## **1. Import Data**

In [None]:
import os
import sys
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("ProcessingBig") \
    .config("spark.sql.shuffle.partitions",6) \
    .config("spark.sql.repl.eagereval.enable", True) \
    .master("local[*]") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.heartbeatInterval", "200s") \
    .config("spark.network.timeout", "300s") \
    .config("spark.pyspark.python", sys.executable) \
    .config("spark.pyspark.driver.python", sys.executable) \
    .getOrCreate()

# Set the current working directory
os.chdir("C:\\Users\\migue\\OneDrive\\Documents\\GitHub\\BigData")
current_dir = os.getcwd()

# Provide path to gzipped JSON file
gzipped_json_path = os.path.join(current_dir, "Books.jsonl.gz")

# Read the file into a DataFrame
df = spark.read.json(gzipped_json_path)

# Drop the "images" column
df = df.drop("images")

## **2. Data Preparation**

In [None]:
# Drop rows with any missing values
df = df.dropna()

In [None]:
# !pip install nltk

In [None]:
# Import necessary libraries

from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, SQLTransformer
from pyspark.ml import Pipeline
import nltk
from nltk.stem import WordNetLemmatizer
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

In [None]:
# Download nltk data
nltk.download('wordnet')
nltk.download('omw-1.4')
from nltk.stem import WordNetLemmatizer

# Initialize lemmatizer
lemmatizer = WordNetLemmatizer()

# Define a UDF to lemmatize tokenized words
def lemmatize_words(words):
    return [lemmatizer.lemmatize(word) for word in words]

lemmatize_udf = udf(lemmatize_words, ArrayType(StringType()))

[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\migue\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to
[nltk_data]     C:\Users\migue\AppData\Roaming\nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!


In [None]:
# Define stages of the pipeline

# Tokenization
tokenizer = Tokenizer(inputCol="text", outputCol="words")

# Remove stopwords
remover = StopWordsRemover(inputCol="words", outputCol="filtered")

# Apply lemmatization using SQLTransformer
lemmatizer_transformer = SQLTransformer(
    statement="SELECT *, lemmatize_udf(filtered) AS lemmatized FROM __THIS__"
)

# HashingTF and IDF
hashingTF = HashingTF(inputCol="lemmatized", outputCol="rawFeatures", numFeatures=1000)
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Build the pipeline
pipeline = Pipeline(stages=[
    tokenizer,
    remover,
    lemmatizer_transformer,
    hashingTF,
    idf
])


# Register the UDF
spark.udf.register("lemmatize_udf", lemmatize_udf)

# Fit the pipeline to the DataFrame
model = pipeline.fit(df)

# Transform the DataFrame
df = model.transform(df)

## **3. Classification**

In [None]:
# Import necessary libraries
from pyspark.sql.functions import col, lit, when
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, LinearSVC, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline

In [None]:
# Calculate class weights
class_counts = df.groupBy("rating").count().collect()
total_count = df.count()
class_weights = {row["rating"]: total_count / row["count"] for row in class_counts}

# Add a weight column based on class weights
df = df.withColumn("weight", when(col("rating") == 1, class_weights[1])
                              .when(col("rating") == 2, class_weights[2])
                              .when(col("rating") == 3, class_weights[3])
                              .when(col("rating") == 4, class_weights[4])
                              .when(col("rating") == 5, class_weights[5]))

# Split the data into training and test sets
(trainingData, testData) = df.randomSplit([0.7, 0.3], seed=42)

# Define the models with the weight column
lr = LogisticRegression(featuresCol="features", labelCol="rating", weightCol="weight")
dt = DecisionTreeClassifier(featuresCol="features", labelCol="rating", weightCol="weight")
rf = RandomForestClassifier(featuresCol="features", labelCol="rating", weightCol="weight")

# Define evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="rating", predictionCol="prediction", metricName="f1")

# Create parameter grid for each model with a small set of parameters (as will be applied to a large dataset and computational capacity is limited)
lr_paramGrid = (ParamGridBuilder()
                .addGrid(lr.regParam, [0.1, 0.01])
                .build())

dt_paramGrid = (ParamGridBuilder()
                .addGrid(dt.maxDepth, [10, 20])
                .build())

rf_paramGrid = (ParamGridBuilder()
                .addGrid(rf.numTrees, [15, 30])
                .addGrid(rf.maxDepth, [5, 10])
                .build())


# Set up CrossValidator for each model
lr_cv = CrossValidator(estimator=lr, estimatorParamMaps=lr_paramGrid, evaluator=evaluator, numFolds=3)
dt_cv = CrossValidator(estimator=dt, estimatorParamMaps=dt_paramGrid, evaluator=evaluator, numFolds=3)
rf_cv = CrossValidator(estimator=rf, estimatorParamMaps=rf_paramGrid, evaluator=evaluator, numFolds=3)

# Create pipelines
lr_pipeline = Pipeline(stages=[lr_cv])
dt_pipeline = Pipeline(stages=[dt_cv])
rf_pipeline = Pipeline(stages=[rf_cv])

# Fit the models using cross-validation
lr_model = lr_pipeline.fit(trainingData)
dt_model = dt_pipeline.fit(trainingData)
rf_model = rf_pipeline.fit(trainingData)

# Predict the test data
lr_predictions = lr_model.transform(testData)
dt_predictions = dt_model.transform(testData)
rf_predictions = rf_model.transform(testData)

# Evaluate the models
lr_f1 = evaluator.evaluate(lr_predictions)
dt_f1 = evaluator.evaluate(dt_predictions)
rf_f1 = evaluator.evaluate(rf_predictions)

# Print the results
print(f"Logistic Regression F1-Score: {lr_f1:.4f}")
print(f"Decision Tree F1-Score: {dt_f1:.4f}")
print(f"Random Forest F1-Score: {rf_f1:.4f}")

Logistic Regression F1-Score: 0.5799
Decision Tree F1-Score: 0.4303
Random Forest F1-Score: 0.5639


## **4. Evaluation**

In [None]:
import pandas as pd

# Define evaluators for other metrics
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="rating", predictionCol="prediction", metricName="accuracy")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="rating", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="rating", predictionCol="prediction", metricName="weightedRecall")

# Evaluate the models
def evaluate_model(predictions):
    accuracy = evaluator_accuracy.evaluate(predictions)
    precision = evaluator_precision.evaluate(predictions)
    recall = evaluator_recall.evaluate(predictions)
    f1 = evaluator.evaluate(predictions)
    return accuracy, precision, recall, f1

# Create a summary table
results = pd.DataFrame(columns=["Model", "Accuracy", "Precision", "Recall", "F1 Score"])

# Logistic Regression results
lr_accuracy, lr_precision, lr_recall, lr_f1 = evaluate_model(lr_predictions)
results = pd.concat([results, pd.DataFrame({"Model": ["Logistic Regression"], "Accuracy": [lr_accuracy], "Precision": [lr_precision], "Recall": [lr_recall], "F1 Score": [lr_f1]})])

# Decision Tree results
dt_accuracy, dt_precision, dt_recall, dt_f1 = evaluate_model(dt_predictions)
results = pd.concat([results, pd.DataFrame({"Model": ["Decision Tree"], "Accuracy": [dt_accuracy], "Precision": [dt_precision], "Recall": [dt_recall], "F1 Score": [dt_f1]})])

# Random Forest results
rf_accuracy, rf_precision, rf_recall, rf_f1 = evaluate_model(rf_predictions)
results = pd.concat([results, pd.DataFrame({"Model": ["Random Forest"], "Accuracy": [rf_accuracy], "Precision": [rf_precision], "Recall": [rf_recall], "F1 Score": [rf_f1]})])

# Print the summary table
results

  results = pd.concat([results, pd.DataFrame({"Model": ["Logistic Regression"], "Accuracy": [lr_accuracy], "Precision": [lr_precision], "Recall": [lr_recall], "F1 Score": [lr_f1]})])


Unnamed: 0,Model,Accuracy,Precision,Recall,F1 Score
0,Logistic Regression,0.53439,0.6566,0.53439,0.579931
0,Decision Tree,0.358803,0.615046,0.358803,0.430266
0,Random Forest,0.528991,0.616412,0.528991,0.563894


In [None]:
def confusion_matrix(predictions):
    # Convert Spark DataFrame to Pandas DataFrame for pivot
    conf_matrix = predictions.groupBy("rating", "prediction").count().toPandas()

    # Pivot to get the confusion matrix format
    conf_matrix_pivot = conf_matrix.pivot(index='rating', columns='prediction', values='count').fillna(0)

    # Calculate row sums and column sums
    conf_matrix_pivot['Total'] = conf_matrix_pivot.sum(axis=1)
    total_col = conf_matrix_pivot.sum(axis=0)
    total_col.name = 'Total'

    # Concatenate the totals to the confusion matrix
    conf_matrix_pivot = pd.concat([conf_matrix_pivot, total_col.to_frame().T])

    return conf_matrix_pivot


print("\n Logistic Regression Confusion Matrix: \n")
print(confusion_matrix(lr_predictions))

print("\n Decision Tree Confusion Matrix: \n")
print(confusion_matrix(dt_predictions))

print("\n Random Forest Confusion Matrix: \n")
print(confusion_matrix(rf_predictions))


 Logistic Regression Confusion Matrix: 

prediction   1.0  2.0  3.0   4.0   5.0  Total
1.0          179   54   51    29    69    382
2.0           84   57   70    53    55    319
3.0          126   99  163   103   101    592
4.0          174  120  208   396   428   1326
5.0          570  326  420   827  3758   5901
Total       1133  656  912  1408  4411   8520

 Decision Tree Confusion Matrix: 

prediction   1.0  2.0  3.0   4.0   5.0  Total
1.0          229   30   41    31    51    382
2.0          143   33   44    37    62    319
3.0          229   47  107   102   107    592
4.0          425   76  203   290   332   1326
5.0         1954  329  484   736  2398   5901
Total       2980  515  879  1196  2950   8520

 Random Forest Confusion Matrix: 

prediction   1.0  2.0  3.0   4.0   5.0  Total
1.0          142   23   43    47   127    382
2.0           84   33   55    54    93    319
3.0          118   44  127   124   179    592
4.0          153   70  173   385   545   1326
5.0         

In [None]:
def show_specific_predictions(predictions, model_name):
    print(f"{model_name} Predictions")

    # Case 1: Model predicted rating 5, actual rating 1
    print("Model predicted rating 5, actual rating 1: \n")
    predictions.filter((col("prediction") == 5) & (col("rating") == 1)).select("text", "rating", "prediction").show(2, truncate=False)

    # Case 2: Model predicted rating 5, actual rating 5
    print("\n\n Model predicted rating 5, actual rating 5: \n")
    predictions.filter((col("prediction") == 5) & (col("rating") == 5)).select("text", "rating", "prediction").show(1, truncate=False)

    # Case 3: Model predicted rating 1, actual rating 5
    print("\n\n Model predicted rating 1, actual rating 5: \n")
    predictions.filter((col("prediction") == 1) & (col("rating") == 5)).select("text", "rating", "prediction").show(1, truncate=False)

    # Case 4: Model predicted rating 1, actual rating 1
    print("\n\n Model predicted rating 1, actual rating 1: \n")
    predictions.filter((col("prediction") == 1) & (col("rating") == 1)).select("text", "rating", "prediction").show(1, truncate=False)

# Logistic Regression Predictions
show_specific_predictions(lr_predictions, "Logistic Regression")

Logistic Regression Predictions
Model predicted rating 5, actual rating 1: 

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## **5. Conclusions**

**Most accurate model**: Overall the model that presents the best metrics is the logistic regression model.
Tree-based models do not show the same performance and appear to deal worse with the unbalanced data aspect of the problem (confusion matrix).

**Limitations**: The dimension of the data and limited processing capabilities do not allow for a more thorough search towards the best combination of parameters as cross-validation needs to be done with larger sets of data and parameter grids are limited to smaller search spaces.
