In [1]:
# import findspark
import findspark
# initialize findspark with spark directory
findspark.init("C:\Program Files\Spark\spark-3.3.1-bin-hadoop3")
# import pyspark
import pyspark
# create spark context
sc = pyspark.SparkContext()
# create spark session 
spark = pyspark.sql.SparkSession(sc)

In [2]:
# import packages
import os 
import pickle
import re
from datetime import datetime
import requests
import pytz
import emoji
import pandas as pd
import numpy as np
import ast
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import array_contains
import matplotlib.pyplot as plt 

## General

In this notebook we will buid a model that predicts if the trend of a certain topic goes up or down on a certain day based on Twitter data of that day.

## 1. Import Data

### 1.1 Google Trends

In [79]:
# read trend data 
trend = spark.read.csv(".././../data/Google_trends/daily_trends.csv", header=True, inferSchema=True, sep=';')

In [80]:
trend.show()

+----------+---------------+
|      date|dependent_vegan|
+----------+---------------+
|2021-10-04|              0|
|2021-10-05|              1|
|2021-10-06|              1|
|2021-10-07|              1|
|2021-10-08|              1|
|2021-10-09|              1|
|2021-10-10|              0|
|2021-10-11|              0|
|2021-10-12|              0|
|2021-10-13|              0|
|2021-10-14|              0|
|2021-10-15|              0|
|2021-10-16|              1|
|2021-10-17|              1|
|2021-10-18|              0|
|2021-10-19|              1|
|2021-10-20|              0|
|2021-10-21|              0|
|2021-10-22|              1|
|2021-10-23|              1|
+----------+---------------+
only showing top 20 rows



In [81]:
# create SQL view
trend.createOrReplaceTempView("trendSQL")

The binary variable indicates if the trend goes up or down.

### 1.2 Twitter

In [82]:
# define data dir
data_dir = "../../data/Topic_vegan/"

# get all twitter files
tweet_files = [os.path.join(data_dir, obs) for obs in os.listdir(data_dir)] 

In [83]:
# import twitter data 
#twitter_df = spark.read.json(tweet_files)

In [84]:
list_hashtags = ["vegan",
               "veganism",
               "vegetarian",
                "veganfood",
                "vegano",
                "veganrecipes",
                "vegansofig",
                "vegansofinstagram"]

data_dir = ".././../data/Topic_vegan/"
tweet_files = [os.path.join(data_dir, obs) for obs in os.listdir(data_dir)]
files_hashtags = [file for file in tweet_files if (file.find(list_hashtags[3]) != -1)]             
twitter_df = spark.read.option("multiline","true").json(files_hashtags) 

In [85]:
# select interesting features
twitter_df = twitter_df.select(F.col('user.name'),
                                F.col('user.screen_name'),
                                F.col('user.followers_count'),
                                F.col('user.following'),
                                F.col('user.statuses_count'),
                                F.col('user.listed_count'),
                                F.col('created_at'),
                                F.col('full_text'),
                                F.col('entities.hashtags'),
                                F.col('favorite_count'),
                                F.col('retweet_count'),
                                F.col('user.friends_count'))

## 2. Data Preprocessing

#### 2.1 Check time period

In [86]:
# function to convert Twitter date string format
def getDate(date):
    if date is not None:
        return str(datetime.strptime(date,'%a %b %d %H:%M:%S +0000 %Y').replace(tzinfo=pytz.UTC).strftime("%Y-%m-%d %H:%M:%S"))
    else:
        return None

# UDF declaration
date_udf = F.udf(getDate, StringType())

# apply udf
twitter_df = twitter_df.withColumn('post_created_at', F.to_utc_timestamp(date_udf("created_at"), "UTC"))

In [87]:
# get first post
first_post = F.min('post_created_at').alias('earliest')
# get latest post
latest_post = F.max('post_created_at').alias('latest')
# show tweet period in our dataset
twitter_df.select(first_post, latest_post).show()

+-------------------+-------------------+
|           earliest|             latest|
+-------------------+-------------------+
|2021-10-27 21:28:18|2022-09-07 17:30:32|
+-------------------+-------------------+



#### 2.2 Remove retweets and duplicates

In [88]:
# drop all retweets from dataset
no_retweets_df = twitter_df.filter(~F.col("full_text").startswith("RT"))

In [89]:
# first sort no_retweets_df based on date in chronological order (most recent ones on top)
no_retweets_sorted_df = no_retweets_df.sort("post_created_at", ascending=False)

