In [1]:
%scala
/* unmount if already mounted */
val MOUNT_NAME = "Trump100k"
dbutils.fs.unmount(s"/mnt/$MOUNT_NAME")

In [2]:
import pyspark

ACCESS_KEY = "xxxxxxxxxx"
SECRET_KEY = "xxxxxxxxxx"
ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
AWS_BUCKET_NAME = "twitteroutput"
MOUNT_NAME = "Trump100k"

dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)

display(dbutils.fs.ls("/mnt/%s" % MOUNT_NAME))

In [3]:
%sql
DROP TABLE IF EXISTS TRUMP;

CREATE TABLE TRUMP(
  row_number STRING,
  id_str STRING,
  created_at STRING,
  tweet_text STRING,
  followers_count INT,
  retweet_count INT,
  tweet_tmstmp TIMESTAMP,
  tweet_date DATE,
  tweet_time STRING,
  neg_score DECIMAL(10,2),
  neu_score DECIMAL(10,2),
  pos_score DECIMAL(10,2),
  cmp_score DECIMAL(10,2)
)
USING com.databricks.spark.csv
OPTIONS (path "dbfs:/mnt/Trump100k/Trump100k.csv", header "true");

In [4]:
%sql
--check if concatenated user and tweet is distinct in the database (no dups)  
SELECT count(distinct id_str||tweet_text) from TRUMP;

In [5]:
%sql
--check if concatenated user and tweet is always populated 
select count(*) from TRUMP where id_str||tweet_text is null;

In [6]:
%sql
--check nulls in the table
select count(distinct *) from TRUMP where cmp_score is not null ;

In [7]:
%sql
--table with nulls removed
DROP TABLE IF EXISTS TRUMP1;
create table TRUMP1 as
(
select * from TRUMP
where cmp_score is not null 
);

In [8]:
%sql
--drop staging table and check counts match on new table
select count(*) from TRUMP1;

In [9]:
%sql
-- identify unique tweets i.e. the user string removed
select DISTINCT tweet_text||' '||neg_score||' '||neu_score||' '||pos_score||' '||cmp_score AS unique_tweet , count(id_str) as tweet_cnt 

FROM TRUMP1 

group by unique_tweet
order by tweet_cnt desc;

In [10]:
%sql 
--categorical value for sentiment
-- cmp_score ranges from -1 to 1, hence the buckets below
-- make sure that categoricals correspond to correct range
select sentiment , max(cmp_score) , min(cmp_score)
from
(
  select  cmp_score  
      , case when cmp_score between -1 and  -0.333 then 'NEG'
        when cmp_score between -0.333 and 0.333 then 'NEUTRAL'
        when cmp_score between 0.333 and 1 THEN 'POS'
        end as sentiment
     

FROM TRUMP1 ) as t1

group by 1
order by 1
;

In [11]:
%sql
--create table for distinct tweets i.e. only one row per tweet (removes duplicates caused by retweets) takes the max retweet count but the min tweet time
DROP TABLE IF EXISTS TRUMP2;
create table TRUMP2 as
(
select tweet_text
      , neg_score
      , neu_score
      , pos_score
      , cmp_score  
      , case when cmp_score between -1 and  -0.333 then 'NEGATIVE'
        when cmp_score between -0.333 and 0.333 then 'NEUTRAL'
        when cmp_score between 0.333 and 1 THEN 'POSITIVE'
        end as sentiment
      , max(retweet_count) as retweets
      , min(tweet_date)    as date
      , min(tweet_time)    as time

FROM TRUMP1 

group by
          tweet_text
        , neg_score
        , neu_score
        , pos_score
        , cmp_score 
        , sentiment
)  ;

In [12]:
%sql
-- TOP 10 RETWEETED TWEETS
select DISTINCT tweet_text, retweets, sentiment

FROM TRUMP2

order by retweets desc
limit 10;

In [13]:
%sql
-- TWEET WITH MOST POSITIVE SENTIMENT
select DISTINCT t1.tweet_text , t1.cmp_score, sum(retweets)

FROM TRUMP2 as t1

inner join
          (
          select max(cmp_score) high_sentiment

          FROM TRUMP2

          )t2
on t2.high_sentiment = t1.cmp_score
group by t1.tweet_text , t1.cmp_score
limit 10;

