#Learning Model

## Imports

In [3]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from elasticsearch import Elasticsearch, helpers
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

## Reading the Data

Reading from the index pattern we have created using the kibana interface: "trump__covid-19_tweets_*". 
<br>
We manually made sure that the hashtags are loaded as array of strings as we saw at the workshow.

In [6]:
ES_HOST = 'dds2019s-1002.eastus.cloudapp.azure.com'
index='trump_covid-19_tweets_*'
es = Elasticsearch([{'host': ES_HOST}], timeout=60000)

if not es.indices.exists(index):
    raise Exception("Index doesn't exist!")

data =  spark.read\
            .format("org.elasticsearch.spark.sql")\
            .option("es.nodes.wan.only","true")\
            .option("es.port","9200")\
            .option("es.nodes",ES_HOST)\
            .option("pushdown", "true")\
            .option("es.read.field.as.array.include",  "hashtags")\
            .load(index)

In [7]:
data.show()

## Preprocessing

In order to use the logistic regression model we need to transform the data so it will contain a 'label' column and a 'features' column.
<br>
To do so we have used a udf to label each tweet if it contatins any 'trump' related hashtags or not.
<br>
After labeling the tweets we've used vector assembler to organize the data in a label column and a feature column so that the logistic regression model could use it.

In [10]:
def hashtag_to_label(hashtags):
  for word in hashtags:
    if 'trump' in word.lower():
      return 1
  return 0

hashtag_to_label_udf = F.udf(hashtag_to_label, IntegerType())

model_df = data.select(F.col('favorite_count').alias('likes'), F.col('hashtags'), F.col('retweet_count').alias('retweets'))
model_df = model_df.withColumn('label', hashtag_to_label_udf(F.col('hashtags')))
assembler = VectorAssembler(
    inputCols=['likes', 'retweets'],
    outputCol='features')

final_df = assembler.transform(model_df).select('label', 'features')
display(final_df)

label,features
0,"List(1, 2, List(), List(0.0, 2.0))"
0,"List(1, 2, List(), List(0.0, 3.0))"
0,"List(1, 2, List(), List(0.0, 34263.0))"
0,"List(1, 2, List(), List(0.0, 6.0))"
0,"List(1, 2, List(), List(0.0, 33.0))"
0,"List(1, 2, List(), List(0.0, 2931.0))"
0,"List(1, 2, List(), List(0.0, 54.0))"
0,"List(1, 2, List(), List(0.0, 1.0))"
0,"List(1, 2, List(), List(0.0, 8.0))"
0,"List(1, 2, List(), List(0.0, 58.0))"


## Undersampling

Since the data is very biased towards the class of 'No trump hashtag' (class 0), we have used a technique called 'undersampling', meaning we took every tweet which contained 'trump' (class 1), counted how many of those we have and then taking an equal amount of random tweets from the second class.

In [13]:
major_df = final_df.filter(F.col("label") == 0)
minor_df = final_df.filter(F.col("label") == 1)
ratio = float(major_df.count())/float(minor_df.count())
sampled_majority_df = major_df.sample(False, 1/ratio, seed=1)
undersampled_df = sampled_majority_df.unionAll(minor_df)
undersampled_df.show()

## Train model

Splitting the data to train and test sets, both normal dataset(no changes made) and the undersampled dataset.

In [16]:
trainNormal, testNormal = final_df.randomSplit([0.7, 0.3])
trainUnder, testUnder = undersampled_df.randomSplit([0.7, 0.3])

Training both models: Normal model and undersampled model.

In [18]:
lr = LogisticRegression()
fittedLRNormal = lr.fit(trainNormal)
fittedLRUnder = lr.fit(trainUnder)

## Training Summary

In [20]:
# Print the coefficients and intercept for multinomial logistic regression
print("Coefficients with no transformation: \n" + str(fittedLRNormal.coefficientMatrix))
print("Intercept with no transformation: " + str(fittedLRNormal.interceptVector))
print("-"*6)
trainingSummaryNormal = fittedLRNormal.summary

# Print the coefficients and intercept for multinomial logistic regression
print("Coefficients with undersampling: \n" + str(fittedLRUnder.coefficientMatrix))
print("Intercept with undersampling: " + str(fittedLRUnder.interceptVector))
print("-"*6)
trainingSummaryUnder = fittedLRUnder.summary