In [90]:
# number of observations before dropping duplicates
no_retweets_sorted_df.count()

12649

In [91]:
# drop duplicates based on tweet text and the profile it was posted from
final_no_duplicates_df = no_retweets_sorted_df.drop_duplicates(["full_text", "screen_name"])

In [92]:
# number of observations after dropping duplicates
final_no_duplicates_df.count()

12099

In [93]:
# rename dataframe
final_twitter_df = final_no_duplicates_df

## 3. Independent Variables

For our independent variables we need to design a pipeline that transforms the data into the desired aggregated metrics per day.

In [94]:
# create SQL view
final_twitter_df.createOrReplaceTempView("twitterSQL")

### 3.1 Volume of tweets 

In [95]:
# select the relevant data
tweet_volume = spark.sql("SELECT DATE_FORMAT(post_created_at, 'Y-M-dd') as date, COUNT(*) as tweet_volume \
                                    FROM twitterSQL \
                                    GROUP BY DATE_FORMAT(post_created_at, 'Y-M-dd') \
                                    ORDER BY DATE_FORMAT(post_created_at, 'Y-M-dd')")

In [96]:
# show 
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
tweet_volume.show(100)

+----------+------------+
|      date|tweet_volume|
+----------+------------+
|2021-10-27|           1|
|2021-10-28|         118|
|2021-10-29|         136|
|2021-10-30|         117|
|2021-10-31|         116|
|2021-11-01|         400|
|2021-11-02|         178|
|2021-11-03|         182|
|2021-11-04|         164|
|2021-11-05|         102|
|2021-12-06|           4|
|2021-12-07|           4|
|2021-12-08|          39|
|2021-12-09|         204|
|2021-12-10|         170|
|2021-12-11|         147|
|2021-12-12|         152|
|2021-12-13|         190|
|2021-12-14|         150|
|2021-12-15|           1|
+----------+------------+
only showing top 20 rows



In [97]:
# create SQL view
tweet_volume.createOrReplaceTempView("tweet_volumeSQL")

### 3.2 Average likes

We exclude tweets with 0 likes.

In [98]:
# select the relevant data
avg_likes = spark.sql("SELECT DATE_FORMAT(post_created_at, 'Y-M-dd') as date, AVG(favorite_count) as avg_likes \
                           FROM twitterSQL \
                           WHERE favorite_count > 0 \
                           GROUP BY DATE_FORMAT(post_created_at, 'Y-M-dd') \
                           ORDER BY DATE_FORMAT(post_created_at, 'Y-M-dd')")

In [99]:
# show 
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
avg_likes.show()

+----------+------------------+
|      date|         avg_likes|
+----------+------------------+
|2021-10-28|5.2615384615384615|
|2021-10-29|           9.09375|
|2021-10-30| 7.130434782608695|
|2021-10-31| 9.347222222222221|
|2021-11-01| 4.408256880733945|
|2021-11-02|               5.0|
|2021-11-03|6.8173076923076925|
|2021-11-04| 6.574712643678161|
|2021-11-05| 2.235294117647059|
|2021-12-06| 9.666666666666666|
|2021-12-07|               1.5|
|2021-12-08| 8.192307692307692|
|2021-12-09|20.021505376344088|
|2021-12-10| 5.390243902439025|
|2021-12-11| 9.817204301075268|
|2021-12-12| 7.447368421052632|
|2021-12-13| 6.813186813186813|
|2021-12-14| 6.945945945945946|
| 2022-2-11|10.666666666666666|
| 2022-2-12| 12.16793893129771|
+----------+------------------+
only showing top 20 rows



In [100]:
# create SQL view
avg_likes.createOrReplaceTempView("avg_likesSQL")

### 3.3 Average Retweets

We exclude tweets with 0 retweets.

In [101]:
# select the relevant data
avg_retweets = spark.sql("SELECT DATE_FORMAT(post_created_at, 'Y-M-dd') as date, AVG(retweet_count) as avg_retweets \
                          FROM twitterSQL \
                          WHERE retweet_count > 0 \
                          GROUP BY DATE_FORMAT(post_created_at, 'Y-M-dd') \
                          ORDER BY DATE_FORMAT(post_created_at, 'Y-M-dd')")

In [102]:
# show 
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
avg_retweets.show()

