In [0]:
# read all csv files into a single RDD
import os.path

# reads all filenames that match the patter. Currently only two within the DBFS, 0819 and 0820
files = '/FileStore/tables/*_UkraineCombinedTweetsDeduped.csv'
tweets = spark.read.csv(files,header=True,sep=",",multiLine=True)

In [0]:
# remove columns not needed
dropped_tweets = tweets.drop("original_tweet_id", \
                            "original_tweet_userid", \
                            "original_tweet_username", \
                            "in_reply_to_status_id", \
                            "in_reply_to_user_id", \
                            "in_reply_to_screen_name", \
                            "quoted_status_id", \
                            "quoted_status_userid", \
                            "quoted_status_username", \
                            "extractedts")

# dropped_tweets.show(1)

dropped_tweets.first()['text']

Out[44]: 'Dear vaccine advocate\n\nDo take the COVID19 mRNA shot and boosters, but do know that @OurWorldInData data shows it offers zero protection, actually accelerates death of vaccinated.\n\nRegards\n#Pfizer #AstraZeneca #Moderna #NWO #Agenda2030 #COP27 #Biden #Obama #Trudeau #Jacinda #life https://t.co/VTbfuqiDvu'

In [0]:
# get only english tweets
english_tweets = dropped_tweets.filter(dropped_tweets.language == "en" )
# english_tweets.show(5)

In [0]:
# get only non-retweets
non_rt_tweets = english_tweets.filter(english_tweets.is_retweet == "False")
# non_rt_tweets.show(5)

In [0]:
%sh pip install textblob

You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-0ca6db4e-da87-4859-819c-1a4b7b669c4b/bin/python -m pip install --upgrade pip' command.


In [0]:
%sh pip install vaderSentiment

You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-0ca6db4e-da87-4859-819c-1a4b7b669c4b/bin/python -m pip install --upgrade pip' command.


In [0]:
from pyspark.sql.functions import col, udf, to_date, least, lit, log
from pyspark.sql.types import FloatType
from textblob import TextBlob
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

# get average number of tweets per day from each user

# make copy of dataframe and convert each tweet created time stamp into a time stamp object
df = non_rt_tweets
df = df.withColumn("date", to_date(col("tweetcreatedts")))

# create window partitioned by user and ordered by date
window = Window.partitionBy("username").orderBy("date")

# Calculate the number of tweets per day for each user
df = df.withColumn("daily_tweet_count", F.count("text").over(window))

# calculate average sentiment for each user
analyzer = SentimentIntensityAnalyzer()
def get_positive_score(tweet):
    vs = analyzer.polarity_scores(tweet)
    return vs['pos']

def get_negative_score(tweet):
    vs = analyzer.polarity_scores(tweet)
    return vs['neg']

# Register the sentiment analysis function as a Spark UDF
sentiment_analysis_pos_udf = udf(get_positive_score, FloatType())
sentiment_analysis_neg_udf = udf(get_negative_score, FloatType())

# Apply sentiment analysis to each tweet and create a new column
df = df.withColumn("positivity", sentiment_analysis_pos_udf(col("text")))
df = df.withColumn("negativity", sentiment_analysis_neg_udf(col("text")))

# Calculate the average sentimentality for each user and the average number of tweets per day
avg_tweet_sentimentality_and_volume = df.groupBy("username").agg(
    # ((F.avg("sentiment_score") + 1) * 50 * 2).alias("avg_sentimentality"),
    F.avg("daily_tweet_count").alias("avg_tweets_per_day"),
    (F.avg("positivity") * 300).alias("avg_positivity"),
    (F.avg("negativity") * 300).alias("avg_negativity")
)
avg_tweet_sentimentality_and_volume = avg_tweet_sentimentality_and_volume.withColumn("avg_tweets_per_day", log(1.5, (col("avg_tweets_per_day") + 1)))

# Calculate the mean and stddev foravg_tweets_per_day
avg_tweets_mean = avg_tweet_sentimentality_and_volume.agg({"avg_tweets_per_day": "mean"}).collect()[0][0]
avg_tweets_stddev = avg_tweet_sentimentality_and_volume.agg({"avg_tweets_per_day": "stddev"}).collect()[0][0]

constant_column = lit(9)

