In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("HDFS_Test") \
    .config("spark.ui.port", "4040") \
    .getOrCreate()

print("Spark version:", spark.version)

# Replace with your HDFS path
hdfs_path = "hdfs://namenode:9000/output/cleaned_reviews"

df = spark.read.parquet(hdfs_path)

# Show a few rows
df.show(5, truncate=False)
print(spark.sparkContext.uiWebUrl)

Spark version: 3.5.0
+------+-----------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+----------------------------+------------+-----------------+-------------+-------------------+---------+--------------------+
|rating|title                                    |text                                                                                                                                                                                                                                                                                                        |asin      |user_id                     |helpful_vote|verified_purchase|timestamp    |date               |has_image|final_best_image_url|
+--

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, concat_ws
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

# 1. Load preprocessed data
print("Loading data...")
print("Data loaded from HDFS successfully.")
df.show(5, truncate=False)
df.printSchema()

# 2. Create label (1 for potential fake reviews)
print("Creating labels...")
df = df.withColumn(
    "label",
    when((col("rating") == 5.0) & (col("helpful_vote") == 0), 1.0).otherwise(0.0)
)

# 3. Combine title and text for better feature extraction
df = df.withColumn(
    "combined_text",
    concat_ws(" ", col("title"), col("text"))
)

# 4. Text processing pipeline
tokenizer = Tokenizer(inputCol="combined_text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=10000)
idf = IDF(inputCol="raw_features", outputCol="text_features")

# 5. Create feature vector
assembler = VectorAssembler(
    inputCols=["text_features", "helpful_vote", "verified_purchase"],
    outputCol="features",
    handleInvalid="skip"
)

# 6. Define logistic regression
lr = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    maxIter=10,
    regParam=0.01
)

# 7. Create pipeline
pipeline = Pipeline(stages=[
    tokenizer,
    hashingTF,
    idf,
    assembler,
    lr
])

# 8. Split data
print("Splitting data...")
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# 9. Train model
print("Training model...")
model = pipeline.fit(train_df)

# 10. Make predictions
print("Making predictions...")
predictions = model.transform(test_df)

# 11. Evaluate with multiple metrics
print("Evaluating model...")

evaluator_roc = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)
auc = evaluator_roc.evaluate(predictions)

evaluator_pr = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderPR"
)
auc_pr = evaluator_pr.evaluate(predictions)

# Confusion Matrix components
tp = predictions.filter((col("label") == 1) & (col("prediction") == 1)).count()
tn = predictions.filter((col("label") == 0) & (col("prediction") == 0)).count()
fp = predictions.filter((col("label") == 0) & (col("prediction") == 1)).count()
fn = predictions.filter((col("label") == 1) & (col("prediction") == 0)).count()

# Derived metrics
accuracy = (tp + tn) / (tp + tn + fp + fn)
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1 = (2 * precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

# Print all metrics neatly
print("\n📊 Evaluation Metrics:")
print(f"AUC-ROC:           {auc:.4f}")
print(f"AUC-PR:            {auc_pr:.4f}")
print(f"Accuracy:          {accuracy:.4f}")
print(f"Precision:         {precision:.4f}")
print(f"Recall:            {recall:.4f}")
print(f"F1-Score:          {f1:.4f}")
print("\nConfusion Matrix:")
print(f"TP: {tp}, FP: {fp}, TN: {tn}, FN: {fn}")

# Save evaluation metrics in HDFS
from pyspark.sql import Row
metrics_df = spark.createDataFrame([
    Row(AUC_ROC=auc, AUC_PR=auc_pr, Accuracy=accuracy, Precision=precision, Recall=recall, F1=f1)
])
metrics_df.write.mode("overwrite").parquet("hdfs://namenode:9000/output/metrics")

# 12. Save predictions
print("Saving predictions...")
predictions.select("user_id", "asin", "label", "prediction") \
    .write.mode("overwrite") \
    .parquet("hdfs://namenode:9000/output/predictions")

print("Pipeline completed successfully!")
        
# Print some statistics
total = predictions.count()
fake = predictions.filter(col("prediction") == 1.0).count()
print(f"\nStats:")
print(f"Total reviews processed: {total}")
print(f"Predicted fake reviews: {fake} ({(fake/total)*100:.2f}%)")


Loading data...
Data loaded from HDFS successfully.
+------+-----------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+----------------------------+------------+-----------------+-------------+-------------------+---------+--------------------+
|rating|title                                    |text                                                                                                                                                                                                                                                                                                        |asin      |user_id                     |helpful_vote|verified_purchase|timestamp    |date               |has_