In [1]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, Word2Vec, StringIndexer, IndexToString, Tokenizer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder 
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql import functions as F
from wordcloud import WordCloud
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import datetime

ModuleNotFoundError: No module named 'wordcloud'

In [2]:
def displayPartitions(df):
    #get the number of records by partition
    num = df.rdd.getNumPartitions()
    print("Total number of Partitions:", num)
    df.withColumn("partitionId", F.spark_partition_id())\
        .groupBy("partitionId")\
        .count()\
        .orderBy(F.asc("count"))\
        .show(num)


In [3]:
spark = SparkSession.builder.appName('Spark-Tweet-Sentiment-Analysis').getOrCreate()

:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-666281f9-0339-4af9-a656-e6945de17585;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;4.4.0 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.828 in central
	found com.github.universal-automata#liblevenshtein;3.0.0 in central
	found com.google.protobuf#protobuf-java-util;3.0.0-beta-3 in central
	found com.google.protobuf#protobuf-java;3.0.0-beta-3 in central
	found com.google.code.gson#gson;2.3 in central
	found it.unimi.dsi#fastutil;7.0.12 in central
	found org.projectlombok#lombok;1.16.8 in central
	found com.google.cloud#google-cloud-storage;2.16.0 in central
	found com.google.guava#guava;31.1-jre in centra

In [4]:
#df = spark.read.json("gs://dataproc-staging-us-central1-575311154882-olmcvswv/notebooks/jupyter/")

df = spark.read.json("gs://bigdatafinaldatakira/covid  tweets/")
print(f"Reading {df.count()} tweets in total.")

23/05/22 13:57:50 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

Reading 1442293 tweets in total.


                                                                                

## Word Cloud Generate

In [5]:
twitter_texts = df.select("text")
twitter_texts.show(5)

+--------------------+
|                text|
+--------------------+
|RT @BPEricAdams: ...|
|RT @realDonaldTru...|
|Hugging during Co...|
|RT @NPR: Maryland...|
|RT @DrW0mbat: Cor...|
+--------------------+
only showing top 5 rows



In [6]:
twitter_texts = twitter_texts.na.drop(subset=["text"])

# delete @mentions
twitter_texts = twitter_texts.withColumn('text', F.regexp_replace(F.col('text'), '@\S+', ''))

# delete RTs
twitter_texts = twitter_texts.withColumn('text', F.regexp_replace(F.col('text'), 'RT', ''))

# delete amp
twitter_texts = twitter_texts.withColumn('text', F.regexp_replace(F.col('text'), 'amp', ''))

# tolower
twitter_texts = twitter_texts.withColumn("text", F.lower(F.col("text")))



# Keep only alpha & num
twitter_texts = twitter_texts.withColumn("text", F.regexp_replace(F.col("text"), "[^a-zA-Z0-9\s]", ""))

# Remove unnecessary spaces
twitter_texts = twitter_texts.withColumn("text", F.trim(F.col("text")))

# Remove empty text
twitter_texts = twitter_texts.filter(F.length(F.trim(twitter_texts.text)) > 0)

# Show
twitter_texts.show(5)

+--------------------+
|                text|
+--------------------+
|the  is an integr...|
|the fake news kno...|
|hugging during co...|
|maryland is repor...|
|coronavirus in sc...|
+--------------------+
only showing top 5 rows



In [7]:
# Tokenization
tokenizer = Tokenizer(inputCol="text", outputCol="words")

# Stop words removal
remover = StopWordsRemover(inputCol="words", outputCol="filtered")

In [8]:
word_cloud_pipeline = Pipeline(stages=[tokenizer, remover])
twitter_texts = word_cloud_pipeline.fit(twitter_texts).transform(twitter_texts)
twitter_texts = twitter_texts.select("filtered")
twitter_texts.show(5)

[Stage 7:>                                                          (0 + 1) / 1]

