# Explore data

### Step 1: Load the JSON file

In [140]:
import json

# Step 1: Load the JSON file
with open("Data.json", "r", encoding="utf-8") as file:
    data = [json.loads(line) for line in file]

### Step 2: Display the type and keys (if it's a dict)

In [141]:
print("Type of data:", type(data))
if isinstance(data, dict):
    print("Top-level keys:", data.keys())

Type of data: <class 'list'>


### Step 3: Display first 3 records (if it's a list or inside a key)

In [142]:
if isinstance(data, list):
    for i, item in enumerate(data[:3]):
        print(f"Record {i+1}:")
        print(item)
        print()
elif isinstance(data, dict):
    for key in data:
        print(f"\nFirst 3 items under '{key}':")
        try:
            for i, item in enumerate(data[key][:3]):
                print(f"Record {i+1}:")
                print(item)
                print()
        except Exception as e:
            print(f"Couldn't display records for key '{key}':", e)


Record 1:
{'reviewerID': 'A2IBPI20UZIR0U', 'asin': '1384719342', 'reviewerName': 'cassandra tu "Yeah, well, that\'s just like, u...', 'helpful': [0, 0], 'reviewText': "Not much to write about here, but it does exactly what it's supposed to. filters out the pop sounds. now my recordings are much more crisp. it is one of the lowest prices pop filters on amazon so might as well buy it, they honestly work the same despite their pricing,", 'overall': 5.0, 'summary': 'good', 'unixReviewTime': 1393545600, 'reviewTime': '02 28, 2014'}

