#### Step 1: Load the Data

*    mount s3 bucket

In [0]:
%pip install vaderSentiment


In [0]:
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName('WeCloud Spark Training') \
        .getOrCreate()
print('Session created')

In [0]:
def mount_s3_bucket(access_key, secret_key, bucket_name, mount_folder):
  ACCESS_KEY_ID = access_key
  SECRET_ACCESS_KEY = secret_key
  ENCODED_SECRET_KEY = SECRET_ACCESS_KEY.replace("/", "%2F")

  print ("Mounting", bucket_name)

  try:
    # Unmount the data in case it was already mounted.
    dbutils.fs.unmount("/mnt/%s" % mount_folder)
    
  except:
    # If it fails to unmount it most likely wasn't mounted in the first place
    print ("Directory not unmounted: ", mount_folder)
    
  finally:
    # Lastly, mount our bucket.
    dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY_ID, ENCODED_SECRET_KEY, bucket_name), "/mnt/%s" % mount_folder)
    #dbutils.fs.mount("s3a://"+ ACCESS_KEY_ID + ":" + ENCODED_SECRET_KEY + "@" + bucket_name, mount_folder)
    print ("The bucket", bucket_name, "was mounted to", mount_folder, "\n")

In [0]:
# Set AWS programmatic access credentials
ACCESS_KEY = "AKIA4VDBMHXDRBOBMLXQ"
SECRET_ACCESS_KEY = "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"

In [0]:
mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, "boqi-bucket", "chatgpt")

In [0]:
# List the files in the mounted directory
dbutils.fs.ls("/mnt/chatgpt/")


In [0]:
%fs ls 'dbfs:/mnt/chatgpt/'


Since We don't have any information about the dataset (like column names or data types), the best approach is to first preview the dataset before manually defining the schema.

### Step 2: Create a Schema for Dataframe
*       Define the schema of the dataset and create a Spark DataFrame accordingly.

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, TimestampType

