###### IMPORTING AND CONNECTION

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col,trim
from pyspark import SparkContext
import pyspark.sql.functions as f
from pyspark.sql.functions import *
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
import subprocess

spark = (SparkSession.builder.appName("ddam-8-step-1").config('spark.some.config.opttion','some-value').getOrCreate())

## Function defnitions

In [2]:
def clean_text(df, colname):
    # remove hashtags
    df = df.withColumn(colname, regexp_replace(col(colname), 'https?:[^ ]*( |$)', ''))
    # remove url's
    df = df.withColumn(colname, regexp_replace(col(colname), '#[^ ]*( |$)', ''))
    # change punctuation to delimeters
    df = df.withColumn(colname, regexp_replace(col(colname), "[.,;:!?-]+", ' '))
    # remove any other special chars and digits
    df = df.withColumn(colname, regexp_replace(col(colname), "[^A-Za-z ]", ''))
    # collapse multiple whitechars
    df = df.withColumn(colname, regexp_replace(col(colname), ' +', ' '))
    # remove trailing spaces
    df = df.withColumn(colname, regexp_replace(col(colname), ' $', ''))
    
    return df

In [3]:
#function to save spark dataframes
def save(df, filename):
    some_path = f"hdfs://kddrtserver12.isti.cnr.it:9000/user/hpsa06/{filename}"
    subprocess.call(["hadoop", "fs", "-rm", "-r", some_path])
    df.write.csv(some_path, header = True)

# ANALYSIS DATAFRAME

In [4]:
df_tr = spark.read.csv("hdfs://kddrtserver12.isti.cnr.it:9000/user/hpsa06/hashtag_donaldtrump.csv", sep=",", quote='"', header=True, escape='"', multiLine=True)
df_bi = spark.read.csv("hdfs://kddrtserver12.isti.cnr.it:9000/user/hpsa06/hashtag_joebiden.csv", sep=",", quote='"', header=True, escape='"', multiLine=True)

In [5]:
print("Trump file count:", df_tr.count())
print("Biden file count:", df_bi.count())

Trump file count: 970919
Biden file count: 776886


In [6]:
print("Missing values Trump:")
df_tr.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_tr.columns]).show()

Missing values Trump:
+----------+--------+-----+-----+-------------+------+-------+---------+----------------+----------------+--------------+--------------------+-------------+------+------+------+-------+---------+------+----------+------------+
|created_at|tweet_id|tweet|likes|retweet_count|source|user_id|user_name|user_screen_name|user_description|user_join_date|user_followers_count|user_location|   lat|  long|  city|country|continent| state|state_code|collected_at|
+----------+--------+-----+-----+-------------+------+-------+---------+----------------+----------------+--------------+--------------------+-------------+------+------+------+-------+---------+------+----------+------------+
|         0|       0|    0|    0|            0|   876|      0|       16|               0|          101266|             0|                   0|       294953|525200|525200|743732| 528171|   528154|650299|    670494|           0|
+----------+--------+-----+-----+-------------+------+-------+--------

In [7]:
df_tr_mv = df_tr.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_tr.columns])

df_tr_mv = df_tr_mv.drop('user_followers_count','created_at','tweet_id','tweet','likes','retweet_count','user_id','user_screen_name','user_join_date','user_f','collected_at')

df_tr_mv.createOrReplaceTempView('trump_mv')
spark.sql('SELECT * FROM trump_mv').toPandas()

Unnamed: 0,source,user_name,user_description,user_location,lat,long,city,country,continent,state,state_code
0,876,16,101266,294953,525200,525200,743732,528171,528154,650299,670494


In [7]:
print("Missing values Biden:")
df_bi.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_bi.columns]).show()

Missing values Biden:
+----------+--------+-----+-----+-------------+------+-------+---------+----------------+----------------+--------------+--------------------+-------------+------+------+------+-------+---------+------+----------+------------+
|created_at|tweet_id|tweet|likes|retweet_count|source|user_id|user_name|user_screen_name|user_description|user_join_date|user_followers_count|user_location|   lat|  long|  city|country|continent| state|state_code|collected_at|
+----------+--------+-----+-----+-------------+------+-------+---------+----------------+----------------+--------------+--------------------+-------------+------+------+------+-------+---------+------+----------+------------+
|         0|       0|    0|    0|            0|   713|      0|       18|               0|           82006|             0|                   0|       233791|421593|421593|590014| 423107|   423089|516691|    532277|           0|
+----------+--------+-----+-----+-------------+------+-------+--------

