In [8]:
%pip freeze

anyio==3.5.0
appnope==0.1.2
argon2-cffi==21.3.0
argon2-cffi-bindings==21.2.0
asttokens==2.0.5
attrs==21.4.0
Babel==2.9.1
backcall==0.2.0
black==21.12b0
bleach==4.1.0
certifi==2021.10.8
cffi==1.15.0
charset-normalizer==2.0.10
click==8.0.3
cycler==0.11.0
debugpy==1.5.1
decorator==5.1.1
defusedxml==0.7.1
distlib==0.3.4
entrypoints==0.3
executing==0.8.2
filelock==3.4.2
fonttools==4.28.5
idna==3.3
ipykernel==6.7.0
ipython==8.0.0
ipython-genutils==0.2.0
jedi==0.18.1
Jinja2==3.0.3
joblib==1.1.0
json5==0.9.6
jsonschema==4.4.0
jupyter-client==7.1.1
jupyter-core==4.9.1
jupyter-server==1.13.3
jupyterlab==3.2.8
jupyterlab-pygments==0.1.2
jupyterlab-server==2.10.3
kiwisolver==1.3.2
MarkupSafe==2.0.1
matplotlib==3.5.1
matplotlib-inline==0.1.3
mistune==0.8.4
mypy-extensions==0.4.3
nbclassic==0.3.5
nbclient==0.5.10
nbconvert==6.4.0
nbformat==5.1.3
nest-asyncio==1.5.4
notebook==6.4.7
numpy==1.22.1
packaging==21.3
pandas==1.3.5
pandocfilters==1.5.0
parso==0.8.3
pathspec==0.9.0
pexpect==4.8.0
pickleshare

In [1]:
import re

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, FloatType
from pyspark.sql import functions as F
from pyspark.sql.functions import *

In [2]:
import os
import sys
from textblob import TextBlob

TRESHOLD = 0.3

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

ModuleNotFoundError: No module named 'textblob'

In [None]:
# Create a function to get the polarity
def getPolarity(tweet: str) -> float:
    return TextBlob(tweet).sentiment.polarity

# Create a function to get sentimental category
def getSentiment(polarityValue: int) -> str:
    if polarityValue < (-TRESHOLD):
        return 'Negative'
    elif polarityValue > TRESHOLD:
        return 'Positive'
    else:
        return "Neutral"

In [None]:
# Clean the tweet
def clean_tweet(tweet):
    r = tweet.lower()
    r = re.sub("'", "", r) # This is to avoid removing contractions in english
    r = re.sub("@[A-Za-z0-9_]+","", r)
    r = re.sub("#[A-Za-z0-9_]+","", r)
    r = re.sub(r'http\S+', '', r)
    r = re.sub('[()!?]', ' ', r)
    r = re.sub('\[.*?\]',' ', r)
    r = re.sub("[^a-z0-9]"," ", r)
    r = r.split()
    stopwords = ["for", "on", "an", "a", "of", "and", "in", "the", "to", "from"]
    r = [w for w in r if not w in stopwords]
    r = " ".join(word for word in r)
    return r


In [None]:
spark = SparkSession\
        .builder\
        .appName("TwitterSentimentAnalysis")\
        .master("local[*]")\
        .getOrCreate()

In [None]:
spark.sparkContext.setLogLevel('ERROR')

In [None]:
df = spark.readStream\
        .format("socket")\
        .option("host", "127.0.0.1")\
        .option("port", 3333)\
        .load()

In [None]:
df.printSchema()

In [None]:
tweet_schema = StructType().add("ID", "string").add("text", "string").add("created_at", "string")

In [None]:
values = df.select(from_json(df.value.cast("string"), tweet_schema).alias("tweet"))

In [None]:
values.printSchema()

In [None]:
df1 = values.select("tweet.*")

# Clean tweet
clean_tweets = F.udf(clean_tweet, StringType())
raw_tweets = df1.withColumn('processed_text', clean_tweets(col("text")))

    # Classify all tweet by sentiment treshhold
polarity = F.udf(getPolarity, FloatType())
sentiment = F.udf(getSentiment, StringType())
polarity_tweets = raw_tweets.withColumn("polarity", polarity(col("processed_text")))
sentiment_tweets = polarity_tweets.withColumn("sentiment", sentiment(col("polarity")))

In [None]:
sentiment_tweets.printSchema()

In [None]:
window_tweets = sentiment_tweets.select("*")\
    .groupby(window(sentiment_tweets.created_at, "60 seconds"), sentiment_tweets.sentiment) \
    .agg(count("*").alias("numEvents"))

In [None]:
    writeTweet = window_tweets.writeStream. \
    outputMode("complete"). \
    format("console"). \
    queryName("tweetquery"). \
    start()

    writeTweet.awaitTermination()

In [None]:
spark.stop()