In [14]:
%sql
-- TWEETS WITH MOST NEGATIVE SENTIMENT
select DISTINCT t1.tweet_text , t1.cmp_score, sum(retweets)

FROM TRUMP2 as t1

inner join
          (
          select min(cmp_score) low_sentiment

          FROM TRUMP2

          )t2
on t2.low_sentiment = t1.cmp_score
group by t1.tweet_text , t1.cmp_score;

In [15]:
%sql
-- distribution of sentiment of tweets about trump
SELECT cmp_score ,sum(retweets + 1)as tweets -- 1 has been added to every retweet to account for tweets that were not retweeted
from TRUMP2  where cmp_score <> 0 group by cmp_score  order by cmp_score; 

In [16]:
%sql
--overall what type of sentiment has the most retweets
select sentiment ,sum(retweets+1) as tweets
from trump2
group by sentiment
order by sentiment;

In [17]:
%sql
-- min and max times
select max(time) , min(time) from trump2;

In [18]:
%sql
--create table for performing machine learning using spark
DROP TABLE IF EXISTS TRUMP_SNT;
create table TRUMP_SNT as
(
select id_str
      , tweet_text
      , case when cmp_score <= 0 then 0
        when cmp_score > 0 THEN 1
        end as sentiment


FROM TRUMP1 
)  ;

In [19]:
trump = spark.table("TRUMP_SNT")
print(trump)
trump.head()

In [20]:
from pyspark.ml import Pipeline
#load sentiment dataset
sentiments_df = spark.read.format("csv").option("header", "true").load("/FileStore/tables/sentiments.csv")

sentiments_df.printSchema()

In [21]:
#check positive sentiment in the dataset
from pyspark.sql import functions as fn
sentiments_df.where(fn.col('sentiment') == 1).show(5)

In [22]:
#create trump df
trump_df = trump
trump_df.where(fn.col('sentiment') == 1).show(5)

In [23]:
from pyspark.ml.feature import RegexTokenizer
tokenizer = RegexTokenizer().setGaps(False)\
  .setPattern("\\p{L}+")\
  .setInputCol("tweet_text")\
  .setOutputCol("words")

In [24]:
tweet_words_df = tokenizer.transform(trump_df)
print(tweet_words_df)

In [25]:
#show exploded words
tweet_words_df.show(5)

In [26]:
#reweighting score to account for word occurrances
# create stop words dataset
import requests
stop_words = requests.get('http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words').text.split()

# add rt as a stopword since it will appear on all retweets
stop_words.append(u'rt')

In [27]:
print(stop_words)

In [28]:
from pyspark.ml.feature import StopWordsRemover
sw_filter = StopWordsRemover()\
  .setStopWords(stop_words)\
  .setCaseSensitive(False)\
  .setInputCol("words")\
  .setOutputCol("filtered")

In [29]:
from pyspark.ml.feature import CountVectorizer

# we will remove words that appear in 5 docs or less
cv = CountVectorizer(minTF=1., minDF=5., vocabSize=2**17)\
  .setInputCol("filtered")\
  .setOutputCol("tf")

In [30]:
#create counter vectoriser estimator
cv_pipeline = Pipeline(stages=[tokenizer, sw_filter, cv]).fit(trump_df)
cv_pipeline.transform(trump_df).show(5)  

In [31]:
#new pipeline to lower the terms of documents that are very common
from pyspark.ml.feature import IDF
idf = IDF().\
    setInputCol('tf').\
    setOutputCol('tfidf')
    
idf_pipeline = Pipeline(stages=[cv_pipeline, idf]).fit(trump_df)
idf_pipeline.transform(trump_df).show(5)

In [32]:
# create training , validation and testing sets (creating both validation and testing sets so that different models can evaluated against each other and the best can then be tested)
training_df, validation_df, testing_df = trump_df.randomSplit([0.6, 0.3, 0.1], seed=0)

In [33]:
#import & build Logistic Regression
from pyspark.ml.classification import LogisticRegression

lambda_par = 0.02
alpha_par = 0.3
en_lr = LogisticRegression().\
        setLabelCol('sentiment').\
        setFeaturesCol('tfidf').\
        setRegParam(lambda_par).\
        setMaxIter(100).\
        setElasticNetParam(alpha_par)
        
en_lr_pipeline = Pipeline(stages=[idf_pipeline, en_lr]).fit(training_df)        

