Skip to content
This repository has been archived by the owner on Mar 7, 2018. It is now read-only.

Commit

Permalink
Merge branch 'master' into rename-customevent
Browse files Browse the repository at this point in the history
  • Loading branch information
Smarker committed Aug 11, 2017
2 parents 89e8f14 + d675bbb commit fc94da4
Show file tree
Hide file tree
Showing 9 changed files with 13 additions and 96 deletions.
77 changes: 1 addition & 76 deletions README.md
Expand Up @@ -13,82 +13,7 @@ This project contains a Spark Streaming job that ingests data into the Fortis sy
3. Narrow down the stream of events based on user-defined geo-areas, target keywords and blacklisted terms.
4. Perform trend detection and aggregate the metrics that back Project Fortis.

At the end of the ingestion pipeline, we publish the events to Kafka from where any downstream processors or aggregators
can consume the data. The schema of the data in Kafka is as follows:

```json
{
"title": "FortisEvent",
"type": "object",
"properties": {
"language": {
"type": "string"
},
"locations": {
"description": "The ids of all places mentioned in the event",
"type": "array",
"items": {
"description": "A Who's-On-First id",
"type": "string"
}
},
"sentiments": {
"type": "array",
"items": {
"description": "Neutral sentiment is 0.6, 0 is most negative, 1 is most positive.",
"type": "number",
"minimum": 0,
"maximum": 1
}
},
"keywords": {
"type": "array",
"items": {
"type": "string"
}
},
"entities": {
"type": "array",
"items": {
"type": "string"
}
},
"summary": {
"type": "string"
},
"id": {
"type": "string"
},
"createdAtEpoch": {
"type": "number"
},
"body": {
"type": "string"
},
"title": {
"type": "string"
},
"publisher": {
"type": "string"
},
"sourceUrl": {
"type": "string"
},
"sharedLocations": {
"description": "The ids of all places explicitly tagged in the event",
"type": "array",
"items": {
"description": "A Who's-On-First id",
"type": "string"
}
}
},
"required": [
"id",
"createdAtEpoch"
]
}
```
At the end of the ingestion pipeline, we publish the events and various aggregations to Cassandra.


## Development setup ##
Expand Down
3 changes: 1 addition & 2 deletions build.sbt
Expand Up @@ -18,11 +18,10 @@ parallelExecution in Test := false

// Dependencies provided by the Spark distro
libraryDependencies ++= Seq(
"com.datastax.spark" %% "spark-cassandra-connector" % "2.0.2",
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion
)
).map(_ % "provided")

// Bundled dependencies
libraryDependencies ++= Seq(
Expand Down
Expand Up @@ -3,11 +3,12 @@ package com.microsoft.partnercatalyst.fortis.spark
object Constants {
val SparkAppName = "project-fortis-spark"
val SparkMasterDefault = "local[*]"
val SparkStreamingBatchSizeDefault = 1
val SparkStreamingBatchSizeDefault = 5

val EventHubProgressDir = "eventhubProgress"

object Env {
val SparkStreamingBatchSize = "FORTIS_STREAMING_DURATION_IN_SECONDS"
val HighlyAvailableProgressDir = "HA_PROGRESS_DIR"
val AppInsightsKey = "FORTIS_APPINSIGHTS_IKEY"
val LanguageModelDir = "FORTIS_MODELS_DIRECTORY"
Expand Down
Expand Up @@ -52,7 +52,7 @@ object ProjectFortis extends App {
Logger.getLogger("liblocations").setLevel(Level.DEBUG)

private def createStreamingContext(): StreamingContext = {
val batchDuration = Seconds(Constants.SparkStreamingBatchSizeDefault)
val batchDuration = Seconds(envOrElse(Constants.Env.SparkStreamingBatchSize, Constants.SparkStreamingBatchSizeDefault.toString).toLong)
val conf = new SparkConf()
.setAppName(Constants.SparkAppName)
.setIfMissing("spark.master", Constants.SparkMasterDefault)
Expand Down
Expand Up @@ -13,7 +13,7 @@ class RedditAnalyzer extends Analyzer[RedditObject] with Serializable
body = item.data.description.getOrElse(""),
title = item.data.title.getOrElse(""),
externalsourceid = item.data.author.getOrElse(""),
pipelinekey = "Reddit",
pipelinekey = "reddit",
sourceurl = item.data.url.getOrElse(""),
original = item
)
Expand Down
Expand Up @@ -22,7 +22,7 @@ class CassandraConfigurationManager extends ConfigurationManager with Serializab
}

val pipelineConfigRows = sparkContext.cassandraTable[CassandraSchema.Table.Stream](CassandraSchema.KeyspaceName,
CassandraSchema.Table.StreamsName).collect()
CassandraSchema.Table.StreamsName).where("pipelinekey = ?", pipeline).collect()

pipelineConfigRows.map(stream => {
val trustedSources = connectorToTrustedSources.computeIfAbsent(stream.streamfactory, (fetchTrustedSources _).asJava)
Expand All @@ -46,7 +46,6 @@ class CassandraConfigurationManager extends ConfigurationManager with Serializab
siteSettingRow match {
case Some(row) =>
SiteSettings(
id = row.id,
siteName = row.sitename,
geofence = Geofence(row.geofence(0), row.geofence(1), row.geofence(2), row.geofence(3)),
languages = row.languages,
Expand All @@ -57,7 +56,7 @@ class CassandraConfigurationManager extends ConfigurationManager with Serializab
cogSpeechSvcToken = row.cogspeechsvctoken,
cogVisionSvcToken = row.cogvisionsvctoken,
cogTextSvcToken = row.cogtextsvctoken,
insertionTime = row.insertionTime
insertiontime = row.insertiontime
)
case None =>
val ex = new Exception(s"Table '${CassandraSchema.Table.SiteSettingsName}' must have at least 1 entry.")
Expand Down
@@ -1,7 +1,5 @@
package com.microsoft.partnercatalyst.fortis.spark.dba

import java.util.UUID

object CassandraSchema {
val KeyspaceName = "fortis"

Expand All @@ -13,18 +11,17 @@ object CassandraSchema {
val TrustedSourcesName = "trustedsources"

case class SiteSetting(
id: UUID,
sitename: String,
geofence: Seq[Double],
languages: Set[String],
languages: Seq[String],
defaultzoom: Int,
title: String,
logo: String,
translationsvctoken: String,
cogspeechsvctoken: String,
cogvisionsvctoken: String,
cogtextsvctoken: String,
insertionTime: Long
insertiontime: Long
)

case class Stream(
Expand Down
@@ -1,18 +1,15 @@
package com.microsoft.partnercatalyst.fortis.spark.dto

import java.util.UUID

case class SiteSettings(
id: UUID,
siteName: String,
geofence: Geofence,
languages: Set[String],
languages: Seq[String],
defaultZoom: Int,
title: String,
logo: String,
translationSvcToken: String,
cogSpeechSvcToken: String,
cogVisionSvcToken: String,
cogTextSvcToken: String,
insertionTime: Long
insertiontime: Long
)
Expand Up @@ -76,7 +76,6 @@ case class PopularTopicAggregate(
) extends AggregationRecordTile with Serializable

case class SiteSetting(
id: String,
sitename: String,
geofence: Seq[Double],
languages: Set[String],
Expand All @@ -87,7 +86,7 @@ case class SiteSetting(
cogspeechsvctoken: String,
cogvisionsvctoken: String,
cogtextsvctoken: String,
insertion_time: Long
insertiontime: Long
)

case class Stream(
Expand Down

0 comments on commit fc94da4

Please sign in to comment.