In [8]:
df_bi_mv = df_bi.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_tr.columns])
df_bi_mv = df_bi_mv.drop('user_followers_count','created_at','tweet_id','tweet','likes','retweet_count','user_id','user_screen_name','user_join_date','user_f','collected_at')

df_bi_mv.createOrReplaceTempView('biden_mv')
spark.sql('SELECT * FROM biden_mv').toPandas()

Unnamed: 0,source,user_name,user_description,user_location,lat,long,city,country,continent,state,state_code
0,713,18,82006,233791,421593,421593,590014,423107,423089,516691,532277


There are no missing tweet text values - all other missing values we take care of in the later analysis

In [6]:
df_tr=df_tr.drop('tweet_id','retweet_count','source','user_id','user_name','user_screen_name','user_description','user_join_date','user_followers_count','user_location','country','continent','state','state_code','collected_at')
df_bi=df_bi.drop('tweet_id','retweet_count','source','user_id','user_name','user_screen_name','user_description','user_join_date','user_followers_count','user_location','country','continent','state','state_code','collected_at')

In [7]:
print("Trump:")
df_tr.show(5)

Trump:
+-------------------+--------------------+-----+----------+------------+----------+
|         created_at|               tweet|likes|       lat|        long|      city|
+-------------------+--------------------+-----+----------+------------+----------+
|2020-10-15 00:00:01|#Elecciones2020 |...|  0.0|  25.77427|   -80.19366|      null|
|2020-10-15 00:00:01|Usa 2020, Trump c...| 26.0|      null|        null|      null|
|2020-10-15 00:00:02|#Trump: As a stud...|  2.0|45.5202471|-122.6741949|  Portland|
|2020-10-15 00:00:02|2 hours since las...|  0.0|      null|        null|      null|
|2020-10-15 00:00:08|You get a tie! An...|  4.0|38.8949924| -77.0365581|Washington|
+-------------------+--------------------+-----+----------+------------+----------+
only showing top 5 rows



In [11]:
#check for duplicates
print("number of duplicates in Trump file: ", (df_tr.count())-(df_tr.distinct().count()))
print("number of duplicates in Biden file: ", (df_bi.count())-(df_bi.distinct().count()))

number of duplicates in Trump file:  1351
number of duplicates in Biden file:  1851


In [8]:
df_tr = df_tr.distinct()
df_bi = df_bi.distinct()

In [9]:
clean_tr = clean_text(df_tr, "tweet")
clean_bi = clean_text(df_bi, "tweet")

In [11]:
df_tr.createOrReplaceTempView('trump')
spark.sql('SELECT * FROM trump LIMIT 5').toPandas()

Unnamed: 0,created_at,tweet,likes,lat,long,city
0,2020-10-15 00:12:49,#WhiteHouse #Trump #MelaniaTrump: Any of you E...,0.0,-31.952712100000003,115.8604796,Perth
1,2020-10-15 00:16:13,Under Section 230 Twitter and Facebook limit R...,0.0,39.7837304,-100.4458825,
2,2020-10-15 00:16:35,"#Trump: It's never gonna be the end for us, is...",2.0,45.5202471,-122.6741949,Portland
3,2020-10-15 00:22:57,@tteribul @MelliBitch @du_mob @jtlittle19 @Taz...,4.0,,,
4,2020-10-15 00:25:09,@andymstone Yeah and why do you let disinforma...,0.0,39.7837304,-100.4458825,


In [12]:
clean_tr.createOrReplaceTempView('trump1')
spark.sql('SELECT * FROM trump1 LIMIT 5').toPandas()

Unnamed: 0,created_at,tweet,likes,lat,long,city
0,2020-10-15 00:12:49,Any of you EVER tell the truth tested positive...,0.0,-31.952712100000003,115.8604796,Perth
1,2020-10-15 00:16:13,Under Section Twitter and Facebook limit Rudy ...,0.0,39.7837304,-100.4458825,
2,2020-10-15 00:16:35,Its never gonna be the end for us is it Crowd ...,2.0,45.5202471,-122.6741949,Portland
3,2020-10-15 00:22:57,tteribul MelliBitch dumob jtlittle Tazdad fuff...,4.0,,,
4,2020-10-15 00:25:09,andymstone Yeah and why do you let disinformat...,0.0,39.7837304,-100.4458825,


