In [1]:
import re
import time
from collections import namedtuple
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from textblob import TextBlob

# text classification
def polarity_detection(text):
    return TextBlob(text).sentiment.polarity


def preprocess(line):
    line = line.lower()
    line = re.sub('@\w+', '', line) # remove username
    line = re.sub('#', '', line) # remove hastags
    line = re.sub(r'http\S+', '', line) # remove links
    line = re.sub('RT', '', line) # remove RT marker
    line = re.sub(':', '', line)
    return line 


def mapper(line):
    processed = preprocess(line)
    sentiment = polarity_detection(line)
    return (processed, sentiment)

In [2]:
sc = SparkContext("local[2]", "AppName")
ssc = StreamingContext(sc, 10)
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

fields = ('text', 'sentiment')
Tweet = namedtuple('Tweet', fields)

lines = ssc.socketTextStream("localhost", 5555)
pairs = lines.map(lambda line: mapper(line))
pairs_count = pairs.reduceByKey(lambda x, y: x + y)
pairs_count_rdds = pairs_count.map(lambda rec: Tweet( rec[0], rec[1]))
pairs_count_rdds.foreachRDD(lambda rdd: rdd.toDF().registerTempTable('Tweets'))

# Print the first ten elements of each RDD generated in this DStream to the console
pairs_count_rdds.pprint()

In [3]:
ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

-------------------------------------------
Time: 2021-04-27 23:31:10
-------------------------------------------
Tweet(text='dino dinotoken crypto cryptogemjust cashed out some crypto earnings 💅🏻rt  things move faster than the speed of light in crypto adoption....rt  crypto daily news from  ', sentiment=0.2)
Tweet(text='', sentiment=0.0)
Tweet(text='safemoon  has listed on zbg ', sentiment=0.0)
Tweet(text='🌋 somewhere deep in the land of crypto, in the fires of eth and bsc, the 1inch labs team forged a master ios wal…rt  thank you for 10k followers! ', sentiment=0.0)
Tweet(text='lets celebrate reaching 10k followers on twitter ', sentiment=0.0)
Tweet(text='we thank you all for your support 💚', sentiment=0.0)
Tweet(text='moo/susdt will be available on cointiger defi zone soon ', sentiment=1.2000000000000002)
Tweet(text='contract address', sentiment=0.0)
Tweet(text="  yolo bsc coin hit's $1 million dollar market cap, 1500 holders, ama news and it's only day 9!!!!  vi…   ", sentiment=0.0

KeyboardInterrupt: 

In [6]:
from IPython import display
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline # Only works for Jupyter Notebooks!

''' TODO: Visualisation
count = 0
while count < 2:
    time.sleep(3)
    tweets_collected = sqlContext.sql('SELECT text, sentiment FROM Tweets')
    tweets_collected_df = tweets_collected.toPandas()
    
    display.clear_output(wait=True)
    # plt.figure(figsize = (10, 8))
    # sns.barplot(x="text", y="sentiment", data=tweets_collected_df)
    # plt.show()
    tweets_collected_df.head(10)
    count = count + 1
'''

UsageError: unrecognized arguments: # Only works for Jupyter Notebooks!