+----------+------------------+
|      date|      avg_retweets|
+----------+------------------+
|2021-10-28|               2.0|
|2021-10-29|  2.41025641025641|
|2021-10-30| 3.303030303030303|
|2021-10-31|               2.9|
|2021-11-01| 2.515151515151515|
|2021-11-02|1.9523809523809523|
|2021-11-03|              2.12|
|2021-11-04| 2.510204081632653|
|2021-11-05|1.5217391304347827|
|2021-12-06|               4.5|
|2021-12-07|               1.0|
|2021-12-08|2.6153846153846154|
|2021-12-09|              5.04|
|2021-12-10|1.9142857142857144|
|2021-12-11| 4.416666666666667|
|2021-12-12|2.6792452830188678|
|2021-12-13|2.0317460317460316|
|2021-12-14|3.5609756097560976|
| 2022-2-11|2.1818181818181817|
| 2022-2-12| 3.532258064516129|
+----------+------------------+
only showing top 20 rows



In [103]:
# create SQL view
avg_retweets.createOrReplaceTempView("avg_retweetsSQL")

### 3.4 Engagement rate

We define engagement rate of a tweet as the sum of likes and retweets divided by the amount of followers of the account that sent out the tweet. For our purpose we will take the avergage engagement rate per day. We exclude accounts who have no followers and we only take tweets into account which are liked and retweeted at least once.

In [104]:
# select the relevant data
avg_engagement_rate = spark.sql("SELECT DATE_FORMAT(post_created_at, 'Y-M-dd') as date, AVG(engagement_rate) as avg_engagement_rate \
                                     FROM (  SELECT screen_name, post_created_at, (favorite_count+retweet_count)/followers_count as engagement_rate \
                                             FROM twitterSQL \
                                             WHERE favorite_count > 0 AND retweet_count > 0 AND followers_count > 0 ) \
                                     GROUP BY DATE_FORMAT(post_created_at, 'Y-M-dd') \
                                     ORDER BY DATE_FORMAT(post_created_at, 'Y-M-dd')")

In [105]:
# show
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
avg_engagement_rate.show()

+----------+--------------------+
|      date| avg_engagement_rate|
+----------+--------------------+
|2021-10-28|0.020606728893851515|
|2021-10-29| 0.07340379042367053|
|2021-10-30| 0.17919702575767021|
|2021-10-31| 0.08996355791000546|
|2021-11-01| 0.20946879040003968|
|2021-11-02| 0.03199078134961682|
|2021-11-03| 0.08867414385733928|
|2021-11-04| 0.08067982230339633|
|2021-11-05| 0.05866521462317866|
|2021-12-06|7.808669607732365E-4|
|2021-12-08|0.007915940641805837|
|2021-12-09| 0.28033717603505437|
|2021-12-10| 0.02277622745914704|
|2021-12-11| 0.14491531461862578|
|2021-12-12| 0.07350065473576302|
|2021-12-13| 0.15918884782728934|
|2021-12-14| 0.03254682747088554|
| 2022-2-11|0.024072641042102536|
| 2022-2-12| 0.09974280721272229|
| 2022-2-13| 0.04839131603550477|
+----------+--------------------+
only showing top 20 rows



In [106]:
# create SQL view
avg_engagement_rate.createOrReplaceTempView("avg_engagement_rateSQL")

### 3.5 Number of influencers

We will calculate how many influencers actively tweeted a certain day. We define an influencer as someone with:
- followers > 1000 
- engagement_rate > 0.20 
- weekly tweet frequency > 5

In [107]:
def get_influencers(follower_count_tresh, eng_rate_tresh, freq_week_tresh, data):

    #df
    df = data
    
    # get all users with their amount of followers
    influencers = df.groupBy("screen_name") \
                    .agg(first("followers_count").alias("followers_count"))

    # average engagement rate for each user
    eng_rate = df.withColumn('eng_rate', ((df['favorite_count'] + df['retweet_count'])/df['followers_count']))

    eng_rate_user = eng_rate.groupBy("screen_name") \
                            .agg(avg("eng_rate").alias("eng_rate"))

    # average freq_weekly per user
    freq_week = df.withColumn("year", year(df["post_created_at"]))
    freq_week = freq_week.withColumn('week', weekofyear('post_created_at'))

    freq_week = freq_week.groupBy('screen_name', 'year', 'week').agg(countDistinct("full_text"))\
                    .withColumnRenamed("count(full_text)", "freq") \
                        .sort('screen_name', 'year', 'week', ascending = True)
    freq_week = freq_week.select('screen_name', 'freq')

    freq_week = freq_week.groupby("screen_name").agg(avg(freq_week.freq).alias('freq'))

    # put the data together
    data_joined = eng_rate_user.join(influencers, "screen_name").join(freq_week, "screen_name")

    # filter the data
    data_joined = data_joined.filter((data_joined.followers_count > follower_count_tresh) & (data_joined.eng_rate > eng_rate_tresh) & (data_joined.freq > freq_week_tresh))
    
    # show the data
    data_joined.show()
    return data_joined

