In [1]:
import pyspark
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.sql.functions import col
import os
from pyspark.sql.functions import array_join, col

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pathlib import Path
import shutil

def create_spark_session():
    spark_master_url = os.getenv("SPARK_MASTER_URL", "spark://localhost:7077")
    return (
        SparkSession.builder.appName("ReviewClassification")
        .config("spark.master", "local[1]")
        .config("spark.broadcast.compress", "true")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.kryoserializer.buffer.max", "512m")
        .config("spark.driver.maxResultSize", "2g")
        .getOrCreate()
    )

In [2]:
def load_data_from_csv(spark, file_path):
    return spark.read.option("multiline", "true").option("escape", "\"").csv(file_path, header=True, inferSchema=True, sep=",")

df = load_data_from_csv(create_spark_session(), "spark/data/vectorized_reviews.csv")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/29 19:56:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [3]:
df

DataFrame[_id: string, username: string, faculty: string, year: int, opinion_weight: double, date: timestamp, professor: string, rating: string, vote_rate: double, course: string, review: string, post_url: string, language: string, vectors: string]

In [4]:
#show language column from the dataframe
df.show(1)

+--------------------+---------+-------+----+--------------+-------------------+---------------+------+---------+--------------------+--------------------+--------------------+--------+--------------------+
|                 _id| username|faculty|year|opinion_weight|               date|      professor|rating|vote_rate|              course|              review|            post_url|language|             vectors|
+--------------------+---------+-------+----+--------------+-------------------+---------------+------+---------+--------------------+--------------------+--------------------+--------+--------------------+
|680fc4a256db6a858...|Anonymous|   NULL|NULL|          NULL|2008-02-07 16:05:00|Barbara Kołwzan|   2,5|      0.0|tutaj Mikrobiolog...|Kurs: tutaj Mikro...|https://polwro.co...|      pl|-0.20756520330905...|
+--------------------+---------+-------+----+--------------+-------------------+---------------+------+---------+--------------------+--------------------+-----------------

In [5]:
# show only vectors
# df.select("vectors").show(1, truncate=False)

In [6]:
from pyspark.sql.functions import udf, split
from pyspark.sql.types import ArrayType, DoubleType

def prepare_features(df: DataFrame) -> DataFrame:
    # Convert string of space-separated numbers to vector
    def string_to_vector(vec_str):
        try:
            # Split string by whitespace and convert to float array
            vec_list = [float(x) for x in vec_str.split()]
            return Vectors.dense(vec_list)
        except:
            return None
    
    string_to_vector_udf = udf(string_to_vector, VectorUDT())
    
    return df.select(
        string_to_vector_udf(col("vectors")).alias("vectors"),
        col("rating").cast("string")
    ).repartition(4)

prepared_df = prepare_features(df)


In [7]:
prepared_df

DataFrame[vectors: vector, rating: string]

In [8]:
prepared_df.show(5)

[Stage 3:>                                                          (0 + 1) / 1]

