# **ALS Machine Learning Recommendation Model**
This model will be trained using data from user's ratings of a variety of anime
### Imports
Setting up the imports that will be needed for the model training

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

## **Preparation For Model Training**
### Spark Session
Creating a spark session by creating a spark environment

In [2]:
spark = SparkSession.builder \
            .appName('RecAnime') \
            .config('spark.driver.memory', '8g') \
            .config('spark.sql.pivotMaxValues', 20000) \
            .getOrCreate()

### Spark DataFrame
Reading the csv file, `user-score-2023`, into a Spark DataFrame.

The parameter `header=True` indicates the first row of the csv file contains the column names. Setting it to True means the first row will be the header, and the columns nanmes will be inferred from it

The parameter `inferSchema` tells Spark to automatically infer the data types of the columns in the
DataFrame based on the contents of the csv file. When set to True, Spark will try to determine the
data appropriate data types for each column

In [3]:
data = spark.read.csv('E:/RhaMo/CSV Files/Anime Dataset/user-filtered.csv', header=True, inferSchema=True)

Renaming the columns

In [4]:
data = data.withColumnRenamed('user_id', 'userId') \
       .withColumnRenamed('anime_id', 'itemId') \
       .withColumnRenamed('rating', 'rating')

Checking the data

In [5]:
data.show(10)

+------+------+------+
|userId|itemId|rating|
+------+------+------+
|     0|    67|     9|
|     0|  6702|     7|
|     0|   242|    10|
|     0|  4898|     0|
|     0|    21|    10|
|     0|    24|     9|
|     0|  2104|     0|
|     0|  4722|     8|
|     0|  6098|     6|
|     0|  3125|     9|
+------+------+------+
only showing top 10 rows



### Load Sample Data
Since the data I am using is large, I will need to train the model with a subset of the data I have so that I can scale it up from there. I will use the sample to start off training the model.

In [6]:
sample_data = data.sample(fraction=0.4, seed=123)

### Splitting the Data
The data will be split up from here. It will be split up 80/20: 80% for training the model, and 20% to test against it

In [None]:
(train_data, test_data) = sample_data.randomSplit([0.8, 0.2], seed=123)

### Persisting the DataFrame
Storing the dataframe in memory (or on disk) so that it can be reused efficiently in subsequent operations This will be useful if I am going to be using the dataframe multiple times in this spark application. It helps to avoid recomputing it from the source data each time it is needed

In [None]:
train_data.persist()
test_data.persist()

### Repartitioning the DataFrame
I am reshuffling the data in the DataFrame by changing the distribution of data across partitions. Partitions are smaller units of data that Spark uses to distribute work across nodes in a cluster. The amount of partitions affects the parallelism and performance

In [None]:
train_data = train_data.repartition(30)
test_data = test_data.repartition(30)

## Model Training
The trainning of the model is underway here

### Range of Values

`rank` controls the number of latent factors (also known as embeddings) used in the matrix factorization process. Larger values of rank allow the model to capture more complex relationships but may lead to overfitting if set too high. Smaller values of rank simplify the model but may result in underfitting. Range: It's common to experiment with values like [5, 10, 20, 50, 100] to find an appropriate balance between complexity and performance. maxIter:ing.

`maxIter` specifies the maximum number of iterations or updates the ALS algorithm performs. Increasing maxIter allows the algorithm to potentially converge to a better solution but may also increase computation time. Range: You can typically explore values like [5, 10, 20, 50, 100] depending on your dataset and computational resources. regParam (or lambda)

`regParam` controls the amount of regularization applied to the model. Regularization helps prevent overfitting by penalizing large values in the latent factor matrices. Smaller values of regParam result in less regularization and may lead to overfitting. Larger values of regParam increase regularization and may result in underfitting. Range: You can explore a range of values, such as [0.01, 0.1, 0.5, 1.0], to find the right balance between fitting the training data and preventing overfitting.

