In [308]:
# import pyspark modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, lower, col, regexp_replace, concat_ws, collect_list
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from nltk.stem.snowball import SnowballStemmer
from pyspark.sql.types import StructType, ArrayType, StringType

# standard modules
import re
from datetime import datetime

In [2]:
# start session
spark = SparkSession.builder.appName("SentimentAnalysis").getOrCreate()

In [189]:
# read data
df_raw = spark.read.option("header", True).parquet("/Users/christopherkindl/Downloads/df_2.parquet")

In [156]:
# remove hashtags, usertags and links
x="@peter I really love that shirt at #Macy. https://t.co/GdddIJwkse"
' '.join(re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)"," ",x).split())

'I really love that shirt at Macy'

In [205]:
# lowercase text and remove special characters 
df_raw_lw = df_raw.select("date", "station", (lower(regexp_replace('tweets', "(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", "")).alias('tweets')))
df_raw_rm = df_raw_lw.select((regexp_replace('tweets',' +', ' ').alias('tweets')), "date", "station")
#df_lower = df_raw.select("date", "station", (lower(col("tweets")).alias('tweets')))

# tokenize text
tokenizer = Tokenizer(inputCol="tweets", outputCol="tweets_token")
df_tokens = tokenizer.transform(df_res).select("tweets_token", "date", "station")

# remove stop words
remover = StopWordsRemover(inputCol="tweets_token", outputCol="tweets_sw_removed")#, stopWords=stopwordList)
df_sw = remover.transform(df_tokens).select("tweets_sw_removed", "date", "station")

# stemming
stemmer = SnowballStemmer(language='english')
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
df_stemmed = df_sw.withColumn("tweets", stemmer_udf("tweets_sw_removed")).select("tweets", "date", "station")

In [214]:
# create string from list of strings
join_udf = udf(lambda x: ",".join(x))
df_clean = df_stemmed.withColumn("tweets", join_udf(col("tweets")))

In [215]:
df_clean.show()

+--------------------+-------------------+----------------+
|              tweets|               date|         station|
+--------------------+-------------------+----------------+
|,qlondon,without,...|2021-04-27 08:30:07|      Abbey Road|
|,jumper,arthur,be...|2021-04-26 08:49:34|      Abbey Road|
|walk,wild,side,lo...|2021-04-21 17:20:23|   Acton Central|
|walk,wild,side,lo...|2021-04-21 17:20:23| Acton Main Line|
|click,link,bio,se...|2021-04-27 13:33:20|      Acton Town|
|recommend,anyon,j...|2021-04-25 16:43:15|      Acton Town|
|your,look,work,lo...|2021-04-22 13:15:30|      Acton Town|
|rt,midst,london,b...|2021-04-28 14:26:17|         Aldgate|
|long,due,catchup,...|2021-04-28 14:05:31|         Aldgate|
|im,brace,thunder,...|2021-04-28 13:00:25|         Aldgate|
|natur,nail,hello,...|2021-04-28 11:17:59|         Aldgate|
|blackbottom,rubbe...|2021-04-28 00:34:13|         Aldgate|
|rt,midst,london,b...|2021-04-28 14:26:17|    Aldgate East|
|long,due,catchup,...|2021-04-28 14:05:3

In [259]:
# group by station and join tweets together
df_tmp = df_clean.select('tweets', 'station').groupby('station').agg(concat_ws(" ", collect_list("tweets")).alias("tweets"))

In [288]:
# set up bag of words computation
bow = df_tmp.rdd\
    .filter(lambda x: x.tweets)\
    .map( lambda x: x.tweets.replace(',',' ').replace('.',' ').replace('-',' '))\
    .flatMap(lambda x: x.split())\
    .map(lambda x: (x, 1))
    
# run bag of words
bow_tmp = bow.reduceByKey(lambda x,y:x+y)

# show top 5 words    
bow_sorted = bow_tmp.takeOrdered(10,lambda a: -a[1])

In [316]:
# convert list of topics into single string
topics = ""

for index in range(len(bow_sorted)):
    if bow_sorted[index][0] != 'london':
        topics += bow_sorted[index][0]
        topics += ", "     

In [317]:
# create dataFrame
columns = ["date","topics"]
curr_date = datetime.now().strftime('%Y-%m-%d')
data = [(curr_date, topics)]
df_final = spark.createDataFrame(data).toDF(*columns)

In [320]:
df_final.write.mode("overwrite").csv("/Users/christopherkindl/Desktop/df_test.csv")