In [1]:
import findspark
findspark.init()

In [2]:
import json, requests, sys
from nltk.corpus import stopwords
from operator import add
from pyspark import SparkContext
from pyspark import StorageLevel
from pyspark.streaming import StreamingContext
from textblob import TextBlob

In [3]:
# text classification
def getSentiment(text):
    sent = TextBlob(text).sentiment.polarity
    neutral_threshold = 0.05
    
    if sent >= neutral_threshold:
        return (1, 0, 0) # positive
    elif sent > neutral_threshold:
        return (0, 1, 0) # neutral
    else:
        return (0, 0, 1) # negative

In [4]:
def getTweetsCounter(dstream_tweets_sentiment_analysed, window_length, sliding_interval):

    tweets_to_count = dstream_tweets_sentiment_analysed. \
        map(lambda x: ('count', (1, x[2])))

    tweets_count_acc_sent = tweets_to_count. \
        reduceByKeyAndWindow(lambda x, y: (x[0] + y[0], (x[1][0] + y[1][0], x[1][1] + y[1][1], x[1][2] + y[1][2])), None,
                             window_length, sliding_interval)

    total_count = tweets_count_acc_sent. \
        map(lambda x: x[1])
    
    total_count.pprint()
    return total_count
    
def sendTweetsCounter(sentiments, url):
    def takeAndSend(time, rdd):
        if not rdd.isEmpty():
            (name, (total, (pos, neutral, neg))) = rdd.first()

            json_data = {'positive': pos, 'neutral': neutral, 'negative': neg, 'total': total}
            #print(json_data)

            response = requests.post(url, data=json_data)

    sentiments.foreachRDD(takeAndSend)

In [5]:
def getTweets(kvs):
    tweets_text = kvs.map(lambda x: json.loads(x)) \
                .map(lambda json_object: (json_object["user"]["screen_name"], json_object["text"], json_object["user"]["followers_count"], json_object["id"])) \
                .transform(lambda rdd: rdd.sortBy(lambda x: x[2], ascending = False))
    
    tweets_text.pprint()
    return tweets_text
    
def sendTweets(tweets, url):
    def takeAndSend(time, rdd):
        if not rdd.isEmpty():
            tweets_data = rdd.take(10)

            users = []
            texts = []
            tweet_ids = []

            for (user, text, follower_count, tweet_id) in tweets_data:
                users.append(user)
                texts.append(text)
                tweet_ids.append(tweet_id)

            json_data = {'user': str(users), 'text': str(texts), 'id': str(tweet_ids)}
            print(json_data)

            response = requests.post(url, data=json_data)

    tweets.foreachRDD(takeAndSend)

In [6]:
def getTopWords(tweets, window_length, sliding_interval):
    lines = tweets.flatMap(lambda line: line[1].split(" "))

    ## This part does the word count
    sw = stopwords.words('english')
    sw.extend(['rt', 'https', 'http', 'coronavirus', 'covid19', 'covid-19'])
    
    counts = lines.map(lambda word: word.strip().lower()) \
                  .filter(lambda word: word not in sw) \
                  .filter(lambda word: len(word) >= 2 and word[0] != '#' and word[0] != '@') \
                  .map(lambda word: (word, 1)) \
                  .reduceByKeyAndWindow(add, None,  window_length, sliding_interval)\
                  .transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending = False))
    
    counts.pprint()
    return counts
    
def sendTopWords(counts, url):
    def takeAndSend(time, rdd):
        if not rdd.isEmpty():
            word_counts = rdd.take(10)

            words = []
            values = []

            for (word, count) in word_counts:
                words.append(word)
                values.append(count)

            json_data = {'words': str(words), 'counts': str(values)}
            print(json_data)

            response = requests.post(url, data=json_data)

    counts.foreachRDD(takeAndSend)

In [7]:
def getTopHashTags(dstream_tweets_sentiment_analysed, window_length, sliding_interval):
    hashtags = dstream_tweets_sentiment_analysed.\
            map(lambda x: ((x[0], x[2]), x[1])).\
            flatMapValues(lambda text: text.split(" ")).\
            filter(lambda x: len(x[1]) > 1 and x[1][0] == '#'). \
            map(lambda x: (x[1], (1, x[0][1])))
    
    hashtags_count_acc_sent = hashtags. \
        reduceByKeyAndWindow(lambda x, y: (x[0] + y[0], (x[1][0] + y[1][0], x[1][1] + y[1][1], x[1][2] + y[1][2])), None,
                             window_length, sliding_interval)
    
    sorted_hashtags_count = hashtags_count_acc_sent. \
        map(lambda x: (x[1][0], (x[0], x[1][1]))). \
        transform(lambda rdd: rdd.sortByKey(False)). \
        map(lambda x: (x[1][0], (x[0], x[1][1])))

    
    sorted_hashtags_count.pprint()
    return sorted_hashtags_count

In [None]:
def send_top_to_dashboard(dstream_tweets_sentiment_analysed, url):

    num = 10

    def take_and_send(time, rdd):
        if not rdd.isEmpty():
            taken = rdd.take(num)

            labels = []
            negative = []
            neutral = []
            positive = []
            for (name, (count, (pos, neu, neg))) in taken:
                labels.append(name)
                negative.append(neg)
                neutral.append(neu)
                positive.append(pos)

            request_data = {'label': str(labels), 'negative': str(negative), 'neutral': str(neutral), 'positive': str(positive)}
            response = requests.post(url, data=request_data)

    dstream_tweets_sentiment_analysed.foreachRDD(take_and_send)


