Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,20 @@ export FACEBOOK_AUTH_TOKEN="..."
sbt assembly

# run locally
java -cp target/scala-2.11/streaming-facebook-assembly-0.0.1.jar FacebookDemo standalone
java -cp target/scala-2.11/streaming-facebook-assembly-0.0.3.jar FacebookDemo standalone

# run on spark
spark-submit --class FacebookDemo --master local[2] target/scala-2.11/streaming-facebook-assembly-0.0.1.jar spark
spark-submit --class FacebookDemo --master local[2] target/scala-2.11/streaming-facebook-assembly-0.0.3.jar spark
```

## How does it work? ##

Facebook doesn't expose a firehose API so we resort to polling. The FacebookReceiver pings the Facebook API every few
seconds and pushes any new posts into Spark Streaming for further processing.

Currently, the following ways to read images are supported:
Currently, the following ways to read Facebook items are supported:
- by page ([sample data](https://www.facebook.com/pg/aljazeera/posts/))
- comments for page ([sample data](https://www.facebook.com/forbes/posts/10155598401707509?comment_id=10155598513972509))

## Release process ##

Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/FacebookDemoSpark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ class FacebookDemoSpark(pageIds: Set[String], auth: FacebookAuth) {
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))

FacebookUtils.createPageStreams(ssc, auth, pageIds).map(x => s"Post: ${x.post.getPermalinkUrl}").print()
val posts = FacebookUtils.createPageStreams(ssc, auth, pageIds).map(x => s"Post: ${x.post.getPermalinkUrl}")
val comments = FacebookUtils.createCommentsStreams(ssc, auth, pageIds).map(x => s"Comment: ${x.comment.getId}")

(posts union comments).print

// run forever
ssc.start()
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/FacebookDemoStandalone.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.github.catalystcode.fortis.spark.streaming.facebook.client.FacebookPa
class FacebookDemoStandalone(pageId: String, auth: FacebookAuth) {
def run(): Unit = {
val date = Some(new Date(new Date().getTime - 3600000 /* 1 hour */))
println(new FacebookPageClient(pageId, auth, Set("place", "comments", "message")).loadNewFacebooks(date).toList)
println(new FacebookPageClient(pageId, auth, Set("place", "comments", "message")).loadNewFacebookPosts(date).toList)
println(new FacebookPageClient(pageId, auth, Set("place", "comments", "message")).loadNewFacebookComments(date).toList)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.github.catalystcode.fortis.spark.streaming.facebook

import java.util.Date

import com.github.catalystcode.fortis.spark.streaming.facebook.client.FacebookPageClient
import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookComment
import com.github.catalystcode.fortis.spark.streaming.{PollingReceiver, PollingSchedule}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

private class FacebookCommentsReceiver(
clients: Set[FacebookPageClient],
pollingSchedule: PollingSchedule,
storageLevel: StorageLevel,
pollingWorkers: Int
) extends PollingReceiver[FacebookComment](pollingSchedule, pollingWorkers, storageLevel) with Logger {

@volatile private var lastIngestedDate: Option[Date] = None

override protected def poll(): Unit = {
clients.par.foreach(_
.loadNewFacebookComments(lastIngestedDate)
.filter(x => {
logDebug(s"Got comment with id ${x.comment.getId} from page ${x.pageId}")
isNew(x)
})
.foreach(x => {
logInfo(s"Storing comment ${x.comment.getId} from page ${x.pageId}")
store(x)
markStored(x)
})
)
}

private def isNew(item: FacebookComment) = {
lastIngestedDate.isEmpty || item.comment.getCreatedTime.after(lastIngestedDate.get)
}

private def markStored(item: FacebookComment): Unit = {
if (isNew(item)) {
lastIngestedDate = Some(item.comment.getCreatedTime)
logDebug(s"Updating last ingested date to ${lastIngestedDate.get}")
}
}
}

class FacebookCommentsInputDStream(
ssc: StreamingContext,
clients: Set[FacebookPageClient],
pollingSchedule: PollingSchedule,
pollingWorkers: Int,
storageLevel: StorageLevel
) extends ReceiverInputDStream[FacebookComment](ssc) {

override def getReceiver(): Receiver[FacebookComment] = {
logDebug("Creating facebook receiver")
new FacebookCommentsReceiver(clients, pollingSchedule, storageLevel, pollingWorkers)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

private class FacebookReceiver(
private class FacebookPostReceiver(
clients: Set[FacebookPageClient],
pollingSchedule: PollingSchedule,
storageLevel: StorageLevel,
Expand All @@ -20,10 +20,10 @@ private class FacebookReceiver(
@volatile private var lastIngestedDate: Option[Date] = None

override protected def poll(): Unit = {
clients.foreach(_
.loadNewFacebooks(lastIngestedDate)
clients.par.foreach(_
.loadNewFacebookPosts(lastIngestedDate)
.filter(x => {
logDebug(s"Got facebook ${x.post.getPermalinkUrl} from page ${x.pageId} time ${x.post.getCreatedTime} with ${x.comments.size} comments")
logDebug(s"Got facebook ${x.post.getPermalinkUrl} from page ${x.pageId} time ${x.post.getCreatedTime}")
isNew(x)
})
.foreach(x => {
Expand All @@ -46,7 +46,7 @@ private class FacebookReceiver(
}
}

class FacebookInputDStream(
class FacebookPostInputDStream(
ssc: StreamingContext,
clients: Set[FacebookPageClient],
pollingSchedule: PollingSchedule,
Expand All @@ -56,6 +56,6 @@ class FacebookInputDStream(

override def getReceiver(): Receiver[FacebookPost] = {
logDebug("Creating facebook receiver")
new FacebookReceiver(clients, pollingSchedule, storageLevel, pollingWorkers)
new FacebookPostReceiver(clients, pollingSchedule, storageLevel, pollingWorkers)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,28 @@ import java.util.concurrent.TimeUnit

import com.github.catalystcode.fortis.spark.streaming.PollingSchedule
import com.github.catalystcode.fortis.spark.streaming.facebook.client.FacebookPageClient
import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookPost
import com.github.catalystcode.fortis.spark.streaming.facebook.dto.{FacebookComment, FacebookPost}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream

object FacebookUtils {
private val DefaultPageFields = Set("message", "place", "caption", "from", "name")
private val DefaultCommentsFields = Set("comments")
private val DefaultPollingSchedule = PollingSchedule(30, TimeUnit.SECONDS)
private val DefaultPollingWorkers = 1
private val DefaultStorageLevel = StorageLevel.MEMORY_ONLY

def createPageStreams(
ssc: StreamingContext,
auth: FacebookAuth,
pageIds: Set[String],
fields: Set[String] = Set("message", "place", "caption", "from", "name", "comments"),
pollingSchedule: PollingSchedule = PollingSchedule(30, TimeUnit.SECONDS),
pollingWorkers: Int = 1,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
fields: Set[String] = DefaultPageFields,
pollingSchedule: PollingSchedule = DefaultPollingSchedule,
pollingWorkers: Int = DefaultPollingWorkers,
storageLevel: StorageLevel = DefaultStorageLevel
): ReceiverInputDStream[FacebookPost] = {
new FacebookInputDStream(
new FacebookPostInputDStream(
ssc = ssc,
clients = pageIds.map(pageId => new FacebookPageClient(
pageId = pageId,
Expand All @@ -34,11 +40,31 @@ object FacebookUtils {
ssc: StreamingContext,
auth: FacebookAuth,
pageId: String,
fields: Set[String] = Set("message", "place", "caption", "from", "name", "comments"),
pollingSchedule: PollingSchedule = PollingSchedule(30, TimeUnit.SECONDS),
pollingWorkers: Int = 1,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
fields: Set[String] = DefaultPageFields,
pollingSchedule: PollingSchedule = DefaultPollingSchedule,
pollingWorkers: Int = DefaultPollingWorkers,
storageLevel: StorageLevel = DefaultStorageLevel
): ReceiverInputDStream[FacebookPost] = {
createPageStreams(ssc, auth, Set(pageId), fields, pollingSchedule, pollingWorkers, storageLevel)
}

def createCommentsStreams(
ssc: StreamingContext,
auth: FacebookAuth,
pageIds: Set[String],
fields: Set[String] = DefaultCommentsFields,
pollingSchedule: PollingSchedule = DefaultPollingSchedule,
pollingWorkers: Int = DefaultPollingWorkers,
storageLevel: StorageLevel = DefaultStorageLevel
): ReceiverInputDStream[FacebookComment] = {
new FacebookCommentsInputDStream(
ssc = ssc,
clients = pageIds.map(pageId => new FacebookPageClient(
pageId = pageId,
auth = auth,
fields = fields)),
pollingSchedule = pollingSchedule,
pollingWorkers = pollingWorkers,
storageLevel = storageLevel)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.github.catalystcode.fortis.spark.streaming.facebook.client
import java.util
import java.util.Date

import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookPost
import com.github.catalystcode.fortis.spark.streaming.facebook.dto.{FacebookComment, FacebookPost}
import com.github.catalystcode.fortis.spark.streaming.facebook.{FacebookAuth, Logger}
import facebook4j._
import facebook4j.auth.AccessToken
Expand All @@ -22,14 +22,14 @@ extends Serializable with Logger {
@transient private lazy val defaultLookback = new Date(new Date().getTime - 2 * 3600000 /* two hours */)
@transient private lazy val defaultFields = Set("permalink_url", "created_time")

def loadNewFacebooks(after: Option[Date] = None): Iterable[FacebookPost] = {
def loadNewFacebookPosts(after: Option[Date] = None): Iterable[FacebookPost] = {
val allPosts = ListBuffer[FacebookPost]()

try {
var posts = fetchFacebookResponse(after.getOrElse(defaultLookback))
while (posts != null && posts.getPaging != null) {
posts.asScala.foreach(post => {
allPosts += FacebookPost(pageId, post, fetchComments(post))
allPosts += FacebookPost(pageId, post)
})
logDebug(s"Got another page: ${Option(posts.getPaging.getNext).getOrElse("no")}")
posts = facebook.fetchNext(posts.getPaging)
Expand All @@ -42,6 +42,28 @@ extends Serializable with Logger {
allPosts
}

def loadNewFacebookComments(after: Option[Date] = None): Iterable[FacebookComment] = {
val allComments = ListBuffer[FacebookComment]()

try {
var posts = fetchFacebookResponse(after.getOrElse(defaultLookback))
while (posts != null && posts.getPaging != null) {
posts.asScala.foreach(post => {
fetchComments(post).foreach(comment => {
allComments += FacebookComment(pageId, post.getId, comment)
})
})
logDebug(s"Got another page: ${Option(posts.getPaging.getNext).getOrElse("no")}")
posts = facebook.fetchNext(posts.getPaging)
}
} catch {
case fbex: FacebookException =>
logError("Problem fetching response from Facebook", fbex)
}

allComments
}

private def fetchComments(post: Post): Seq[Comment] = {
val allComments = new util.ArrayList[Comment]()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.github.catalystcode.fortis.spark.streaming.facebook.dto

import facebook4j.Comment

case class FacebookComment(
pageId: String,
postId: String,
comment: Comment
)
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package com.github.catalystcode.fortis.spark.streaming.facebook.dto

import facebook4j.{Comment, Post}
import facebook4j.Post

case class FacebookPost(
pageId: String,
post: Post,
comments: Seq[Comment]
post: Post
)
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version := "0.0.2"
version := "0.0.3"