# Define the schema for the tweet dataset with appropriate data types
schema = StructType([
    StructField("tweet_id", StringType(), True),
    StructField("tweet_created", StringType(), True),  # we can later convert this to TimestampType
    StructField("tweet_extracted", StringType(), True),  # Same for this
    StructField("text", StringType(), True),
    StructField("lang", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("user_name", StringType(), True),
    StructField("user_username", StringType(), True),
    StructField("user_location", StringType(), True),
    StructField("user_description", StringType(), True),
    StructField("user_created", StringType(), True),  # we can convert to TimestampType later if needed
    StructField("user_followers_count", IntegerType(), True),
    StructField("user_following_count", IntegerType(), True),
    StructField("user_tweet_count", IntegerType(), True),
    StructField("user_verified", BooleanType(), True),  # Boolean field
    StructField("source", StringType(), True),
    StructField("retweet_count", IntegerType(), True),
    StructField("like_count", IntegerType(), True),
    StructField("reply_count", IntegerType(), True),
    StructField("impression_count", IntegerType(), True)
])


In [0]:
# Load the dataset into a DataFrame using the defined schema
chatgpt_tweets_df = spark.read.option("header", "true").schema(schema).csv("dbfs:/mnt/chatgpt/chatgpt_daily_tweets.csv")

# Show the first few rows of the DataFrame
chatgpt_tweets_df.show(5)

# Print the schema to verify that everything is correctly typed
chatgpt_tweets_df.printSchema()


Lets get familiar with the dataframe

In [0]:
# show the dataframe
display(chatgpt_tweets_df)

In [0]:
# show the column names 
chatgpt_tweets_df.columns

In [0]:
# Count the number of rows in the DataFrame
chatgpt_tweets_df.count()


### Step 3: Preprocessing

#### 3.1 Filter out empty rows
*        Remove any rows with missing or empty values from the dataset.

In [0]:
# necessary libraries for preprocessing
from pyspark.sql.functions import col, lower, regexp_replace, to_timestamp
import pyspark.sql.functions as F

In [0]:
# Filter out rows where 'text' is null or empty
chatgpt_tweets_df_filtered = chatgpt_tweets_df.filter((col("text").isNotNull()) & (col("text") != ""))

# Show the filtered DataFrame
display(chatgpt_tweets_df_filtered)


#### 3.2 Clean and Preprocess text data

*      Perform text preprocessing steps, such as:
          *  Create a new column for the original text (original_text).
          *  Convert tweet_created to datetime format.
          *  Convert tweet text to lowercase.
          *  Remove Twitter handles (e.g., @username).
          *  Remove hashtags (e.g., #hashtag).
          *  Remove URLs.
          *  Remove special characters.
          *  Remove single characters (e.g., stand-alone letters).
          *  Replace multiple spaces with a single space.

In [0]:
from pyspark.sql.functions import regexp_replace, lower, col, length

# Step 1: Remove rows with null or empty text
chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.filter((col("text").isNotNull()) & (length(col("text")) > 0))

# Step 2: Convert text to lowercase
chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.withColumn("text", lower(col("text")))

# Step 3: Remove Twitter handles (@username)
chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.withColumn("text", regexp_replace(col("text"), r'@\w+', ''))

# Step 4: Remove hashtags (but keep the associated text)
chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.withColumn("text", regexp_replace(col("text"), r'#', ''))

# Step 5: Remove URLs
chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.withColumn("text", regexp_replace(col("text"), r'http\S+', ''))

# Step 6: Remove all special characters (punctuation, emojis, etc.)
chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.withColumn("text", regexp_replace(col("text"), r'[^a-zA-Z\s]', ''))

# Step 7: Replace multiple spaces with a single space
chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.withColumn("text", regexp_replace(col("text"), r'\s+', ' '))

# Step 8: Verify that text is clean and consistent
chatgpt_tweets_df_filtered.select("text").show(5, truncate=False)


In [0]:
# Step 1: Create a new column for the original tweet text
chatgpt_tweets_df_filtered = chatgpt_tweets_df.withColumn("original_tweet", col("text"))

In [0]:
# Step 2: Convert 'tweet_created' to datetime format and remove timezone information

chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.withColumn("datetime", to_timestamp(col("tweet_created")))



In [0]:
# Step 3: Preprocess the tweet text (convert to lowercase)
chatgpt_tweets_df_filtered = chatgpt_tweets_df.withColumn("text", lower(col("text")))

In [0]:
# Step 4: Remove Twitter handles (@username)
chatgpt_tweets_df_filtered = chatgpt_tweets_df.withColumn("text", regexp_replace(col("text"), r'@[^\s]+', ''))


In [0]:
# Step 5: Remove hashtags (#hashtag)
chatgpt_tweets_df_filtered = chatgpt_tweets_df.withColumn("text", regexp_replace(col("text"), r'\B#\S+', ''))


In [0]:
# Step 6: Remove URLs
chatgpt_tweets_df_filtered = chatgpt_tweets_df.withColumn("text", regexp_replace(col("text"), r'http\S+', ''))


In [0]:

# Step 7: Remove all special characters, leaving only alphanumeric and spaces
chatgpt_tweets_df_filtered = chatgpt_tweets_df.withColumn("text", regexp_replace(col("text"), r'\W+', ' '))


In [0]:

# Step 8: Remove all single characters
chatgpt_tweets_df_filtered = chatgpt_tweets_df.withColumn("text", regexp_replace(col("text"), r'\s+[a-zA-Z]\s+', ''))


In [0]:
# Step 9: Substitute multiple spaces with a single space
chatgpt_tweets_df_filtered = chatgpt_tweets_df.withColumn("text", regexp_replace(col("text"), r'\s+', ' '))


In [0]:

chatgpt_tweets_df_filtered.filter((col("text").isNull()) | (length(col("text")) == 0)).show()

In [0]:
# Step 10: Remove rows with null or empty in the 'text'
chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.filter((col("text").isNotNull()) & (length(col("text")) > 0))


In [0]:
from pyspark.sql.functions import to_timestamp

# Create 'datetime' column from 'tweet_created'
chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.withColumn("datetime", to_timestamp(col("tweet_created")))

# Show a sample of the DataFrame to ensure 'datetime' is created
chatgpt_tweets_df_filtered.select("tweet_created", "datetime").show(5, truncate=False)


In [0]:
display(chatgpt_tweets_df_filtered)

### Step 4: Feature Engineering

#### 4.1  Extract Features




*      Tokenize the text into words and calculate:
     *      Word count for each tweet.
     *      Sentence length (total number of characters in the tweet).

In [0]:
from pyspark.sql.functions import split, size, length, col, udf
from pyspark.sql.types import IntegerType

# Step 1: Tokenization (Extract words)
chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.withColumn("words", split(col("text"), r"\W+"))

# Step 2: Word Count (Number of words)
chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.withColumn("words_count", size(col("words")))

# Step 3: Sentence Length (Length of the sentence)
chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.withColumn("sentence_length", length(col("text")))

# Show the extracted features
chatgpt_tweets_df_filtered.select("text", "words", "words_count", "sentence_length").show(5, truncate=False)



#### 4.2 Date and Time (Hour, Date, Month, Year)

Extract hour, date, month, and year from the tweet_created column (now in datetime format).

In [0]:
from pyspark.sql.functions import hour, year, month, dayofmonth, date_format

# Extract hour, date, month, year from the 'datetime' column
chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.withColumn("hour", hour(col("datetime")))
chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.withColumn("date", date_format(col("datetime"), "yyyy-MM-dd"))
chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.withColumn("month", month(col("datetime")))
chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.withColumn("year", year(col("datetime")))

# Show the time-related features
chatgpt_tweets_df_filtered.select("datetime", "hour", "date", "month", "year").show(5, truncate=False)


#### 4.3 Sentiment Extraction using VADER

*      Use VADER sentiment analysis to generate sentiment scores for each tweet 

For sentiment analysis, you can use VADER (from vaderSentiment library). Since VADER isn’t available directly in PySpark, you need to use a UDF (User Defined Function) to apply it across the Spark DataFrame.


##### 4.3.1  Overall Sentiment

In [0]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# Initialize the VADER sentiment analyzer
sia = SentimentIntensityAnalyzer()

# Define a UDF to extract the compound sentiment score, handling None values
def get_sentiment(text):
    if text is None or len(text.strip()) == 0:  # Check if the text is None or empty
        return 0.0  # Neutral score for missing/empty text
    return float(sia.polarity_scores(text)['compound'])

# Register the UDF in PySpark
sentiment_udf = udf(get_sentiment, FloatType())

# Apply the UDF to calculate sentiment score for each tweet on the filtered DataFrame
chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.withColumn("sentiment", sentiment_udf(col("text")))

# Show the resulting DataFrame with sentiment scores
chatgpt_tweets_df_filtered.select("text", "sentiment").show(5, truncate=False)


#### 4.3.2 Label the sentiment 

In [0]:
def label_sentiment(score):
    if score > 0.35:
        return "positive"
    elif score < -0.05:
        return "negative"
    else:
        return "neutral"

# Register the UDF for labeling
label_udf = udf(label_sentiment, StringType())

# Apply the UDF to create labeled sentiments
chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.withColumn("overall_sentiment", label_udf(col("sentiment")))

In [0]:
# check the dataframe
display(chatgpt_tweets_df_filtered.select("text", "overall_sentiment"))

In [0]:
chatgpt_tweets_df_filtered.columns

####   4.4 Convert Sentiment Labels To Numeric Form

*      Use StringIndexer to convert the sentiment labels (negative, neutral, positive) into numeric form for modeling

In [0]:
from pyspark.ml.feature import StringIndexer

# Apply StringIndexer to the entire dataset
indexer = StringIndexer(inputCol="overall_sentiment", outputCol="overall_sentiment_index")

# Fit the indexer and apply it to the entire dataset
chatgpt_tweets_df_encoded = indexer.fit(chatgpt_tweets_df_filtered).transform(chatgpt_tweets_df_filtered)

# Check the encoded labels
display(chatgpt_tweets_df_encoded.select("overall_sentiment", "overall_sentiment_index").show(5))




In [0]:
display(chatgpt_tweets_df_encoded.select("overall_sentiment_index", "overall_sentiment", "text"))

#### 4.5 Handle Imbalance Data

##### 4.5.1 Check the Class distribution first

In [0]:
# Check class distribution in the dataset
chatgpt_tweets_df_encoded.groupBy("overall_sentiment_index").count().show()


Highly imbalanced data. So, We need to split the data first and then balance the train data itself, leaving the test portion to make it more realistic

#### 4.5.2 Split the Data - Train-Test

In [0]:
# Split the dataset into 80% training and 20% testing sets
train_df, test_df = chatgpt_tweets_df_encoded.randomSplit([0.8, 0.2], seed=42)

# Check the class distribution in the training set
train_df.groupBy("overall_sentiment_index").count().show()

# Check the class distribution in the test set (should remain imbalanced)
test_df.groupBy("overall_sentiment_index").count().show()


#### 4.5.3 Undersample The Majority Class (neutral) in the Training Set

Now, let's undersample the neutral class (101,156 rows) in the training set to balance it with the positive (7,673 rows) and negative (5,021 rows) classes.

In [0]:
# Separate the classes in the training set
neutral_train_df = train_df.filter(train_df["overall_sentiment_index"] == 0)
positive_train_df = train_df.filter(train_df["overall_sentiment_index"] == 1)
negative_train_df = train_df.filter(train_df["overall_sentiment_index"] == 2)

# Downsample the neutral class
neutral_sampled_df = neutral_train_df.sample(withReplacement=False, fraction=0.1, seed=42)  # Adjust fraction to control the size

# Combine the downsampled neutral class with positive and negative classes
balanced_train_df = positive_train_df.union(negative_train_df).union(neutral_sampled_df)

# Check the new class distribution in the balanced training set
balanced_train_df.groupBy("overall_sentiment_index").count().show()


This is significantly more balanced compared to the original distribution where the neutral class was heavily over-represented. While it's not perfectly equal, this distribution should work well for training ML model without the risk of overwhelming bias toward the neutral class.

####  4.6 Vectorize the Text Data (TF-IDF)


*     Code to Vectorize the Text:
         *     Tokenize the text.
         *     Remove stopwords.
         *     Apply HashingTF (term frequency).
         *     Apply IDF (inverse document frequency) to get the TF-IDF features.

Drop the conflicted columns and proceed 

In [0]:
from pyspark.sql.functions import col

# Step 1: Ensure all values in the 'text' column are strings
balanced_train_df = balanced_train_df.withColumn("text", col("text").cast("string"))
chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.withColumn("text", col("text").cast("string"))

# Step 2: Drop conflicting columns from both DataFrames if they exist
for column in ['tokenized_words', 'filtered_words', 'features', 'raw_features']:
    if column in balanced_train_df.columns:
        balanced_train_df = balanced_train_df.drop(column)
    if column in chatgpt_tweets_df_filtered.columns:
        chatgpt_tweets_df_filtered = chatgpt_tweets_df_filtered.drop(column)

# Step 3: Proceed with Tokenization and TF-IDF
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline

# Tokenize the text
tokenizer = Tokenizer(inputCol="text", outputCol="tokenized_words")

# Remove stopwords
remover = StopWordsRemover(inputCol="tokenized_words", outputCol="filtered_words")

# Convert text into numerical features using TF-IDF
hashingTF = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=10000)
idf = IDF(inputCol="raw_features", outputCol="features")

# Create a pipeline
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf])

