Skip to content
This repository has been archived by the owner on Mar 7, 2018. It is now read-only.

Commit

Permalink
Merge pull request #27 from CatalystCode/add-publisher
Browse files Browse the repository at this point in the history
Add more fields required for Cassandra schema
  • Loading branch information
c-w committed Jun 23, 2017
2 parents 6dc0dcd + 2730c8c commit 0716350
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.microsoft.partnercatalyst.fortis.spark.dto

case class AnalyzedItem(
createdAtEpoch: Long,
body: String,
title: String,
source: String,
publisher: String,
sourceUrl: String,
sharedLocations: List[Location] = List(),
analysis: Analysis
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now

import com.github.catalystcode.fortis.spark.streaming.bing.dto.BingPost
import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider}
Expand All @@ -15,9 +17,11 @@ object BingPipeline extends Pipeline {

private def convertToSchema(stream: DStream[BingPost], transformContext: TransformContext): DStream[AnalyzedItem] = {
stream.map(post => AnalyzedItem(
createdAtEpoch = now.getEpochSecond,
body = post.snippet,
title = post.name,
source = post.url,
publisher = "Bing",
sourceUrl = post.url,
analysis = Analysis()
))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now

import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookPost
import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider}
Expand All @@ -17,9 +19,11 @@ object FacebookPipeline extends Pipeline {
import transformContext._

stream.map(post => AnalyzedItem(
createdAtEpoch = now.getEpochSecond,
body = post.post.getMessage,
title = "",
source = post.post.getPermalinkUrl.toString,
publisher = "Facebook",
sourceUrl = post.post.getPermalinkUrl.toString,
sharedLocations = Option(post.post.getPlace).map(_.getLocation) match {
case Some(location) => locationsExtractor.fetch(location.getLatitude, location.getLongitude).toList
case None => List()},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now

import com.github.catalystcode.fortis.spark.streaming.instagram.dto.InstagramItem
import com.microsoft.partnercatalyst.fortis.spark.dto.AnalyzedItem
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider}
Expand All @@ -16,14 +18,16 @@ object InstagramPipeline extends Pipeline {
// do computer vision analysis
val analysis = imageAnalyzer.analyze(instagram.images.standard_resolution.url)
AnalyzedItem(
createdAtEpoch = now.getEpochSecond,
body = analysis.summary.getOrElse(""),
title = instagram.caption.text,
sharedLocations = instagram.location match {
case Some(location) => locationsExtractor.fetch(location.latitude, location.longitude).toList
case None => List()},
analysis = analysis.copy(
keywords = analysis.keywords.filter(tag => targetKeywords.contains(tag.name))),
source = instagram.link)
publisher = "Instagram",
sourceUrl = instagram.link)
})
.map(analyzedItem => {
// keyword extraction
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now

import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider}
import com.microsoft.partnercatalyst.fortis.spark.streamwrappers.radio.RadioTranscription
Expand All @@ -15,9 +17,11 @@ object RadioPipeline extends Pipeline {

private def convertToSchema(stream: DStream[RadioTranscription], transformContext: TransformContext): DStream[AnalyzedItem] = {
stream.map(transcription => AnalyzedItem(
createdAtEpoch = now.getEpochSecond,
body = transcription.text,
title = "",
source = transcription.radioUrl,
publisher = "Radio",
sourceUrl = transcription.radioUrl,
analysis = Analysis(
language = Some(transcription.language)
)))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now

import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider}
import com.microsoft.partnercatalyst.fortis.spark.tadaweb.dto.TadawebEvent
Expand All @@ -17,9 +19,11 @@ object TadawebPipeline extends Pipeline {
import transformContext._

stream.map(tada => AnalyzedItem(
createdAtEpoch = now.getEpochSecond,
body = tada.text,
title = tada.title,
source = tada.tada.name,
publisher = "TadaWeb",
sourceUrl = tada.tada.name,
sharedLocations = tada.cities.flatMap(city => city.coordinates match {
case Seq(latitude, longitude) => locationsExtractor.fetch(latitude, longitude)
case _ => None
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now

import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider}
import org.apache.spark.streaming.StreamingContext
Expand All @@ -17,9 +19,11 @@ object TwitterPipeline extends Pipeline {
import transformContext._

stream.map(tweet => AnalyzedItem(
createdAtEpoch = now.getEpochSecond,
body = tweet.getText,
title = "",
source = s"https://twitter.com/statuses/${tweet.getId}",
publisher = "Twitter",
sourceUrl = s"https://twitter.com/statuses/${tweet.getId}",
sharedLocations = Option(tweet.getGeoLocation) match {
case Some(location) => locationsExtractor.fetch(location.getLatitude, location.getLongitude).toList
case None => List()},
Expand Down

0 comments on commit 0716350

Please sign in to comment.