Record 2:
{'reviewerID': 'A14VAT5EAX3D9S', 'asin': '1384719342', 'reviewerName': 'Jake', 'helpful': [13, 14], 'reviewText': "The product does exactly as it should and is quite affordable.I did not realized it was double screened until it arrived, so it was even better than I had expected.As an added bonus, one of the screens carries a small hint of the smell of an old grape candy I used to buy, so for reminiscent's sake, I cannot stop putting the pop filter next

# Prepare data

In [143]:
import pandas as pd
# Convert to DataFrame
df = pd.DataFrame(data)

In [144]:
import re
import html
from nltk.corpus import stopwords
import nltk

nltk.download('stopwords')
stop_words = set(stopwords.words('english'))

def clean_review(text):
    if not isinstance(text, str):
        return ""
    text = html.unescape(text)
    text = text.lower()
    text = re.sub(r"[^a-z0-9\s]", " ", text)
    text = re.sub(r"\s+", " ", text).strip()
    tokens = [word for word in text.split() if word not in stop_words]
    return " ".join(tokens)

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\HP\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [145]:
df = df[["reviewText", "overall"]].dropna()

# Apply text cleaning
df["reviewText"] = df["reviewText"].apply(clean_review)

In [146]:
# Keep only reviewText and overall
df = df[["reviewText", "overall"]].dropna()

# Create label column based on overall rating
def label_sentiment(rating):
    if rating < 3:
        return 0  # negative
    elif rating == 3:
        return 1  # neutral
    else:
        return 2  # positive
# Create label column based on overall rating
def sentiment(rating):
    if rating < 3:
        return "negative"  # negative
    elif rating == 3:
        return "neutral"  # neutral
    elif rating >3:
        return "positive"  # positive

df["sentiment"] = df["overall"].apply(sentiment)

df["label"] = df["overall"].apply(label_sentiment)

In [147]:
# Show label distribution
print("Label counts:")
print(df["label"].value_counts())

# Show sample data
print("\nSample rows:")
print(df.head(5))

# Save the cleaned dataset to CSV
df.to_csv("cleaned_amazon_reviews.csv", index=False, encoding="utf-8")
print("✅ Cleaned data saved to 'cleaned_amazon_reviews.csv'")

Label counts:
label
2    9022
1     772
0     467
Name: count, dtype: int64

Sample rows:
                                          reviewText  overall sentiment  label
0  much write exactly supposed filters pop sounds...      5.0  positive      2
1  product exactly quite affordable realized doub...      5.0  positive      2
2  primary job device block breath would otherwis...      5.0  positive      2
3  nice windscreen protects mxl mic prevents pops...      5.0  positive      2
4  pop filter great looks performs like studio fi...      5.0  positive      2
✅ Cleaned data saved to 'cleaned_amazon_reviews.csv'


# Training amazon reviews

In [148]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, NaiveBayes, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import DataFrame


In [149]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
import os

# Configuration Hadoop pour Windows
os.environ["HADOOP_HOME"] = "C:/hadoop"  # Remplace si ton chemin est différent
os.environ["JAVA_HOME"] = "C:\Program Files\jdk1.8.0_202"
os.environ["PATH"] += os.pathsep + os.path.join(os.environ["HADOOP_HOME"], "bin")

# Configuration pour éviter les problèmes de connexion
os.environ['PYSPARK_PYTHON'] = 'python'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python'

spark = SparkSession.builder \
    .appName("SimpleTest") \
    .master("local[*]") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.rpc.message.maxSize", "512") \
    .config("spark.python.worker.timeout", "120") \
    .getOrCreate()



In [150]:
df = spark.read.csv("cleaned_amazon_reviews.csv", header=True, inferSchema=True)

In [151]:
from pyspark.sql.functions import length
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StringIndexer,NGram,CountVectorizer
from pyspark.ml import Pipeline


# Read CSV
df = spark.read.csv("cleaned_amazon_reviews.csv", header=True, inferSchema=True)

# ✅ Clean out bad or short entries
df = df.filter(df["reviewText"].isNotNull())
df = df.filter(df["reviewText"] != "")


# Pipeline steps
tokenizer = Tokenizer(inputCol="reviewText", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
ngram = NGram(n=2, inputCol="filtered", outputCol="bigrams")
vectorizer = CountVectorizer(inputCol="bigrams", outputCol="rawFeatures", vocabSize=3000)
idf = IDF(inputCol="rawFeatures", outputCol="features")
labelIndexer = StringIndexer(inputCol="sentiment", outputCol="indexedLabel")


In [152]:
# Étape 1 : split 80% train + 20% temporaire (val + test)
train_data, temp_data = df.randomSplit([0.8, 0.2], seed=42)

# Étape 2 : split 10% val + 10% test
validation_data, test_data = temp_data.randomSplit([0.5, 0.5], seed=42)

# Vérification
print(f"Train count     : {train_data.count()}")
print(f"Validation count: {validation_data.count()}")
print(f"Test count      : {test_data.count()}")


Train count     : 8283
Validation count: 1012
Test count      : 959


In [153]:
from pyspark.sql.functions import rand

# Number of times to duplicate minority classes
oversample_factor = 5.0

# Duplicate neutral and negative classes
neutral_oversampled = neutral.sample(withReplacement=True, fraction=oversample_factor, seed=42)
negative_oversampled = negative.sample(withReplacement=True, fraction=oversample_factor, seed=42)

# Combine with positive class (keep all positives)
balanced_train = positive.union(neutral_oversampled).union(negative_oversampled)

# Check new distribution
balanced_train.groupBy("label").count().show()


+-----+-----+
|label|count|
+-----+-----+
|    2| 7270|
|    1| 3233|
|    0| 1948|
+-----+-----+



In [154]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, NaiveBayes

# Liste des modèles à tester
models = {
    "Logistic Regression": LogisticRegression(featuresCol="features", labelCol="label", maxIter=10),
    "Random Forest": RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=20),
    "Naive Bayes": NaiveBayes(featuresCol="features", labelCol="label", smoothing=1.0)
}


In [155]:
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

def evaluate_model_numeric_label(model_name, model_algo, train_data, test_data, validation_data):
    # Assuming tokenizer, remover, ngram, vectorizer, idf are defined and produce features

    # Build pipeline without StringIndexer since labels are numeric already
    pipeline = Pipeline(stages=[tokenizer, labelIndexer, remover, ngram, vectorizer, idf,model_algo])

    # Parameter grid setup
    paramGrid = ParamGridBuilder()

    if model_name.lower() == "logistic regression":
        paramGrid = paramGrid.addGrid(model_algo.regParam, [0.01, 0.1, 0.5])
    elif model_name.lower() == "random forest":
        paramGrid = paramGrid.addGrid(model_algo.numTrees, [50, 100])
        paramGrid = paramGrid.addGrid(model_algo.maxDepth, [5, 10])

    paramGrid = paramGrid.build()

    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

    tvs = TrainValidationSplit(estimator=pipeline,
                               estimatorParamMaps=paramGrid,
                               evaluator=evaluator,
                               trainRatio=0.8)

    tvs_model = tvs.fit(train_data)

    best_model = tvs_model.bestModel

    # Predict on test and validation datasets
    test_predictions = best_model.transform(test_data)
    val_predictions = best_model.transform(validation_data)

    evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
    evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

    f1_test = evaluator_f1.evaluate(test_predictions)
    acc_test = evaluator_acc.evaluate(test_predictions)
    f1_val = evaluator_f1.evaluate(val_predictions)
    acc_val = evaluator_acc.evaluate(val_predictions)

    print(f"🔎 {model_name} — Test F1: {f1_test:.4f} | Test Acc: {acc_test:.4f} | Val F1: {f1_val:.4f} | Val Acc: {acc_val:.4f}")

    return model_name, f1_test, acc_test, f1_val, acc_val, best_model


In [156]:
results = []
for name, algo in models.items():
    result = evaluate_model_numeric_label(name, algo, train_data, test_data, validation_data)
    results.append(result)


🔎 Logistic Regression — Test F1: 0.8396 | Test Acc: 0.8801 | Val F1: 0.8453 | Val Acc: 0.8854
🔎 Random Forest — Test F1: 0.8299 | Test Acc: 0.8843 | Val F1: 0.8330 | Val Acc: 0.8864
🔎 Naive Bayes — Test F1: 0.7635 | Test Acc: 0.7226 | Val F1: 0.7834 | Val Acc: 0.7490


In [157]:
# Trouver le modèle avec le meilleur F1 Score
best_model_info = max(results, key=lambda x: x[1])  # x[1] = F1 Score
best_model_name, best_f1_test, best_acc_test, best_f1_val,best_acc_val,best_pipeline_model = best_model_info

print(f"\n💾 Meilleur modèle : {best_model_name} — Test F1 Score : {best_f1_test:.4f} | Test Accuracy: {best_acc_test:.4f}")



💾 Meilleur modèle : Logistic Regression — Test F1 Score : 0.8396 | Test Accuracy: 0.8801


In [158]:
import os
from pyspark.ml import PipelineModel

save_path = "file:///" + os.path.abspath("Random Fores")
best_pipeline_model.write().overwrite().save(save_path)


# consumer

In [159]:
from kafka import KafkaConsumer
from json import loads
from pymongo import MongoClient
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import re
import datetime
import logging
import traceback

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("AmazonReviewConsumer")

# Initialize Spark session
spark = SparkSession.builder \
    .appName("AmazonReviewStreaming") \
    .master("local[*]") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# Load trained Spark model
model = best_pipeline_model  

# MongoDB connection
mongo_client = MongoClient('mongodb://localhost:27017/')
db = mongo_client['amazon']
collection = db['sentiments']

# Kafka Consumer config
consumer = KafkaConsumer(
    'amazon_reviews',
    group_id='amazon_consumer_group',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    value_deserializer=lambda x: loads(x.decode('utf-8'))
)

# Text cleaner
def clean_text(text):
    if not text:
        return ""
    text = text.lower()
    text = re.sub(r'<.*?>', '', text)
    text = re.sub(r'http\S+|www\S+', '', text)
    text = re.sub(r'\d+', '', text)
    text = re.sub(r'[^\w\s]', '', text)
    emoji_pattern = re.compile("[" u"\U0001F600-\U0001F64F"
                               u"\U0001F300-\U0001F5FF"
                               u"\U0001F680-\U0001F6FF"
                               u"\U0001F1E0-\U0001F1FF" "]+", flags=re.UNICODE)
    text = emoji_pattern.sub(r'', text)
    return text.strip()

# Label to sentiment string
def pred_to_sentiment(prediction):
    mapping = {0: "Negative", 1: "Neutral", 2: "Positive"}
    return mapping.get(int(prediction), "Unknown")

pred_to_sentiment_udf = udf(pred_to_sentiment, StringType())

# Main logic
def consume_and_predict():
    for message in consumer:
        try:
            data = message.value
            if not isinstance(data, dict) or 'reviewText' not in data:
                logger.warning("❗ Invalid message skipped.")
                continue

            review_text = data.get('reviewText', '')
            if not review_text.strip():
                logger.warning("⚠️ Empty review skipped.")
                continue

            cleaned_text = clean_text(review_text)
            df = spark.createDataFrame([(cleaned_text,)], ['reviewText'])

            prediction_df = model.transform(df)
            prediction_df = prediction_df.withColumn("predicted_sentiment", pred_to_sentiment_udf(col("prediction")))
            row = prediction_df.select("prediction", "predicted_sentiment").first()

            document = {
                'reviewText': review_text,
                'cleanedReview': cleaned_text,
                'predicted_label': int(row['prediction']),
                'predicted_sentiment': row['predicted_sentiment'],
                'summary': data.get('summary'),
                'overall': data.get('overall'),
                'label': data.get('label'),
                'created_at': datetime.datetime.utcnow()
            }

            document = {k: v for k, v in document.items() if v is not None}  # Clean nulls

            collection.insert_one(document)
            logger.info(f"✅ Inserted: {document['predicted_sentiment']} - {review_text[:60]}...")

        except Exception as e:
            logger.error("❌ Error occurred:")
            traceback.print_exc()

if __name__ == "__main__":
    consume_and_predict()

INFO:kafka.conn:<BrokerConnection client_id=kafka-python-2.2.7, node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO:kafka.conn:<BrokerConnection client_id=kafka-python-2.2.7, node_id=bootstrap-0 host=localhost:9092 <checking_api_versions_recv> [IPv6 ('::1', 9092, 0, 0)]>: Broker version identified as 2.6
INFO:kafka.conn:<BrokerConnection client_id=kafka-python-2.2.7, node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
INFO:kafka.consumer.subscription_state:Updating subscribed topics to: ('amazon_reviews',)
INFO:kafka.conn:<BrokerConnection client_id=kafka-python-2.2.7, node_id=1 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO:kafka.conn:<BrokerConnection client_id=kafka-python-2.2.7, node_id=1 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.

KeyboardInterrupt: 

In [139]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

# Assuming Spark session is already created and model is loaded as 'trained_model'

def pred_to_sentiment(prediction):
    mapping = {0: "Negative", 1: "Neutral", 2: "Positive"}
    return mapping.get(int(prediction), "Unknown")

pred_to_sentiment_udf = udf(pred_to_sentiment, StringType())

def predict_single_review(trained_model, review_text):
    # Create Spark DataFrame with one row
    df = spark.createDataFrame([(review_text,)], ["reviewText"])
    
    # Predict
    prediction_df = trained_model.transform(df)
    
    # Map numeric prediction to label
    prediction_df = prediction_df.withColumn("predicted_sentiment", pred_to_sentiment_udf(col("prediction")))
    
    # Get the prediction result
    row = prediction_df.select("prediction", "predicted_sentiment").first()
    
    print(f"Review: {review_text}")
    print(f"Predicted label: {int(row['prediction'])}")
    print(f"Predicted sentiment: {row['predicted_sentiment']}")
    
    return row['predicted_sentiment']

# Example usage:
sentiment = predict_single_review(best_pipeline_model, "pro cheapo hated thing noisy cables feel really cheap gummy like drop bucks get something else")


Review: pro cheapo hated thing noisy cables feel really cheap gummy like drop bucks get something else
Predicted label: 0
Predicted sentiment: Negative
