# Incarcarea setului de date 

In [2]:
from pyspark.sql import SparkSession
import time

spark = SparkSession.builder \
    .appName("SteamReviewsClassifier") \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "16g") \
    .getOrCreate()
print("SparkSession creat")

path = "/kaggle/input/steam-reviews/dataset.csv"  

start = time.time()

df = spark.read.csv(path, header=True, inferSchema=True)
df = df.repartition(100)


end = time.time()
print("Timp încărcare dataset: {:.2f} secunde".format(end - start))

df.printSchema()
df.show(5)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/20 06:42:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


SparkSession creat


                                                                                                    

Timp încărcare dataset: 26.49 secunde
root
 |-- app_id: integer (nullable = true)
 |-- app_name: string (nullable = true)
 |-- review_text: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- review_votes: integer (nullable = true)





+------+--------------------+--------------------+------------+------------+
|app_id|            app_name|         review_text|review_score|review_votes|
+------+--------------------+--------------------+------------+------------+
| 17080|      Tribes: Ascend|Tribes: Ascend is...|          -1|           0|
| 10090|Call of Duty: Wor...|Died from the oth...|           1|           0|
| 17410|       Mirror's Edge|Probably my favou...|           1|           0|
|113400|        APB Reloaded|keeps being ♥♥♥♥♥...|           1|           0|
|107410|              Arma 3|10/10 would shoot...|           1|           0|
+------+--------------------+--------------------+------------+------------+
only showing top 5 rows



                                                                                                    

# Preprocesarea datelor

In [3]:
!pip install vaderSentiment
from pyspark.sql.functions import col, lower, trim, udf
from pyspark.sql.types import IntegerType
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import time
import re

start = time.time()
# Normalizare si filtrare text 
df_clean = df.withColumn("review_text", lower(trim(col("review_text")))) \
             .withColumn("app_name", trim(col("app_name"))) \
             .filter(col("review_text").isNotNull()) \
             .filter(col("review_text") != "") \
             .filter(col("app_name").isNotNull()) \
             .filter(col("app_name") != "")

# UDF pentru analiza de sentiment
analyzer = SentimentIntensityAnalyzer()

def vader_sentiment(text):
    if text is None:
        return 1
    text = re.sub(r"[^\w\s]", "", text.lower())
    score = analyzer.polarity_scores(text)['compound']
    if score > 0.05:
        return 2
    elif score < -0.05:
        return 0
    else:
        return 1

vader_udf = udf(vader_sentiment, IntegerType())

df_labeled = df_clean.withColumn("label", vader_udf(col("review_text")))

# Vectorizare
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

tokenizer = Tokenizer(inputCol="review_text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features")

df_words = tokenizer.transform(df_labeled)
df_filtered = remover.transform(df_words)
df_featurized = hashingTF.transform(df_filtered)
idfModel = idf.fit(df_featurized)
df_tfidf = idfModel.transform(df_featurized)

end = time.time()
print(f"Timp preprocesare: {end - start:.2f} secunde")

# df_labeled.select("app_name", "review_text", "label").show(5, truncate=False)





Timp preprocesare: 306.69 secunde


                                                                                                    

# Naive Bayes Model

In [None]:
import time
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import NaiveBayes

train_df, test_df = df_tfidf.randomSplit([0.8, 0.2], seed=42)
# Antrenare model 
start = time.time()
nb = NaiveBayes(featuresCol="features", labelCol="label")
nb_model = nb.fit(train_df)
end = time.time()
print(f"Timp antrenare Naive Bayes: {end - start:.2f} secunde")

# Predictii
predictions = nb_model.transform(test_df)

# Evaluare model
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")

accuracy = evaluator_acc.evaluate(predictions)
f1 = evaluator_f1.evaluate(predictions)
precision = evaluator_precision.evaluate(predictions)
recall = evaluator_recall.evaluate(predictions)

print(f"Acuratețe: {accuracy:.4f}")
print(f"F1-score: {f1:.4f}")
print(f"Precizie: {precision:.4f}")
print(f"Recall: {recall:.4f}")


# Linear SVC Model

In [3]:
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import time

train_df, test_df = df_tfidf.randomSplit([0.8, 0.2], seed=42)

# Definire clasificador linear SVM (binary)
svm = LinearSVC(maxIter=100, regParam=0.1, featuresCol="features", labelCol="label")

# OneVsRest pentru multiclass
ovr = OneVsRest(classifier=svm)

# Antrenare și timp
start = time.time()
ovr_model = ovr.fit(train_df)
end = time.time()
print(f"Timp antrenare OneVsRest SVM: {end - start:.2f} secunde")

# Predictii
predictions = ovr_model.transform(test_df)

# Evaluare
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")

accuracy = evaluator_acc.evaluate(predictions)
f1 = evaluator_f1.evaluate(predictions)
precision = evaluator_precision.evaluate(predictions)
recall = evaluator_recall.evaluate(predictions)

print(f"Acuratețe: {accuracy:.4f}")
print(f"F1-score: {f1:.4f}")
print(f"Precizie: {precision:.4f}")
print(f"Recall: {recall:.4f}")


25/06/18 09:18:10 WARN DAGScheduler: Broadcasting large task binary with size 1076.8 KiB6 + 1) / 17]
25/06/18 09:58:05 WARN DAGScheduler: Broadcasting large task binary with size 1079.0 KiB6 + 1) / 17]
25/06/18 10:37:35 WARN DAGScheduler: Broadcasting large task binary with size 1115.5 KiB + 1) / 100]
25/06/18 10:37:51 WARN DAGScheduler: Broadcasting large task binary with size 1116.6 KiB + 1) / 100]
25/06/18 10:37:52 WARN DAGScheduler: Broadcasting large task binary with size 1116.0 KiB            
25/06/18 10:38:14 WARN DAGScheduler: Broadcasting large task binary with size 1117.1 KiB + 1) / 100]
25/06/18 10:38:14 WARN DAGScheduler: Broadcasting large task binary with size 1116.0 KiB            
25/06/18 10:38:16 WARN DAGScheduler: Broadcasting large task binary with size 1117.1 KiB + 4) / 100]
25/06/18 10:38:17 WARN DAGScheduler: Broadcasting large task binary with size 1116.0 KiB            
25/06/18 10:38:18 WARN DAGScheduler: Broadcasting large task binary with size 1117.1 KiB + 

Timp antrenare OneVsRest SVM: 9899.80 secunde


25/06/18 12:03:10 WARN DAGScheduler: Broadcasting large task binary with size 1364.5 KiB6 + 1) / 17]
25/06/18 12:44:12 WARN DAGScheduler: Broadcasting large task binary with size 1364.5 KiB6 + 1) / 17]
25/06/18 13:24:51 WARN DAGScheduler: Broadcasting large task binary with size 1364.5 KiB5 + 2) / 17]
25/06/18 14:05:49 WARN DAGScheduler: Broadcasting large task binary with size 1364.5 KiB6 + 1) / 17]

Acuratețe: 0.7928
F1-score: 0.7630
Precizie: 0.8198
Recall: 0.7928


                                                                                                    

 # Random Forest Classifier

In [4]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import time

# Împărțirea datelor
train_df, test_df = df_tfidf.randomSplit([0.8, 0.2], seed=42)

# Definirea clasificatorului Random Forest
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=20, maxDepth=5, seed=42)

# Antrenare și măsurare timp
start = time.time()
rf_model = rf.fit(train_df)
end = time.time()
print(f"Timp antrenare Random Forest: {end - start:.2f} secunde")

# Predictii pe test
predictions = rf_model.transform(test_df)

# Evaluare
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")

accuracy = evaluator_acc.evaluate(predictions)
f1 = evaluator_f1.evaluate(predictions)
precision = evaluator_precision.evaluate(predictions)
recall = evaluator_recall.evaluate(predictions)

print(f"Acuratețe: {accuracy:.4f}")
print(f"F1-score: {f1:.4f}")
print(f"Precizie: {precision:.4f}")
print(f"Recall: {recall:.4f}")


25/06/20 06:50:39 WARN DAGScheduler: Broadcasting large task binary with size 1086.4 KiB6 + 1) / 17]
25/06/20 07:32:37 WARN DAGScheduler: Broadcasting large task binary with size 1108.8 KiB6 + 1) / 17]
25/06/20 07:33:21 WARN DAGScheduler: Broadcasting large task binary with size 1108.8 KiB            
25/06/20 08:14:38 WARN DAGScheduler: Broadcasting large task binary with size 1213.0 KiB            
25/06/20 08:55:59 WARN DAGScheduler: Broadcasting large task binary with size 1424.0 KiB            
25/06/20 08:57:59 WARN MemoryStore: Not enough space to cache rdd_81_0 in memory! (computed 1553.0 MiB so far)
25/06/20 08:57:59 WARN BlockManager: Persisting block rdd_81_0 to disk instead.
25/06/20 08:59:46 WARN MemoryStore: Not enough space to cache rdd_81_6 in memory! (computed 129.6 MiB so far)
25/06/20 08:59:46 WARN BlockManager: Persisting block rdd_81_6 to disk instead.
25/06/20 08:59:46 WARN MemoryStore: Not enough space to cache rdd_81_4 in memory! (computed 443.6 MiB so far)
25/0

Timp antrenare Random Forest: 11693.62 secunde


25/06/20 10:04:38 WARN DAGScheduler: Broadcasting large task binary with size 1215.6 KiB6 + 1) / 17]
25/06/20 10:47:08 WARN DAGScheduler: Broadcasting large task binary with size 1215.6 KiB6 + 1) / 17]
25/06/20 11:28:48 WARN DAGScheduler: Broadcasting large task binary with size 1215.6 KiB6 + 1) / 17]
25/06/20 12:10:28 WARN DAGScheduler: Broadcasting large task binary with size 1215.6 KiB6 + 1) / 17]

Acuratețe: 0.7640
F1-score: 0.6969
Precizie: 0.6734
Recall: 0.7640


                                                                                                    