In [19]:
def getGeoData(kvs, window_length, sliding_interval):
    #geo_data =  kvs.window(window_length, sliding_interval)\
    geo_data=           kvs.map(lambda x: json.loads(x)) \
                .map(lambda json_object: (json_object["user"]["screen_name"], json_object["coordinates"])) \
                .map(lambda kv: (kv[1]['coordinates'][0], kv[1]['coordinates'][1]))
    
    geo_data.pprint()
    return geo_data
    
def sendGeoData(geo_data,url):
    geodata=geo_data.collect()
    longitudes = []
    latitudes = []

    for geotweet in geodata:
        longitudes.append(geotweet[0])
        latitudes.append(geotweet[1])
    
    json_data = {'longitude': str(longitudes), 'latitude': str(latitudes)}
    response = requests.post(url, data=json_data)

In [10]:
sc = SparkContext(appName="tweetStream")
# Create a local StreamingContext with batch interval of 2 second
batch_interval = 2
window_length = 15*60
sliding_interval = 6

ssc = StreamingContext(sc, batch_interval)
ssc.checkpoint("twittercheckpt")

# Create a DStream that conencts to hostname:port
tweetStream = ssc.socketTextStream("0.0.0.0", 5555)

In [11]:
tweets = tweetStream. \
        map(lambda  x: json.loads(x)). \
        map(lambda json_object: (json_object["user"]["screen_name"], json_object["text"]))

tweets_sentiment_analysed = tweets. \
        map(lambda x: (x[0], x[1], getSentiment(x[1])))

In [12]:
tweets_sentiment_analysed.persist(StorageLevel.MEMORY_AND_DISK)

<pyspark.streaming.dstream.TransformedDStream at 0x7fd4f0dc7f40>

In [None]:
server = 'http://localhost:5000/'

tweet_counters = getTweetsCounter(tweets_sentiment_analysed, window_length, sliding_interval)
sendTweetsCounter(tweet_counters,  server +'update_sentiments')

tweet_text= getTweets(tweetStream)
sendTweets(tweet_text, server + 'update_tweets')

key_words=getTopWords(tweets, window_length, sliding_interval)
sendTopWords(key_words, server + 'update_counts')

hashtag=getTopHashTags(tweets_sentiment_analysed, window_length, sliding_interval)
send_top_to_dashboard(hashtag,  server + 'update_hashtagcounts')

geo_data= getGeoData(tweetStream, window_length, sliding_interval)
sendGeoData(geo_data, server + 'update_geodata')

In [20]:
getGeoData(tweetStream, window_length, sliding_interval)

Py4JJavaError: An error occurred while calling None.org.apache.spark.streaming.api.python.PythonTransformedDStream.
: java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported
	at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:226)
	at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:67)
	at org.apache.spark.streaming.api.python.PythonDStream.<init>(PythonDStream.scala:224)
	at org.apache.spark.streaming.api.python.PythonTransformedDStream.<init>(PythonDStream.scala:241)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [12]:
getTopHashTags(tweets_sentiment_analysed, window_length, sliding_interval)



<pyspark.streaming.dstream.TransformedDStream at 0x7f8bb7ed51c0>

In [13]:
getTopMentioned(tweets_sentiment_analysed, window_length, sliding_interval)

<pyspark.streaming.dstream.TransformedDStream at 0x7f8bb7ed5d30>

In [14]:
# Start computing
ssc.start()        
# Wait for termination
ssc.awaitTermination()
ssc.stop(stopGraceFully = True)

Py4JJavaError: An error occurred while calling o23.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/local/opt/apache-spark/libexec/python/pyspark/streaming/util.py", line 68, in call
    r = self.func(t, *rdds)
  File "/usr/local/opt/apache-spark/libexec/python/pyspark/streaming/dstream.py", line 173, in takeAndPrint
    taken = rdd.take(num + 1)
  File "/usr/local/opt/apache-spark/libexec/python/pyspark/rdd.py", line 1446, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "/usr/local/opt/apache-spark/libexec/python/pyspark/context.py", line 1118, in runJob
    sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, xiaoyuans-mbp, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/opt/apache-spark/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/usr/local/opt/apache-spark/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/opt/apache-spark/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-9-c172036fca7d>", line 5, in <lambda>
TypeError: 'NoneType' object is not subscriptable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:295)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:607)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:154)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/opt/apache-spark/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/usr/local/opt/apache-spark/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/opt/apache-spark/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-9-c172036fca7d>", line 5, in <lambda>
TypeError: 'NoneType' object is not subscriptable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:295)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:607)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)


	at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
	at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
	at org.apache.spark.streaming.api.python.PythonDStream$.$anonfun$callForeachRDD$1(PythonDStream.scala:179)
	at org.apache.spark.streaming.api.python.PythonDStream$.$anonfun$callForeachRDD$1$adapted(PythonDStream.scala:179)
	at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
	at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [5]:
server = 'http://localhost:5000/'
geodata_path='/Users/shawvin/Desktop/Big data project/geo_tweets.txt'

sendGeoData(geodata_path, server + 'update_geodata')
sendTweetsFromStream(kvs, server + 'update_tweets')
sendTopHashtagsFromStream(kvs, server + 'update_hashtagcounts')
sendTopWordsFromStream(kvs, server + 'update_counts')
sendTweetSentimentsFromStream(kvs, server + 'update_sentiments')