#INSTALLATION

In [94]:
! apt-get install openjdk-8-jdk-headless -qq > /dev/null
# ! wget -q https://archive.apache.org/dist/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz
! cp drive/MyDrive/MMDS/spark-3.5.4-bin-hadoop3.tgz .
! tar xf spark-3.5.4-bin-hadoop3.tgz
! pip install -q findspark
! pip install memory_profiler



In [95]:
! du -sh spark-3.5.4-bin-hadoop3.tgz

383M	spark-3.5.4-bin-hadoop3.tgz


In [96]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.4-bin-hadoop3"

In [97]:
import findspark
findspark.init()

#LOAD DATASET

In [98]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *

In [99]:
spark = SparkSession.builder.appName("Task1").getOrCreate()

In [100]:
df = spark.read.csv('/content/drive/MyDrive/MMDS/ratings2k.csv',header=True,inferSchema=True)

In [101]:
df.show(1000,truncate=False)

+-----+----+----+------+
|index|user|item|rating|
+-----+----+----+------+
|0    |73  |52  |4.0   |
|1    |36  |239 |3.0   |
|2    |72  |26  |1.0   |
|3    |59  |430 |2.5   |
|4    |72  |284 |3.0   |
|5    |36  |277 |3.0   |
|6    |72  |426 |4.0   |
|7    |18  |163 |3.0   |
|8    |67  |93  |4.0   |
|9    |59  |22  |3.5   |
|10   |8   |174 |2.0   |
|11   |5   |149 |2.0   |
|12   |26  |322 |3.5   |
|13   |8   |416 |4.0   |
|14   |31  |25  |2.0   |
|15   |41  |83  |2.0   |
|16   |25  |321 |3.0   |
|17   |47  |193 |3.0   |
|18   |9   |455 |4.0   |
|19   |43  |216 |3.5   |
|20   |5   |170 |3.0   |
|21   |20  |176 |5.0   |
|22   |12  |368 |2.0   |
|23   |13  |128 |5.0   |
|24   |12  |428 |4.5   |
|25   |72  |49  |3.0   |
|26   |23  |465 |5.0   |
|27   |9   |319 |5.0   |
|28   |8   |390 |4.0   |
|29   |59  |176 |4.0   |
|30   |72  |184 |4.0   |
|31   |36  |328 |3.0   |
|32   |1   |167 |3.5   |
|33   |64  |144 |5.0   |
|34   |25  |81  |4.0   |
|35   |25  |277 |3.0   |
|36   |40  |288 |4.5   |


In [102]:
df.printSchema()

root
 |-- index: integer (nullable = true)
 |-- user: integer (nullable = true)
 |-- item: integer (nullable = true)
 |-- rating: double (nullable = true)



In [117]:
class CollaborativeFiltering:
    '''
        Algorithm to recommend items for individual users using PySpark basing on Item-item Collaborative Filtering.
    '''

    def __init__(self,N:list[int],dataset:DataFrame)->None:
        self.N = N

        self.dataset = dataset.select(
            col("user"),
            col("item"),
            col("rating")
        ).dropDuplicates(['user','item']).dropna()

        self.utility_matrix_item = self.dataset.groupBy('item')\
            .pivot('user')\
            .agg(first('rating'))\
            .fillna(0.0)
        self.item_profiles = self.utility_matrix_item\
            .withColumn('ratings',array(*[col for col in self.utility_matrix_item.columns if col != 'item']))\
            .select('item','ratings')



        self.utility_matrix_user = self.dataset.groupBy('user')\
                                .pivot('item')\
                                .agg(first('rating'))\
                                .fillna(0.0)
        self.user_profiles = self.utility_matrix_user\
            .withColumn('ratings',array(*[col for col in self.utility_matrix_user.columns if col != 'user']))\
            .select('user','ratings')

        # self.spark = self.dataset.sparkSession()
        self.item_similarities = None


    def pearson_similarity(self,min_common_users:int=3):
        '''
            Calculate Pearson similarity between items based on user's rating
            Args: Minimum number of common users
        '''
        #Create pyspark dataframe to self join implement
        df = self.utility_matrix_item.withColumn('ratings',array(*[col for col in self.utility_matrix_item.columns if col != 'item']))
        df1 = df.alias("df1")
        df2 =  df.alias("df2")

        df = df1.crossJoin(df2).where(col('df1.item')!=col('df2.item'))
        # df.show(truncate=False)

        '''Calculate Pearson Correlation Coefficient Similarity'''

        def cal_sim(ratings_r1,ratings_r2):
            sum_x, sum_y, sum_xy = 0,0,0
            sum_x2, sum_y2 = 0,0
            n = len(ratings_r1)
            for i in range(len(ratings_r1)):
                sum_x += ratings_r1[i]
                sum_y += ratings_r2[i]
                sum_xy += ratings_r1[i] + ratings_r2[i]
                sum_x2 += ratings_r1[i]**2
                sum_y2 += ratings_r2[i]**2
            numerator = n * sum_xy - sum_x*sum_y
            denominator = ((n*sum_x2-sum_x**2)*(n*sum_y2-sum_y**2))**(1/2)
            return numerator/denominator if denominator != 0 else 0.0

        cal_sim_udf = udf(cal_sim,FloatType())

        self.item_similarities = df.withColumn('similarity', cal_sim_udf(col('df1.ratings'),col('df2.ratings')))
        # df.select('df1.item','df1.ratings','df2.item','df2.ratings').show(truncate=False)
        self.item_similarities = self.item_similarities.where(col('similarity')>0)


    def predict(self,user:int,n:int)->None:
        '''
            Predict items and recommend those ones
            Input:
                integer number as user id
                &&
                expected number of recommended items
            Return: Pyspark Dataframe as consisting of recommended items sorted in the descening order of scores
        '''
        self.pearson_similarity()


        def dot_product_avg(similarities, ratings):
          if not similarities or not ratings or len(similarities) != len(ratings):
              return None
          n = len(similarities)
          sum_sim = 0
          dot_product=0
          for i in range(n):
            dot_product += similarities[i] * ratings[i]
            sum_sim += similarities[i] if ratings[i] != 0 else 0

          return  dot_product / sum_sim if sum_sim != 0 else 0
        predict_rating_udf = udf(dot_product_avg,FloatType())

        user_id = str(user)
        profile = self.item_similarities.select('df1.item',f'df1.{user_id}','df2.item',f'df2.{user_id}','similarity')
        # profile.show(2000,truncate=False)
        result_df = profile\
        .groupBy('df1.item',f'df1.{user_id}')\
        .agg(collect_list('df2.item').alias('items'),collect_list('similarity').alias('similarities'),collect_list(f'df2.{user_id}').alias('ratings'))\
        .where(col(f'df1.{user_id}')==0.0)\
        .select('item',user_id,'similarities','ratings')

        result_df = result_df\
        .withColumn('predict', predict_rating_udf(col('similarities'),col('ratings')))\
        .orderBy(col('predict').desc())\
        .select('item','predict')\
        .limit(n)

        result_df.show(1000,truncate=False)
        return result_df

    def draw_bar(self):
        pass


