In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length, when
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, Word2Vec, StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
spark = SparkSession.builder.appName("SentimentAnalysis").getOrCreate()

In [3]:
df1 = spark.read.csv("Tweets.csv", header=True, encoding="ISO-8859-1", inferSchema=True)
df2 = spark.read.csv("train.csv", header=True, encoding="ISO-8859-1", inferSchema=True)


In [4]:
df1

DataFrame[tweet_id: string, airline_sentiment: string, airline_sentiment_confidence: string, negativereason: string, negativereason_confidence: string, airline: string, airline_sentiment_gold: string, name: string, negativereason_gold: string, retweet_count: int, text: string, tweet_coord: string, tweet_created: string, tweet_location: string, user_timezone: string]

In [5]:
df2

DataFrame[textID: string, text: string, selected_text: string, sentiment: string, Time of Tweet: string, Age of User: string, Country: string, Population -2020: int, Land Area (Km²): double, Density (P/Km²): int]

In [6]:
from pyspark.sql.functions import col, when

# Step 1: Clean and label df1 (airline dataset)
df1 = df1.select(
    col("text"),
    col("airline_sentiment").alias("sentiment")
).withColumn("source", when(col("text").isNotNull(), "airline"))

# Step 2: Clean and label df2 (general dataset)
df2 = df2.select(
    col("text"),
    col("sentiment")
).withColumn("source", when(col("text").isNotNull(), "general"))

# Step 3: Combine and drop nulls
combined_df = df1.unionByName(df2).na.drop(subset=["text", "sentiment"])


In [7]:
combined_df

DataFrame[text: string, sentiment: string, source: string]

In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    RegexTokenizer, StopWordsRemover, HashingTF, IDF,
    VectorAssembler, StandardScaler
)
from pyspark.sql.functions import col, length, size, when


# Convert to string just in case
combined_df = combined_df.withColumn("processed_text", col("text").cast("string"))

# Create text_length feature
combined_df = combined_df.withColumn("text_length", length(col("text")))

# Create is_airline binary feature
combined_df = combined_df.withColumn("is_airline", when(col("source") == "airline", 1).otherwise(0))


# Tokenization
tokenizer = RegexTokenizer(inputCol="processed_text", outputCol="tokens", pattern="\\s+")

# Remove stop words
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")

# TF-IDF
hashingTF = HashingTF(inputCol="filtered", outputCol="tf", numFeatures=5000)
idf = IDF(inputCol="tf", outputCol="tfidf")

# Add word count after tokens
# We'll use a SQL transformer after tokenizer if needed — but here's a cleaner solution:
# Use the `size()` function on the tokens column *after* tokenization

# Build preprocessing pipeline up to tokenization for intermediate step
token_pipeline = Pipeline(stages=[tokenizer])
token_model = token_pipeline.fit(combined_df)
tokenized_df = token_model.transform(combined_df)

# Now add `word_count`
tokenized_df = tokenized_df.withColumn("word_count", size(col("tokens")))

# Re-apply the rest of the pipeline on tokenized_df
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="tf", numFeatures=5000)
idf = IDF(inputCol="tf", outputCol="tfidf")

assembler = VectorAssembler(
    inputCols=["tfidf", "text_length", "word_count", "is_airline"],
    outputCol="assembled_features"
)

scaler = StandardScaler(inputCol="assembled_features", outputCol="features")

# Final pipeline (after tokenization)
final_pipeline = Pipeline(stages=[remover, hashingTF, idf, assembler, scaler])
final_model = final_pipeline.fit(tokenized_df)
preprocessed_df = final_model.transform(tokenized_df)


# Display key columns
preprocessed_df.select(
    "text", "tokens", "filtered", "text_length", "word_count", "is_airline", "features"
).show(5, truncate=False)


+----------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------+-----------+----------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                              |tokens                                                                                           

In [11]:
preprocessed_df

DataFrame[text: string, sentiment: string, source: string, processed_text: string, text_length: int, word_count: int, is_airline: int, tokens: array<string>, filtered: array<string>, tf: vector, tfidf: vector, assembled_features: vector, features: vector]

In [25]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Prepare scaler and classifier
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=False)
lr = LogisticRegression(featuresCol="scaled_features", labelCol="label", maxIter=20)

# Pipeline
pipeline_lr = Pipeline(stages=[scaler, lr])

# Train model
model_lr = pipeline_lr.fit(preprocessed_df)

# Predict
predictions_lr = model_lr.transform(preprocessed_df)

# Evaluate
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy_lr = evaluator.evaluate(predictions_lr)

print(f"Logistic Regression Accuracy: {accuracy_lr:.4f}")

Logistic Regression Accuracy: 0.8700


In [26]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="scaled_features", labelCol="label", numTrees=50)

pipeline_rf = Pipeline(stages=[scaler, rf])

model_rf = pipeline_rf.fit(preprocessed_df)

predictions_rf = model_rf.transform(preprocessed_df)

accuracy_rf = evaluator.evaluate(predictions_rf)

print(f"Random Forest Accuracy: {accuracy_rf:.4f}")


Random Forest Accuracy: 0.8460


In [30]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol="scaled_features", labelCol="label")

pipeline_dt = Pipeline(stages=[scaler, dt])

model_dt = pipeline_dt.fit(preprocessed_df)

predictions_dt = model_dt.transform(preprocessed_df)

accuracy_dt = evaluator.evaluate(predictions_dt)

print(f"Decision Tree Accuracy: {accuracy_dt:.4f}")

Decision Tree Accuracy: 0.8587


In [32]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline

# Assuming 'features' column exists
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

nb = NaiveBayes(featuresCol="scaled_features", labelCol="label")

pipeline_nb = Pipeline(stages=[scaler, nb])

model_nb = pipeline_nb.fit(preprocessed_df)

predictions_nb = model_nb.transform(preprocessed_df)

accuracy_nb = evaluator.evaluate(predictions_nb)

print(f"Naive Bayes Accuracy: {accuracy_nb:.4f}")


Naive Bayes Accuracy: 0.8347


In [34]:
from pyspark.ml.classification import OneVsRest, LinearSVC

# Initialize the base classifier
svm = LinearSVC(featuresCol="scaled_features", labelCol="label")

# One-vs-Rest classifier
ovr = OneVsRest(classifier=svm, labelCol="label", featuresCol="scaled_features")

pipeline_svm = Pipeline(stages=[scaler, ovr])

model_svm = pipeline_svm.fit(preprocessed_df)

predictions_svm = model_svm.transform(preprocessed_df)

accuracy = evaluator.evaluate(predictions_svm)
print(f"SVM One-vs-Rest Accuracy: {accuracy:.4f}")


SVM One-vs-Rest Accuracy: 0.8605
