In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length, when

# Create Spark session
spark = SparkSession.builder \
    .appName("Amazon Review Big Data ML") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/06/03 00:20:55 WARN Utils: Your hostname, umair, resolves to a loopback address: 127.0.1.1; using 192.168.100.25 instead (on interface wlp2s0)
25/06/03 00:20:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/03 00:20:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
!cd parquet_chunks && pwd && ls

In [2]:
# Load all chunks
df = spark.read.parquet("parquet_chunks/*.parquet")

25/06/03 00:21:12 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: parquet_chunks/*.parquet.
java.io.FileNotFoundException: File parquet_chunks/*.parquet does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:917)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1238)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:907)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:56)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:381)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache.spark.sql.catalyst.analysis.Res

In [3]:
print("✅ Schema:")
df.printSchema()

✅ Schema:
root
 |-- rating: double (nullable = true)
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- attachment_type: string (nullable = true)
 |    |    |-- large_image_url: string (nullable = true)
 |    |    |-- medium_image_url: string (nullable = true)
 |    |    |-- small_image_url: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- parent_asin: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- helpful_vote: long (nullable = true)
 |-- verified_purchase: boolean (nullable = true)



In [4]:
print("🔢 Total records:", df.count())

[Stage 1:>                                                          (0 + 4) / 5]

🔢 Total records: 1500000


                                                                                

In [5]:
print("📊 Sample:")
df.show(5)

📊 Sample:


[Stage 4:>                                                          (0 + 1) / 1]

+------+--------------------+--------------------+--------------------+----------+-----------+--------------------+-------------+------------+-----------------+
|rating|               title|                text|              images|      asin|parent_asin|             user_id|    timestamp|helpful_vote|verified_purchase|
+------+--------------------+--------------------+--------------------+----------+-----------+--------------------+-------------+------------+-----------------+
|   1.0|Not a watercolor ...|It is definitely ...|[{IMAGE, https://...|B09BGPFTDB| B09BGPFTDB|AFKZENTNBQ7A7V7UX...|1642399598485|           0|             true|
|   5.0|Updated: after 1s...|Updated: after fi...|                  []|0593235657| 0593235657|AFKZENTNBQ7A7V7UX...|1640629604904|           1|             true|
|   5.0|Excellent! I love...|I bought it for t...|                  []|1782490671| 1782490671|AFKZENTNBQ7A7V7UX...|1640383495102|           0|             true|
|   5.0|Updated after 1st...|Updat

                                                                                

In [6]:
# Clean and transform the data
df_clean = (
    df.filter(col("text").isNotNull() & col("rating").isNotNull())
      .withColumn("textLength", length(col("text")))
      .withColumn("label", when(col("rating") >= 4, 1).otherwise(0))  # Binary sentiment label
)

# Optional: check sample rows
df_clean.select("rating", "text", "textLength", "label").show(5, truncate=False)

# Save cleaned data for ML
df_clean.write.mode("overwrite").parquet("cleaned_reviews.parquet")

+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [7]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [8]:
# Reload cleaned data
df = spark.read.parquet("cleaned_reviews.parquet")

# Sample to avoid OOM (adjust as needed)
df = df.limit(500000)

In [9]:
# ✅ Correct column names used here
tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
tf = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=5000)
idf = IDF(inputCol="rawFeatures", outputCol="features")
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Build ML pipeline
pipeline = Pipeline(stages=[tokenizer, remover, tf, idf, lr])

In [10]:
# Train/Test Split
train, test = df.randomSplit([0.8, 0.2], seed=42)

In [11]:
# Train model
model = pipeline.fit(train)

25/06/03 00:22:49 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/06/03 00:24:21 WARN MemoryStore: Not enough space to cache rdd_56_0 in memory! (computed 113.1 MiB so far)
25/06/03 00:24:21 WARN BlockManager: Persisting block rdd_56_0 to disk instead.

In [12]:
# Predict
predictions = model.transform(test)

In [13]:
# Evaluate
evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = evaluator.evaluate(predictions)

print(f"🎯 AUC: {auc:.4f}")

                                                                                

🎯 AUC: 0.8708
