In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder \
    .appName("AmazonReviewsEDA") \
    .getOrCreate()

# Load the dataset
data = spark.read.csv("/content/Reviews.csv", header=True, inferSchema=True)

# Filter data for reviews with scores >= 4
filtered_data = data.filter(col("Score") >= 4)

# Aggregate reviews by ProductId and count
aggregated_data = filtered_data.groupBy("ProductId").count()

# Show results
aggregated_data.show()







+----------+-----+
| ProductId|count|
+----------+-----+
|B000FA398U|    1|
|B00523NRVO|    3|
|B002R7XYBQ|    1|
|B001RIXUS8|    3|
|B000JSQKNE|    2|
|B004N71J7O|    3|
|B000FKL0EU|   15|
|B0080HFENI|    2|
|B001EO62CG|    1|
|B005P0NI4K|    1|
|B001EQ4LAE|   11|
|B0018CJYPG|   33|
|B0052UOQY4|    1|
|B000HEA95K|    9|
|B00432EV3I|    6|
|B0042RNHVG|    1|
|B000YUOY30|   14|
|B004FWYAYG|    6|
|B001EQ5FQI|    6|
|B00142IAKU|    8|
+----------+-----+
only showing top 20 rows



In [None]:
# Print schema and sample data
data.printSchema()
data.show(5)

# Drop nulls and fill empty values
data = data.na.drop(subset=["Text", "Score"])
data = data.na.fill("", subset=["Text"])

# Create label column
data = data.withColumn("label", col("Score") - 1)

# Check label distribution
data.groupBy("label").count().show()

# Train-test split
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Check split counts
print(f"Training data count: {train_data.count()}")
print(f"Test data count: {test_data.count()}")

root
 |-- Id: integer (nullable = true)
 |-- ProductId: string (nullable = true)
 |-- UserId: string (nullable = true)
 |-- ProfileName: string (nullable = true)
 |-- HelpfulnessNumerator: string (nullable = true)
 |-- HelpfulnessDenominator: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Text: string (nullable = false)
 |-- label: double (nullable = true)

+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+-----+
| Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|label|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+-----+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|       

In [None]:
# Tokenizer
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

tokenizer = Tokenizer(inputCol="Text", outputCol="words")
tokenized_data = tokenizer.transform(data)
tokenized_data.select("words").show(5, truncate=False)

# StopWordsRemover
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered_data = stopwords_remover.transform(tokenized_data)
filtered_data.select("filtered_words").show(5, truncate=False)

# CountVectorizer
vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="raw_features")
try:
    vectorizer_model = vectorizer.fit(filtered_data)
    vectorized_data = vectorizer_model.transform(filtered_data)
    vectorized_data.select("raw_features").show(5, truncate=False)
except Exception as e:
    print(f"Vectorizer Error: {e}")
    raise

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|words                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
+-------------------------------------------------------------------

In [None]:
# IDF
idf = IDF(inputCol="raw_features", outputCol="features")
idf_model = idf.fit(vectorized_data)
transformed_data = idf_model.transform(vectorized_data)

# Logistic Regression
lr = LogisticRegression(featuresCol="features", labelCol="label")
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, vectorizer, idf, lr])

# Train the model
if train_data.count() > 0:
    try:
        model = pipeline.fit(train_data)
        predictions = model.transform(test_data)

        # Evaluate the model
        evaluator = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")
        accuracy = evaluator.evaluate(predictions)
        print(f"Model Accuracy: {accuracy}")
    except Exception as e:
        print(f"Training Error: {e}")
else:
    print("Training data is empty. Please check your dataset or filtering logic.")

Training Error: An error occurred while calling o579.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 69.0 failed 1 times, most recent failure: Lost task 1.0 in stage 69.0 (TID 85) (2384d3c32d3c executor driver): java.lang.RuntimeException: Labels MUST be in [0, 2147483647), but got -1.0
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Ite

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Data preprocessing
tokenizer = Tokenizer(inputCol="review_body", outputCol="words")
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="raw_features")
idf = IDF(inputCol="raw_features", outputCol="features")

# Train-test split
data = data.withColumn("label", data["star_rating"] - 1)  # Assuming star ratings are 1-5
train_data, test_data = data.randomSplit([0.8, 0.2])

# Model pipeline
lr = LogisticRegression(featuresCol="features", labelCol="label")
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, vectorizer, idf, lr])

# Train the model
model = pipeline.fit(train_data)

# Test the model
predictions = model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Model Accuracy: {accuracy}")