In [34]:
#test model accuracy
en_lr_pipeline.transform(validation_df).select(fn.avg(fn.expr('float(prediction = sentiment)'))).show()

In [35]:
#check to see which words are considered top 10 most positive/ negative in dataset
import pandas as pd
vocabulary = idf_pipeline.stages[0].stages[-1].vocabulary

en_weights = en_lr_pipeline.stages[-1].coefficients.toArray()
en_coeffs_df = pd.DataFrame({'word': vocabulary, 'weight': en_weights})

#negative words
en_coeffs_df.sort_values('weight').head(10)

In [36]:
#positive words
en_coeffs_df.sort_values('weight', ascending=False).head(10)

In [37]:
#experiment with different values for the lambda and alpha
from pyspark.ml.tuning import ParamGridBuilder
en_lr_estimator = Pipeline(stages=[idf_pipeline, en_lr])
grid = ParamGridBuilder().\
    addGrid(en_lr.regParam, [0., 0.01, 0.02]).\
    addGrid(en_lr.elasticNetParam, [0., 0.2, 0.4]).\
    build()
    
grid    

In [38]:
#fit models with different lambda and alapha values
all_models = []
for i in range(len(grid)):
    print("Fitting model {}".format(i+1))
    model = en_lr_estimator.fit(training_df, grid[i])
    all_models.append(model)

In [39]:
#use ROC curve to evaluate different models
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
from sklearn.metrics import roc_curve, auc
import numpy as np
import seaborn as sns
from matplotlib import pyplot as plt
#model output for evauation
model_output1 = all_models[0].transform(testing_df)
model_output2 = all_models[1].transform(testing_df)
model_output3 = all_models[2].transform(testing_df)
model_output4 = all_models[3].transform(testing_df)
model_output5 = all_models[4].transform(testing_df)
model_output6 = all_models[5].transform(testing_df)
model_output7 = all_models[6].transform(testing_df)
model_output8 = all_models[7].transform(testing_df)
model_output9 = all_models[8].transform(testing_df)

#call predicted probs and target variable
results1 = model_output1.select(['probability', 'sentiment'])
results2 = model_output2.select(['probability', 'sentiment'])
results3 = model_output3.select(['probability', 'sentiment'])
results4 = model_output4.select(['probability', 'sentiment'])
results5 = model_output5.select(['probability', 'sentiment'])
results6 = model_output6.select(['probability', 'sentiment'])
results7 = model_output7.select(['probability', 'sentiment'])
results8 = model_output8.select(['probability', 'sentiment'])
results9 = model_output9.select(['probability', 'sentiment'])

## prepare score-label set
# Model 1
results_collect1 = results1.collect()
results_list1 = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect1]
scoreAndLabels1 = sc.parallelize(results_list1) 
metrics1 = metric(scoreAndLabels1)
fpr1 = dict()
tpr1 = dict()
roc_auc1 = dict() 
y_test1 = [i[1] for i in results_list1]
y_score1 = [i[0] for i in results_list1] 
fpr1, tpr1, _ = roc_curve(y_test1, y_score1)
roc_auc1 = auc(fpr1, tpr1)


# Model 2
results_collect2 = results2.collect()
results_list2 = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect2]
scoreAndLabels2 = sc.parallelize(results_list2) 
metrics2 = metric(scoreAndLabels2)
fpr2 = dict()
tpr2 = dict()
roc_auc2 = dict() 
y_test2 = [i[1] for i in results_list2]
y_score2 = [i[0] for i in results_list2] 
fpr2, tpr2, _ = roc_curve(y_test2, y_score2)
roc_auc2 = auc(fpr2, tpr2)


# Model 3
results_collect3 = results3.collect()
results_list3 = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect3]
scoreAndLabels3 = sc.parallelize(results_list3) 
metrics3 = metric(scoreAndLabels3)
fpr3 = dict()
tpr3 = dict()
roc_auc3 = dict() 
y_test3 = [i[1] for i in results_list3]
y_score3 = [i[0] for i in results_list3] 
fpr3, tpr3, _ = roc_curve(y_test3, y_score3)
roc_auc3 = auc(fpr3, tpr3)