In [None]:
als = ALS(
    rank=50,
    maxIter=20,
    regParam=0.1,
    userCol='userId',
    itemCol='itemId',
    ratingCol='rating',
    coldStartStrategy='drop'
)

In [None]:
# import pandas as pd
# df = pd.read_csv("E:/RhaMo/CSV Files/Anime Dataset/user-filtered.csv")
# df

Training the trainig data

In [None]:
model = als.fit(train_data)

Getting the prediciton from the test data

In [None]:
predictions = model.transform(test_data)

### **Evaluate the model using the Regression Evaluator**
- `metricName`: 'rsme' indicates that the evaluation metric is Root Mean Squared Error (RSME)gs

- `labelCol`: 'rating' specifies the column in the DataFrame containg the actual ratings

- `predictionCol`: 'prediction' specifies the column in the DataFrame containing the predicited ratings

In [None]:
evaluator = RegressionEvaluator(
    metricName='rmse',
    labelCol='rating',
    predictionCol='prediction'
)
rsme = evaluator.evaluate(predictions)
print(f'Root Mean Squared Error on test data: {rsme}')

## Baseline Model
Making a baseline model to compare with my trined model to see how good the predicitions are compared to the baseline. If lower than the baseline, this shows that my model has picked up on a pattern with it's algorithm. It will provide a reference point for comparing the performance of my model

In [None]:
import pandas as pd
from sklearn.metrics import mean_squared_error
import numpy as np

In [None]:
df = pd.read_csv("E:/RhaMo/CSV Files/Anime Dataset/user-filtered.csv")

### Calculations
- We calculate the mean rating using the .mean() method of the DataFrame- 
We create an array of baseline predictions using np.full_like() to fill it with the mean rating
-  
We use the mean_squared_error function from the sklearn.metrics module to calculate 

In [None]:
# Step 1: Calculate the Mean
mean_rating = df['rating'].mean()

# Step 2: Make Predictions
baseline_predictions = np.full_like(df['rating'], fill_value=mean_rating)

# Step 3: Compute RMSE
baseline_rmse = np.sqrt(mean_squared_error(df['rating'], baseline_predictions))
print("Baseline RMSE:", baseline_rmse)

# **User-Based Collaboritve Filtering (USBF)**
I am going to train with a different alogrithm to see if it produces better results

In [7]:
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions as F

Checking the number of unique anime_id to set spark.sql.pivotMaxValues the the at least nummber of unique values

In [None]:
df['anime_id'].nunique()

Converting user-item interactions to a user-item matrix

In [8]:
pivot_df = sample_data.groupBy('userId').pivot('itemId').agg(F.avg('rating'))

In [9]:
# Calculate user similarities
normalizer = Normalizer(inputCol="features", outputCol="normFeatures")
assembler = VectorAssembler(inputCols=pivot_df.columns[1:], outputCol="features")
vector_df = assembler.transform(pivot_df).select('userId', 'features')

In [10]:
def dot_product(v1, v2):
    return sum(x * y for x, y in zip(v1, v2))

In [11]:
# Register the dot_product function as a UDF
dot_product_udf = F.udf(dot_product)

# Alias the columns in one of the DataFrames to resolve ambiguity
vector_df = vector_df.alias("vector1")
# cross_joined_df = vector_df.crossJoin(vector_df.alias("vector2"))

In [12]:
similarity_df = vector_df.crossJoin(vector_df.alias("vector2")) \
    .withColumn("dot_product", dot_product_udf(F.col("vector1.features"), F.col("vector2.features"))) \
    .withColumn("norm_dot_product", F.col("dot_product"))

In [13]:
similarity_df = similarity_df.select("vector1.userId", "vector2.userId", "norm_dot_product")

In [16]:
similar_users = similarity_df.filter((F.col('userId') != F.col('userId')) & (F.col('dot_product') > threshold))\
                    .orderBy(F.col('dot_product').desc()

SyntaxError: unexpected EOF while parsing (928742355.py, line 2)