# Installation

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# Download Spark
!wget -q https://dlcdn.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz

In [None]:
# Unzip the file
!tar xf spark-3.4.0-bin-hadoop3.tgz

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = '/content/spark-3.4.0-bin-hadoop3'

In [None]:
# Install library for finding Spark
!pip install -q findspark
# Import the libary
import findspark
# Initiate findspark
findspark.init()
# Check the location for Spark
findspark.find()

In [None]:
# Import SparkSession
import pyspark as ps
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.master("local").getOrCreate()
# Check Spark Session Information
spark

In [None]:
import pyspark as ps
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import udf, col, when
import numpy as np
from google.colab import drive
from scipy.sparse import load_npz
import pandas as pd

spark = ps.sql.SparkSession.builder.master("local").appName("recom").getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

# Preparing data

In [None]:
drive.mount('/content/drive')
test_csr=load_npz('/content/drive/MyDrive/recom/train_csr.npz')
# train_csr = load_npz('/content/drive/MyDrive/recom/train_csr.npz')

In [None]:
rows, cols = test_csr.nonzero()
df2 = pd.DataFrame(columns=[ 'user_id', 'movie_id','rating'])
df2['user_id'] = cols
df2['movie_id'] = rows
df2['rating'] = test_csr.data
df2.to_csv('df_s.csv')

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType
schema = StructType([
    StructField("index_", IntegerType(), True),
    StructField("movie_id", IntegerType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("rank", IntegerType(), True)])
ratings= spark.read.csv('/content/drive/MyDrive/recom/df_s.csv', sep='\t', header=True, schema=schema)

In [None]:
training , validation= ratings.randomSplit([.8,.2])

# model

In [None]:
rank = 4
iterat = 20 
regparam=0.1
errors=[]
err=0
als=ALS(maxIter=iterat,
        regParam=regparam,
        rank=rank,
        userCol="user_id",
        itemCol="movie_id",
        ratingCol="rank",
        coldStartStrategy="drop",
        )

In [None]:
model = als.fit(training)

In [None]:
predictions = model.transform(validation)

In [None]:
new_predictions = predictions.filter(col('prediction') != np.nan)

In [None]:
evaluator = RegressionEvaluator(metricName="rmse",labelCol="rank",predictionCol="prediction")

In [None]:
rmse = evaluator.evaluate(new_predictions)
print("RMSE="+str(rmse))