+--------------------+------+
|             vectors|rating|
+--------------------+------+
|[-0.1488369554281...|   2,5|
|[0.07919885218143...|   4,5|
|[-0.0209472067654...|     5|
|[-0.0632937103509...|     3|
|[-0.0620504021644...|   3,5|
+--------------------+------+
only showing top 5 rows



                                                                                

In [9]:
def split_data(df, train_ratio=0.8):
    train_data, test_data = df.randomSplit([train_ratio, 1 - train_ratio], seed=42)
    return train_data, test_data

train_data, test_data = split_data(prepared_df)
print('Training Data Count:', train_data.count())
print('Test Data Count:', test_data.count())
train_data.show(5)
test_data.show(5)

25/04/29 19:56:36 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

Training Data Count: 21767


                                                                                

Test Data Count: 5227


                                                                                

+--------------------+------+
|             vectors|rating|
+--------------------+------+
|[-0.5281942486763...|     5|
|[-0.4362084865570...|     2|
|[-0.4344595372676...|   2,5|
|[-0.4194297492504...|   4,5|
|[-0.4090240895748...|     5|
+--------------------+------+
only showing top 5 rows



[Stage 21:>                                                         (0 + 1) / 1]

+--------------------+------+
|             vectors|rating|
+--------------------+------+
|[-0.4356518983840...|     5|
|[-0.4043717980384...|     3|
|[-0.3931698203086...|   4,5|
|[-0.3765598237514...|     5|
|[-0.3444329798221...|   5,5|
+--------------------+------+
only showing top 5 rows



                                                                                

In [10]:
def create_pipeline():
    string_indexer = StringIndexer(inputCol="rating", outputCol="label")
    assembler = VectorAssembler(inputCols=["vectors"], outputCol="features")

    rf = RandomForestClassifier(
        featuresCol="features",
        labelCol="label",
        numTrees=10,
        maxDepth=5,
        seed=42,
        cacheNodeIds=False,
        maxBins=32,
    )

    return Pipeline(stages=[string_indexer, assembler, rf])

def train_model(pipeline, train_data):
    paramGrid = (
        ParamGridBuilder()
        .addGrid(pipeline.getStages()[-1].numTrees, [10, 20, 30])
        .addGrid(pipeline.getStages()[-1].maxDepth, [5, 10, 15])
        .build()
    )

    evaluator = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction", metricName="accuracy"
    )

    tvs = TrainValidationSplit(
        estimator=pipeline,
        estimatorParamMaps=paramGrid,
        evaluator=evaluator,
        trainRatio=0.8,
        parallelism=2,
    )

    model = tvs.fit(train_data)
    return model


def evaluate_model(model: TrainValidationSplit, test_data):
    predictions = model.bestModel.transform(test_data)
    evaluator = MulticlassClassificationEvaluator(labelCol="label")

    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
    precision = evaluator.evaluate(
        predictions, {evaluator.metricName: "weightedPrecision"}
    )
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

    print(f"Accuracy: {accuracy}")
    print(f"F1 Score: {f1}")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")

    # Print classification mapping
    label_mapping = model.bestModel.stages[0].labels
    print("\nRating to Label Mapping:")
    for idx, rating in enumerate(label_mapping):
        print(f"Rating {rating} -> Label {idx}")

    # Print feature importances
    rf_model = model.bestModel.stages[-1]
    feature_importances = rf_model.featureImportances
    print("\nFeature Importances:")
    print(feature_importances)

    return predictions


pipeline = create_pipeline()
model = train_model(pipeline, train_data)

predictions = evaluate_model(model, test_data)


25/04/29 19:57:43 WARN DAGScheduler: Broadcasting large task binary with size 1547.8 KiB
25/04/29 19:57:45 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
25/04/29 19:57:51 WARN DAGScheduler: Broadcasting large task binary with size 1908.8 KiB
25/04/29 19:58:09 WARN DAGScheduler: Broadcasting large task binary with size 1547.8 KiB
25/04/29 19:58:11 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
25/04/29 19:58:14 WARN DAGScheduler: Broadcasting large task binary with size 3.7 MiB
25/04/29 19:58:18 WARN DAGScheduler: Broadcasting large task binary with size 5.3 MiB
25/04/29 19:58:21 WARN DAGScheduler: Broadcasting large task binary with size 7.0 MiB
25/04/29 19:58:26 WARN DAGScheduler: Broadcasting large task binary with size 8.7 MiB
25/04/29 19:58:37 WARN DAGScheduler: Broadcasting large task binary with size 10.5 MiB
25/04/29 19:58:42 WARN DAGScheduler: Broadcasting large task binary with size 1218.6 KiB
25/04/29 19:58:42 WARN DAGScheduler: Broa

Accuracy: 0.37784580065046874
F1 Score: 0.3568867934716005
Precision: 0.41438791483458204
Recall: 0.3778458006504687

Rating to Label Mapping:
Rating 5 -> Label 0
Rating 5,5 -> Label 1
Rating 4,5 -> Label 2
Rating 4 -> Label 3
Rating 3,5 -> Label 4
Rating 3 -> Label 5
Rating 2 -> Label 6
Rating 2,5 -> Label 7

Feature Importances:
(768,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127,128,129,130,131,132,133,134,135,136,137,138,139,140,141,142,143,144,145,146,147,148,149,150,151,152,153,154,155,156,157,158,159,160,161,162,163,164,165,166,167,168,169,170,171,172,173,174,175,176,177,178,179,180,181,182,183,184,185,186,187,188,189,190,191,192

                                                                                

In [11]:
model.bestModel.write().overwrite().save("spark/models/review_classification_model")

25/04/29 20:05:07 WARN TaskSetManager: Stage 584 contains a task of very large size (22978 KiB). The maximum recommended task size is 1000 KiB.


In [12]:

# MODEL_PATH = "/app/models/professor_raing_classifier"
# temp_dir = "/opt/bitnami/spark/work/models/professor_raing_classifier"
# model.bestModel.write().overwrite().save(temp_dir)
# os.makedirs(os.path.dirname(MODEL_PATH), exist_ok=True)
# shutil.move(temp_dir, MODEL_PATH)

# print(f"Model saved to {MODEL_PATH}")