Skip to content
This repository has been archived by the owner on Mar 7, 2018. It is now read-only.

Commit

Permalink
Implement custom event pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
c-w committed Jun 28, 2017
1 parent c2fc843 commit 36790f8
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 0 deletions.
5 changes: 5 additions & 0 deletions README.md
Expand Up @@ -38,6 +38,11 @@ export TADAWEB_EH_POLICY_KEY="..."
export TADAWEB_EH_NAMESPACE="..."
export TADAWEB_EH_NAME="..."
export TADAWEB_EH_PARTITION_COUNT="..."
export CUSTOMEVENTS_EH_POLICY_NAME="..."
export CUSTOMEVENTS_EH_POLICY_KEY="..."
export CUSTOMEVENTS_EH_NAMESPACE="..."
export CUSTOMEVENTS_EH_NAME="..."
export CUSTOMEVENTS_EH_PARTITION_COUNT="..."
export EH_PROGRESS_DIR="..."

# compile scala, run tests, build fat jar
Expand Down
Expand Up @@ -10,6 +10,7 @@ import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.reflect.runtime.universe.TypeTag
import scala.util.Properties.envOrElse

object ProjectFortis extends App {

Expand Down Expand Up @@ -70,6 +71,7 @@ object ProjectFortis extends App {
pipeline("facebook", new FacebookAnalyzer),
pipeline("instagram", new InstagramAnalyzer),
pipeline("tadaweb", new TadawebAnalyzer),
pipeline("customevents", new CustomEventAnalyzer),
pipeline("bing", new BingAnalyzer),
pipeline("radio", new RadioAnalyzer)
).flatten.reduceOption(_.union(_))
Expand Down Expand Up @@ -168,6 +170,19 @@ object ProjectFortis extends App {
"consumerGroup" -> "$Default"
)
)
),
"customevents" -> List(
ConnectorConfig(
"CustomEvents",
Map (
"policyName" -> envOrElse("CUSTOMEVENTS_EH_POLICY_NAME", "project-fortis-spark"),
"policyKey" -> System.getenv("CUSTOMEVENTS_EH_POLICY_KEY"),
"namespace" -> envOrElse("CUSTOMEVENTS_EH_NAMESPACE", "fortiscustomevents"),
"name" -> envOrElse("CUSTOMEVENTS_EH_NAME", "customevents"),
"partitionCount" -> envOrElse("CUSTOMEVENTS_EH_PARTITION_COUNT", "1"),
"consumerGroup" -> envOrElse("CUSTOMEVENTS_EH_CONSUMER_GROUP", "$Default")
)
)
)
)(pipeline)
}
Expand Down
Expand Up @@ -4,6 +4,7 @@ import java.io.File

import com.microsoft.partnercatalyst.fortis.spark.streamfactories._
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.StreamProvider
import com.microsoft.partnercatalyst.fortis.spark.streamwrappers.customevents.CustomEventsAdapter
import com.microsoft.partnercatalyst.fortis.spark.streamwrappers.tadaweb.TadawebAdapter

object StreamProviderFactory {
Expand Down Expand Up @@ -42,6 +43,12 @@ object StreamProviderFactory {
new File(settings.progressDir, Constants.EventHubProgressDir).getPath)
)
)
.withFactories(
List(
new EventHubStreamFactory("CustomEvents", CustomEventsAdapter.apply,
new File(settings.progressDir, Constants.EventHubProgressDir).getPath)
)
)

streamProvider
}
Expand Down
@@ -0,0 +1,23 @@
package com.microsoft.partnercatalyst.fortis.spark.analyzer

import java.time.Instant.now
import java.util.UUID.randomUUID

import com.microsoft.partnercatalyst.fortis.spark.streamwrappers.customevents.CustomEvent
import com.microsoft.partnercatalyst.fortis.spark.transforms.image.ImageAnalyzer

@SerialVersionUID(100L)
class CustomEventAnalyzer extends Analyzer[CustomEvent] with Serializable
with AnalysisDefaults.EnableAll[CustomEvent] {
override def toSchema(item: CustomEvent, locationFetcher: LocationFetcher, imageAnalyzer: ImageAnalyzer): ExtendedDetails[CustomEvent] = {
ExtendedDetails(
id = randomUUID(),
createdAtEpoch = now.getEpochSecond,
body = item.message,
title = item.title.getOrElse(""),
publisher = item.source.getOrElse("CustomEvent"),
sourceUrl = item.link.getOrElse(""),
original = item
)
}
}
Expand Up @@ -57,5 +57,8 @@ class KryoRegistrator extends BaseKryoRegistrator {
kryo.register(classOf[com.microsoft.partnercatalyst.fortis.spark.analyzer.TwitterAnalyzer])
kryo.register(classOf[com.microsoft.partnercatalyst.fortis.spark.transforms.locations.LocationsExtractorFactory])
kryo.register(classOf[com.microsoft.partnercatalyst.fortis.spark.transforms.ZipModelsProvider])
kryo.register(classOf[com.microsoft.partnercatalyst.fortis.spark.streamwrappers.customevents.CustomEvent])
kryo.register(classOf[com.microsoft.partnercatalyst.fortis.spark.streamwrappers.customevents.CustomEventFeature])
kryo.register(classOf[com.microsoft.partnercatalyst.fortis.spark.streamwrappers.customevents.CustomEventFeatureCollection])
}
}
@@ -0,0 +1,19 @@
package com.microsoft.partnercatalyst.fortis.spark.streamwrappers.customevents

case class CustomEventFeature(
`type`: String,
coordinates: List[Float])

case class CustomEventFeatureCollection(
`type`: String,
features: List[CustomEventFeature])

case class CustomEvent(
RowKey: String,
created_at: String,
featureCollection: CustomEventFeatureCollection,
message: String,
language: String,
link: Option[String],
source: Option[String],
title: Option[String])
@@ -0,0 +1,12 @@
package com.microsoft.partnercatalyst.fortis.spark.streamwrappers.customevents

import net.liftweb.json

import scala.util.Try

object CustomEventsAdapter {
def apply(input: String): Try[CustomEvent] = {
implicit val _ = json.DefaultFormats
Try(json.parse(input)).flatMap(body => Try(body.extract[CustomEvent]))
}
}

0 comments on commit 36790f8

Please sign in to comment.