In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

sc = SparkContext()
sqlContext = SQLContext(sc)

spark=SparkSession.builder.appName('Recommend_Function').getOrCreate()

In [24]:
movies = spark.read.csv('s3://msbx5420-2020/mount massive/', header =True, inferSchema=True)

movies = movies.select('title','titleType','startYear','genres','averageRating','numVotes')
movies.show(10)

+--------------------+---------+---------+--------------------+-------------+--------+
|               title|titleType|startYear|              genres|averageRating|numVotes|
+--------------------+---------+---------+--------------------+-------------+--------+
|          Carmencita|    short|   1894.0|   Documentary,Short|          5.6|    1550|
|      Pauvre Pierrot|    short|   1892.0|Animation,Comedy,...|          6.5|    1207|
|    Blacksmith Scene|    short|   1893.0|        Comedy,Short|          6.1|    1934|
|Corbett and Court...|    short|   1894.0|         Short,Sport|          5.5|     615|
|Edison Kinetoscop...|    short|   1894.0|   Documentary,Short|          5.4|    1667|
|La sortie de l'us...|    short|   1895.0|   Documentary,Short|          6.9|    5545|
|L'arrivée d'un tr...|    short|   1896.0|Action,Documentar...|          7.4|    9435|
|Neuville-sur-Saôn...|    short|   1895.0|   Documentary,Short|          5.7|    1447|
|   L'arroseur arrosé|    short|   1895.0| 

In [25]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import *

In [26]:
mean = movies.groupBy().avg("averageRating").take(1)[0][0]
mean

6.386923529027673

In [27]:
movies = movies.withColumn("AverageTotalRating", lit(mean).cast(FloatType()))

movies = movies.withColumn("WeightedRating", lit(0).cast(FloatType())) 

movies = movies.withColumn("MinimumVotes", lit(100).cast(IntegerType())) 

movies = movies.withColumn("numVotes", movies['numVotes'].cast(FloatType()))

In [28]:
movies.show(10)

+--------------------+---------+---------+--------------------+-------------+--------+------------------+--------------+------------+
|               title|titleType|startYear|              genres|averageRating|numVotes|AverageTotalRating|WeightedRating|MinimumVotes|
+--------------------+---------+---------+--------------------+-------------+--------+------------------+--------------+------------+
|          Carmencita|    short|   1894.0|   Documentary,Short|          5.6|  1550.0|         6.3869233|           0.0|         100|
|      Pauvre Pierrot|    short|   1892.0|Animation,Comedy,...|          6.5|  1207.0|         6.3869233|           0.0|         100|
|    Blacksmith Scene|    short|   1893.0|        Comedy,Short|          6.1|  1934.0|         6.3869233|           0.0|         100|
|Corbett and Court...|    short|   1894.0|         Short,Sport|          5.5|   615.0|         6.3869233|           0.0|         100|
|Edison Kinetoscop...|    short|   1894.0|   Documentary,Short

In [29]:
all_original_cols = [eval('movies.' + x) for x in movies.columns]
all_original_cols

[Column<b'title'>,
 Column<b'titleType'>,
 Column<b'startYear'>,
 Column<b'genres'>,
 Column<b'averageRating'>,
 Column<b'numVotes'>,
 Column<b'AverageTotalRating'>,
 Column<b'WeightedRating'>,
 Column<b'MinimumVotes'>]

In [30]:
def weighted(v, m, C, R) : 
  return (v/(v+m) * R) + (m/(m+v) * C)

In [31]:
weighted_udf = udf(weighted, FloatType())

In [32]:
WeightedRating = weighted_udf(movies.numVotes, movies.MinimumVotes, movies.AverageTotalRating, movies.averageRating)
WeightedRating

Column<b'weighted(numVotes, MinimumVotes, AverageTotalRating, averageRating)'>

In [33]:
movies = movies.withColumn("WeightedRating", WeightedRating.cast(FloatType()))

In [34]:
movies = movies.select('title','titleType','startYear','genres','averageRating','WeightedRating')
movies.show(10)

+--------------------+---------+---------+--------------------+-------------+--------------+
|               title|titleType|startYear|              genres|averageRating|WeightedRating|
+--------------------+---------+---------+--------------------+-------------+--------------+
|          Carmencita|    short|   1894.0|   Documentary,Short|          5.6|      5.647692|
|      Pauvre Pierrot|    short|   1892.0|Animation,Comedy,...|          6.5|     6.4913483|
|    Blacksmith Scene|    short|   1893.0|        Comedy,Short|          6.1|      6.114106|
|Corbett and Court...|    short|   1894.0|         Short,Sport|          5.5|     5.6240454|
|Edison Kinetoscop...|    short|   1894.0|   Documentary,Short|          5.4|      5.455853|
|La sortie de l'us...|    short|   1895.0|   Documentary,Short|          6.9|      6.890911|
|L'arrivée d'un tr...|    short|   1896.0|Action,Documentar...|          7.4|      7.389375|
|Neuville-sur-Saôn...|    short|   1895.0|   Documentary,Short|       

In [35]:
movies.createOrReplaceTempView("movies_sql")

In [36]:
recommendations_sql = spark.sql('''
  SELECT title, titleType, startYear, genres, WeightedRating
  FROM movies_sql
  WHERE genres LIKE '%Comedy%' AND titleType = 'movie' AND (startYear > 2000)
  ORDER BY WeightedRating desc
  LIMIT (5)
''')


In [37]:
recommendations_sql.show()

+--------------------+---------+---------+--------------------+--------------+
|               title|titleType|startYear|              genres|WeightedRating|
+--------------------+---------+---------+--------------------+--------------+
|CM101MMXI Fundame...|    movie|   2013.0|  Comedy,Documentary|      9.193336|
|               Shibu|    movie|   2019.0|              Comedy|      8.924001|
|          Anbe Sivam|    movie|   2003.0|Adventure,Comedy,...|      8.782449|
|The Weight of Cha...|    movie|   2014.0|Comedy,Documentar...|      8.780598|
|Nuvvu Naaku Nachchav|    movie|   2001.0|Comedy,Family,Mus...|      8.628696|
+--------------------+---------+---------+--------------------+--------------+

