In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('MovieSimilarity').master('local[*]').getOrCreate()

In [2]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# user id | item id | rating | timestamp. 
schRating = StructType([StructField('user_id', IntegerType(), True),\
                        StructField('movie_id', IntegerType(), True),\
                        StructField('rating', IntegerType(), True),\
                        StructField('timestamp', StringType(), True)])

ratings = spark.read.schema(schema=schRating)\
                    .option('sep', '\t')\
                    .csv('file:////Users/giovanna/Documents/GitHub/pyspark/SparkCourse/ml-100k/u.data')\
                    .select('user_id', 'movie_id', 'rating')

In [3]:
schName = StructType([StructField('movie_id', IntegerType(), True),\
                        StructField('name', StringType(), True)])
names = spark.read.schema(schema=schName)\
                    .option('sep', '|')\
                    .option('charset', 'ISO-8859-1')\
                    .csv('file:////Users/giovanna/Documents/GitHub/pyspark/SparkCourse/ml-100k/u.item')

In [4]:
ratings.show(1)

+-------+--------+------+
|user_id|movie_id|rating|
+-------+--------+------+
|    196|     242|     3|
+-------+--------+------+
only showing top 1 row



In [5]:
from pyspark.sql import functions as F

In [6]:
# self-join, para cada usuário identifica par a par, removendo os filmes iguais.
movie_2a2 = ratings.alias('ratings1')\
                    .join(ratings.alias('ratings2'), \
                            (F.col('ratings1.user_id') == F.col('ratings2.user_id')) \
                                & (F.col('ratings1.movie_id') < F.col('ratings2.movie_id')))\
                    .select(F.col('ratings1.movie_id').alias('movie1'),\
                             F.col('ratings2.movie_id').alias('movie2'),\
                             F.col('ratings1.rating').alias('ratings1'),\
                             F.col('ratings2.rating').alias('ratings2'))
movie_2a2.show(1)

+------+------+--------+--------+
|movie1|movie2|ratings1|ratings2|
+------+------+--------+--------+
|   242|   269|       3|       3|
+------+------+--------+--------+
only showing top 1 row



In [7]:
# filtrar relações com baixo valor de recomendação
movie_2a2 = movie_2a2.filter(F.col('ratings2') < 1)

In [8]:
movie_2a2 = movie_2a2.withColumn('xx', F.col('ratings1') * F.col('ratings1'))\
                        .withColumn('yy', F.col('ratings2') * F.col('ratings2'))\
                        .withColumn('xy', F.col('ratings1') * F.col('ratings2'))

$\Large denominator = \frac{\sum xy}{\sqrt{\sum xx} * \sqrt{\sum yy}}$

In [9]:
movie_2a2_group = movie_2a2.groupby('movie1', 'movie2')\
                            .agg(F.sum(F.col('xy')).alias('numerator'),\
                                 (F.sqrt(F.sum(F.col('xx'))) * F.sqrt(F.sum(F.col('yy')))).alias('denominator'),\
                                 F.count(F.col('xy')).alias('total_revision'))
movie_2a2_group.show(1)

+------+------+---------+-----------+--------------+
|movie1|movie2|numerator|denominator|total_revision|
+------+------+---------+-----------+--------------+
+------+------+---------+-----------+--------------+



In [10]:
movie_scores = movie_2a2_group.withColumn('score', \
                                            F.when(F.col('denominator') != 0, \
                                                    F.col('numerator') / F.col('denominator'))\
                                              .otherwise(0))\
                                .drop('numerator', 'denominator')
movie_scores.show(1)

+------+------+--------------+-----+
|movie1|movie2|total_revision|score|
+------+------+--------------+-----+
+------+------+--------------+-----+



In [11]:
names.show(2)

+--------+----------------+
|movie_id|            name|
+--------+----------------+
|       1|Toy Story (1995)|
|       2|GoldenEye (1995)|
+--------+----------------+
only showing top 2 rows



In [12]:
minScore = 0.90
minRev = 50

In [13]:
movie = 50
names.filter(F.col('movie_id')==movie).collect()[0]['name']

'Star Wars (1977)'

In [14]:
recommendations = movie_scores.filter((F.col('movie1') == movie) \
                                        & (F.col('score') > minScore)\
                                        & (F.col('total_revision') > minRev))\
                                .orderBy('score', ascending=False)\
                                .take(10)

In [15]:
for rec_mov in recommendations:
    print (f"Filme {names.filter(F.col('movie_id') == rec_mov['movie2']).collect()[0]['name']} score {rec_mov['score']}")