accuracy = trainingSummaryNormal.accuracy
falsePositiveRate = trainingSummaryNormal.weightedFalsePositiveRate
truePositiveRate = trainingSummaryNormal.weightedTruePositiveRate
fMeasure = trainingSummaryNormal.weightedFMeasure()
precision = trainingSummaryNormal.weightedPrecision
recall = trainingSummaryNormal.weightedRecall
print("Measurments of normal training: Accuracy: %s\nFalse Positive Rate: %s\nTrue Positive Rate:  %s\nPrecision: %s\nRecall: %s\nF-measure: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

accuracy = trainingSummaryUnder.accuracy
falsePositiveRate = trainingSummaryUnder.weightedFalsePositiveRate
truePositiveRate = trainingSummaryUnder.weightedTruePositiveRate
fMeasure = trainingSummaryUnder.weightedFMeasure()
precision = trainingSummaryUnder.weightedPrecision
recall = trainingSummaryUnder.weightedRecall

print("Measurments of undersampling training: Accuracy: %s\nFalse Positive Rate: %s\nTrue Positive Rate:  %s\nPrecision: %s\nRecall: %s\nF-measure: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

## Predicting

In [22]:
predictionsNormal = fittedLRNormal.transform(testNormal) # predict Normal
display(predictionsNormal)

label,features,rawPrediction,probability,prediction
0,"List(0, 2, List(), List())","List(1, 2, List(), List(5.71754224107022, -5.71754224107022))","List(1, 2, List(), List(0.9967229925298182, 0.0032770074701817544))",0.0
0,"List(0, 2, List(), List())","List(1, 2, List(), List(5.71754224107022, -5.71754224107022))","List(1, 2, List(), List(0.9967229925298182, 0.0032770074701817544))",0.0
0,"List(0, 2, List(), List())","List(1, 2, List(), List(5.71754224107022, -5.71754224107022))","List(1, 2, List(), List(0.9967229925298182, 0.0032770074701817544))",0.0
0,"List(0, 2, List(), List())","List(1, 2, List(), List(5.71754224107022, -5.71754224107022))","List(1, 2, List(), List(0.9967229925298182, 0.0032770074701817544))",0.0
0,"List(0, 2, List(), List())","List(1, 2, List(), List(5.71754224107022, -5.71754224107022))","List(1, 2, List(), List(0.9967229925298182, 0.0032770074701817544))",0.0
0,"List(0, 2, List(), List())","List(1, 2, List(), List(5.71754224107022, -5.71754224107022))","List(1, 2, List(), List(0.9967229925298182, 0.0032770074701817544))",0.0
0,"List(0, 2, List(), List())","List(1, 2, List(), List(5.71754224107022, -5.71754224107022))","List(1, 2, List(), List(0.9967229925298182, 0.0032770074701817544))",0.0
0,"List(0, 2, List(), List())","List(1, 2, List(), List(5.71754224107022, -5.71754224107022))","List(1, 2, List(), List(0.9967229925298182, 0.0032770074701817544))",0.0
0,"List(0, 2, List(), List())","List(1, 2, List(), List(5.71754224107022, -5.71754224107022))","List(1, 2, List(), List(0.9967229925298182, 0.0032770074701817544))",0.0
0,"List(0, 2, List(), List())","List(1, 2, List(), List(5.71754224107022, -5.71754224107022))","List(1, 2, List(), List(0.9967229925298182, 0.0032770074701817544))",0.0


In [23]:
predictionsUnder = fittedLRUnder.transform(testUnder) # predict Undersampled
display(predictionsUnder)

label,features,rawPrediction,probability,prediction
0,"List(0, 2, List(), List())","List(1, 2, List(), List(-0.290701096514765, 0.290701096514765))","List(1, 2, List(), List(0.4278322357040579, 0.5721677642959422))",1.0
0,"List(0, 2, List(), List())","List(1, 2, List(), List(-0.290701096514765, 0.290701096514765))","List(1, 2, List(), List(0.4278322357040579, 0.5721677642959422))",1.0
0,"List(0, 2, List(), List())","List(1, 2, List(), List(-0.290701096514765, 0.290701096514765))","List(1, 2, List(), List(0.4278322357040579, 0.5721677642959422))",1.0
0,"List(0, 2, List(), List())","List(1, 2, List(), List(-0.290701096514765, 0.290701096514765))","List(1, 2, List(), List(0.4278322357040579, 0.5721677642959422))",1.0
0,"List(0, 2, List(), List())","List(1, 2, List(), List(-0.290701096514765, 0.290701096514765))","List(1, 2, List(), List(0.4278322357040579, 0.5721677642959422))",1.0
0,"List(0, 2, List(), List())","List(1, 2, List(), List(-0.290701096514765, 0.290701096514765))","List(1, 2, List(), List(0.4278322357040579, 0.5721677642959422))",1.0
0,"List(0, 2, List(), List())","List(1, 2, List(), List(-0.290701096514765, 0.290701096514765))","List(1, 2, List(), List(0.4278322357040579, 0.5721677642959422))",1.0
0,"List(0, 2, List(), List())","List(1, 2, List(), List(-0.290701096514765, 0.290701096514765))","List(1, 2, List(), List(0.4278322357040579, 0.5721677642959422))",1.0
0,"List(0, 2, List(), List())","List(1, 2, List(), List(-0.290701096514765, 0.290701096514765))","List(1, 2, List(), List(0.4278322357040579, 0.5721677642959422))",1.0
0,"List(0, 2, List(), List())","List(1, 2, List(), List(-0.290701096514765, 0.290701096514765))","List(1, 2, List(), List(0.4278322357040579, 0.5721677642959422))",1.0


## Visualizing
Visualizing the proportion of true labeling and wrong labeling

In [25]:
def calcAccuracy(a,b):
  if a==b:
    return 'True Prediction'
  return 'Wrong Prediction'

acc_udf = F.udf(calcAccuracy, StringType())

NormalDFAcc = predictionsNormal.withColumn('final_res', acc_udf(F.col('prediction'), F.col('label')))
display(NormalDFAcc.groupby('final_res').count())

final_res,count
True Prediction,4366896
Wrong Prediction,10525


In [26]:
UnderDFAcc = predictionsUnder.withColumn('final_res', acc_udf(F.col('prediction'), F.col('label')))
display(UnderDFAcc.groupby('final_res').count())

final_res,count
True Prediction,12742
Wrong Prediction,8042


## Food For Thought
As we can see from both of the models' accuracy visualization, the undersampled model predicted poorly (almost 40% wrong predictions) whilst the normal model predicted overwhelmingly well (almost 0% wrong predictions).
<br>
At this point we'va formed an hypothesis: Training the model on the entire dataset made the model very biased towards the "No Trump" class. Moreover, we can conclude that since the proportion of "Trump" class out of the whole data is extremely small, even if the model were to classify every tweet as a "No Trump" it would have a neglectable precentage of wrong predictions.
<br>
On the other hand, training the model on the undersampled data showed us an ugly truth: Either the algorithm we chose does not fit for this task or the features we chose fo this task are not a good indications of tweets having "Trump" related hashtags.

## Evaluting the model based on test sets

In [29]:
resultsNormalEval = fittedLRNormal.evaluate(testNormal)
resultsUNderEval = fittedLRUnder.evaluate(testUnder) 

In [30]:
accuracy = resultsNormalEval.accuracy
falsePositiveRate = resultsNormalEval.weightedFalsePositiveRate
truePositiveRate = resultsNormalEval.weightedTruePositiveRate
fMeasure = resultsNormalEval.weightedFMeasure()
precision = resultsNormalEval.weightedPrecision
recall = resultsNormalEval.weightedRecall
print("Accuracy based on normal dataset: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

accuracy = resultsUNderEval.accuracy
falsePositiveRate = resultsUNderEval.weightedFalsePositiveRate
truePositiveRate = resultsUNderEval.weightedTruePositiveRate
fMeasure = resultsUNderEval.weightedFMeasure()
precision = resultsUNderEval.weightedPrecision
recall = resultsUNderEval.weightedRecall
print("Accuracy based on undersampling: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

After evaluating both models on the test sets we can see that our hypothesis is likely correct. <BR>
As our hypothesis indicated, the model trained on the entire dataset is very biased and the undersampled model works poorly for this sort of task.