Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/main/scala/FacebookDemo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ object FacebookDemo {
val mode = args.headOption.getOrElse("")

// configure page for which to ingest posts
val pageId = "aljazeera"
val pageIds = List("aljazeera", "TheEconomist")

// configure interaction with facebook api
val auth = FacebookAuth(accessToken = System.getenv("FACEBOOK_AUTH_TOKEN"), appId = System.getenv("FACEBOOK_APP_ID"), appSecret = System.getenv("FACEBOOK_APP_SECRET"))
Expand All @@ -16,7 +16,7 @@ object FacebookDemo {
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("libfacebook").setLevel(Level.DEBUG)

if (mode.contains("standalone")) new FacebookDemoStandalone(pageId, auth).run()
if (mode.contains("spark")) new FacebookDemoSpark(pageId, auth).run()
if (mode.contains("standalone")) new FacebookDemoStandalone(pageIds.head, auth).run()
if (mode.contains("spark")) new FacebookDemoSpark(pageIds.toSet, auth).run()
}
}
4 changes: 2 additions & 2 deletions src/main/scala/FacebookDemoSpark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import com.github.catalystcode.fortis.spark.streaming.facebook.{FacebookAuth, Fa
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

class FacebookDemoSpark(pageId: String, auth: FacebookAuth) {
class FacebookDemoSpark(pageIds: Set[String], auth: FacebookAuth) {
def run(): Unit = {
// set up the spark context and streams
val conf = new SparkConf().setAppName("Facebook Spark Streaming Demo Application")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))

FacebookUtils.createPageStream(ssc, auth, pageId).map(x => s"Post: ${x.post.getPermalinkUrl}").print()
FacebookUtils.createPageStreams(ssc, auth, pageIds).map(x => s"Post: ${x.post.getPermalinkUrl}").print()

// run forever
ssc.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.github.catalystcode.fortis.spark.streaming.facebook

import java.util.Date

import com.github.catalystcode.fortis.spark.streaming.facebook.client.FacebookClient
import com.github.catalystcode.fortis.spark.streaming.facebook.client.FacebookPageClient
import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookPost
import com.github.catalystcode.fortis.spark.streaming.{PollingReceiver, PollingSchedule}
import org.apache.spark.storage.StorageLevel
Expand All @@ -11,7 +11,7 @@ import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

private class FacebookReceiver(
client: FacebookClient,
clients: Set[FacebookPageClient],
pollingSchedule: PollingSchedule,
storageLevel: StorageLevel,
pollingWorkers: Int
Expand All @@ -20,17 +20,18 @@ private class FacebookReceiver(
@volatile private var lastIngestedDate: Option[Date] = None

override protected def poll(): Unit = {
client
clients.foreach(_
.loadNewFacebooks(lastIngestedDate)
.filter(x => {
logDebug(s"Got facebook ${x.post.getPermalinkUrl} from time ${x.post.getCreatedTime} with ${x.comments.size} comments")
logDebug(s"Got facebook ${x.post.getPermalinkUrl} from page ${x.pageId} time ${x.post.getCreatedTime} with ${x.comments.size} comments")
isNew(x)
})
.foreach(x => {
logInfo(s"Storing facebook ${x.post.getPermalinkUrl}")
store(x)
markStored(x)
})
)
}

private def isNew(item: FacebookPost) = {
Expand All @@ -47,14 +48,14 @@ private class FacebookReceiver(

class FacebookInputDStream(
ssc: StreamingContext,
client: FacebookClient,
clients: Set[FacebookPageClient],
pollingSchedule: PollingSchedule,
pollingWorkers: Int,
storageLevel: StorageLevel
) extends ReceiverInputDStream[FacebookPost](ssc) {

override def getReceiver(): Receiver[FacebookPost] = {
logDebug("Creating facebook receiver")
new FacebookReceiver(client, pollingSchedule, storageLevel, pollingWorkers)
new FacebookReceiver(clients, pollingSchedule, storageLevel, pollingWorkers)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,35 @@ import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream

object FacebookUtils {
def createPageStream(
def createPageStreams(
ssc: StreamingContext,
auth: FacebookAuth,
pageId: String,
pageIds: Set[String],
fields: Set[String] = Set("message", "place", "caption", "from", "name", "comments"),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove comments as this should be populated from createPageCommentsStream

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now implemented in #4.

pollingSchedule: PollingSchedule = PollingSchedule(30, TimeUnit.SECONDS),
pollingWorkers: Int = 1,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
): ReceiverInputDStream[FacebookPost] = {
new FacebookInputDStream(
ssc = ssc,
client = new FacebookPageClient(
clients = pageIds.map(pageId => new FacebookPageClient(
pageId = pageId,
auth = auth,
fields = fields),
fields = fields)),
pollingSchedule = pollingSchedule,
pollingWorkers = pollingWorkers,
storageLevel = storageLevel)
}

def createPageStream(
ssc: StreamingContext,
auth: FacebookAuth,
pageId: String,
fields: Set[String] = Set("message", "place", "caption", "from", "name", "comments"),
pollingSchedule: PollingSchedule = PollingSchedule(30, TimeUnit.SECONDS),
pollingWorkers: Int = 1,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
): ReceiverInputDStream[FacebookPost] = {
createPageStreams(ssc, auth, Set(pageId), fields, pollingSchedule, pollingWorkers, storageLevel)
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're going to need to expose a createPageCommentsStream utility to focus on newly posted comments since the previous processed time. This stream should crawl facebook4j getPostComments http://facebook4j.github.io/javadoc/facebook4j/api/PostMethods.html#getPostComments(java.lang.String, facebook4j.Reading)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Will do in a follow-up PR, see #3 for tracking.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,16 +1,78 @@
package com.github.catalystcode.fortis.spark.streaming.facebook.client

import java.util
import java.util.Date

import com.github.catalystcode.fortis.spark.streaming.facebook.FacebookAuth
import facebook4j.{Post, ResponseList}
import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookPost
import com.github.catalystcode.fortis.spark.streaming.facebook.{FacebookAuth, Logger}
import facebook4j._
import facebook4j.auth.AccessToken

import collection.JavaConverters._
import scala.collection.mutable.ListBuffer

@SerialVersionUID(100L)
class FacebookPageClient(
pageId: String,
auth: FacebookAuth,
fields: Set[String])
extends FacebookClient(auth, fields) {
extends Serializable with Logger {

@transient protected lazy val facebook: Facebook = createFacebook()
@transient private lazy val defaultLookback = new Date(new Date().getTime - 2 * 3600000 /* two hours */)
@transient private lazy val defaultFields = Set("permalink_url", "created_time")

def loadNewFacebooks(after: Option[Date] = None): Iterable[FacebookPost] = {
val allPosts = ListBuffer[FacebookPost]()

try {
var posts = fetchFacebookResponse(after.getOrElse(defaultLookback))
while (posts != null && posts.getPaging != null) {
posts.asScala.foreach(post => {
allPosts += FacebookPost(pageId, post, fetchComments(post))
})
logDebug(s"Got another page: ${Option(posts.getPaging.getNext).getOrElse("no")}")
posts = facebook.fetchNext(posts.getPaging)
}
} catch {
case fbex: FacebookException =>
logError("Problem fetching response from Facebook", fbex)
}

allPosts
}

private def fetchComments(post: Post): Seq[Comment] = {
val allComments = new util.ArrayList[Comment]()

try {
var comments = post.getComments
while (comments != null && comments.getPaging != null) {
allComments.addAll(comments)
logDebug(s"Got another comments page: ${Option(comments.getPaging.getNext).getOrElse("no")}")
comments = facebook.fetchNext(comments.getPaging)
}
} catch {
case fbex: FacebookException =>
logError(s"Problem fetching comments from Facebook for post ${post.getPermalinkUrl}", fbex)
}

allComments.asScala
}

private def createFacebook(): Facebook = {
val facebook = new FacebookFactory().getInstance()
facebook.setOAuthAppId(auth.appId, auth.appSecret)
facebook.setOAuthAccessToken(new AccessToken(auth.accessToken, null))
facebook
}

protected def createReading(after: Date): Reading = {
// todo: reduce limit if we got a too-large-request error
new Reading().since(after).fields((defaultFields ++ fields).toArray : _*).limit(100)
}

override def fetchFacebookResponse(after: Date): ResponseList[Post] = {
protected def fetchFacebookResponse(after: Date): ResponseList[Post] = {
logDebug(s"Fetching posts for $pageId since $after")
facebook.getPosts(pageId, createReading(after))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens to the comments that were posted since after? Keep in mind that FB comments are the most useful piece of data for trend detection in fortis. These posts can have comments that carry across for a very long time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Posts and Comments are linked in the Facebook API. As such, you can get a post for date T and then look up its linked comments for it that may have been posted at a date after T.

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.github.catalystcode.fortis.spark.streaming.facebook.dto
import facebook4j.{Comment, Post}

case class FacebookPost(
pageId: String,
post: Post,
comments: Seq[Comment]
)
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version := "0.0.1"
version := "0.0.2"