In [4]:
!pip install pyspark



In [5]:
from pyspark.sql import SparkSession

In [6]:
import pandas as pd

<h2>Creating Spark Session</h2>

In [7]:
spark = SparkSession.builder \
  .appName("Sentiment Analysis") \
  .getOrCreate()


This will create our spark session.

In [8]:
# Loading the data file
tweets_df = spark.read.csv(r"Sentiment Analysis Dataset.csv", inferSchema=True, header=True)


In [9]:
tweets_df.show(6)

+------+---------+---------------+--------------------+
|ItemID|Sentiment|SentimentSource|       SentimentText|
+------+---------+---------------+--------------------+
|     1|        0|   Sentiment140|                 ...|
|     2|        0|   Sentiment140|                 ...|
|     3|        1|   Sentiment140|              omg...|
|     4|        0|   Sentiment140|          .. Omga...|
|     5|        0|   Sentiment140|         i think ...|
|     6|        0|   Sentiment140|         or i jus...|
+------+---------+---------------+--------------------+
only showing top 6 rows



In [10]:
tweets_df.describe()

DataFrame[summary: string, ItemID: string, Sentiment: string, SentimentSource: string, SentimentText: string]

<h2>Cleaning the data</h2>

<h3>Converting the text to lowercase</h3>

In [11]:
from pyspark.sql.functions import lower

tweets_df = tweets_df.withColumn("text", lower(tweets_df.SentimentText))


<h3>Checking for duplicates</h3>

In [12]:
# Considering all columns
duplicates = tweets_df.dropDuplicates()
num_duplicates = tweets_df.count() - duplicates.count()
print(f"Number of duplicate rows: {num_duplicates}")



Number of duplicate rows: 0


<h3>Checking for missing values</h3>

In [13]:
from pyspark.sql.functions import col, count


In [14]:
from pyspark.sql.functions import count

missing_counts = tweets_df.groupBy("SentimentText").agg(count("SentimentText").alias("count")).where(col("count") == 0)
missing_cols = missing_counts.select("SentimentText")

print("Columns with missing values:")
missing_cols.show()



Columns with missing values:
+-------------+
|SentimentText|
+-------------+
+-------------+



In [15]:
missing_cols.count()

0

<p>There are no missing values or duplicate values in the dataset</p>

<h2>Selecting the related data</h2>

In [16]:
sentiment_data = tweets_df.select("SentimentText", col("Sentiment").cast("Int").alias("label"))
sentiment_data.show(truncate = False,n=10)

+------------------------------------------------------------------------------------------------------------------------------------+-----+
|SentimentText                                                                                                                       |label|
+------------------------------------------------------------------------------------------------------------------------------------+-----+
|                     is so sad for my APL friend.............                                                                       |0    |
|                   I missed the New Moon trailer...                                                                                 |0    |
|              omg its already 7:30 :O                                                                                               |1    |
|          .. Omgaga. Im sooo  im gunna CRy. I've been at this dentist since 11.. I was suposed 2 just get a crown put on (30mins)...|0    |
|         i t

<p>Dividing the data into training data and test data</p>

In [17]:
#divide data, 70% for training, 30% for testing
train_test_data = sentiment_data.randomSplit([0.7, 0.3])
sentiment_train_data = train_test_data[0] #index 0 = data training
sentiment_test_data = train_test_data[1] #index 1 = data testing
training_data = sentiment_train_data.count()
testing_data = sentiment_test_data.count()
print ("Training data rows:", training_data, "; Testing data rows:", testing_data)

Training data rows: 1105272 ; Testing data rows: 473355


<h2>Preparing training data</h2>

In [18]:
from pyspark.ml.feature import Tokenizer

In [19]:
tokenizer = Tokenizer(inputCol="SentimentText", outputCol="SentimentWords")
tokenize_train_data = tokenizer.transform(sentiment_train_data)
tokenize_train_data.show(truncate=False, n=5)

+---------------------------------------------------------------------------+-----+--------------------------------------------------------------------------------------------------------------+
|SentimentText                                                              |label|SentimentWords                                                                                                |
+---------------------------------------------------------------------------+-----+--------------------------------------------------------------------------------------------------------------+
|                                     I miss her so much already...         |0    |[, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , i, miss, her, so, much, already...]|
|                     is so sad for my APL friend.............              |0    |[, , , , , , , , , , , , , , , , , , , , , is, so, sad, for, my, apl, friend.............]                    |
|                   I mis