avg_tweet_sentimentality_and_volume = avg_tweet_sentimentality_and_volume.withColumn("avg_tweets_per_day_zscore", (col("avg_tweets_per_day") - avg_tweets_mean) / avg_tweets_stddev)
avg_tweet_sentimentality_and_volume = avg_tweet_sentimentality_and_volume.withColumn("avg_tweets_per_day_normalized", ((least(col("avg_tweets_per_day_zscore"), constant_column) + 3) / 6)*70)

avg_tweet_sentimentality_and_volume = avg_tweet_sentimentality_and_volume.drop("avg_tweets_per_day_zscore", "avg_tweets_per_day")

# avg_tweet_sentimentality_and_volume.display()

In [0]:
# Normalize following and followers
from pyspark.sql.functions import col, mean, count, sum, expr, least, lit, unix_timestamp, current_timestamp, log
from pyspark.sql.types import LongType

current_time = current_timestamp()

# Make sure you operate on unique users, in case of duplicates get the average for the user
df_unique_users = english_tweets.groupBy("username", "usercreatedts").agg(
    sum((col("is_quote_status") == "True").cast("int")).alias("is_quote_status_true_count"),
    sum((col("is_quote_status") == "False").cast("int")).alias("is_quote_status_false_count"),
    mean("followers").alias("followers"),
    mean("following").alias("following"),
    count("username").alias("tweet_count"),
    (unix_timestamp(current_time) - unix_timestamp(expr("substring(usercreatedts, 1, 19)"))).cast(LongType()).alias("account_age")
)

df_unique_users = df_unique_users.withColumn("followers", log(10.0, col("followers")))
df_unique_users = df_unique_users.withColumn("following", log(7.0, col("following")))
df_unique_users = df_unique_users.withColumn("tweet_count", log(2.0, col("tweet_count")))

# Calculate the mean and stddev for followers and following
followers_mean = df_unique_users.agg({"followers": "mean"}).collect()[0][0]
followers_stddev = df_unique_users.agg({"followers": "stddev"}).collect()[0][0]
following_mean = df_unique_users.agg({"following": "mean"}).collect()[0][0]
following_stddev = df_unique_users.agg({"following": "stddev"}).collect()[0][0]

# Calculate the mean and stddev for account age
account_age_mean = df_unique_users.agg({"account_age": "mean"}).collect()[0][0]
account_age_stddev = df_unique_users.agg({"account_age": "stddev"}).collect()[0][0]

# Calculate the mean and stddev for tweet count
tweet_count_mean = df_unique_users.agg({"tweet_count": "mean"}).collect()[0][0]
tweet_count_stddev = df_unique_users.agg({"tweet_count": "stddev"}).collect()[0][0]

# Calculate the z_scores for each entry
df_unique_users = df_unique_users.withColumn("followers_zscore", (col("followers") - followers_mean) / followers_stddev)
df_unique_users = df_unique_users.withColumn("following_zscore", (col("following") - following_mean) / following_stddev)

df_unique_users = df_unique_users.withColumn("account_age_zscore", (col("account_age") - account_age_mean) / account_age_stddev)

df_unique_users = df_unique_users.withColumn("tweet_count_zscore", (col("tweet_count") - tweet_count_mean) / tweet_count_stddev)

# Use lit to create a constant column with the integer value
# (constant_column + 3) / 6 *100
constant_column = lit(9)

# Calculate normalized values
# 99.7% of the data lie within 3 standard deviations of the mean, these values will be between 0-100, outliers will be outside this range
df_unique_users = df_unique_users.withColumn("followers_normalized", ((least(col("followers_zscore"), constant_column) + 3) / 6)*100)
df_unique_users = df_unique_users.withColumn("following_normalized", ((least(col("following_zscore"), constant_column) + 3) / 6)*100)

df_unique_users = df_unique_users.withColumn("account_age_normalized", ((least(col("account_age_zscore"), constant_column) + 3) / 6)*100)
df_unique_users = df_unique_users.withColumn("tweet_count_normalized", ((least(col("tweet_count_zscore"), constant_column) + 3) / 6)*100)

# Unique users
df_unique_users = df_unique_users.withColumn("percentage_of_quotes", 100*(col("is_quote_status_true_count") / (col("is_quote_status_true_count") + col("is_quote_status_false_count"))))

# Drop unnecessary columns
df_normalized_follow = df_unique_users.drop("followers_zscore", "following_zscore", "followers", "following", "is_quote_status_true_count", "is_quote_status_false_count", "usercreatedts", "account_age_zscore", "account_age", "tweet_count", "tweet_count_zscore")

# display(df_normalized_follow)

In [0]:
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.window import Window
from pyspark.sql import functions as F
import re

