<a href="https://colab.research.google.com/github/ST10117200/GettingStartedWithPySpark/blob/main/Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285397 sha256=eacbea58c4d9fbb90d979a6d872e1245272a8abe28684b4c1dda9d68bb9977e8
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn
import random
import os

from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer

In [3]:
from google.colab import files

uploaded = files.upload()

Saving movie_ratings_df.csv to movie_ratings_df.csv


In [4]:
spark = SparkSession.builder.appName('recommender_system').getOrCreate()

In [5]:
import io

df = spark.read.csv('movie_ratings_df.csv', header = True, inferSchema = True)
df.count()

100000

In [6]:
df.head()

Row(userId=196, title='Kolya (1996)', rating=3)

In [9]:
df.describe()

DataFrame[summary: string, userId: string, title: string, rating: string]

In [11]:
from pyspark.ml.feature import StringIndexer, IndexToString

stringIndexer = StringIndexer(inputCol='title', outputCol='title_new')

# Applying stringindexer object on dataframe movie title column.
model = stringIndexer.fit(df)

# Creating new dataframe with transformed values.
indexed = model.transform(df)

# Validate the numerical title values.
indexed.limit(5).toPandas()

Unnamed: 0,userId,title,rating,title_new
0,196,Kolya (1996),3,287.0
1,63,Kolya (1996),3,287.0
2,226,Kolya (1996),5,287.0
3,154,Kolya (1996),3,287.0
4,306,Kolya (1996),5,287.0


In [12]:
# Split the data into training and test datatset.
train, test = indexed.randomSplit([0.75,0.25])

from pyspark.ml.recommendation import ALS

# Training the recommender model using train datatset.
rec=ALS( maxIter=10
        ,regParam=0.01
        ,userCol='userId'
        ,itemCol='title_new'
        ,ratingCol='rating'
        ,nonnegative=True
        ,coldStartStrategy="drop")

# Fit the model on train set.
rec_model=rec.fit(train)

# Making predictions on test set.
predicted_ratings=rec_model.transform(test)
predicted_ratings.limit(5).toPandas()

Unnamed: 0,userId,title,rating,title_new,prediction
0,148,2001: A Space Odyssey (1968),5,59.0,2.084151
1,148,Amadeus (1984),1,50.0,3.975547
2,148,Beauty and the Beast (1991),4,114.0,3.912307
3,148,Being There (1979),5,290.0,3.703876
4,148,Blade Runner (1982),5,52.0,4.478418


In [13]:
# Importing Regression Evaluator to measure RMSE.
from pyspark.ml.evaluation import RegressionEvaluator

# Create Regressor evaluator object for measuring accuracy.
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')

# Apply the RE on predictions dataframe to calculate RMSE.
rmse=evaluator.evaluate(predicted_ratings)

# Print RMSE error.
print(rmse)

1.0187834259697948


In [14]:
# First we need to create dataset of all distinct movies.
unique_movies=indexed.select('title_new').distinct()

# Create function to recommend top 'n' movies to any particular user.
def top_movies(user_id,n):
    """
    This function returns the top 'n' movies that user has not seen yet but might like

    """
    # Assigning alias name 'a' to unique movies df.
    a = unique_movies.alias('a')

    # Creating another dataframe which contains already watched movie by active user.
    watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new')

    # Assigning alias name 'b' to watched movies df
    b=watched_movies.alias('b')

    # Joining both tables on left join.
    total_movies = a.join(b, a.title_new == b.title_new,how='left')

    # Selecting movies which active user is yet to rate or watch.
    remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()


    # Adding new column of user_Id of active useer to remaining movies df.
    remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))


    # Making recommendations using ALS recommender model and selecting only top 'n' movies.
    recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False).limit(n)


    # Adding columns of movie titles in recommendations.
    movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)
    final_recommendations=movie_title.transform(recommendations)

    # Return the recommendations to active user.
    return final_recommendations.show(n,False)

In [15]:
# Test: recommend 5 movies for user of id=60.
top_movies(60,5)

+---------+------+----------+----------------------------------------------------+
|title_new|userId|prediction|title                                               |
+---------+------+----------+----------------------------------------------------+
|1103.0   |60    |5.919969  |Stalker (1979)                                      |
|1394.0   |60    |5.5498056 |So Dear to My Heart (1949)                          |
|1189.0   |60    |5.41212   |Grace of My Heart (1996)                            |
|1347.0   |60    |5.2975926 |Angel Baby (1995)                                   |
|846.0    |60    |5.2341633 |Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)|
+---------+------+----------+----------------------------------------------------+

