# Training 1: TfIdf + Random Forest + Ngram

- Code version: 1.0
- Python version: 3.11.6
- Owner: Aditya Patkar
- File created: 2023-11-16

## Configurations

In [1]:
#Set the JAVA_HOME environment variable to the path of Java installation.
import os

In [1]:
#Necessary imports
import warnings
warnings.filterwarnings("ignore")

import wandb

import findspark
findspark.init()
findspark.find()

import boto3

import pyspark as ps
from pyspark.sql import SQLContext
from pyspark.ml.feature import IDF, Tokenizer, CountVectorizer, StringIndexer, NGram,  VectorAssembler
from pyspark.ml import Pipeline

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

In [2]:
#login to wandb and initialize the project
#wandb.login(relogin=True ) #uncomment this line if you are running this code for the first time
wandb.init(project="msml651-sentiment-analysis", entity="apatkar", name="rf+tfidf+ngram")

Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: Currently logged in as: [33mapatkar[0m. Use [1m`wandb login --relogin`[0m to force relogin


In [3]:
#initialize spark context
try:
    # create SparkContext on all CPUs available)
    sc = ps.SparkContext( 'local[*]' )
    sqlContext = SQLContext(sc)
    print("Just created a SparkContext")
except ValueError:
    warnings.warn("SparkContext already exists in this scope")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/04 11:41:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Just created a SparkContext


In [4]:
sc.setLogLevel("ERROR")

## Dataset

In [24]:
#get the data from s3
s3 = boto3.resource('s3', region_name='us-east-1', aws_access_key_id="AKIAVMCC766MHUJBYMEJ", aws_secret_access_key="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")
bucket = s3.Bucket('msml651')
bucket.download_file('sentiment140_clean_no_stopwords.parquet', './data/sentiment140_clean_no_stopwords.parquet')

KeyboardInterrupt: 

In [5]:
#read the data into a spark dataframe
df = sqlContext.read.parquet('./data/sentiment140_clean_no_stopwords.parquet')
df.show(5)

+------+----------+--------------------+----------+---------------+--------------------+-----------------+----------------+---------------+----------------+-----------------------+
|target|  tweet_id|                date|query_flag|      user_name|               tweet|post_clean_length|pre_clean_length|pre_clean_words|post_clean_words|tweet_without_stopwords|
+------+----------+--------------------+----------+---------------+--------------------+-----------------+----------------+---------------+----------------+-----------------------+
|     0|1467810369|Mon Apr 06 22:19:...|  NO_QUERY|_TheSpecialOne_|awww that s a bum...|               44|             115|             19|               8|   awww bummer shoul...|
|     0|1467810672|Mon Apr 06 22:19:...|  NO_QUERY|  scotthamilton|is upset that he ...|               69|             111|             21|              11|   upset update face...|
|     0|1467810917|Mon Apr 06 22:19:...|  NO_QUERY|       mattycus|i dived many time...|       

In [6]:
#Set the config parameters
config = {
    'type': 'tfidf +ngram + rf',
    'train_size': 0.95,
    'test_size': 0.25,
    'val_size': 0.25,
    'max_depth': 5,
    'vocab_size': 10000,
    'idf_min_doc_freq': 5,
}
wandb.config.update(config)

In [7]:
#split the data into train, test and validation sets
(train_set, val_set, test_set) = df.randomSplit([config['train_size'], config['val_size'], config['test_size']], seed = 2000)

## Preprocessing

In [8]:
def create_rf_pipeline(input_column = 'tweet_without_stopwords', target_column = 'target', n=2):
    """
    Create Random Forest pipeline
    """

    tokenizer = [Tokenizer(inputCol=input_column, outputCol="words")] 
    ngrams = [NGram(n=i, inputCol="words", outputCol=f"{i}_grams") for i in range(1, n+1)]
    cv = [CountVectorizer(vocabSize=config['vocab_size'], inputCol=f"{i}_grams", outputCol=f"{i}_tf") for i in range(1, n+1)]
    idf = [IDF(minDocFreq=config['idf_min_doc_freq'], inputCol=f"{i}_tf", outputCol=f"{i}_tfidf") for i in range(1, n+1)]
    assembler = [VectorAssembler(inputCols=[f"{i}_tfidf" for i in range(1, n+1)], outputCol="features")]
    label_stringIdx = [StringIndexer(inputCol = target_column, outputCol = 'label')]
    rf = [RandomForestClassifier(labelCol="label", featuresCol="features", maxDepth=config['max_depth'])]
    #create the pipeline
    pipeline = Pipeline(stages=tokenizer + ngrams + cv + idf + assembler + label_stringIdx + rf)
    

    return pipeline



    
    

## Training

In [9]:
pipeline = create_rf_pipeline()

In [10]:
#fit the pipeline to the training data and transform the data
pipelineFit = pipeline.fit(train_set)
predictions = pipelineFit.transform(val_set)

                                                                                

## Evaluation

In [11]:
predictions.select('tweet_without_stopwords', 'rawPrediction', 'prediction', 'probability').show(10)



+-----------------------+--------------------+----------+--------------------+
|tweet_without_stopwords|       rawPrediction|prediction|         probability|
+-----------------------+--------------------+----------+--------------------+
|   upset update face...|[9.99801366078643...|       1.0|[0.49990068303932...|
|   hey long time see...|[8.79347892866582...|       1.0|[0.43967394643329...|
|                   nope|[9.99801366078643...|       1.0|[0.49990068303932...|
|      day get much done|[9.99801366078643...|       1.0|[0.49990068303932...|
|      im sad miss lilly|[9.99801366078643...|       1.0|[0.49990068303932...|
|   hacked account ai...|[9.71457499730105...|       1.0|[0.48572874986505...|
|   want go promote g...|[10.4798655889933...|       0.0|[0.52399327944966...|
|   wow tons replies ...|[9.99801366078643...|       1.0|[0.49990068303932...|
|   leaving parking l...|[10.2093155779572...|       0.0|[0.51046577889786...|
|   sure right need s...|[9.99801366078643...|      

                                                                                

In [12]:
#evaluate the predictions
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction") #create an evaluator
auc = evaluator.evaluate(predictions) #evaluate the predictions, this is the AUC
print("AUC on validation data = %g" % auc)



AUC on validation data = 0.670186


                                                                                

In [13]:
predictionAndLabels = predictions.select("prediction", "label").rdd #get the predictions and labels as an rdd because the MulticlassMetrics class needs an rdd
metrics = MulticlassMetrics(predictionAndLabels)


# Get confusion matrix
print(metrics.confusionMatrix().toArray()) 

# Get accuracy
print("Accuracy: %s" % (metrics.accuracy))

# Get precision, recall, f1

print("Precision for negative: %s" % (metrics.precision(label=1.0)))
print("Recall for negative: %s" % (metrics.recall(label=1.0)))
print("F1-Score for negative: %s" % (metrics.fMeasure(label=1.0, beta=1.0)))

print("Precision for positive: %s" % (metrics.precision(label=0.0)))
print("Recall for positive: %s" % (metrics.recall(label=0.0)))
print("F1-Score for positive: %s" % (metrics.fMeasure(label=0.0, beta=1.0)))

# calculate macro avg
precision = (metrics.precision(label=1.0) + metrics.precision(label=0.0))/2
recall = (metrics.recall(label=1.0) + metrics.recall(label=0.0))/2
f1 = (metrics.fMeasure(label=1.0, beta=1.0) + metrics.fMeasure(label=0.0, beta=1.0))/2

print("Macro Precision: %s" % (precision))
print("Macro Recall: %s" % (recall))
print("Macro F1-Score: %s" % (f1))






[[ 47636.  90058.]
 [ 18595. 119607.]]
Accuracy: 0.6061813146982921
Precision for negative: 0.5704671738249112
Recall for negative: 0.8654505723506172
F1-Score for negative: 0.6876593640673015
Precision for positive: 0.7192402349353022
Recall for positive: 0.3459555245689718
F1-Score for positive: 0.46719136937599604
Macro Precision: 0.6448537043801067
Macro Recall: 0.6057030484597945
Macro F1-Score: 0.5774253667216488


                                                                                

## Post-training

In [39]:
# log the results
wandb.log({"auc": auc, "accuracy": metrics.accuracy, "precision_negative": metrics.precision(label=1.0), "recall_negative": metrics.recall(label=1.0), "f1_negative": metrics.fMeasure(label=1.0, beta=1.0), "precision_positive": metrics.precision(label=0.0), "recall_positive": metrics.recall(label=0.0), "f1_positive": metrics.fMeasure(label=0.0, beta=1.0), "macro_precision": precision, "macro_recall": recall, "macro_f1": f1})

# save the model

pipeline.save("RFModel-ngram-tfidf")

# push the model to wandb
wandb.save('RFModel-ngram-tfidf')

# finish the run
wandb.finish()


VBox(children=(Label(value='0.001 MB of 0.001 MB uploaded\r'), FloatProgress(value=1.0, max=1.0)))

0,1
accuracy,▁
auc,▁
f1_negative,▁
f1_positive,▁
macro_f1,▁
macro_precision,▁
macro_recall,▁
precision_negative,▁
precision_positive,▁
recall_negative,▁

0,1
accuracy,0.63467
auc,0.67156
f1_negative,0.69984
f1_positive,0.53334
macro_f1,0.61659
macro_precision,0.66516
macro_recall,0.63427
precision_negative,0.59466
precision_positive,0.73565
recall_negative,0.85024
