In [1]:
# Import necessary Spark and MLlib libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors, Vector
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StopWordsRemover
from pyspark.ml import Pipeline



In [2]:
# Initialize SparkSession
# The appName is a label for your application in the Spark UI
# The .getOrCreate() method ensures that if a SparkSession already exists, it will be used,
# otherwise, a new one will be created.
spark = SparkSession.builder \
    .appName("PySparkModelTraining") \
    .getOrCreate()

# Set log level to ERROR to reduce verbosity for cleaner output
spark.sparkContext.setLogLevel("ERROR")

print("SparkSession initialized successfully.")



SparkSession initialized successfully.


In [3]:
# --- 1. Load Cleaned Data ---

# Define path for cleaned data (CSV format)
cleaned_data_path = "../data/cleaned_data.csv" 

try:
    # --- FIX: Define explicit schema for reading CSV ---
    # This ensures Spark reads the columns with the correct data types,
    # preventing misinterpretations by inferSchema=True.
    cleaned_data_schema = StructType([
        StructField("content", StringType(), True),
        StructField("clean_comment", StringType(), True),
        StructField("sentiment", IntegerType(), True) # Explicitly define sentiment as IntegerType
    ])

    # Load the CSV file using the defined schema
    df_cleaned = spark.read.csv(cleaned_data_path, header=True, schema=cleaned_data_schema)
    print(f"\nCleaned data loaded successfully from {cleaned_data_path}")

    # Rename 'sentiment' column to 'label' for MLlib compatibility
    df_cleaned = df_cleaned.withColumnRenamed("sentiment", "label")
    
    # --- ADDED: Re-filter out empty clean_comment rows in Spark ---
    # This ensures consistency with the preprocessing script's filtering.
    initial_spark_rows = df_cleaned.count() # Count before filtering
    df_cleaned = df_cleaned.filter(col("clean_comment").isNotNull() & (col("clean_comment") != ""))
    rows_filtered_in_spark = initial_spark_rows - df_cleaned.count()
    if rows_filtered_in_spark > 0:
        print(f"Filtered {rows_filtered_in_spark} rows with empty or null 'clean_comment' after loading in Spark.")
    
    # --- ADDED FIX: Filter out rows with null labels ---
    initial_rows_before_label_filter = df_cleaned.count()
    df_cleaned = df_cleaned.filter(col("label").isNotNull())
    rows_filtered_for_null_labels = initial_rows_before_label_filter - df_cleaned.count()
    if rows_filtered_for_null_labels > 0:
        print(f"Filtered {rows_filtered_for_null_labels} rows with NULL 'label' values.")

    print("\n--- Cleaned Data Sample (first 5 rows) ---")
    df_cleaned.show(5, truncate=False)
    df_cleaned.printSchema()
    print(f"Total rows in cleaned DataFrame after Spark-side filtering: {df_cleaned.count()}")
except Exception as e:
    print(f"Error loading cleaned data: {e}")
    print("Please ensure the preprocessing step ran successfully and 'cleaned_data.csv' was saved at the specified path.")
    # If loading fails, create a dummy DataFrame to avoid breaking the notebook flow
    dummy_schema = StructType([
        StructField("content", StringType(), True),
        StructField("clean_comment", StringType(), True),
        StructField("label", IntegerType(), True)
    ])
    df_cleaned = spark.createDataFrame([
        ("This is a positive comment.", "positive comment", 0),
        ("This is a negative comment.", "negative comment", 1),
        ("This is a neutral comment.", "neutral comment", 2)
    ], schema=dummy_schema)
    print("Loaded dummy cleaned data for demonstration due to file loading issue.")



Cleaned data loaded successfully from ../data/cleaned_data.csv
Filtered 8222 rows with empty or null 'clean_comment' after loading in Spark.
Filtered 5274 rows with NULL 'label' values.

--- Cleaned Data Sample (first 5 rows) ---
+-------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------+-----+
|content                                                                                                            |clean_comment                                                             |label|
+-------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------+-----+
|@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D|thats bummer shoulda got david carr third day          

In [4]:

# --- 2. Text Feature Engineering (TF-IDF) ---

# Ensure 'clean_comment' column is not null and is string type
# This line is redundant if the above filter is effective, but harmless.
df_cleaned = df_cleaned.withColumn("clean_comment", col("clean_comment").cast(StringType()))

# 2.1 Tokenization: Split text into words
tokenizer = Tokenizer(inputCol="clean_comment", outputCol="words")

# 2.2 Stop Words Removal (Optional, as clean_text already handles this, but can be used for extra robustness)
# For this dataset, English stopwords are primarily removed by the preprocessing script.
# If you uncomment, make sure 'words' column exists from tokenizer.
# remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered_words")

# 2.3 HashingTF: Convert words into fixed-size feature vectors (term frequencies)
# numFeatures is the size of the vocabulary (or feature vector dimension).
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="raw_features", numFeatures=10000)

# 2.4 IDF: Calculate Inverse Document Frequency
idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol="features")

# 2.5 Create a Pipeline to chain these steps
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf])

# Fit the pipeline to the data and transform it to get the 'features' column
print("\n--- Building Feature Engineering Pipeline and Transforming Data ---")
pipeline_model = pipeline.fit(df_cleaned)
df_features = pipeline_model.transform(df_cleaned)