# Model 4
results_collect4 = results4.collect()
results_list4 = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect4]
scoreAndLabels4 = sc.parallelize(results_list4) 
metrics4 = metric(scoreAndLabels4)
fpr4 = dict()
tpr4 = dict()
roc_auc4 = dict() 
y_test4 = [i[1] for i in results_list4]
y_score4 = [i[0] for i in results_list4] 
fpr4, tpr4, _ = roc_curve(y_test4, y_score4)
roc_auc4 = auc(fpr4, tpr4)

# Model 5
results_collect5 = results5.collect()
results_list5 = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect5]
scoreAndLabels5 = sc.parallelize(results_list5) 
metrics5 = metric(scoreAndLabels5)
fpr5 = dict()
tpr5 = dict()
roc_auc5 = dict() 
y_test5 = [i[1] for i in results_list5]
y_score5 = [i[0] for i in results_list5] 
fpr5, tpr5, _ = roc_curve(y_test5, y_score5)
roc_auc5 = auc(fpr5, tpr5)

# Model 6
results_collect6 = results6.collect()
results_list6 = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect6]
scoreAndLabels6 = sc.parallelize(results_list6) 
metrics6 = metric(scoreAndLabels6)
fpr6 = dict()
tpr6 = dict()
roc_auc6 = dict() 
y_test6 = [i[1] for i in results_list6]
y_score6 = [i[0] for i in results_list6] 
fpr6, tpr6, _ = roc_curve(y_test6, y_score6)
roc_auc6 = auc(fpr6, tpr6)

# Model 7
results_collect7 = results7.collect()
results_list7 = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect7]
scoreAndLabels7 = sc.parallelize(results_list7) 
metrics7 = metric(scoreAndLabels7)
fpr7 = dict()
tpr7 = dict()
roc_auc7 = dict() 
y_test7 = [i[1] for i in results_list7]
y_score7 = [i[0] for i in results_list7] 
fpr7, tpr7, _ = roc_curve(y_test7, y_score7)
roc_auc7 = auc(fpr7, tpr7)

# Model 8
results_collect8 = results8.collect()
results_list8 = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect8]
scoreAndLabels8 = sc.parallelize(results_list8) 
metrics8 = metric(scoreAndLabels8)
fpr8 = dict()
tpr8 = dict()
roc_auc8 = dict() 
y_test8 = [i[1] for i in results_list8]
y_score8 = [i[0] for i in results_list8] 
fpr8, tpr8, _ = roc_curve(y_test8, y_score8)
roc_auc8 = auc(fpr8, tpr8)

# Model 9
results_collect9 = results9.collect()
results_list9 = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect9]
scoreAndLabels9 = sc.parallelize(results_list9) 
metrics9 = metric(scoreAndLabels9)
fpr9 = dict()
tpr9 = dict()
roc_auc9 = dict() 
y_test9 = [i[1] for i in results_list9]
y_score9 = [i[0] for i in results_list9] 
fpr9, tpr9, _ = roc_curve(y_test9, y_score9)
roc_auc9 = auc(fpr9, tpr9)

 
fig = plt.figure()
plt.plot(fpr1, tpr1, linewidth= 1.5, color = '#3498db' ,label='Model 1 = %0.3f' % roc_auc1)
plt.plot(fpr2, tpr2, linewidth= 1.5, color = '#e74c3c' ,label='Model 2 = %0.3f' % roc_auc2)
plt.plot(fpr3, tpr3, linewidth= 1.5, color = '#2ecc71' ,label='Model 3 = %0.3f' % roc_auc3)
plt.plot(fpr4, tpr4, linewidth= 1.5, color = '#9b59b6' ,label='Model 4 = %0.3f' % roc_auc4)
plt.plot(fpr5, tpr5, linewidth= 1.5, color = '#FFA500' ,label='Model 5 = %0.3f' % roc_auc5)
plt.plot(fpr6, tpr6, linewidth= 1.5, color = '#878E88' ,label='Model 6 = %0.3f' % roc_auc6)
plt.plot(fpr7, tpr7, linewidth= 1.5, color = '#5F5AA2' ,label='Model 7 = %0.3f' % roc_auc7)
plt.plot(fpr8, tpr8, linewidth= 1.5, color = '#EE92C2' ,label='Model 8 = %0.3f' % roc_auc8)
plt.plot(fpr9, tpr9, linewidth= 1.5, color = '#999900' ,label='Model 9 = %0.3f' % roc_auc9)

plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc="lower right")
display(fig)