Twitter Categorizer Project

Problem: 
Twitter is a popular online social networking platform that allows users to post texts as “tweets.” Hashtags can group similar tweets and link to other tweets that include them, however, if a user doesn’t provide hashtags, it is hard to label a given tweet.

Objective:
This project aims to correctly categorize user-provided tweets by utilizing Big Data tools and ML techniques.

Process: 
- Collect and clean data from various sources 
- Harvard Dataverse, Kaggle, etc.
- Load data to MongoDB and import data to Spark
- Perform text-preprocessing (Remove special characters, stopwords, TF-IDF vectorize)
- Implement a classification model to categorize Tweets

In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
import pyspark
import sparknlp
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import Tokenizer, StopWordsRemover, RegexTokenizer
from pyspark.sql.functions import regexp_replace
from nltk.corpus import stopwords
from sparknlp.base import Finisher, DocumentAssembler
from sparknlp.annotator import (Tokenizer, Normalizer,
                                LemmatizerModel, StopWordsCleaner)
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF

In [3]:
import os
import pyspark
conf = pyspark.SparkConf()

sc = pyspark.SparkContext(conf=conf)
spark = pyspark.sql.SparkSession(sc)
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/09/26 11:55:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [265]:
# Load data
data = spark.read.format("csv")\
        .option("inferSchema", "true")\
        .option("header", "true")\
        .load("combined_tweets.csv") 

In [266]:
data.createOrReplaceTempView("df")

In [267]:
data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- Username: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Tweets: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Label: string (nullable = true)



In [268]:
data.limit(5).toPandas()

23/09/26 12:43:42 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Username, ID, Tweets, Date, Label
 Schema: _c0, Username, ID, Tweets, Date, Label
Expected: _c0 but found: 
CSV file: file:///Users/seungpang/combined_tweets.csv


Unnamed: 0,_c0,Username,ID,Tweets,Date,Label
0,3161,sports_user,3161.0,Its time for world cup,,
1,#WorldCup #WorldCup2022 #WorldcupQatar2022 #Qa...,2022-11-20 17:08:39+00:00,sports,,,
2,4453,TSLA,3454.0,Looks like @Tesla is/will be holding some spec...,,
3,“Enjoy a complimentary ice cream with your fri...,2022-08-06 00:36:53+00:00,stocks,,,
4,32374,Figgimus Maximus,253958386.0,Basically true. https://t.co/0vSbFCOp8k,2017-08-17 07:22:07,crypto


In [269]:
#Select necessary columns - Tweets, Label
data = data.select("Tweets", "Label")

In [270]:
#Check data shape
print((data.count(), len(data.columns)))

(72211, 2)


In [271]:
#Check null values
from pyspark.sql.functions import col
data.filter(col("Tweets").isNull()).show()

+------+-----+
|Tweets|Label|
+------+-----+
|  null| null|
|  null| null|
|  null| null|
|  null| null|
|  null| null|
|  null| null|
|  null| null|
|  null| null|
|  null| null|
|  null| null|
|  null| null|
|  null| null|
|  null| null|
|  null| null|
|  null| null|
|  null| null|
|  null| null|
|  null| null|
|  null| null|
|  null| null|
+------+-----+
only showing top 20 rows



In [272]:
from pyspark.sql.functions import col,isnan,when,count
df2 = data.select([count(when(col("Tweets").contains('None') | \
                            col("Tweets").contains('NULL') | \
                            (col("Tweets") == '' ) | \
                            col("Tweets").isNull() | \
                            isnan("Tweets"), "Tweets" 
                           )).alias(c)
                    for c in data.columns])
df2.show()

+------+-----+
|Tweets|Label|
+------+-----+
| 30723|30723|
+------+-----+



In [273]:
#Replace null values
data = data.na.fill("this is null")

In [274]:
import pyspark.sql.functions as f
print((data.count(), len(data.columns)))

(72211, 2)


In [275]:
#Check for data with correct label
correct_label=["celebrity","crypto","stocks","sports","politics"]
data=data.filter(data.Label.isin(correct_label))

In [276]:
print((data.count(), len(data.columns)))

(29001, 2)


In [277]:
data.printSchema()

root
 |-- Tweets: string (nullable = false)
 |-- Label: string (nullable = false)



In [278]:
#Split into training and test
(df_train, df_test) = data.randomSplit([0.8, 0.2])

In [279]:
df_train.printSchema()