print("Feature engineering completed. Sample of features and labels:")
df_features.select("clean_comment", "features", "label").show(5, truncate=False)
df_features.printSchema()

# --- ADDED: Check for empty/null feature vectors after TF-IDF ---
print("\n--- Checking Feature Vector Validity ---")
# Count rows where 'features' column is null or has zero size (empty vector)
# For SparseVector, size is the number of non-zero elements.
# A vector with no non-zero elements (i.e., all zeros) can be problematic for some models.
# Check for null features first
null_features_count = df_features.filter(col("features").isNull()).count()
if null_features_count > 0:
    print(f"WARNING: Found {null_features_count} rows with NULL feature vectors. These will be filtered out.")
    df_features = df_features.filter(col("features").isNotNull())

# Check for empty (all zero) feature vectors
# This requires converting to RDD and checking the sparse vector's size or values
# A more direct way to check if a SparseVector is all zeros: its size (num_non_zeros) will be 0
# However, the 'features' column is of type VectorUDT, which doesn't directly expose num_non_zeros
# A common way to check for all-zero vectors is to filter where the norm is zero, but that's computationally heavy.
# A simpler approach is to filter out rows where the 'words' column (after tokenization) is empty,
# as this would lead to empty feature vectors. We already have a filter for clean_comment != "".
# Let's check if any feature vector has zero non-zero elements.
# This requires a UDF or more complex Spark SQL. For now, we'll rely on the filter for clean_comment.
# If the error persists, we might need to inspect the actual vector contents.

# Re-count after any potential filtering for null features
print(f"Total rows in DataFrame after feature engineering and null feature filtering: {df_features.count()}")




--- Building Feature Engineering Pipeline and Transforming Data ---
Feature engineering completed. Sample of features and labels:
+--------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|clean_comment                                                             |features                                                                                                                                                                                                                                                                                          |label|
+--------------------------------------------------------------------------+---------------------------------------------

In [5]:
# --- 3. Data Splitting and Model Training ---

# 3.1 Data Splitting
# Split the data with features and labels into training and testing sets.
train_df, test_df = df_features.randomSplit([0.7, 0.3], seed=42)

print(f"\nTraining data rows: {train_df.count()}")
print(f"Test data rows: {test_df.count()}")

# 3.2 Model Definition: Multi-class Logistic Regression
lr = LogisticRegression(featuresCol="features", labelCol="label", family="multinomial")

# 3.3 Model Training
print("\n--- Starting Multi-class Logistic Regression Model Training ---")
lr_model = lr.fit(train_df)
print("Model training completed.")


Training data rows: 1123101
Test data rows: 480652

--- Starting Multi-class Logistic Regression Model Training ---
Model training completed.


In [6]:
# --- 4. Make Predictions and Model Evaluation ---

# 4.1 Make Predictions
predictions = lr_model.transform(test_df)

print("\n--- Predictions Sample (first 10 rows) ---")
predictions.select("label", "prediction", "probability", "rawPrediction").show(10, truncate=False)

# 4.2 Model Evaluation (Multi-class)
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
evaluator_weighted_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator_weighted_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")

accuracy = evaluator_accuracy.evaluate(predictions)
f1_score = evaluator_f1.evaluate(predictions)
weighted_precision = evaluator_weighted_precision.evaluate(predictions)
weighted_recall = evaluator_weighted_recall.evaluate(predictions)

print(f"\nAccuracy on test set: {accuracy}")
print(f"F1 Score on test set: {f1_score}")
print(f"Weighted Precision on test set: {weighted_precision}")
print(f"Weighted Recall on test set: {weighted_recall}")

# 4.3 Save Trained Model
model_save_path = "../data/trained_model/lr" 
print(f"\nSaving trained Logistic Regression model to {model_save_path}...")
try:
    lr_model.save(model_save_path)
    print("Model saved successfully.")
except Exception as e:
    print(f"Error saving model: {e}")

try:
    print("\n--- Logistic Regression Model Coefficients and Intercept ---")
    print(f"Intercepts (per class): {lr_model.interceptVector}")
    print(f"Coefficients (per class, sample of first 10): {lr_model.coefficientMatrix.toArray()[:, :10]}")
except Exception as e:
    print(f"Could not retrieve LR model or coefficients: {e}")



--- Predictions Sample (first 10 rows) ---
+-----+----------+--------------------------------------------------------------+-----------------------------------------------------------+
|label|prediction|probability                                                   |rawPrediction                                              |
+-----+----------+--------------------------------------------------------------+-----------------------------------------------------------+
|1    |0.0       |[0.5462467724006658,0.445209223408468,0.008544004190866308]   |[1.454122522043148,1.2495960209356465,-2.7037185429787947] |
|1    |0.0       |[0.7751438989565699,0.22479964688796647,5.6454155463643375E-5]|[3.5884047356894677,2.350565594734031,-5.9389703304234995] |
|1    |1.0       |[0.398928353698952,0.6010145160240701,5.713027697791523E-5]   |[2.813788547631738,3.2236257988915984,-6.037414346523336]  |
|1    |1.0       |[0.14401925133937357,0.8559804258634051,3.227972212731543E-7] |[3.742044227300343,5.52

In [7]:
# --- Stop SparkSession ---
spark.stop()
print("\nSparkSession stopped.")



SparkSession stopped.