In [13]:
df_bi.createOrReplaceTempView('biden')
spark.sql('SELECT * FROM biden LIMIT 5').toPandas()

Unnamed: 0,created_at,tweet,likes,lat,long,city
0,2020-10-15 00:03:17,"Hunter #Biden introduced his father, then-Vice...",1.0,40.7127281,-74.0060152,New York
1,2020-10-15 00:16:13,Under Section 230 Twitter and Facebook limit R...,0.0,39.7837304,-100.4458825,
2,2020-10-15 00:22:02,"No, Today we're going to make the evil nutter ...",1.0,,,
3,2020-10-15 00:22:57,@tteribul @MelliBitch @du_mob @jtlittle19 @Taz...,4.0,,,
4,2020-10-15 00:33:06,#WheresHunter #HunterBidenEmails #JoeBiden #El...,0.0,38.475840600000005,-80.84084150000001,


In [14]:
clean_bi.createOrReplaceTempView('biden1')
spark.sql('SELECT * FROM biden1 LIMIT 5').toPandas()

Unnamed: 0,created_at,tweet,likes,lat,long,city
0,2020-10-15 00:03:17,Hunter introduced his father then Vice Preside...,1.0,40.7127281,-74.0060152,New York
1,2020-10-15 00:16:13,Under Section Twitter and Facebook limit Rudy ...,0.0,39.7837304,-100.4458825,
2,2020-10-15 00:22:02,No Today were going to make the evil nutter a ...,1.0,,,
3,2020-10-15 00:22:57,tteribul MelliBitch dumob jtlittle Tazdad fuff...,4.0,,,
4,2020-10-15 00:33:06,,0.0,38.475840600000005,-80.84084150000001,


In [11]:
#saving the new dataframes in hdfs
save(clean_tr, 'df_Trump.csv')
save(clean_bi, 'df_Biden.csv')

In [12]:
print("Trump file count:", clean_tr.count())
print("Biden file count:", clean_bi.count())

Trump file count: 969568
Biden file count: 775035


# Model Dataframe

In [16]:
df_test = spark.read.csv("hdfs://kddrtserver12.isti.cnr.it:9000/user/hpsa06/test.csv", header = True, inferSchema = True)
df_train = spark.read.csv("hdfs://kddrtserver12.isti.cnr.it:9000/user/hpsa06/train.csv", header = True, inferSchema = True)

In [17]:
df_train_d = df_train.drop('selected_text','TextID')
df_test_d = df_test.drop('TextID')

In [18]:
df_union = df_train_d.union(df_test_d)
df_union.count()

31015

In [19]:
#check for duplicates
print("number of missing value rows: ", (df_union.count())-(df_union.dropna().count()))
df_union = df_union.dropna()

number of missing value rows:  3


In [20]:
#check for duplicates
print("number of duplicates: ", (df_union.count())-(df_union.distinct().count()))
df_union = df_union.distinct()

number of duplicates:  0


In [21]:
df_union.createOrReplaceTempView('model')
spark.sql('SELECT * FROM model LIMIT 5').toPandas()

Unnamed: 0,text,sentiment
0,"i lost all my friends, i`m alone and sleepy..i...",negative
1,"yeah I was thinking about that ,ahaha",positive
2,"The birds are out,, oh man... That`s NOT cool ...",negative
3,I`m missing crab legs and attending my going a...,negative
4,there were attempts to somehow extend inner c...,neutral


In [22]:
clean_mod = clean_text(df_union, "text")
clean_mod.createOrReplaceTempView('model')
spark.sql('SELECT * FROM model LIMIT 5').toPandas()

Unnamed: 0,text,sentiment
0,i lost all my friends im alone and sleepy i wa...,negative
1,yeah I was thinking about that ahaha,positive
2,The birds are out oh man Thats NOT cool I didn...,negative
3,Im missing crab legs and attending my going aw...,negative
4,there were attempts to somehow extend inner c...,neutral


In [23]:
save(clean_mod,'df_Model.csv')