diff --git a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/dto/AnalyzedItem.scala b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/dto/AnalyzedItem.scala index 1a4c507..0657f10 100644 --- a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/dto/AnalyzedItem.scala +++ b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/dto/AnalyzedItem.scala @@ -1,9 +1,11 @@ package com.microsoft.partnercatalyst.fortis.spark.dto case class AnalyzedItem( + createdAtEpoch: Long, body: String, title: String, - source: String, + publisher: String, + sourceUrl: String, sharedLocations: List[Location] = List(), analysis: Analysis ) diff --git a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/BingPipeline.scala b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/BingPipeline.scala index 9cadbf9..fd72e60 100644 --- a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/BingPipeline.scala +++ b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/BingPipeline.scala @@ -1,5 +1,7 @@ package com.microsoft.partnercatalyst.fortis.spark.pipeline +import java.time.Instant.now + import com.github.catalystcode.fortis.spark.streaming.bing.dto.BingPost import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem} import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider} @@ -15,9 +17,11 @@ object BingPipeline extends Pipeline { private def convertToSchema(stream: DStream[BingPost], transformContext: TransformContext): DStream[AnalyzedItem] = { stream.map(post => AnalyzedItem( + createdAtEpoch = now.getEpochSecond, body = post.snippet, title = post.name, - source = post.url, + publisher = "Bing", + sourceUrl = post.url, analysis = Analysis() )) } diff --git a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/FacebookPipeline.scala b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/FacebookPipeline.scala index 5f6d0c6..976aae2 100644 --- a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/FacebookPipeline.scala +++ b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/FacebookPipeline.scala @@ -1,5 +1,7 @@ package com.microsoft.partnercatalyst.fortis.spark.pipeline +import java.time.Instant.now + import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookPost import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem} import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider} @@ -17,9 +19,11 @@ object FacebookPipeline extends Pipeline { import transformContext._ stream.map(post => AnalyzedItem( + createdAtEpoch = now.getEpochSecond, body = post.post.getMessage, title = "", - source = post.post.getPermalinkUrl.toString, + publisher = "Facebook", + sourceUrl = post.post.getPermalinkUrl.toString, sharedLocations = Option(post.post.getPlace).map(_.getLocation) match { case Some(location) => locationsExtractor.fetch(location.getLatitude, location.getLongitude).toList case None => List()}, diff --git a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/InstagramPipeline.scala b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/InstagramPipeline.scala index 23c7ec8..fadf6bb 100644 --- a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/InstagramPipeline.scala +++ b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/InstagramPipeline.scala @@ -1,5 +1,7 @@ package com.microsoft.partnercatalyst.fortis.spark.pipeline +import java.time.Instant.now + import com.github.catalystcode.fortis.spark.streaming.instagram.dto.InstagramItem import com.microsoft.partnercatalyst.fortis.spark.dto.AnalyzedItem import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider} @@ -16,6 +18,7 @@ object InstagramPipeline extends Pipeline { // do computer vision analysis val analysis = imageAnalyzer.analyze(instagram.images.standard_resolution.url) AnalyzedItem( + createdAtEpoch = now.getEpochSecond, body = analysis.summary.getOrElse(""), title = instagram.caption.text, sharedLocations = instagram.location match { @@ -23,7 +26,8 @@ object InstagramPipeline extends Pipeline { case None => List()}, analysis = analysis.copy( keywords = analysis.keywords.filter(tag => targetKeywords.contains(tag.name))), - source = instagram.link) + publisher = "Instagram", + sourceUrl = instagram.link) }) .map(analyzedItem => { // keyword extraction diff --git a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/RadioPipeline.scala b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/RadioPipeline.scala index 2147d8e..9faff3c 100644 --- a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/RadioPipeline.scala +++ b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/RadioPipeline.scala @@ -1,5 +1,7 @@ package com.microsoft.partnercatalyst.fortis.spark.pipeline +import java.time.Instant.now + import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem} import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider} import com.microsoft.partnercatalyst.fortis.spark.streamwrappers.radio.RadioTranscription @@ -15,9 +17,11 @@ object RadioPipeline extends Pipeline { private def convertToSchema(stream: DStream[RadioTranscription], transformContext: TransformContext): DStream[AnalyzedItem] = { stream.map(transcription => AnalyzedItem( + createdAtEpoch = now.getEpochSecond, body = transcription.text, title = "", - source = transcription.radioUrl, + publisher = "Radio", + sourceUrl = transcription.radioUrl, analysis = Analysis( language = Some(transcription.language) ))) diff --git a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/TadawebPipeline.scala b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/TadawebPipeline.scala index 7a0a029..8bb0e80 100644 --- a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/TadawebPipeline.scala +++ b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/TadawebPipeline.scala @@ -1,5 +1,7 @@ package com.microsoft.partnercatalyst.fortis.spark.pipeline +import java.time.Instant.now + import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem} import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider} import com.microsoft.partnercatalyst.fortis.spark.tadaweb.dto.TadawebEvent @@ -17,9 +19,11 @@ object TadawebPipeline extends Pipeline { import transformContext._ stream.map(tada => AnalyzedItem( + createdAtEpoch = now.getEpochSecond, body = tada.text, title = tada.title, - source = tada.tada.name, + publisher = "TadaWeb", + sourceUrl = tada.tada.name, sharedLocations = tada.cities.flatMap(city => city.coordinates match { case Seq(latitude, longitude) => locationsExtractor.fetch(latitude, longitude) case _ => None diff --git a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/TwitterPipeline.scala b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/TwitterPipeline.scala index f119f53..777016f 100644 --- a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/TwitterPipeline.scala +++ b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/pipeline/TwitterPipeline.scala @@ -1,5 +1,7 @@ package com.microsoft.partnercatalyst.fortis.spark.pipeline +import java.time.Instant.now + import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem} import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider} import org.apache.spark.streaming.StreamingContext @@ -17,9 +19,11 @@ object TwitterPipeline extends Pipeline { import transformContext._ stream.map(tweet => AnalyzedItem( + createdAtEpoch = now.getEpochSecond, body = tweet.getText, title = "", - source = s"https://twitter.com/statuses/${tweet.getId}", + publisher = "Twitter", + sourceUrl = s"https://twitter.com/statuses/${tweet.getId}", sharedLocations = Option(tweet.getGeoLocation) match { case Some(location) => locationsExtractor.fetch(location.getLatitude, location.getLongitude).toList case None => List()},