In [108]:
influencers = get_influencers(1000, 0.002, 2, final_twitter_df)

+---------------+--------------------+---------------+------------------+
|    screen_name|            eng_rate|followers_count|              freq|
+---------------+--------------------+---------------+------------------+
| _Alex_Greenway|0.009057542454856534|           1693|2.3333333333333335|
|   chefmompiche|0.002041559397617...|           1216|               3.0|
|    vivaluvegan| 0.07332462784595821|           3907|               3.0|
|     wiservegan| 0.06963865224637812|           1020|               3.0|
|   DarrenLong71|0.013008270573555896|           2217|             3.125|
| PlantBasedGent|0.002493369018065403|           1128|              21.6|
|      VeganGuys|0.004872359357709762|           3975|              2.75|
|  nolancharlene|0.003874202370100...|           1097|               4.0|
|         innkyo| 0.03430053334605656|           1268| 2.272727272727273|
|FearOfTheDuck74| 0.11069032095018164|           1255| 3.357142857142857|
|LornaMa03249374|0.014224769730596533|

In [109]:
# create SQL view
influencers.createOrReplaceTempView("influencersSQL")

In [110]:
# select the relevant data
number_of_influencers = spark.sql(" SELECT DATE_FORMAT(a.post_created_at, 'Y-M-dd') as date, COUNT(b.screen_name) as influencers \
                                    FROM twitterSQL a \
                                    RIGHT OUTER JOIN influencersSQL b ON a.screen_name = b.screen_name\
                                    GROUP BY DATE_FORMAT(post_created_at, 'Y-M-dd') \
                                    ORDER BY DATE_FORMAT(post_created_at, 'Y-M-dd')")

In [111]:
# show
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
number_of_influencers.show()

+----------+-----------+
|      date|influencers|
+----------+-----------+
|2021-10-28|         11|
|2021-10-29|          9|
|2021-10-30|          5|
|2021-10-31|         10|
|2021-11-01|         16|
|2021-11-02|         17|
|2021-11-03|         18|
|2021-11-04|         12|
|2021-11-05|          9|
|2021-12-07|          1|
|2021-12-08|          1|
|2021-12-09|         21|
|2021-12-10|         13|
|2021-12-11|          5|
|2021-12-12|         10|
|2021-12-13|         11|
|2021-12-14|         14|
| 2022-2-11|          1|
| 2022-2-12|         10|
| 2022-2-13|         18|
+----------+-----------+
only showing top 20 rows



In [112]:
# create SQL view
number_of_influencers.createOrReplaceTempView("number_of_influencersSQL")

## 4. Basetable

In [123]:
# create basetable
basetable = spark.sql("SELECT a.date, a.dependent_vegan, b.tweet_volume, COALESCE(c.avg_likes,0) as avg_likes, \
                       COALESCE(d.avg_retweets,0) as avg_retweets, \
                       COALESCE(e.avg_engagement_rate,0) as avg_engagement_rate, COALESCE(f.influencers,0) as influencers \
                       FROM trendSQL a \
                       INNER JOIN tweet_volumeSQL b ON a.date = b.date \
                       LEFT OUTER JOIN avg_likesSQL c ON b.date = c.date \
                       LEFT OUTER JOIN avg_retweetsSQL d ON c.date = d.date \
                       LEFT OUTER JOIN avg_engagement_rateSQL e ON d.date = e.date \
                       LEFT OUTER JOIN number_of_influencersSQL f ON e.date = f.date")

In [124]:
# show
basetable.show(50)

