Skip to content
This repository has been archived by the owner on Mar 7, 2018. It is now read-only.

Add more fields required for Cassandra schema #27

Merged
merged 3 commits into from Jun 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -1,9 +1,11 @@
package com.microsoft.partnercatalyst.fortis.spark.dto

case class AnalyzedItem(
createdAtEpoch: Long,
body: String,
title: String,
source: String,
publisher: String,
sourceUrl: String,
sharedLocations: List[Location] = List(),
analysis: Analysis
)
Expand Down
@@ -1,5 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now

import com.github.catalystcode.fortis.spark.streaming.bing.dto.BingPost
import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider}
Expand All @@ -15,9 +17,11 @@ object BingPipeline extends Pipeline {

private def convertToSchema(stream: DStream[BingPost], transformContext: TransformContext): DStream[AnalyzedItem] = {
stream.map(post => AnalyzedItem(
createdAtEpoch = now.getEpochSecond,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Do these incoming posts have a date of their own? If so, do we prefer to use our own capture time as opposed to the timestamp in the post?

Copy link
Contributor Author

@c-w c-w Jun 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They have a crawl-date but not a post-date (BingPost.scala). Not sure if that's something that would be more useful to us than a timestamp for which we definitely know what it means.

body = post.snippet,
title = post.name,
source = post.url,
publisher = "Bing",
sourceUrl = post.url,
analysis = Analysis()
))
}
Expand Down
@@ -1,5 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now

import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookPost
import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider}
Expand All @@ -17,9 +19,11 @@ object FacebookPipeline extends Pipeline {
import transformContext._

stream.map(post => AnalyzedItem(
createdAtEpoch = now.getEpochSecond,
body = post.post.getMessage,
title = "",
source = post.post.getPermalinkUrl.toString,
publisher = "Facebook",
sourceUrl = post.post.getPermalinkUrl.toString,
sharedLocations = Option(post.post.getPlace).map(_.getLocation) match {
case Some(location) => locationsExtractor.fetch(location.getLatitude, location.getLongitude).toList
case None => List()},
Expand Down
@@ -1,5 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now

import com.github.catalystcode.fortis.spark.streaming.instagram.dto.InstagramItem
import com.microsoft.partnercatalyst.fortis.spark.dto.AnalyzedItem
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider}
Expand All @@ -16,14 +18,16 @@ object InstagramPipeline extends Pipeline {
// do computer vision analysis
val analysis = imageAnalyzer.analyze(instagram.images.standard_resolution.url)
AnalyzedItem(
createdAtEpoch = now.getEpochSecond,
body = analysis.summary.getOrElse(""),
title = instagram.caption.text,
sharedLocations = instagram.location match {
case Some(location) => locationsExtractor.fetch(location.latitude, location.longitude).toList
case None => List()},
analysis = analysis.copy(
keywords = analysis.keywords.filter(tag => targetKeywords.contains(tag.name))),
source = instagram.link)
publisher = "Instagram",
sourceUrl = instagram.link)
})
.map(analyzedItem => {
// keyword extraction
Expand Down
@@ -1,5 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now

import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider}
import com.microsoft.partnercatalyst.fortis.spark.streamwrappers.radio.RadioTranscription
Expand All @@ -15,9 +17,11 @@ object RadioPipeline extends Pipeline {

private def convertToSchema(stream: DStream[RadioTranscription], transformContext: TransformContext): DStream[AnalyzedItem] = {
stream.map(transcription => AnalyzedItem(
createdAtEpoch = now.getEpochSecond,
body = transcription.text,
title = "",
source = transcription.radioUrl,
publisher = "Radio",
sourceUrl = transcription.radioUrl,
analysis = Analysis(
language = Some(transcription.language)
)))
Expand Down
@@ -1,5 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now

import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider}
import com.microsoft.partnercatalyst.fortis.spark.tadaweb.dto.TadawebEvent
Expand All @@ -17,9 +19,11 @@ object TadawebPipeline extends Pipeline {
import transformContext._

stream.map(tada => AnalyzedItem(
createdAtEpoch = now.getEpochSecond,
body = tada.text,
title = tada.title,
source = tada.tada.name,
publisher = "TadaWeb",
sourceUrl = tada.tada.name,
sharedLocations = tada.cities.flatMap(city => city.coordinates match {
case Seq(latitude, longitude) => locationsExtractor.fetch(latitude, longitude)
case _ => None
Expand Down
@@ -1,5 +1,7 @@
package com.microsoft.partnercatalyst.fortis.spark.pipeline

import java.time.Instant.now

import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, AnalyzedItem}
import com.microsoft.partnercatalyst.fortis.spark.streamprovider.{ConnectorConfig, StreamProvider}
import org.apache.spark.streaming.StreamingContext
Expand All @@ -17,9 +19,11 @@ object TwitterPipeline extends Pipeline {
import transformContext._

stream.map(tweet => AnalyzedItem(
createdAtEpoch = now.getEpochSecond,
body = tweet.getText,
title = "",
source = s"https://twitter.com/statuses/${tweet.getId}",
publisher = "Twitter",
sourceUrl = s"https://twitter.com/statuses/${tweet.getId}",
sharedLocations = Option(tweet.getGeoLocation) match {
case Some(location) => locationsExtractor.fetch(location.getLatitude, location.getLongitude).toList
case None => List()},
Expand Down