From d4db207ad10aefccc548b112316113906ef6ac7c Mon Sep 17 00:00:00 2001 From: Daniel Kershaw Date: Thu, 8 May 2014 21:06:10 +0100 Subject: [PATCH 1/2] Geolocation to twitter stream Adding option to allow geolocation streaming to the external twitter module. --- .../twitter/TwitterInputDStream.scala | 49 +++++++++++++----- .../streaming/twitter/TwitterUtils.scala | 51 +++++++++++-------- 2 files changed, 66 insertions(+), 34 deletions(-) diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala index 7bca1407116fa..c6ee7df2ffac0 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala @@ -27,6 +27,7 @@ import org.apache.spark.streaming.dstream._ import org.apache.spark.storage.StorageLevel import org.apache.spark.Logging import org.apache.spark.streaming.receiver.Receiver +import scala.language.higherKinds /* A stream of Twitter statuses, potentially filtered by one or more keywords. * @@ -39,11 +40,12 @@ import org.apache.spark.streaming.receiver.Receiver */ private[streaming] class TwitterInputDStream( - @transient ssc_ : StreamingContext, - twitterAuth: Option[Authorization], - filters: Seq[String], - storageLevel: StorageLevel - ) extends ReceiverInputDStream[Status](ssc_) { + @transient ssc_ : StreamingContext, + twitterAuth: Option[Authorization], + keywordFilters: Seq[String], + storageLevel: StorageLevel, + geoLocationFilter: Seq[Seq[Double]] + ) extends ReceiverInputDStream[Status](ssc_) { private def createOAuthAuthorization(): Authorization = { new OAuthAuthorization(new ConfigurationBuilder().build()) @@ -52,16 +54,17 @@ class TwitterInputDStream( private val authorization = twitterAuth.getOrElse(createOAuthAuthorization()) override def getReceiver(): Receiver[Status] = { - new TwitterReceiver(authorization, filters, storageLevel) + new TwitterReceiver(authorization, keywordFilters, storageLevel, geoLocationFilter) } } private[streaming] class TwitterReceiver( - twitterAuth: Authorization, - filters: Seq[String], - storageLevel: StorageLevel - ) extends Receiver[Status](storageLevel) with Logging { + twitterAuth: Authorization, + keywordFilters: Seq[String], + storageLevel: StorageLevel, + geoLocationFilter: Seq[Seq[Double]] + ) extends Receiver[Status](storageLevel) with Logging { var twitterStream: TwitterStream = _ @@ -82,10 +85,21 @@ class TwitterReceiver( }) val query = new FilterQuery - if (filters.size > 0) { - query.track(filters.toArray) + if (keywordFilters.size > 0) { + query.track(keywordFilters.toArray) + } + + if (geoLocationFilter.size > 0){ + println(arrayToString(ss2aa(geoLocationFilter))) + query.locations(ss2aa(geoLocationFilter)) + + } + + if(keywordFilters.size + geoLocationFilter.size > 0){ + println(query.toString()) twitterStream.filter(query) - } else { + } + else { twitterStream.sample() } logInfo("Twitter receiver started") @@ -95,4 +109,13 @@ class TwitterReceiver( twitterStream.shutdown() logInfo("Twitter receiver stopped") } + + def arrayToString(a: Array[Array[Double]]) : String = { + val str = for (l <- a) yield l.mkString("{", ",", "}") + str.mkString("{",",\n","}") + } + + def ss2aa[A,B[_],C[_]](c: C[B[A]])( + implicit b2seq: B[A] => Seq[A], c2seq: C[B[A]] => Seq[B[A]], ma: ClassManifest[A] + ) = c2seq(c).map(b => b2seq(b).toArray).toArray } diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala index c6a9a2b73714f..d5ff7ab5fed13 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala @@ -36,14 +36,23 @@ object TwitterUtils { * @param storageLevel Storage level to use for storing the received objects */ def createStream( - ssc: StreamingContext, - twitterAuth: Option[Authorization], - filters: Seq[String] = Nil, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[Status] = { - new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) + ssc: StreamingContext, + twitterAuth: Option[Authorization], + filters: Seq[String] = Nil, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): ReceiverInputDStream[Status] = { + new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel, Nil) } + def createGeoStream( + ssc: StreamingContext, + twitterAuth: Option[Authorization], + filters: Seq[String] = Nil, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, + geofileter: Seq[Seq[Double]] = Nil + ):ReceiverInputDStream[Status] = { + new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel, geofileter) + } /** * Create a input stream that returns tweets received from Twitter using Twitter4J's default * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, @@ -66,7 +75,7 @@ object TwitterUtils { * @param filters Set of filter strings to get only those tweets that match them */ def createStream(jssc: JavaStreamingContext, filters: Array[String] - ): JavaReceiverInputDStream[Status] = { + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, None, filters) } @@ -80,10 +89,10 @@ object TwitterUtils { * @param storageLevel Storage level to use for storing the received objects */ def createStream( - jssc: JavaStreamingContext, - filters: Array[String], - storageLevel: StorageLevel - ): JavaReceiverInputDStream[Status] = { + jssc: JavaStreamingContext, + filters: Array[String], + storageLevel: StorageLevel + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, None, filters, storageLevel) } @@ -94,7 +103,7 @@ object TwitterUtils { * @param twitterAuth Twitter4J Authorization */ def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization - ): JavaReceiverInputDStream[Status] = { + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, Some(twitterAuth)) } @@ -106,10 +115,10 @@ object TwitterUtils { * @param filters Set of filter strings to get only those tweets that match them */ def createStream( - jssc: JavaStreamingContext, - twitterAuth: Authorization, - filters: Array[String] - ): JavaReceiverInputDStream[Status] = { + jssc: JavaStreamingContext, + twitterAuth: Authorization, + filters: Array[String] + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, Some(twitterAuth), filters) } @@ -121,11 +130,11 @@ object TwitterUtils { * @param storageLevel Storage level to use for storing the received objects */ def createStream( - jssc: JavaStreamingContext, - twitterAuth: Authorization, - filters: Array[String], - storageLevel: StorageLevel - ): JavaReceiverInputDStream[Status] = { + jssc: JavaStreamingContext, + twitterAuth: Authorization, + filters: Array[String], + storageLevel: StorageLevel + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel) } } From eaf91192e2b4444be179a1f6bf9790c3b494c06c Mon Sep 17 00:00:00 2001 From: Daniel Kershaw Date: Fri, 22 Aug 2014 14:47:20 +0100 Subject: [PATCH 2/2] Changes to the indentation --- .../twitter/TwitterInputDStream.scala | 25 ++++----- .../streaming/twitter/TwitterUtils.scala | 56 +++++++++---------- 2 files changed, 40 insertions(+), 41 deletions(-) diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala index c6ee7df2ffac0..51aface07d5d5 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala @@ -40,12 +40,12 @@ import scala.language.higherKinds */ private[streaming] class TwitterInputDStream( - @transient ssc_ : StreamingContext, - twitterAuth: Option[Authorization], - keywordFilters: Seq[String], - storageLevel: StorageLevel, - geoLocationFilter: Seq[Seq[Double]] - ) extends ReceiverInputDStream[Status](ssc_) { + @transient ssc_ : StreamingContext, + twitterAuth: Option[Authorization], + keywordFilters: Seq[String], + storageLevel: StorageLevel, + geoLocationFilter: Seq[Seq[Double]] + ) extends ReceiverInputDStream[Status](ssc_) { private def createOAuthAuthorization(): Authorization = { new OAuthAuthorization(new ConfigurationBuilder().build()) @@ -60,11 +60,11 @@ class TwitterInputDStream( private[streaming] class TwitterReceiver( - twitterAuth: Authorization, - keywordFilters: Seq[String], - storageLevel: StorageLevel, - geoLocationFilter: Seq[Seq[Double]] - ) extends Receiver[Status](storageLevel) with Logging { + twitterAuth: Authorization, + keywordFilters: Seq[String], + storageLevel: StorageLevel, + geoLocationFilter: Seq[Seq[Double]] + ) extends Receiver[Status](storageLevel) with Logging { var twitterStream: TwitterStream = _ @@ -98,8 +98,7 @@ class TwitterReceiver( if(keywordFilters.size + geoLocationFilter.size > 0){ println(query.toString()) twitterStream.filter(query) - } - else { + } else { twitterStream.sample() } logInfo("Twitter receiver started") diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala index d5ff7ab5fed13..6b46cc73b4c24 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala @@ -36,21 +36,21 @@ object TwitterUtils { * @param storageLevel Storage level to use for storing the received objects */ def createStream( - ssc: StreamingContext, - twitterAuth: Option[Authorization], - filters: Seq[String] = Nil, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[Status] = { + ssc: StreamingContext, + twitterAuth: Option[Authorization], + filters: Seq[String] = Nil, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): ReceiverInputDStream[Status] = { new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel, Nil) } def createGeoStream( - ssc: StreamingContext, - twitterAuth: Option[Authorization], - filters: Seq[String] = Nil, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, - geofileter: Seq[Seq[Double]] = Nil - ):ReceiverInputDStream[Status] = { + ssc: StreamingContext, + twitterAuth: Option[Authorization], + filters: Seq[String] = Nil, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, + geofileter: Seq[Seq[Double]] = Nil + ):ReceiverInputDStream[Status] = { new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel, geofileter) } /** @@ -75,7 +75,7 @@ object TwitterUtils { * @param filters Set of filter strings to get only those tweets that match them */ def createStream(jssc: JavaStreamingContext, filters: Array[String] - ): JavaReceiverInputDStream[Status] = { + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, None, filters) } @@ -88,12 +88,12 @@ object TwitterUtils { * @param filters Set of filter strings to get only those tweets that match them * @param storageLevel Storage level to use for storing the received objects */ - def createStream( - jssc: JavaStreamingContext, - filters: Array[String], - storageLevel: StorageLevel - ): JavaReceiverInputDStream[Status] = { - createStream(jssc.ssc, None, filters, storageLevel) + def createStream( + jssc: JavaStreamingContext, + filters: Array[String], + storageLevel: StorageLevel + ): JavaReceiverInputDStream[Status] = { + createStream(jssc.ssc, None, filters, storageLevel) } /** @@ -103,7 +103,7 @@ object TwitterUtils { * @param twitterAuth Twitter4J Authorization */ def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization - ): JavaReceiverInputDStream[Status] = { + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, Some(twitterAuth)) } @@ -115,10 +115,10 @@ object TwitterUtils { * @param filters Set of filter strings to get only those tweets that match them */ def createStream( - jssc: JavaStreamingContext, - twitterAuth: Authorization, - filters: Array[String] - ): JavaReceiverInputDStream[Status] = { + jssc: JavaStreamingContext, + twitterAuth: Authorization, + filters: Array[String] + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, Some(twitterAuth), filters) } @@ -130,11 +130,11 @@ object TwitterUtils { * @param storageLevel Storage level to use for storing the received objects */ def createStream( - jssc: JavaStreamingContext, - twitterAuth: Authorization, - filters: Array[String], - storageLevel: StorageLevel - ): JavaReceiverInputDStream[Status] = { + jssc: JavaStreamingContext, + twitterAuth: Authorization, + filters: Array[String], + storageLevel: StorageLevel + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel) } }