+----------+---------------+------------+------------------+------------------+--------------------+-----------+
|      date|dependent_vegan|tweet_volume|         avg_likes|      avg_retweets| avg_engagement_rate|influencers|
+----------+---------------+------------+------------------+------------------+--------------------+-----------+
|2021-11-03|              1|         182|6.8173076923076925|              2.12| 0.08867414385733928|         18|
|2021-12-06|              0|           4| 9.666666666666666|               4.5|7.808669607732365E-4|          0|
|2021-12-08|              1|          39| 8.192307692307692|2.6153846153846154|0.007915940641805837|          1|
|2021-11-02|              0|         178|               5.0|1.9523809523809523| 0.03199078134961682|         17|
|2021-12-13|              0|         190| 6.813186813186813|2.0317460317460316| 0.15918884782728934|         11|
|2021-10-27|              0|           1|               0.0|               0.0|                 

In [126]:
# import the required functions
from pyspark.ml.feature import Binarizer, StringIndexer, VectorIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.types import DoubleType

In [131]:
# define string indexer to index price 
SI = StringIndexer(inputCol = 'dependent_vegan', outputCol = 'label')

# define vector assembler for numeric variables
numColumns = ['avg_likes','avg_retweets','avg_engagement_rate','influencers']
VAnum = VectorAssembler(inputCols=numColumns, outputCol="numFeatures")

In [132]:
# define pipeline stages
stages = [SI, VAnum]
# define pipeline and fit on data
preprocessingPipeline = Pipeline().setStages(stages).fit(basetable)
# apply pipeline on data
basetable = preprocessingPipeline.transform(basetable)

In [133]:
# select features and labels
basetable = basetable.select(["numFeatures", "label"])

In [134]:
# check
basetable.show(5)

+--------------------+-----+
|         numFeatures|label|
+--------------------+-----+
|[6.81730769230769...|  1.0|
|[9.66666666666666...|  0.0|
|[8.19230769230769...|  1.0|
|[5.0,1.9523809523...|  0.0|
|[6.81318681318681...|  0.0|
+--------------------+-----+
only showing top 5 rows



**Logistic Regression**
- Split the data in a train en test set (70/30).
- Build one pipeline that:
  - standardizes the numerical variables
  - applies a logistic regression to the data
  - check the performance using the AUC.

In [135]:
# split data in train and test set
train, test = basetable.randomSplit([0.70, 0.30])

In [136]:
# check number of observations in train and test set
print(train.count())
print(test.count())

12
8


In [138]:
# inspect distribution of label in train and test set
basetable.groupBy("label").count().show()
train.groupBy("label").count().show()
test.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0|   13|
|  1.0|    7|
+-----+-----+

+-----+-----+
|label|count|
+-----+-----+
|  0.0|    6|
|  1.0|    6|
+-----+-----+

+-----+-----+
|label|count|
+-----+-----+
|  0.0|    7|
|  1.0|    1|
+-----+-----+



In [139]:
# import required features
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [142]:
# define scaler
SS = StandardScaler(inputCol = 'numFeatures', outputCol = 'scaledNumFeatures', withStd = True, withMean = False)

# define vector assembler
VA = VectorAssembler(inputCols = ['scaledNumFeatures'], outputCol = 'features')

# define logistic regression model
LR = LogisticRegression(labelCol = 'label', featuresCol = 'features', maxIter = 10)

In [143]:
# define pipeline stages
stages = [SS, VA, LR]
# create pipeline and fit on training set
lrModelPipeline = Pipeline().setStages(stages).fit(train)
# apply pipeline on test set to get predictions
predictions = lrModelPipeline.transform(test)

In [144]:
# inspect predictions
predictions.show(5)

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|         numFeatures|label|   scaledNumFeatures|            features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|           (4,[],[])|  0.0|           (4,[],[])|           (4,[],[])|[8.77354943982320...|[0.99984525070483...|       0.0|
|   [1.5,1.0,0.0,0.0]|  0.0|[0.50169269403248...|[0.50169269403248...|[8.04133983171041...|[0.99967822617218...|       0.0|
|[4.40825688073394...|  0.0|[1.47439351365511...|[1.47439351365511...|[2.76031739599873...|[0.94049339975351...|       0.0|
|[5.0,1.9523809523...|  0.0|[1.67230898010829...|[1.67230898010829...|[3.75278600554792...|[0.97708509122137...|       0.0|
|[5.26153846153846...|  0.0|[1.75978360368319...|[1.75978360368319...|[3.46122900384395...|[0.96956425485154...|       0.0|
+-------

In [145]:
# define evaluator
evaluator = BinaryClassificationEvaluator()
# get evaluation metric
lrAUC = evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderROC'})
# inspect model performance
print('AUC lr: %f' %(lrAUC))

AUC lr: 0.857143