+--------------------+
|            filtered|
+--------------------+
|[, integral, comp...|
|[fake, news, know...|
|[hugging, corona,...|
|[maryland, report...|
|[coronavirus, sco...|
+--------------------+
only showing top 5 rows



                                                                                

In [9]:
# convert to pandas DataFrame for word cloud generation
pandas_df = twitter_texts.toPandas()
pandas_df.head()

                                                                                

Unnamed: 0,filtered
0,"[, integral, component, transportation, networ..."
1,"[fake, news, knows, thanks, katie, httpstcobub..."
2,"[hugging, corona, httpstcogijgmnahl3]"
3,"[maryland, reporting, 1784, newly, confirmed, ..."
4,"[coronavirus, scotland, garden, centres, plead..."


In [None]:
from sklearn.feature_extraction.text import CountVectorizer
# Initialize an empty dictionary to store the total word frequencies
total_frequencies = {}

# Initialize a CountVectorizer to extract word frequencies
vectorizer = CountVectorizer()
i = 0
# Loop over chunks of the DataFrame
for chunk in np.array_split(pandas_df, 10):
    print(f"Reading chunk {i}")
    # Adjust number of chunks as per memory capacity
    # Get the text from this chunk
    text = ' '.join([' '.join(row.filtered) for index, row in chunk.iterrows()])
    
    # Calculate word frequencies for this chunk
    frequencies = vectorizer.fit_transform([text]).toarray().ravel()
    words = vectorizer.get_feature_names()

    # Add the word frequencies of this chunk to the total frequencies
    for word, freq in zip(words, frequencies):
        total_frequencies[word] = total_frequencies.get(word, 0) + freq
    
    i += 1

# Generate the wordcloud
wordcloud = WordCloud(width = 1000, height = 800).generate_from_frequencies(total_frequencies)

# plot the WordCloud image                       
plt.figure(figsize = (8, 8), facecolor = None) 
plt.imshow(wordcloud) 
plt.axis("off") 
plt.tight_layout(pad = 0) 
plt.show()  


Reading chunk 0
Reading chunk 1
Reading chunk 2
Reading chunk 3
Reading chunk 4
Reading chunk 5
Reading chunk 6
Reading chunk 7


## Sentiment Analysis

In [5]:
df_labeled_data = spark.read.csv("gs://bigdatafinaldatakira/Sentimentdata.csv"
                         ,header=True, inferSchema=True)
df_labeled_data.show(5)

                                                                                

+----+---------+--------------------+
|  id|sentiment|                text|
+----+---------+--------------------+
|3204|      sad|agree the poor in...|
|1431|      joy|if only i could h...|
| 654|      joy|will nature conse...|
|2530|      sad|"coronavirus disa...|
|2296|      sad|uk records lowest...|
+----+---------+--------------------+
only showing top 5 rows



In [6]:
df_labeled_data = df_labeled_data.cache()

In [7]:
df_labeled_data.count()

3090

In [8]:
displayPartitions(df_labeled_data)

Total number of Partitions: 1
+-----------+-----+
|partitionId|count|
+-----------+-----+
|          0| 3090|
+-----------+-----+



In [9]:
df_labeled_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- text: string (nullable = true)



In [10]:
# Tokenization
tokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")

# Stop words removal
remover = StopWordsRemover(inputCol="words", outputCol="filtered")

# Word2Vec
word2Vec = Word2Vec(vectorSize=100, minCount=2, inputCol="filtered", outputCol="features")

# Indexer
indexer = StringIndexer(inputCol="sentiment", outputCol="label")

In [11]:
# Build the NLP preprocessing pipeline
nlp_pipeline = Pipeline(stages=[tokenizer, remover, word2Vec, indexer])
nlp_model = nlp_pipeline.fit(df_labeled_data)
processed_data = nlp_model.transform(df_labeled_data)
processed_data.show(5)

23/05/22 13:59:47 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/05/22 13:59:47 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


+----+---------+--------------------+--------------------+--------------------+--------------------+-----+
|  id|sentiment|                text|               words|            filtered|            features|label|
+----+---------+--------------------+--------------------+--------------------+--------------------+-----+
|3204|      sad|agree the poor in...|[agree, the, poor...|[agree, poor, ind...|[-0.0123396027047...|  1.0|
|1431|      joy|if only i could h...|[if, only, i, cou...|[spent, cutie, vc...|[-0.0019741141702...|  3.0|
| 654|      joy|will nature conse...|[will, nature, co...|[nature, conserva...|[-0.0118956502733...|  3.0|
|2530|      sad|"coronavirus disa...|[coronavirus, dis...|[coronavirus, dis...|[-0.0166720755223...|  1.0|
|2296|      sad|uk records lowest...|[uk, records, low...|[uk, records, low...|[-0.0230484596368...|  1.0|
+----+---------+--------------------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [12]:
lr = LogisticRegression()

# Build the classification pipeline using Logistic Regression
pipeline = Pipeline(stages=[lr])

# Find the best hyperparameter
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.001, 0.01, 0.05, 0.1]) \
    .addGrid(lr.maxIter, [30, 50, 100])\
    .build()

# Do cross validation
crossval = CrossValidator(estimator = pipeline,
                          estimatorParamMaps = paramGrid,
                          evaluator = MulticlassClassificationEvaluator(),
                          numFolds = 5)  # use 5 folds

# Split data into train and test
train, test = processed_data.randomSplit([0.8, 0.2], seed=13)

# Run cross-validation, and choose the best set of parameters.
lrCvModel = crossval.fit(train)

                                                                                

In [13]:
predictions = lrCvModel.transform(test)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

# Get the best model
bestModel = lrCvModel.bestModel

# Assuming the LogisticRegression stage is the last one in the pipeline
best_lr_model = bestModel.stages[-1]  

print("Best regularization parameter: ", best_lr_model._java_obj.getRegParam())

Test set accuracy = 0.5380794701986755
Best regularization parameter:  0.001


In [14]:
columns_to_keep = ["created_at", "id", "retweet_count", "favorite_count", "text"]
df = df.select(*columns_to_keep)
df.show(5)

+--------------------+-------------------+-------------+--------------+--------------------+
|          created_at|                 id|retweet_count|favorite_count|                text|
+--------------------+-------------------+-------------+--------------+--------------------+
|Wed Jul 01 15:10:...|1278345250428370944|            0|             1|@davidmweissman Y...|
|Wed Jul 01 15:10:...|1278345250684182528|           94|             0|RT @AdoptionsUk: ...|
|Wed Jul 01 15:10:...|1278345250071666688|         3700|             0|RT @davidplouffe:...|
|Wed Jul 01 15:10:...|1278345250549923840|        93417|             0|RT @lookitstaylor...|
|Wed Jul 01 15:10:...|1278345249782239233|         4037|             0|RT @RepSwalwell: ...|
+--------------------+-------------------+-------------+--------------+--------------------+
only showing top 5 rows



In [15]:
# delete @mentions
df = df.withColumn('text', F.regexp_replace(F.col('text'), '@\S+', ''))

# delete RTs
df = df.withColumn('text', F.regexp_replace(F.col('text'), 'RT', ''))
df.show(5)

+--------------------+-------------------+-------------+--------------+--------------------+
|          created_at|                 id|retweet_count|favorite_count|                text|
+--------------------+-------------------+-------------+--------------+--------------------+
|Wed Jul 01 15:10:...|1278345250428370944|            0|             1| Yes. I read SPY ...|
|Wed Jul 01 15:10:...|1278345250684182528|           94|             0|  Happy news Silv...|
|Wed Jul 01 15:10:...|1278345250071666688|         3700|             0|  COVID-19 may ha...|
|Wed Jul 01 15:10:...|1278345250549923840|        93417|             0|  Shaming people ...|
|Wed Jul 01 15:10:...|1278345249782239233|         4037|             0|  I’m so angry wi...|
+--------------------+-------------------+-------------+--------------+--------------------+
only showing top 5 rows



In [16]:
df = df.withColumn("text", df["text"].cast(StringType()))

# remove na
df = df.na.drop(subset=["text"])

# tolower
df = df.withColumn("text", F.lower(F.col("text")))

# Keep only alpha & num
df = df.withColumn("text", F.regexp_replace(F.col("text"), "[^a-zA-Z0-9\s]", ""))

# Remove unnecessary spaces
df = df.withColumn("text", F.trim(F.col("text")))

# Remove empty text
df = df.filter(F.length(F.trim(df.text)) > 0)

# Show
df.show(5)

+--------------------+-------------------+-------------+--------------+--------------------+
|          created_at|                 id|retweet_count|favorite_count|                text|
+--------------------+-------------------+-------------+--------------+--------------------+
|Wed Jul 01 15:10:...|1278345250428370944|            0|             1|yes i read spy ma...|
|Wed Jul 01 15:10:...|1278345250684182528|           94|             0|happy news silver...|
|Wed Jul 01 15:10:...|1278345250071666688|         3700|             0|covid19 may have ...|
|Wed Jul 01 15:10:...|1278345250549923840|        93417|             0|shaming people fo...|
|Wed Jul 01 15:10:...|1278345249782239233|         4037|             0|im so angry with ...|
+--------------------+-------------------+-------------+--------------+--------------------+
only showing top 5 rows



In [17]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")


# first transform created_at column to unix timestamp and then get the correct time_stamp
df = df.withColumn(
    "created_at", 
    F.to_timestamp(F.unix_timestamp(df["created_at"], "EEE MMM dd HH:mm:ss Z yyyy"))
)

df.show(5)

+-------------------+-------------------+-------------+--------------+--------------------+
|         created_at|                 id|retweet_count|favorite_count|                text|
+-------------------+-------------------+-------------+--------------+--------------------+
|2020-07-01 15:10:32|1278345250428370944|            0|             1|yes i read spy ma...|
|2020-07-01 15:10:32|1278345250684182528|           94|             0|happy news silver...|
|2020-07-01 15:10:32|1278345250071666688|         3700|             0|covid19 may have ...|
|2020-07-01 15:10:32|1278345250549923840|        93417|             0|shaming people fo...|
|2020-07-01 15:10:32|1278345249782239233|         4037|             0|im so angry with ...|
+-------------------+-------------------+-------------+--------------+--------------------+
only showing top 5 rows



In [18]:
df = df.withColumn("retweet_count", df["retweet_count"].cast(IntegerType()))
df = df.withColumn("favorite_count", df["favorite_count"].cast(IntegerType()))
df.show(5)
df.printSchema()

+-------------------+-------------------+-------------+--------------+--------------------+
|         created_at|                 id|retweet_count|favorite_count|                text|
+-------------------+-------------------+-------------+--------------+--------------------+
|2020-07-01 15:10:32|1278345250428370944|            0|             1|yes i read spy ma...|
|2020-07-01 15:10:32|1278345250684182528|           94|             0|happy news silver...|
|2020-07-01 15:10:32|1278345250071666688|         3700|             0|covid19 may have ...|
|2020-07-01 15:10:32|1278345250549923840|        93417|             0|shaming people fo...|
|2020-07-01 15:10:32|1278345249782239233|         4037|             0|im so angry with ...|
+-------------------+-------------------+-------------+--------------+--------------------+
only showing top 5 rows

root
 |-- created_at: timestamp (nullable = true)
 |-- id: long (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- favorite_count

In [19]:
# Check the types
df.printSchema()

root
 |-- created_at: timestamp (nullable = true)
 |-- id: long (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- favorite_count: integer (nullable = true)
 |-- text: string (nullable = true)



In [20]:
# Create a new nlp pipeline
model_pipeline = Pipeline(stages=[tokenizer, remover, word2Vec, best_lr_model])

# Fit the tweet data
model = model_pipeline.fit(df)

                                                                                

In [21]:
# Make prediction
predictions = model.transform(df)

# Convert prediction back to original sentiment
indexer_model = indexer.fit(df_labeled_data)
labelConverter = IndexToString(inputCol="prediction", outputCol="predicted_sentiment", labels=indexer_model.labels)

# Apply IndexToString to the DataFrame with predictions
converted = labelConverter.transform(predictions)

# Select the columns
final_result = converted.select("created_at", "text", "predicted_sentiment")

# Show results
final_result.show()


[Stage 4055:>                                                       (0 + 1) / 1]

+-------------------+--------------------+-------------------+
|         created_at|                text|predicted_sentiment|
+-------------------+--------------------+-------------------+
|2020-07-01 15:10:32|yes i read spy ma...|                joy|
|2020-07-01 15:10:32|happy news silver...|                joy|
|2020-07-01 15:10:32|covid19 may have ...|                joy|
|2020-07-01 15:10:32|shaming people fo...|                joy|
|2020-07-01 15:10:32|im so angry with ...|              anger|
|2020-07-01 15:10:32|my stepsister who...|                sad|
|2020-07-01 15:10:32|the idsi strongly...|                joy|
|2020-07-01 15:10:32|          great news|                joy|
|2020-07-01 15:10:32|trumps gop allies...|                joy|
|2020-07-01 15:10:32|aaaaaand this is ...|                joy|
|2020-07-01 15:10:32|the us represents...|              anger|
|2020-07-01 15:10:32|much like the pan...|                joy|
|2020-07-01 15:10:32|give 2000month to...|             

                                                                                

In [22]:
final_result = final_result.withColumn("date", F.to_date(F.col("created_at")))

# Count by date & lable
daily_counts = final_result.groupBy("date", "predicted_sentiment").count()


# Get the pandas DataFrame
df_pd = daily_counts.orderBy("date").toPandas()


# Set the date as index
df_pd.set_index("date", inplace=True)

# Filter the DataFrame to include only data on or after 2020-03-25
df_pd = df_pd[df_pd.index >= datetime.date(2020, 3, 25)]

# Calculate the total number of tweets per day
total_daily_counts = df_pd.groupby("date")["count"].sum().reset_index()
total_daily_counts.columns = ["date", "total_count"]

# Merge the total counts back to the original dataframe
df_pd = pd.merge(df_pd, total_daily_counts, on="date")

# Calculate the proportion of each sentiment per day
df_pd["proportion"] = df_pd["count"] / df_pd["total_count"]

# Set the `date` as index again
df_pd.set_index("date", inplace=True)

# Get unique sentiment types
sentiment_types = df_pd['predicted_sentiment'].unique()

# Plot each sentiment type in the same plot
plt.figure(figsize=(15,8))
for sentiment in sentiment_types:
    df_sentiment = df_pd[df_pd['predicted_sentiment'] == sentiment]
    plt.plot(df_sentiment.index, df_sentiment['proportion'], label=sentiment)

plt.xticks(rotation=45)
plt.legend()
plt.show()

                                                                                

NameError: name 'datetime' is not defined

In [None]:
# Save the result to the csv file
columns_to_keep = ["created_at", "text", 'predicted_sentiment']
final_result = final_result.select(*columns_to_keep)
final_result.repartition(1).write.csv('gs://dataproc-staging-us-central1-575311154882-olmcvswv/notebooks/jupyter/output', header=True)