In [None]:
#install pyspark
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:8 http://security.ubuntu.com/ubuntu jammy-security/restricted amd64 Packages [3,429 kB]
Get:9 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [2,503 kB]
Get:10 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [8,551 kB]
Get:11 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,632 kB]
Hit:12 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:13 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ub

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, when, lower
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# Initialize Spark session
spark = SparkSession.builder \
    .appName("AmazonReviewsPolarityClassification") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

#------------------------------------------
# Load Training and Testing Data
#------------------------------------------
schema = "polarity INT, title STRING, text STRING"
train_df = spark.read.csv("/content/drive/MyDrive/550_proj/train.csv", schema=schema, header=False, quote='"', escape='"')
test_df = spark.read.csv("/content/drive/MyDrive/550_proj/test.csv", schema=schema, header=False, quote='"', escape='"')

# Print schema and initial data info
print("=== Initial DataFrames Loaded ===")
print("Train DataFrame Schema:")
train_df.printSchema()
print("Test DataFrame Schema:")
test_df.printSchema()

print("Train DataFrame Count:", train_df.count())
print("Test DataFrame Count:", test_df.count())

print("Sample Rows from Train DataFrame:")
train_df.show(5, truncate=False)

print("Sample Rows from Test DataFrame:")
test_df.show(5, truncate=False)

# Remove rows with null values
train_df = train_df.na.drop()
test_df = test_df.na.drop()

# Print counts after dropping nulls
print("Train DataFrame Count after dropping nulls:", train_df.count())
print("Test DataFrame Count after dropping nulls:", test_df.count())

# Lowercase the title and text
train_df = train_df.withColumn("title", lower(col("title"))).withColumn("text", lower(col("text")))
test_df = test_df.withColumn("title", lower(col("title"))).withColumn("text", lower(col("text")))

# Combine title and text
train_df = train_df.withColumn("combined_text", concat_ws(" ", col("title"), col("text")))
test_df = test_df.withColumn("combined_text", concat_ws(" ", col("title"), col("text")))

# Print sample combined_text
print("Sample combined_text from Train DataFrame:")
train_df.select("polarity", "combined_text").show(5, truncate=False)

print("Sample combined_text from Test DataFrame:")
test_df.select("polarity", "combined_text").show(5, truncate=False)

# Map labels: polarity 1 -> 0.0, polarity 2 -> 1.0
train_df = train_df.withColumn("label", when(col("polarity") == 1, 0.0).otherwise(1.0))
test_df = test_df.withColumn("label", when(col("polarity") == 1, 0.0).otherwise(1.0))

#------------------------------------------
# Pre-processing Pipeline (No Bigrams, No CV)
#------------------------------------------
tokenizer = Tokenizer(inputCol="combined_text", outputCol="tokens")
stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")

# Use a CountVectorizer with default parameters
cv = CountVectorizer(inputCol="filtered_tokens", outputCol="features", vocabSize=65536, minDF=5)

lr = LogisticRegression(featuresCol="features", labelCol="label")

pipeline = Pipeline(stages=[tokenizer, stopwords_remover, cv, lr])

# Train the pipeline (no cross-validation, direct fit)
print("Starting model training...")
model = pipeline.fit(train_df)
print("Model training complete.")

# Evaluate on Test Set
predictions = model.transform(test_df)
print("Predictions on Test Set complete.")

# Show sample predictions
print("Sample Predictions:")
predictions.select("polarity", "label", "prediction", "probability").show(5, truncate=False)

accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = accuracy_evaluator.evaluate(predictions)

precision_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = precision_evaluator.evaluate(predictions)

recall_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = recall_evaluator.evaluate(predictions)

f1_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = f1_evaluator.evaluate(predictions)

binary_evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = binary_evaluator.evaluate(predictions)

print("=== Test Set Metrics ===")
print(f" - Accuracy:  {accuracy}")
print(f" - Precision: {precision}")
print(f" - Recall:    {recall}")
print(f" - F1-Score:  {f1_score}")
print(f" - ROC AUC:   {auc}")

spark.stop()


Mounted at /content/drive
=== Initial DataFrames Loaded ===
Train DataFrame Schema:
root
 |-- polarity: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)

Test DataFrame Schema:
root
 |-- polarity: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)

Train DataFrame Count: 3600000
Test DataFrame Count: 400000
Sample Rows from Train DataFrame:
+--------+------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------