root
 |-- Tweets: string (nullable = false)
 |-- Label: string (nullable = false)



In [280]:
#Regex Tokenize - Removes punctuation
regexTokenizer = RegexTokenizer(inputCol="Tweets", outputCol="regex", pattern="\W+")
regexTokenized = regexTokenizer.transform(df_train)

In [281]:
regexTokenized.show(5)

+--------------------+------+--------------------+
|              Tweets| Label|               regex|
+--------------------+------+--------------------+
| ET5 delivery wil...|stocks|[et5, delivery, w...|
|         Hyderabad."|sports|         [hyderabad]|
| and $14.9K per s...|stocks|[and, 14, 9k, per...|
| and why the comp...|stocks|[and, why, the, c...|
| avoiding 400k to...|stocks|[avoiding, 400k, ...|
+--------------------+------+--------------------+
only showing top 5 rows



In [282]:
#Stop Words Remove
remover = StopWordsRemover(inputCol="regex", outputCol="Cleaned_Words")
clean_df = remover.transform(regexTokenized)

In [283]:
clean_df.show(5)

+--------------------+------+--------------------+--------------------+
|              Tweets| Label|               regex|       Cleaned_Words|
+--------------------+------+--------------------+--------------------+
| ET5 delivery wil...|stocks|[et5, delivery, w...|[et5, delivery, t...|
|         Hyderabad."|sports|         [hyderabad]|         [hyderabad]|
| and $14.9K per s...|stocks|[and, 14, 9k, per...|[14, 9k, per, sec...|
| and why the comp...|stocks|[and, why, the, c...|[company, correct...|
| avoiding 400k to...|stocks|[avoiding, 400k, ...|[avoiding, 400k, ...|
+--------------------+------+--------------------+--------------------+
only showing top 5 rows



In [284]:
#tf-idf vectorizer
hashingTF = HashingTF(inputCol="Cleaned_Words", outputCol="rawfeatures",numFeatures=50)
featurizedData = hashingTF.transform(clean_df)

idf = IDF(inputCol="rawfeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
df_train_tfidf = idfModel.transform(featurizedData)

In [285]:
#Index Label
string_indexer = StringIndexer(inputCol='Label', outputCol='Label_Indexed')
string_indexer_model = string_indexer.fit(df_train_tfidf)
df_train_final = string_indexer_model.transform(df_train_tfidf)

In [286]:
#Logistic Regression Model
LR_Model = LogisticRegression(featuresCol=idf.getOutputCol(), labelCol=string_indexer_model.getOutputCol())
lr_model = LR_Model.fit(df_train_final)

# Transform the test set 
df_test_token = regexTokenizer.transform(df_test)
df_test_stopwords = remover.transform(df_test_token)
df_test_tf = hashingTF.transform(df_test_stopwords)
df_test_tfidf = idfModel.transform(df_test_tf)
df_test_final= string_indexer_model.transform(df_test_tfidf)

+--------------------+---------+-------------+--------------------+----------+
|       Cleaned_Words|    Label|Label_Indexed|         probability|prediction|
+--------------------+---------+-------------+--------------------+----------+
|[lot, tech, growt...|   stocks|          3.0|[0.81269633939627...|       0.0|
|[including, stron...|   stocks|          3.0|[0.42013330737820...|       1.0|
|[patience, https,...|   stocks|          3.0|[0.58706633731306...|       0.0|
|[much, still, bet...|   stocks|          3.0|[0.59512927151548...|       0.0|
|[https, co, zq3h6...|   crypto|          1.0|[0.57546092735675...|       0.0|
|[https, co, 1bszp...|   crypto|          1.0|[0.51041619335389...|       0.0|
|[https, co, grspb...|   crypto|          1.0|[0.47677987931319...|       0.0|
|[estreno, mundial...|celebrity|          0.0|[0.88801349257671...|       0.0|
|[abd, nin, charlo...|   crypto|          1.0|[0.20118143464386...|       1.0|
|[charlottesville,...|   crypto|          1.0|[0.048

In [None]:
#Prediction
prediction = lr_model.transform(df_test_final)
prediction = prediction.na.drop()
prediction.select("Cleaned_Words", "Label", "Label_Indexed", "probability", "prediction").show(10)

#Accuracy
accuracy = prediction.filter(prediction.Label_Indexed == prediction.prediction).count() / float(prediction.count())
print("Accuracy : ",accuracy)