diff --git a/src/main/scala/FacebookDemo.scala b/src/main/scala/FacebookDemo.scala index 59ef7f0..9c71656 100644 --- a/src/main/scala/FacebookDemo.scala +++ b/src/main/scala/FacebookDemo.scala @@ -6,7 +6,7 @@ object FacebookDemo { val mode = args.headOption.getOrElse("") // configure page for which to ingest posts - val pageId = "aljazeera" + val pageIds = List("aljazeera", "TheEconomist") // configure interaction with facebook api val auth = FacebookAuth(accessToken = System.getenv("FACEBOOK_AUTH_TOKEN"), appId = System.getenv("FACEBOOK_APP_ID"), appSecret = System.getenv("FACEBOOK_APP_SECRET")) @@ -16,7 +16,7 @@ object FacebookDemo { Logger.getRootLogger.setLevel(Level.ERROR) Logger.getLogger("libfacebook").setLevel(Level.DEBUG) - if (mode.contains("standalone")) new FacebookDemoStandalone(pageId, auth).run() - if (mode.contains("spark")) new FacebookDemoSpark(pageId, auth).run() + if (mode.contains("standalone")) new FacebookDemoStandalone(pageIds.head, auth).run() + if (mode.contains("spark")) new FacebookDemoSpark(pageIds.toSet, auth).run() } } diff --git a/src/main/scala/FacebookDemoSpark.scala b/src/main/scala/FacebookDemoSpark.scala index 046b2ef..5f111c2 100644 --- a/src/main/scala/FacebookDemoSpark.scala +++ b/src/main/scala/FacebookDemoSpark.scala @@ -2,14 +2,14 @@ import com.github.catalystcode.fortis.spark.streaming.facebook.{FacebookAuth, Fa import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} -class FacebookDemoSpark(pageId: String, auth: FacebookAuth) { +class FacebookDemoSpark(pageIds: Set[String], auth: FacebookAuth) { def run(): Unit = { // set up the spark context and streams val conf = new SparkConf().setAppName("Facebook Spark Streaming Demo Application") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) - FacebookUtils.createPageStream(ssc, auth, pageId).map(x => s"Post: ${x.post.getPermalinkUrl}").print() + FacebookUtils.createPageStreams(ssc, auth, pageIds).map(x => s"Post: ${x.post.getPermalinkUrl}").print() // run forever ssc.start() diff --git a/src/main/scala/com/github/catalystcode/fortis/spark/streaming/facebook/FacebookInputDStream.scala b/src/main/scala/com/github/catalystcode/fortis/spark/streaming/facebook/FacebookInputDStream.scala index 983d223..ed66c6d 100644 --- a/src/main/scala/com/github/catalystcode/fortis/spark/streaming/facebook/FacebookInputDStream.scala +++ b/src/main/scala/com/github/catalystcode/fortis/spark/streaming/facebook/FacebookInputDStream.scala @@ -2,7 +2,7 @@ package com.github.catalystcode.fortis.spark.streaming.facebook import java.util.Date -import com.github.catalystcode.fortis.spark.streaming.facebook.client.FacebookClient +import com.github.catalystcode.fortis.spark.streaming.facebook.client.FacebookPageClient import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookPost import com.github.catalystcode.fortis.spark.streaming.{PollingReceiver, PollingSchedule} import org.apache.spark.storage.StorageLevel @@ -11,7 +11,7 @@ import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.receiver.Receiver private class FacebookReceiver( - client: FacebookClient, + clients: Set[FacebookPageClient], pollingSchedule: PollingSchedule, storageLevel: StorageLevel, pollingWorkers: Int @@ -20,10 +20,10 @@ private class FacebookReceiver( @volatile private var lastIngestedDate: Option[Date] = None override protected def poll(): Unit = { - client + clients.foreach(_ .loadNewFacebooks(lastIngestedDate) .filter(x => { - logDebug(s"Got facebook ${x.post.getPermalinkUrl} from time ${x.post.getCreatedTime} with ${x.comments.size} comments") + logDebug(s"Got facebook ${x.post.getPermalinkUrl} from page ${x.pageId} time ${x.post.getCreatedTime} with ${x.comments.size} comments") isNew(x) }) .foreach(x => { @@ -31,6 +31,7 @@ private class FacebookReceiver( store(x) markStored(x) }) + ) } private def isNew(item: FacebookPost) = { @@ -47,7 +48,7 @@ private class FacebookReceiver( class FacebookInputDStream( ssc: StreamingContext, - client: FacebookClient, + clients: Set[FacebookPageClient], pollingSchedule: PollingSchedule, pollingWorkers: Int, storageLevel: StorageLevel @@ -55,6 +56,6 @@ class FacebookInputDStream( override def getReceiver(): Receiver[FacebookPost] = { logDebug("Creating facebook receiver") - new FacebookReceiver(client, pollingSchedule, storageLevel, pollingWorkers) + new FacebookReceiver(clients, pollingSchedule, storageLevel, pollingWorkers) } } diff --git a/src/main/scala/com/github/catalystcode/fortis/spark/streaming/facebook/FacebookUtils.scala b/src/main/scala/com/github/catalystcode/fortis/spark/streaming/facebook/FacebookUtils.scala index e15d906..a4b18b1 100644 --- a/src/main/scala/com/github/catalystcode/fortis/spark/streaming/facebook/FacebookUtils.scala +++ b/src/main/scala/com/github/catalystcode/fortis/spark/streaming/facebook/FacebookUtils.scala @@ -10,10 +10,10 @@ import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.ReceiverInputDStream object FacebookUtils { - def createPageStream( + def createPageStreams( ssc: StreamingContext, auth: FacebookAuth, - pageId: String, + pageIds: Set[String], fields: Set[String] = Set("message", "place", "caption", "from", "name", "comments"), pollingSchedule: PollingSchedule = PollingSchedule(30, TimeUnit.SECONDS), pollingWorkers: Int = 1, @@ -21,12 +21,24 @@ object FacebookUtils { ): ReceiverInputDStream[FacebookPost] = { new FacebookInputDStream( ssc = ssc, - client = new FacebookPageClient( + clients = pageIds.map(pageId => new FacebookPageClient( pageId = pageId, auth = auth, - fields = fields), + fields = fields)), pollingSchedule = pollingSchedule, pollingWorkers = pollingWorkers, storageLevel = storageLevel) } + + def createPageStream( + ssc: StreamingContext, + auth: FacebookAuth, + pageId: String, + fields: Set[String] = Set("message", "place", "caption", "from", "name", "comments"), + pollingSchedule: PollingSchedule = PollingSchedule(30, TimeUnit.SECONDS), + pollingWorkers: Int = 1, + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY + ): ReceiverInputDStream[FacebookPost] = { + createPageStreams(ssc, auth, Set(pageId), fields, pollingSchedule, pollingWorkers, storageLevel) + } } diff --git a/src/main/scala/com/github/catalystcode/fortis/spark/streaming/facebook/client/FacebookClient.scala b/src/main/scala/com/github/catalystcode/fortis/spark/streaming/facebook/client/FacebookClient.scala deleted file mode 100644 index f43186e..0000000 --- a/src/main/scala/com/github/catalystcode/fortis/spark/streaming/facebook/client/FacebookClient.scala +++ /dev/null @@ -1,75 +0,0 @@ -package com.github.catalystcode.fortis.spark.streaming.facebook.client - -import java.util -import java.util.Date - -import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookPost -import com.github.catalystcode.fortis.spark.streaming.facebook.{FacebookAuth, Logger} -import facebook4j._ -import facebook4j.auth.AccessToken - -import collection.JavaConverters._ -import scala.collection.mutable.ListBuffer - -@SerialVersionUID(100L) -abstract class FacebookClient( - auth: FacebookAuth, - fields: Set[String]) -extends Serializable with Logger { - - @transient protected lazy val facebook: Facebook = createFacebook() - @transient private lazy val defaultLookback = new Date(new Date().getTime - 2 * 3600000 /* two hours */) - @transient private lazy val defaultFields = Set("permalink_url", "created_time") - - def loadNewFacebooks(after: Option[Date] = None): Iterable[FacebookPost] = { - val allPosts = ListBuffer[FacebookPost]() - - try { - var posts = fetchFacebookResponse(after.getOrElse(defaultLookback)) - while (posts != null && posts.getPaging != null) { - posts.asScala.foreach(post => { - allPosts += FacebookPost(post, fetchComments(post)) - }) - logDebug(s"Got another page: ${Option(posts.getPaging.getNext).getOrElse("no")}") - posts = facebook.fetchNext(posts.getPaging) - } - } catch { - case fbex: FacebookException => - logError("Problem fetching response from Facebook", fbex) - } - - allPosts - } - - private def fetchComments(post: Post): Seq[Comment] = { - val allComments = new util.ArrayList[Comment]() - - try { - var comments = post.getComments - while (comments != null && comments.getPaging != null) { - allComments.addAll(comments) - logDebug(s"Got another comments page: ${Option(comments.getPaging.getNext).getOrElse("no")}") - comments = facebook.fetchNext(comments.getPaging) - } - } catch { - case fbex: FacebookException => - logError(s"Problem fetching comments from Facebook for post ${post.getPermalinkUrl}", fbex) - } - - allComments.asScala - } - - private def createFacebook(): Facebook = { - val facebook = new FacebookFactory().getInstance() - facebook.setOAuthAppId(auth.appId, auth.appSecret) - facebook.setOAuthAccessToken(new AccessToken(auth.accessToken, null)) - facebook - } - - protected def createReading(after: Date): Reading = { - // todo: reduce limit if we got a too-large-request error - new Reading().since(after).fields((defaultFields ++ fields).toArray : _*).limit(100) - } - - protected def fetchFacebookResponse(after: Date): ResponseList[Post] -} diff --git a/src/main/scala/com/github/catalystcode/fortis/spark/streaming/facebook/client/FacebookPageClient.scala b/src/main/scala/com/github/catalystcode/fortis/spark/streaming/facebook/client/FacebookPageClient.scala index 63aebc8..32b34a4 100644 --- a/src/main/scala/com/github/catalystcode/fortis/spark/streaming/facebook/client/FacebookPageClient.scala +++ b/src/main/scala/com/github/catalystcode/fortis/spark/streaming/facebook/client/FacebookPageClient.scala @@ -1,16 +1,78 @@ package com.github.catalystcode.fortis.spark.streaming.facebook.client + +import java.util import java.util.Date -import com.github.catalystcode.fortis.spark.streaming.facebook.FacebookAuth -import facebook4j.{Post, ResponseList} +import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookPost +import com.github.catalystcode.fortis.spark.streaming.facebook.{FacebookAuth, Logger} +import facebook4j._ +import facebook4j.auth.AccessToken + +import collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +@SerialVersionUID(100L) class FacebookPageClient( pageId: String, auth: FacebookAuth, fields: Set[String]) -extends FacebookClient(auth, fields) { +extends Serializable with Logger { + + @transient protected lazy val facebook: Facebook = createFacebook() + @transient private lazy val defaultLookback = new Date(new Date().getTime - 2 * 3600000 /* two hours */) + @transient private lazy val defaultFields = Set("permalink_url", "created_time") + + def loadNewFacebooks(after: Option[Date] = None): Iterable[FacebookPost] = { + val allPosts = ListBuffer[FacebookPost]() + + try { + var posts = fetchFacebookResponse(after.getOrElse(defaultLookback)) + while (posts != null && posts.getPaging != null) { + posts.asScala.foreach(post => { + allPosts += FacebookPost(pageId, post, fetchComments(post)) + }) + logDebug(s"Got another page: ${Option(posts.getPaging.getNext).getOrElse("no")}") + posts = facebook.fetchNext(posts.getPaging) + } + } catch { + case fbex: FacebookException => + logError("Problem fetching response from Facebook", fbex) + } + + allPosts + } + + private def fetchComments(post: Post): Seq[Comment] = { + val allComments = new util.ArrayList[Comment]() + + try { + var comments = post.getComments + while (comments != null && comments.getPaging != null) { + allComments.addAll(comments) + logDebug(s"Got another comments page: ${Option(comments.getPaging.getNext).getOrElse("no")}") + comments = facebook.fetchNext(comments.getPaging) + } + } catch { + case fbex: FacebookException => + logError(s"Problem fetching comments from Facebook for post ${post.getPermalinkUrl}", fbex) + } + + allComments.asScala + } + + private def createFacebook(): Facebook = { + val facebook = new FacebookFactory().getInstance() + facebook.setOAuthAppId(auth.appId, auth.appSecret) + facebook.setOAuthAccessToken(new AccessToken(auth.accessToken, null)) + facebook + } + + protected def createReading(after: Date): Reading = { + // todo: reduce limit if we got a too-large-request error + new Reading().since(after).fields((defaultFields ++ fields).toArray : _*).limit(100) + } - override def fetchFacebookResponse(after: Date): ResponseList[Post] = { + protected def fetchFacebookResponse(after: Date): ResponseList[Post] = { logDebug(s"Fetching posts for $pageId since $after") facebook.getPosts(pageId, createReading(after)) } diff --git a/src/main/scala/com/github/catalystcode/fortis/spark/streaming/facebook/dto/FacebookPost.scala b/src/main/scala/com/github/catalystcode/fortis/spark/streaming/facebook/dto/FacebookPost.scala index 4dbbad5..9fde1f1 100644 --- a/src/main/scala/com/github/catalystcode/fortis/spark/streaming/facebook/dto/FacebookPost.scala +++ b/src/main/scala/com/github/catalystcode/fortis/spark/streaming/facebook/dto/FacebookPost.scala @@ -3,6 +3,7 @@ package com.github.catalystcode.fortis.spark.streaming.facebook.dto import facebook4j.{Comment, Post} case class FacebookPost( + pageId: String, post: Post, comments: Seq[Comment] ) diff --git a/version.sbt b/version.sbt index a1c3c31..7a30cbf 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version := "0.0.1" +version := "0.0.2"