In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("JSON Reader").getOrCreate()
df = spark.read.json("Data.json")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/05/24 09:20:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


                                                                                

In [2]:
df.printSchema() 

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



In [3]:
df.show(5)  

+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|      asin| helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|
+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|1384719342|  [0, 0]|    5.0|Not much to write...|02 28, 2014|A2IBPI20UZIR0U|cassandra tu "Yea...|                good|    1393545600|
|1384719342|[13, 14]|    5.0|The product does ...|03 16, 2013|A14VAT5EAX3D9S|                Jake|                Jake|    1363392000|
|1384719342|  [1, 1]|    5.0|The primary job o...|08 28, 2013|A195EZSQDW3E21|Rick Bennette "Ri...|It Does The Job Well|    1377648000|
|1384719342|  [0, 0]|    5.0|Nice windscreen p...|02 14, 2014|A2C00NNG1ZQQG2|RustyBill "Sunday...|GOOD WINDSCREEN F...|    1392336000|
|1384719342|  [0, 0]|    5.0|This pop filter i...|02 21

In [4]:
train, rest = df.randomSplit([0.8, 0.2], seed=42)
val, test = rest.randomSplit([0.5, 0.5], seed=42)
val =val.select("asin", "reviewText","reviewTime","reviewerName")

In [5]:
val.show(5)

+----------+--------------------+-----------+--------------------+
|      asin|          reviewText| reviewTime|        reviewerName|
+----------+--------------------+-----------+--------------------+
|B00005ML71|I got it to have ...|04 22, 2014|       Christopher C|
|B000068NVI|I've used a lot o...|09 17, 2013|R. Wristen "The P...|
|B000068NW5|I am not hard on ...| 06 8, 2013|           Dr. Freud|
|B000068NW5|Bought this for m...| 03 5, 2014|            C. Zemer|
|B000068NW5|This is good cabl...| 10 8, 2013|     grandpa "Randy"|
+----------+--------------------+-----------+--------------------+
only showing top 5 rows



In [6]:
import shutil
import os

val_single = val.coalesce(1)

temp_path = "temp_validation_output"
val_single.write.mode("overwrite").json(temp_path)

for file_name in os.listdir(temp_path):
    if file_name.endswith(".json"):
        source_file = os.path.join(temp_path, file_name)
        break

shutil.move(source_file, "data_validation.json")

shutil.rmtree(temp_path)

print("Fichier sauvegardé : data_validation.json")

Fichier sauvegardé : data_validation.json


In [7]:
from pyspark.sql.functions import col

train = train.select("reviewText", "overall")
train = train.na.drop(subset=["reviewText", "overall"])

In [8]:
train.show(5)  

+--------------------+-------+
|          reviewText|overall|
+--------------------+-------+
|Nice windscreen p...|    5.0|
|Not much to write...|    5.0|
|The primary job o...|    5.0|
|The product does ...|    5.0|
|I now use this ca...|    3.0|
+--------------------+-------+
only showing top 5 rows



In [9]:
test = test.select("reviewText", "overall")
test = test.na.drop(subset=["reviewText", "overall"])

In [10]:
test.show(5)

+--------------------+-------+
|          reviewText|overall|
+--------------------+-------+
|This pop filter i...|    5.0|
|I have used monst...|    5.0|
|Perfect for my Ep...|    5.0|
|Fender cords look...|    5.0|
|Cant go wrong. Gr...|    4.0|
+--------------------+-------+
only showing top 5 rows



In [11]:
from pyspark.sql.functions import when, col

train = train.withColumn(
    "label",
    when(col("overall") < 3, 0)
    .when(col("overall") == 3, 1)
    .otherwise(2)
)

In [12]:
train.show(5)

+--------------------+-------+-----+
|          reviewText|overall|label|
+--------------------+-------+-----+
|Nice windscreen p...|    5.0|    2|
|Not much to write...|    5.0|    2|
|The primary job o...|    5.0|    2|
|The product does ...|    5.0|    2|
|I now use this ca...|    3.0|    1|
+--------------------+-------+-----+
only showing top 5 rows



In [13]:
test = test.withColumn(
    "label",
    when(col("overall") < 3, 0)
    .when(col("overall") == 3, 1)
    .otherwise(2)
)

In [14]:
test.show(5)

+--------------------+-------+-----+
|          reviewText|overall|label|
+--------------------+-------+-----+
|This pop filter i...|    5.0|    2|
|I have used monst...|    5.0|    2|
|Perfect for my Ep...|    5.0|    2|
|Fender cords look...|    5.0|    2|
|Cant go wrong. Gr...|    4.0|    2|
+--------------------+-------+-----+
only showing top 5 rows



In [15]:
import re
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

import nltk
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords

nltk.download('stopwords')
nltk.download('wordnet')


stop_words = set(stopwords.words('english'))
lemmatizer = WordNetLemmatizer()

def clean_text(text):
    if text is None:
        return ""
    text = re.sub(r'[^a-zA-Z]', ' ', text.lower())
    tokens = text.split()
    tokens = [lemmatizer.lemmatize(word) for word in tokens if word not in stop_words]
    return ' '.join(tokens)

clean_text_udf = udf(clean_text, StringType())

train = train.withColumn("clean_text", clean_text_udf(col("reviewText")))
test = test.withColumn("clean_text", clean_text_udf(col("reviewText")))



[nltk_data] Downloading package stopwords to /opt/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /opt/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [16]:
train.show(5)

[Stage 8:>                                                          (0 + 1) / 1]

+--------------------+-------+-----+--------------------+
|          reviewText|overall|label|          clean_text|
+--------------------+-------+-----+--------------------+
|Nice windscreen p...|    5.0|    2|nice windscreen p...|
|Not much to write...|    5.0|    2|much write exactl...|
|The primary job o...|    5.0|    2|primary job devic...|
|The product does ...|    5.0|    2|product exactly q...|
|I now use this ca...|    3.0|    1|use cable run out...|
+--------------------+-------+-----+--------------------+
only showing top 5 rows



                                                                                

In [17]:
test.show(5)

[Stage 9:>                                                          (0 + 1) / 1]

+--------------------+-------+-----+--------------------+
|          reviewText|overall|label|          clean_text|
+--------------------+-------+-----+--------------------+
|This pop filter i...|    5.0|    2|pop filter great ...|
|I have used monst...|    5.0|    2|used monster cabl...|
|Perfect for my Ep...|    5.0|    2|perfect epiphone ...|
|Fender cords look...|    5.0|    2|fender cord look ...|
|Cant go wrong. Gr...|    4.0|    2|cant go wrong gre...|
+--------------------+-------+-----+--------------------+
only showing top 5 rows



                                                                                

In [18]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="clean_text", outputCol="words")

remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

hashingTF = HashingTF(inputCol="filtered_words", outputCol="rawFeatures", numFeatures=5000)

# 4. IDF : calcule le TF-IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")

In [19]:
words_data = tokenizer.transform(train)
filtered_data = remover.transform(words_data)
featurized_data = hashingTF.transform(filtered_data)
idf_model = idf.fit(featurized_data)
df_tfidf = idf_model.transform(featurized_data)

                                                                                

In [20]:
words_data = tokenizer.transform(test)
filtered_data_test = remover.transform(words_data)
featurized_data_test = hashingTF.transform(filtered_data_test)
idf_model_test = idf.fit(featurized_data_test)
df_tfidf_test = idf_model.transform(featurized_data_test)

                                                                                

In [21]:
from pyspark.sql.functions import col, rand

print("Avant sur-échantillonnage (train) :")
df_tfidf.groupBy("label").count().show()

counts = df_tfidf.groupBy("label").count().collect()
count_dict = {row["label"]: row["count"] for row in counts}

max_count = max(count_dict.values())

resampled = None

for label_val, count in count_dict.items():
    subset = df_tfidf.filter(col("label") == label_val)
    if count < max_count:
        ratio = max_count / count
        sampled_subset = subset.sample(withReplacement=True, fraction=ratio)
    else:
        sampled_subset = subset

    if resampled is None:
        resampled = sampled_subset
    else:
        resampled = resampled.union(sampled_subset)

resampled = resampled.orderBy(rand())

print("Après sur-échantillonnage (train) :")
resampled.groupBy("label").count().show()


Avant sur-échantillonnage (train) :
+-----+-----+
|label|count|
+-----+-----+
|    1|  616|
|    2| 7282|
|    0|  376|
+-----+-----+

Après sur-échantillonnage (train) :
+-----+-----+
|label|count|
+-----+-----+
|    1| 7226|
|    2| 7282|
|    0| 7299|
+-----+-----+



In [23]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label")

# Entraîner le modèle sur la base d'entraînement équilibrée
lr_model = lr.fit(resampled)

# Faire des prédictions sur la base de test
predictions = lr_model.transform(df_tfidf_test)

# Afficher quelques résultats de la prédiction
predictions.select("features", "label", "prediction", "probability").show(5)




+--------------------+-----+----------+--------------------+
|            features|label|prediction|         probability|
+--------------------+-----+----------+--------------------+
|(5000,[306,385,64...|    2|       2.0|[3.04901616264223...|
|(5000,[370,668,10...|    2|       2.0|[1.53760404311251...|
|(5000,[157,281,81...|    2|       2.0|[1.04711785542006...|
|(5000,[157,659,75...|    2|       2.0|[3.62754488731072...|
|(5000,[86,750,102...|    2|       2.0|[2.71381590341281...|
+--------------------+-----+----------+--------------------+
only showing top 5 rows



                                                                                

In [24]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"Precision (pondérée): {precision:.4f}")
print(f"Recall (pondéré): {recall:.4f}")


[Stage 352:>                                                        (0 + 2) / 2]

Accuracy: 0.8147
F1 Score: 0.8145
Precision (pondérée): 0.8159
Recall (pondéré): 0.8147


                                                                                

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col

# Prédictions
predictions = lr_model.transform(test)

# Matrice de confusion
confusion_df = predictions.groupBy("label", "prediction").count().orderBy("label", "prediction")
confusion_df.show()




+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    0|       0.0|    9|
|    0|       1.0|    5|
|    0|       2.0|   28|
|    1|       0.0|    6|
|    1|       1.0|   13|
|    1|       2.0|   54|
|    2|       0.0|   29|
|    2|       1.0|   81|
|    2|       2.0|  757|
+-----+----------+-----+



                                                                                

In [25]:
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, lr])

In [26]:
for colname in ["words", "filtered_words", "rawFeatures", "features"]:
    if colname in resampled.columns:
        resampled = resampled.drop(colname)


In [27]:
pipeline_model = pipeline.fit(resampled) 



In [None]:
pipeline_model.save("review_sentiment")

                                                                                