#Returns Hashtags within a string as an array
def extractHastags(tweet):
    return re.findall(r"#(\w+)",tweet)


#Resgister extractHastags as a Spark UDF
extract_hashtags_udf = F.udf(extractHastags, ArrayType(StringType()))

#Drop unused columns
hashtag_df = english_tweets.drop("acctdesc", \
                                "location", \
                                "following", \
                                "followers", \
                                "totaltweets", \
                                "usercreatedts", \
                                "tweetcreatedts", \
                                "retweetcount", \
                                "language", \
                                "coordinates", \
                                "favorite_count", \
                                "is_retweet", \
                                "is_quote_status", \
                                "_c0", \
                                "userid", \
                                "tweetid", \
                                )

#Map Hashtags to users
hashtag_df = hashtag_df.withColumn("hashtags", extract_hashtags_udf(col("text")))

#Explode hastag list
hashtag_df = hashtag_df.select("username", F.explode("hashtags").alias("hashtag"))

#Count the number of each hashtag by user
grouped_hashtag_df = hashtag_df.groupBy("username", "hashtag").count()

#Calculate the max number of one hashtag for each user#
result_hashtag_df = grouped_df.groupBy("username").agg(
    log(2.0, F.max("count")).alias("hashtags")
).orderBy("username")

hashtag_mean = result_hashtag_df.agg({"hashtags": "mean"}).collect()[0][0]
hashtag_stddev = result_hashtag_df.agg({"hashtags": "stddev"}).collect()[0][0]

result_hashtag_df = result_hashtag_df.withColumn("hashtags_zscore", (col("hashtags") - hashtag_mean) / hashtag_stddev)
result_hashtag_df = result_hashtag_df.withColumn("hashtags_normalized", ((least(col("hashtags_zscore"), constant_column) + 3) / 6)*100)
result_hashtag_df = result_hashtag_df.drop("hashtags_zscore", "hashtags")
#Display preview
# result_hashtag_df.display()

In [0]:
# df_final_non_rt = avg_tweet_volume.join(avg_tweet_sentimentality, on="username", how="inner")
# df_final = df_final_non_rt.join(df_normalized_follow, on="username", how="left").join(df_quote_percentage, on="username", how="inner").join(df_account_age, on="username", how="inner").join(username_counts, on="username", how="inner")
df_final = avg_tweet_sentimentality_and_volume.join(df_normalized_follow, on="username", how="left").join(result_hashtag_df, on="username", how="inner")
# display(df_final)

In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

selected_columns = [col for col in df_final.columns if col not in ['username', 'userid']]
assembler = VectorAssembler(inputCols=selected_columns, outputCol="features")
df_assembled = assembler.transform(df_final)

# Specify the number of clusters (k)
k = 8

# Create a KMeans instance
kmeans = KMeans().setK(k).setSeed(1)

# Fit the model to the assembled DataFrame
model = kmeans.fit(df_assembled)

# Get the cluster centers
centers = model.clusterCenters()

# Predict the cluster for each data point
predictions = model.transform(df_assembled)

# Display the results
print("Cluster Centers:")
for index, center in enumerate(centers):
    print(index, " ", center)

print("Predictions:")
# predictions.select("username", "features", "prediction").display()


Cluster Centers:
0   [79.4582301   9.37295388 31.12492265 50.31629143 47.50125698 50.02783938
 44.16757506  0.45206337 44.22859348]
1   [  9.75376011 101.75124524  31.28379159  53.80605094  51.90431381
  51.38576525  44.45289773   4.09480885  44.99443181]
2   [22.67255065 26.58061289 57.09833993 73.8459879  48.0474814  48.50011235
 81.62421914  8.22598116 81.26155144]
3   [16.48347323 35.95984086 33.90163911 52.8513481  55.27651539 53.29812745
 48.82990557 92.09263685 48.26641704]
4   [ 29.85606252  25.38780931  37.39058825  64.68661733 200.
  39.19916836  53.35428681   2.64267172  53.58557445]
5   [19.17825515 48.33130063 32.4104372  46.16476758 48.42624243 48.06500434
 46.58622082  0.79323226 46.63354119]
6   [13.71288403  5.38467493 31.19247773 50.24374736 50.13561785 50.34012812
 44.44514756  0.56561086 44.70452818]
7   [86.06376644 10.26917972 31.34332314 54.08307502 57.38662859 54.89789563
 44.77784645 94.97874396 44.7525081 ]
Predictions:
