### Item Rating Based Recommendation 

In [2]:
from pyspark.sql import SparkSession 
from pyspark.sql.types import IntegerType , StringType ,StructType  , StructField , FloatType , TimestampType , LongType
from pyspark.sql import functions as func

In [3]:
spark = SparkSession.builder.appName('Friends_Analysis').getOrCreate()

In [5]:
rating_schema = StructType([\
    StructField('user_id' , IntegerType() , True),
    StructField('movie_id' , IntegerType() , True),  
    StructField('rating' , IntegerType() , True)
])

ratings = spark.read.option('sep' , '\t').schema(rating_schema).csv('u.data')

movieName_schema = StructType([\
    StructField('Id' , IntegerType() , True),\
    StructField('Name' , StringType(), True)    
])

movieName = spark.read.option('sep' , '|').schema(movieName_schema).csv('u.item')

In [6]:

# doing a self join with ratings to get every pair of movie and their rating 
complete_movie_rating_data = ratings.alias('ratings1')\
                            .join( ratings.alias('ratings2') , ( (func.col('ratings1.user_id') == func.col('ratings2.user_id')) & ( 
                                func.col('ratings1.movie_id') < func.col('ratings2.movie_id') )  )  )\
                             .select( func.col('ratings1.movie_id').alias('movie1'),\
                                     func.col('ratings2.movie_id').alias('movie2'),\
                                     func.col('ratings1.rating').alias('rating1'),\
                                     func.col('ratings2.rating').alias('rating2')
                                    )   



In [7]:
## adding xx ,xy , yy data for recomendation 
pair_data = complete_movie_rating_data\
                .withColumn('xx' , func.col('rating1') * func.col('rating1') ) \
                .withColumn('yy' , func.col('rating2') * func.col('rating2') ) \
                .withColumn('xy' , func.col('rating1') * func.col('rating2') ) 

pair_data.show(10)

+------+------+-------+-------+---+---+---+
|movie1|movie2|rating1|rating2| xx| yy| xy|
+------+------+-------+-------+---+---+---+
|   242|   269|      3|      3|  9|  9|  9|
|   242|   845|      3|      4|  9| 16| 12|
|   242|  1022|      3|      4|  9| 16| 12|
|   242|   762|      3|      3|  9|  9|  9|
|   242|   411|      3|      4|  9| 16| 12|
|   242|  1007|      3|      4|  9| 16| 12|
|   242|  1241|      3|      3|  9|  9|  9|
|   242|   285|      3|      5|  9| 25| 15|
|   242|   382|      3|      4|  9| 16| 12|
|   242|   287|      3|      3|  9|  9|  9|
+------+------+-------+-------+---+---+---+
only showing top 10 rows



In [9]:
# calculating the numerator and denomerator to calculate the score for every pair of movie 

final_data = pair_data\
                .groupBy( func.col('movie1') , func.col( 'movie2') )\
                .agg(  func.sum( func.col('xy')).alias('Denom')  ,\
                    func.sqrt(  func.sum( func.col('xx')) * func.sum( func.col('yy'))  ).alias('numerator'),\
                    func.count( func.col('xy')).alias('numPairs') )\
                # .select( 'movie1' , 'movie2' , '')

final_data.show(10)

+------+------+-----+------------------+--------+
|movie1|movie2|Denom|         numerator|numPairs|
+------+------+-----+------------------+--------+
|    51|   924|  197|208.13457185196313|      15|
|   451|   529|  357| 410.3425398371463|      30|
|    86|   318| 1669|1745.2701796570066|      95|
|    40|   167|  241| 253.9921258622007|      23|
|   274|  1211|   78|  79.5989949685296|       7|
|  1042|  1067|   20|              20.0|       2|
|   118|   946|  402| 437.8858298689283|      40|
|   234|   461|  756| 794.0976010541777|      54|
|    88|   523| 1021|  1066.07692030172|      74|
|   796|  1036|   91| 94.91575211733824|       8|
+------+------+-----+------------------+--------+
only showing top 10 rows



In [10]:
# calculating the score using previously calculated demom and numerator also covering edge cases devide by zero 
result = final_data\
            .withColumn( 'score' , func.when( func.col('Denom') !=0 , func.round( func.col('numerator') / func.col('Denom') , 2 ) ).otherwise(0) )\
            .select('movie1' , 'movie2' , 'score' , 'numPairs')

result.show(10)


+------+------+-----+--------+
|movie1|movie2|score|numPairs|
+------+------+-----+--------+
|    51|   924| 1.06|      15|
|   451|   529| 1.15|      30|
|    86|   318| 1.05|      95|
|    40|   167| 1.05|      23|
|   274|  1211| 1.02|       7|
|  1042|  1067|  1.0|       2|
|   118|   946| 1.09|      40|
|   234|   461| 1.05|      54|
|    88|   523| 1.04|      74|
|   796|  1036| 1.04|       8|
+------+------+-----+--------+
only showing top 10 rows



In [11]:
## setting up threshold value for better performance 
pairThreshold = 50.0
scoreThreshold = 0.97 

In [12]:
## using a movieId to getting movie recomendation as per movie_id = 50  
findMovieId = 50

## adding condn. to get best cases based on threshold value
getActualMovieIds = result\
                    .filter( ( (func.col('movie1') == findMovieId)  |   (func.col('movie2') == findMovieId )) \
                           & (func.col('score') >= scoreThreshold ) & (func.col('numPairs') >= pairThreshold  ))

actual_result = getActualMovieIds.sort(func.col('score').desc()).take(10)

In [13]:
# to get the movie title as per the movie_id 
def getMovieName( movieName , actualMovieId):
    return movieName.filter( func.col('Id') == actualMovieId).select('Name').collect()[0][0]

In [14]:
## getting the movie_id 50 related recommendations 

for result in actual_result :
    actualMovieId = result.movie1
    if actualMovieId == findMovieId:
        actualMovieId = result.movie2
    print(f' Movie_id = { getMovieName(movieName,actualMovieId)} , score = {result.score}  , numPairs = {result.numPairs} ' ) 

 Movie_id = Crash (1996) , score = 1.16  , numPairs = 75 
 Movie_id = Jungle2Jungle (1997) , score = 1.13  , numPairs = 81 
 Movie_id = Cook the Thief His Wife & Her Lover, The (1989) , score = 1.13  , numPairs = 70 
 Movie_id = Event Horizon (1997) , score = 1.13  , numPairs = 89 
 Movie_id = Striptease (1996) , score = 1.13  , numPairs = 53 
 Movie_id = Beautician and the Beast, The (1997) , score = 1.12  , numPairs = 53 
 Movie_id = Cable Guy, The (1996) , score = 1.12  , numPairs = 95 
 Movie_id = Star Trek V: The Final Frontier (1989) , score = 1.12  , numPairs = 59 
 Movie_id = Bean (1997) , score = 1.11  , numPairs = 53 
 Movie_id = Serial Mom (1994) , score = 1.11  , numPairs = 50 