<p>Removing Stop Words</p>

In [20]:
from pyspark.ml.feature import StopWordsRemover

In [21]:
stop_words_removal = StopWordsRemover(inputCol=tokenizer.getOutputCol(),
                       outputCol="MeaningfulWords")
train_data_stopword_removed = stop_words_removal.transform(tokenize_train_data)
train_data_stopword_removed.show(truncate=False, n=5)

+---------------------------------------------------------------------------+-----+--------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------+
|SentimentText                                                              |label|SentimentWords                                                                                                |MeaningfulWords                                                                                   |
+---------------------------------------------------------------------------+-----+--------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------+
|                                     I miss her so much already...         |0    |[, , , , , , , , , , , , , , , , , 

<p>Now converting words into numbers using Austin Appleby's MurmurHash 3 Algorithm</p>

In [22]:
from pyspark.ml.feature import HashingTF

In [23]:
hashTF = HashingTF(inputCol=stop_words_removal.getOutputCol(), outputCol="features")
numericTrainData = hashTF.transform(train_data_stopword_removed).select(
    'label', 'MeaningfulWords', 'features')
numericTrainData.show(truncate=False, n=3)

+-----+--------------------------------------------------------------------------------------------------+-------------------------------------------------------------------+
|label|MeaningfulWords                                                                                   |features                                                           |
+-----+--------------------------------------------------------------------------------------------------+-------------------------------------------------------------------+
|0    |[, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , miss, much, already...]|(262144,[76764,232735,249180,260036],[1.0,1.0,37.0,1.0])           |
|0    |[, , , , , , , , , , , , , , , , , , , , , sad, apl, friend.............]                         |(262144,[23825,74989,125638,249180],[1.0,1.0,1.0,21.0])            |
|0    |[, , , , , , , , , , , , , , , , , , , missed, new, moon, trailer...]                             |(262144,[89833,1653

In [24]:
!pip install -U -q PyDrive


In [25]:
def train_and_evaluate(model, training_data, param_grid):
    """
    Trains a model with hyperparameter tuning and returns the best model and its evaluation metrics.

    Args:
        model: Spark machine learning model object.
        training_data: Spark DataFrame containing training data.
        param_grid: ParamGridBuilder object defining the hyperparameter search space.

    Returns:
        A tuple containing the best trained model and its evaluation metrics (accuracy).
    """
    # Define the evaluator for multi-class classification
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

    # Create the CrossValidator for hyperparameter tuning with 3-fold cross-validation (adjust folds as needed)
    cv = CrossValidator(estimator=model, evaluator=evaluator, estimatorParamMaps=param_grid, numFolds=3)

    # Train the model with hyperparameter tuning
    best_model = cv.fit(training_data)

    # Evaluate the best model on the entire training data
    accuracy = evaluator.evaluate(best_model.bestModel.transform(training_data))

    return best_model, accuracy

In [28]:
from pyspark.ml.classification import LogisticRegression






In [30]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator





possible_labels = ["positive", "negative", "neutral"]

# Support Vector Machine (SVM)
svm = LinearSVC(maxIter=100, regParam=0.01)  #

# Define the hyperparameter search space
param_grid = ParamGridBuilder().build()

# Train and evaluate SVM with hyperparameter tuning
best_svm_model, svm_accuracy = train_and_evaluate(svm, numericTrainData, param_grid)

# Print the performance metrics
print("Best SVM Accuracy:", svm_accuracy)


model_path_drive = "/content/gdrive/MyDrive/Datasets/spark_ml_model"

# Save the model
best_svm_model.bestModel.save(model_path_drive)





Best SVM Accuracy: 0.8637992975612889


In [31]:
lr = LogisticRegression(labelCol="label", featuresCol="features",
                        maxIter=10, regParam=0.01)
#model = lr.fit(numericTrainData)
param_grid = ParamGridBuilder().build()

best_lr_model, lr_accuracy = train_and_evaluate(lr, numericTrainData, param_grid)
print ("Training is done!")

Training is done!


In [32]:
print("Best lr Accuracy:", lr_accuracy)

Best lr Accuracy: 0.8618769931434351


In [33]:
print("Best SVM Accuracy:", svm_accuracy)

Best SVM Accuracy: 0.8637992975612889


In [34]:
import matplotlib as plt

</h2>Prepare testing data</h2>

In [36]:
test_tokens = tokenizer.transform(sentiment_test_data)
test_data_stop_words_removed = stop_words_removal.transform(test_tokens)
numericTest = hashTF.transform(test_data_stop_words_removed).select(
    'Label', 'MeaningfulWords', 'features')
numericTest.show(truncate=False, n=2)

+-----+-------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+
|Label|MeaningfulWords                                                                                  |features                                                                                           |
+-----+-------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+
|0    |[, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , exhausted]|(262144,[148003,249180],[1.0,43.0])                                                                |
|0    |[, , , , , , , , , , , , , , , noooooooooo, friends, twitter, makes, sad, someone, follow]       |(262144,[1512,125638,130047,148039,182401,199581,213767,249180],[1.0,1.

<h2>Prediction</h2>

In [37]:
prediction_model = best_svm_model.transform(numericTest)
prediction_final = prediction_model.select(
    "MeaningfulWords", "prediction", "Label")
prediction_final.show(n=20, truncate = False)
prediction = prediction_final.filter(
    prediction_final['prediction'] == prediction_final['Label']).count()
data = prediction_final.count()
print("correct prediction:", prediction, ", total data:", data,
      ", accuracy:", prediction/data)

+-------------------------------------------------------------------------------------------------+----------+-----+
|MeaningfulWords                                                                                  |prediction|Label|
+-------------------------------------------------------------------------------------------------+----------+-----+
|[, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , exhausted]|0.0       |0    |
|[, , , , , , , , , , , , , , , noooooooooo, friends, twitter, makes, sad, someone, follow]       |0.0       |0    |
|[, , , , , , , , , , , , , , omg, already, 7:30, :o]                                             |0.0       |1    |
|[, , , , , , , , , , life, lazzzzyyyy!!!!!!!, , , , , , , , , , , , ***********]                 |0.0       |1    |
|[, , , , , , , , heart, hurts, badly...]                                                         |0.0       |0    |
|[, , , , , , , , wish, ella, somebody,,,,]                     

In [40]:
prediction_model_lr = best_lr_model.transform(numericTest)
prediction_final_lr = prediction_model.select(
    "MeaningfulWords", "prediction", "Label")
prediction_final_lr.show(n=20, truncate = False)
prediction_lr = prediction_final_lr.filter(
    prediction_final_lr['prediction'] == prediction_final_lr['Label']).count()
data_lr = prediction_final_lr.count()
print("correct prediction:", prediction_lr, ", total data:", data,
      ", accuracy:", prediction_lr/data_lr)

+-------------------------------------------------------------------------------------------------+----------+-----+
|MeaningfulWords                                                                                  |prediction|Label|
+-------------------------------------------------------------------------------------------------+----------+-----+
|[, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , exhausted]|0.0       |0    |
|[, , , , , , , , , , , , , , , noooooooooo, friends, twitter, makes, sad, someone, follow]       |0.0       |0    |
|[, , , , , , , , , , , , , , omg, already, 7:30, :o]                                             |0.0       |1    |
|[, , , , , , , , , , life, lazzzzyyyy!!!!!!!, , , , , , , , , , , , ***********]                 |0.0       |1    |
|[, , , , , , , , heart, hurts, badly...]                                                         |0.0       |0    |
|[, , , , , , , , wish, ella, somebody,,,,]                     

In [41]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml import PipelineModel

# Assuming you have the trained model object available in your notebook
loaded_model = best_svm_model.bestModel

def preprocess_tweet(tweet):
    # Create a DataFrame with a single column named "SentimentText" containing the tweet
    tweet_df = spark.createDataFrame([(tweet,)], ["SentimentText"])

    # Tokenize the tweet
    tokens = tokenizer.transform(tweet_df)

    # Remove stop words
    words_removed = stop_words_removal.transform(tokens)

    # Convert to features
    features = hashTF.transform(words_removed).select("SentimentText", "features")

    return features



def predict_sentiment(tweet):
    # Preprocess the tweet
    tweet_features = preprocess_tweet(tweet)

    # Use the model to predict sentiment
    prediction_model = loaded_model.transform(tweet_features)

    # Extract prediction label
    prediction_label = prediction_model.select("prediction").collect()[0][0]

    return prediction_label

# Get input tweet from user
tweet_input = input("Enter your tweet: ")

# Predict sentiment label
predicted_label = predict_sentiment(tweet_input)

print("Predicted sentiment label:", predicted_label)


Enter your tweet: We should send rockets not at each other, but rather to the stars
Predicted sentiment label: 1.0