# Step 4: Fit the pipeline only on the training data
pipeline_model = pipeline.fit(balanced_train_df)

# Step 5: Transform the training and test data using the fitted pipeline
balanced_train_df = pipeline_model.transform(balanced_train_df)
test_df = pipeline_model.transform(test_df)

# Step 6: Show the transformed data
balanced_train_df.select("text", "tokenized_words", "filtered_words", "features").show(5, truncate=False)


## Step 5: ML models

*    Train the Models 
*    LR, RF classifiers
*    Evaluate the Models
*    Model hyperparameter-tuning

### 5.1  Logistic Regression

In [0]:
from pyspark.ml.classification import LogisticRegression

# Initialize the Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="overall_sentiment_index")

# Train the Logistic Regression model using the balanced training data
lr_model = lr.fit(balanced_train_df)

# Make predictions on the test set
lr_predictions = lr_model.transform(test_df)

# Show the predictions
display(lr_predictions.select("text", "overall_sentiment_index", "prediction"))


### 5.2 Random Forest 

In [0]:
from pyspark.ml.classification import RandomForestClassifier

# Initialize the Random Forest Classifier
rf = RandomForestClassifier(featuresCol="features", labelCol="overall_sentiment_index", numTrees=100)

# Train the Random Forest model using the balanced training data
rf_model = rf.fit(balanced_train_df)

# Make predictions on the test set
rf_predictions = rf_model.transform(test_df)

# Show the predictions
display(rf_predictions.select("text", "overall_sentiment_index", "prediction"))


### 5.3: Model Evaluation 



In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize the evaluators
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="overall_sentiment_index", predictionCol="prediction", metricName="accuracy")
precision_evaluator = MulticlassClassificationEvaluator(labelCol="overall_sentiment_index", predictionCol="prediction", metricName="weightedPrecision")
recall_evaluator = MulticlassClassificationEvaluator(labelCol="overall_sentiment_index", predictionCol="prediction", metricName="weightedRecall")
f1_evaluator = MulticlassClassificationEvaluator(labelCol="overall_sentiment_index", predictionCol="prediction", metricName="f1")

# Evaluate the Logistic Regression model
print("=== Logistic Regression Model Evaluation ===")
lr_accuracy = accuracy_evaluator.evaluate(lr_predictions)
lr_precision = precision_evaluator.evaluate(lr_predictions)
lr_recall = recall_evaluator.evaluate(lr_predictions)
lr_f1 = f1_evaluator.evaluate(lr_predictions)

print(f"Logistic Regression Accuracy: {lr_accuracy}")
print(f"Logistic Regression Precision: {lr_precision}")
print(f"Logistic Regression Recall: {lr_recall}")
print(f"Logistic Regression F1 Score: {lr_f1}")
print()

# Evaluate the Random Forest model
print("=== Random Forest Model Evaluation ===")
rf_accuracy = accuracy_evaluator.evaluate(rf_predictions)
rf_precision = precision_evaluator.evaluate(rf_predictions)
rf_recall = recall_evaluator.evaluate(rf_predictions)
rf_f1 = f1_evaluator.evaluate(rf_predictions)

print(f"Random Forest Accuracy: {rf_accuracy}")
print(f"Random Forest Precision: {rf_precision}")
print(f"Random Forest Recall: {rf_recall}")
print(f"Random Forest F1 Score: {rf_f1}")


In [0]:
balanced_train_df.cache()


## Step 6:  ML models with Cross_validation, f-1 score, Model evaluation, Model selection

#### 6.1 Logistic Regression with K-fold :

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize Logistic Regression
lr = LogisticRegression(featuresCol="features", labelCol="overall_sentiment_index")

# Set up a smaller parameter grid for Logistic Regression
paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.5]) \
    .build()

# Define the evaluator using F1-score
evaluator = MulticlassClassificationEvaluator(
    labelCol="overall_sentiment_index", predictionCol="prediction", metricName="f1")

# Set up the CrossValidator for Logistic Regression with fewer folds and params
crossval_lr = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid_lr,
    evaluator=evaluator,
    numFolds=3)  # Reduce folds to 3

# Fit the Logistic Regression model with cross-validation
cv_model_lr = crossval_lr.fit(balanced_train_df)

# Make predictions on the test set
lr_predictions = cv_model_lr.transform(test_df)


In [0]:
display(lr_predictions.select("text", "overall_sentiment_index", "prediction"))

### 6.2 Logistic Regression evaluation


In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Evaluators for multiple metrics
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="overall_sentiment_index", predictionCol="prediction", metricName="accuracy")
precision_evaluator = MulticlassClassificationEvaluator(labelCol="overall_sentiment_index", predictionCol="prediction", metricName="weightedPrecision")
recall_evaluator = MulticlassClassificationEvaluator(labelCol="overall_sentiment_index", predictionCol="prediction", metricName="weightedRecall")
f1_evaluator = MulticlassClassificationEvaluator(labelCol="overall_sentiment_index", predictionCol="prediction", metricName="f1")

# Evaluate Logistic Regression model
lr_accuracy = accuracy_evaluator.evaluate(lr_predictions)
lr_precision = precision_evaluator.evaluate(lr_predictions)
lr_recall = recall_evaluator.evaluate(lr_predictions)
lr_f1 = f1_evaluator.evaluate(lr_predictions)

# Print the evaluation results
print(f"Logistic Regression - Accuracy: {lr_accuracy}")
print(f"Logistic Regression - Precision: {lr_precision}")
print(f"Logistic Regression - Recall: {lr_recall}")
print(f"Logistic Regression - F1 Score: {lr_f1}")


### 6.3  Random Forest Classifier with K-fold

In [0]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize Random Forest Classifier
rf = RandomForestClassifier(featuresCol="features", labelCol="overall_sentiment_index")

# Set up the parameter grid for Random Forest
paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

# Define the evaluator (using F1-score for tuning)
evaluator = MulticlassClassificationEvaluator(
    labelCol="overall_sentiment_index", predictionCol="prediction", metricName="f1")

# Set up the CrossValidator for Random Forest
crossval_rf = CrossValidator(
    estimator=rf,
    estimatorParamMaps=paramGrid_rf,
    evaluator=evaluator,
    numFolds=3)  # Use 3-fold cross-validation

# Fit the Random Forest model with cross-validation
cv_model_rf = crossval_rf.fit(balanced_train_df)

# Make predictions on the test set
rf_predictions = cv_model_rf.transform(test_df)

# Show the predictions
display(rf_predictions.select("text", "overall_sentiment_index", "prediction"))


### 6.4 Random Forest Classifier evaluation

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Evaluators for multiple metrics
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="overall_sentiment_index", predictionCol="prediction", metricName="accuracy")
precision_evaluator = MulticlassClassificationEvaluator(labelCol="overall_sentiment_index", predictionCol="prediction", metricName="weightedPrecision")
recall_evaluator = MulticlassClassificationEvaluator(labelCol="overall_sentiment_index", predictionCol="prediction", metricName="weightedRecall")
f1_evaluator = MulticlassClassificationEvaluator(labelCol="overall_sentiment_index", predictionCol="prediction", metricName="f1")

# Evaluate Random Forest Classifier predictions
rf_accuracy = accuracy_evaluator.evaluate(rf_predictions)
rf_precision = precision_evaluator.evaluate(rf_predictions)
rf_recall = recall_evaluator.evaluate(rf_predictions)
rf_f1 = f1_evaluator.evaluate(rf_predictions)

# Print the evaluation results
print(f"Random Forest Classifier - Accuracy: {rf_accuracy}")
print(f"Random Forest Classifier - Precision: {rf_precision}")
print(f"Random Forest Classifier - Recall: {rf_recall}")
print(f"Random Forest Classifier - F1 Score: {rf_f1}")


### 6.5 Decision Tree Classifier with K-fold

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier

# Initialize Decision Tree Classifier
dt = DecisionTreeClassifier(featuresCol="features", labelCol="overall_sentiment_index")

# Set up the parameter grid for tuning
paramGrid_dt = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 10]) \
    .addGrid(dt.minInstancesPerNode, [1, 2]) \
    .build()

# Set up the CrossValidator for Decision Tree
crossval_dt = CrossValidator(
    estimator=dt,
    estimatorParamMaps=paramGrid_dt,
    evaluator=evaluator,
    numFolds=3)  # Use 3-fold cross-validation

# Fit the Decision Tree model with cross-validation
cv_model_dt = crossval_dt.fit(balanced_train_df)

# Make predictions on the test set
dt_predictions = cv_model_dt.transform(test_df)

# Show predictions
display(dt_predictions.select("text", "overall_sentiment_index", "prediction"))


### 6.6 RTC evaluation


In [0]:
# Evaluate Decision Tree model
dt_accuracy = evaluator.evaluate(dt_predictions, {evaluator.metricName: "accuracy"})
dt_precision = evaluator.evaluate(dt_predictions, {evaluator.metricName: "weightedPrecision"})
dt_recall = evaluator.evaluate(dt_predictions, {evaluator.metricName: "weightedRecall"})
dt_f1 = evaluator.evaluate(dt_predictions, {evaluator.metricName: "f1"})

# Print Decision Tree evaluation metrics
print(f"Decision Tree Classifier - Accuracy: {dt_accuracy}")
print(f"Decision Tree Classifier - Precision: {dt_precision}")
print(f"Decision Tree Classifier - Recall: {dt_recall}")
print(f"Decision Tree Classifier - F1 Score: {dt_f1}")


### 6.7 Naive Bayes Classifier with K-fold

In [0]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Initialize Naive Bayes Classifier
nb = NaiveBayes(featuresCol="features", labelCol="overall_sentiment_index")

# Set up the parameter grid for tuning
paramGrid_nb = ParamGridBuilder() \
    .addGrid(nb.smoothing, [0.0, 0.5, 1.0]) \
    .build()

# Set up the CrossValidator for Naive Bayes
crossval_nb = CrossValidator(
    estimator=nb,
    estimatorParamMaps=paramGrid_nb,
    evaluator=evaluator,
    numFolds=3)  # Use 3-fold cross-validation

# Fit the Naive Bayes model with cross-validation
cv_model_nb = crossval_nb.fit(balanced_train_df)

# Make predictions on the test set
nb_predictions = cv_model_nb.transform(test_df)

# Show predictions
display(nb_predictions.select("text", "overall_sentiment_index", "prediction"))


### 6.8 Naive Bayes classifier Evaluation 

In [0]:
# Evaluate Naive Bayes model
nb_accuracy = evaluator.evaluate(nb_predictions, {evaluator.metricName: "accuracy"})
nb_precision = evaluator.evaluate(nb_predictions, {evaluator.metricName: "weightedPrecision"})
nb_recall = evaluator.evaluate(nb_predictions, {evaluator.metricName: "weightedRecall"})
nb_f1 = evaluator.evaluate(nb_predictions, {evaluator.metricName: "f1"})

# Print Naive Bayes evaluation metrics
print(f"Naive Bayes Classifier - Accuracy: {nb_accuracy}")
print(f"Naive Bayes Classifier - Precision: {nb_precision}")
print(f"Naive Bayes Classifier - Recall: {nb_recall}")
print(f"Naive Bayes Classifier - F1 Score: {nb_f1}")


## Step 7: Model Selection



#### Logistic Regression
 -   Logistic Regression - Accuracy: 0.8159488559892328 
 -   Logistic Regression - Precision: 0.8745705488045877
 -   Logistic Regression - Recall: 0.8159488559892328
 -   Logistic Regression - F1 Score: 0.8350467718837626

After evaluating several machine learning models, Logistic Regression was selected as the best-performing model for the sentiment analysis task. This decision was based on the following key evaluation metrics:

Accuracy: The Logistic Regression model achieved an accuracy of 81.6%, meaning it correctly predicted the sentiment label for over 80% of the data. This demonstrates that the model is highly reliable in predicting the correct sentiment.

Precision: With a precision score of 87.5%, the model consistently predicted the correct class when it made a positive prediction. This high precision is important, as it shows that the model minimizes false positives, particularly in classifying positive sentiments.

Recall: The recall score of 81.6% indicates that the model is able to identify a substantial proportion of the actual sentiment labels. This balance between precision and recall shows that the model is not biased towards any specific sentiment class.

F1 Score: The F1 score of 83.5% strikes a good balance between precision and recall, confirming that the model performs well across all sentiment categories (positive, negative, and neutral). This makes Logistic Regression a strong candidate for accurately classifying various sentiment labels.

## Step 8: Save prediction into my s3 bucket

    * save as a csv file

In [0]:
spark.conf.set("fs.s3a.access.key", "AKIA4VDBMHXDRBOBMLXQ")
spark.conf.set("fs.s3a.secret.key", "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")


In [0]:
# Select only the necessary columns you want to save
lr_predictions_flat = lr_predictions.select("text", "overall_sentiment_index", "prediction")

# Save the selected columns to S3 as a CSV
lr_predictions_flat.write.format("csv").save("s3a://boqi-bucket/sentiment_lr_predictions/")


In [0]:
# Save the preprocessed data (with necessary columns) to S3 as a CSV
chatgpt_tweets_df_filtered.select(*columns_to_keep) \
    .write.format("csv").option("header", "true").save("s3a://boqi-bucket/tweets_original_dat/")

In [0]:
display(chatgpt_tweets_df_filtered.select(*columns_to_keep))