In [128]:
Filter = CollaborativeFiltering(10,df)
result_df = Filter.predict(75,5000)

+----+---------+
|item|predict  |
+----+---------+
|14  |3.5489235|
|111 |3.5489235|
|242 |3.5489235|
|313 |3.5489235|
|330 |3.5489235|
|418 |3.5489235|
|466 |3.5489235|
|189 |3.5472455|
|235 |3.5472455|
|261 |3.5472455|
|38  |3.5455806|
|64  |3.5455806|
|72  |3.5455806|
|101 |3.5455806|
|105 |3.5455806|
|106 |3.5455806|
|108 |3.5455806|
|134 |3.5455806|
|137 |3.5455806|
|143 |3.5455806|
|149 |3.5455806|
|166 |3.5455806|
|220 |3.5455806|
|238 |3.5455806|
|252 |3.5455806|
|289 |3.5455806|
|296 |3.5455806|
|304 |3.5455806|
|305 |3.5455806|
|315 |3.5455806|
|334 |3.5455806|
|358 |3.5455806|
|205 |3.5439289|
|227 |3.5439289|
|292 |3.5439289|
|6   |3.5422902|
|19  |3.5422902|
|23  |3.5422902|
|27  |3.5422902|
|32  |3.5422902|
|43  |3.5422902|
|48  |3.5422902|
|49  |3.5422902|
|51  |3.5422902|
|70  |3.5422902|
|73  |3.5422902|
|85  |3.5422902|
|87  |3.5422902|
|92  |3.5422902|
|104 |3.5422902|
|109 |3.5422902|
|113 |3.5422902|
|120 |3.5422902|
|121 |3.5422902|
|126 |3.5422902|
|142 |3.542290

In [129]:
Filter.utility_matrix_item.select('item','10').where(col('10')==0).show(1000,truncate=False)

+----+---+
|item|10 |
+----+---+
|148 |0.0|
|463 |0.0|
|392 |0.0|
|243 |0.0|
|31  |0.0|
|85  |0.0|
|451 |0.0|
|137 |0.0|
|458 |0.0|
|65  |0.0|
|255 |0.0|
|53  |0.0|
|133 |0.0|
|296 |0.0|
|322 |0.0|
|78  |0.0|
|362 |0.0|
|375 |0.0|
|155 |0.0|
|108 |0.0|
|211 |0.0|
|193 |0.0|
|34  |0.0|
|368 |0.0|
|115 |0.0|
|101 |0.0|
|126 |0.0|
|385 |0.0|
|183 |0.0|
|436 |0.0|
|28  |0.0|
|210 |0.0|
|412 |0.0|
|406 |0.0|
|300 |0.0|
|76  |0.0|
|26  |0.0|
|332 |0.0|
|27  |0.0|
|384 |0.0|
|159 |0.0|
|44  |0.0|
|271 |0.0|
|192 |0.0|
|253 |0.0|
|236 |0.0|
|103 |0.0|
|460 |0.0|
|329 |0.0|
|12  |0.0|
|350 |0.0|
|336 |0.0|
|223 |0.0|
|417 |0.0|
|388 |0.0|
|91  |0.0|
|409 |0.0|
|285 |0.0|
|222 |0.0|
|22  |0.0|
|372 |0.0|
|128 |0.0|
|330 |0.0|
|209 |0.0|
|319 |0.0|
|230 |0.0|
|122 |0.0|
|93  |0.0|
|190 |0.0|
|225 |0.0|
|232 |0.0|
|157 |0.0|
|233 |0.0|
|346 |0.0|
|246 |0.0|
|367 |0.0|
|360 |0.0|
|224 |0.0|
|111 |0.0|
|47  |0.0|
|140 |0.0|
|177 |0.0|
|416 |0.0|
|444 |0.0|
|152 |0.0|
|132 |0.0|
|355 |0.0|
|353 |0.0|

In [130]:
Filter.dataset.count() - Filter.dataset.where(col('user')==75).count()

2305

In [131]:
result_df.count()

407