In [7]:
import findspark

findspark.init()

In [8]:
from pyspark.sql.types import *

In [9]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("recommendation_system").getOrCreate()

## Movies dataset

In [10]:
path='movies.csv'

In [11]:
df=spark.read.csv(path,header=True)

In [12]:
df.show

<bound method DataFrame.show of DataFrame[movieId: string, title: string, genres: string]>

We need to create a schema to ensure that each column has the desired data type .

For this , we will create StructType and add StructFields to it with the targeted data type. Then we will pass this schema when we read the file from the local machine and also put header = True ( It will make the first line of the CSV file as the column name of the table and it is False by default, leaving default column names like _c0,_c1 if not set True).

In [13]:
schema=(StructType().add("movieId",IntegerType()).add("title",StringType()).add("genres",StringType()))

In [14]:
moviesdf=spark.read.csv(path,schema=schema,header=True)

In [15]:
moviesdf.show

<bound method DataFrame.show of DataFrame[movieId: int, title: string, genres: string]>

In [16]:
moviesdf.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

withColumn is used to create a new column in the dataframe with some condition or just populating constant values. If column exists already as in this case (genre column) we can populate it with some condition passed alongside it.

In [17]:
from pyspark.sql.functions import split,explode

In [18]:
moviesdf.withColumn("genres",explode(split("genres","[|]"))).show()

+-------+--------------------+---------+
|movieId|               title|   genres|
+-------+--------------------+---------+
|      1|    Toy Story (1995)|Adventure|
|      1|    Toy Story (1995)|Animation|
|      1|    Toy Story (1995)| Children|
|      1|    Toy Story (1995)|   Comedy|
|      1|    Toy Story (1995)|  Fantasy|
|      2|      Jumanji (1995)|Adventure|
|      2|      Jumanji (1995)| Children|
|      2|      Jumanji (1995)|  Fantasy|
|      3|Grumpier Old Men ...|   Comedy|
|      3|Grumpier Old Men ...|  Romance|
|      4|Waiting to Exhale...|   Comedy|
|      4|Waiting to Exhale...|    Drama|
|      4|Waiting to Exhale...|  Romance|
|      5|Father of the Bri...|   Comedy|
|      6|         Heat (1995)|   Action|
|      6|         Heat (1995)|    Crime|
|      6|         Heat (1995)| Thriller|
|      7|      Sabrina (1995)|   Comedy|
|      7|      Sabrina (1995)|  Romance|
|      8| Tom and Huck (1995)|Adventure|
+-------+--------------------+---------+
only showing top

We will filter the dataframe according to our condition

In [19]:
df.filter(df.genres=="(no genres listed)").show()

+-------+--------------------+------------------+
|movieId|               title|            genres|
+-------+--------------------+------------------+
| 114335|   La cravate (1957)|(no genres listed)|
| 122888|      Ben-hur (2016)|(no genres listed)|
| 122896|Pirates of the Ca...|(no genres listed)|
| 129250|   Superfast! (2015)|(no genres listed)|
| 132084| Let It Be Me (1995)|(no genres listed)|
| 134861|Trevor Noah: Afri...|(no genres listed)|
| 141131|    Guardians (2016)|(no genres listed)|
| 141866|   Green Room (2015)|(no genres listed)|
| 142456|The Brand New Tes...|(no genres listed)|
| 143410|          Hyena Road|(no genres listed)|
| 147250|The Adventures of...|(no genres listed)|
| 149330|A Cosmic Christma...|(no genres listed)|
| 152037|  Grease Live (2016)|(no genres listed)|
| 155589|Noin 7 veljestä (...|(no genres listed)|
| 156605|            Paterson|(no genres listed)|
| 159161|Ali Wong: Baby Co...|(no genres listed)|
| 159779|A Midsummer Night...|(no genres listed)|


## Ratings Dataset :

In [20]:
ratings_path='ratings.csv'

In [21]:
ratingsdf=spark.read.csv(ratings_path,header=True)

We will not pass a schema and the spark framework will handle it own by assigning it to the StringType by default . We will check it with using df.schema command

In [22]:
ratingsdf.show

<bound method DataFrame.show of DataFrame[userId: string, movieId: string, rating: string, timestamp: string]>

In [23]:
ratingsdf.schema

StructType(List(StructField(userId,StringType,true),StructField(movieId,StringType,true),StructField(rating,StringType,true),StructField(timestamp,StringType,true)))

In [24]:
ratingsdf.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



    As we notice , we have movieId as the common primary key in both dataframes , so we can join the 2 dataframes but with some conditions and get a full view , we will get one dataframe with full information .

In [25]:
joindf=moviesdf.join(ratingsdf,moviesdf.movieId==ratingsdf.movieId).drop(ratingsdf.movieId)

In [26]:
joindf.show()

+-------+--------------------+--------------------+------+------+---------+
|movieId|               title|              genres|userId|rating|timestamp|
+-------+--------------------+--------------------+------+------+---------+
|      1|    Toy Story (1995)|Adventure|Animati...|     1|   4.0|964982703|
|      3|Grumpier Old Men ...|      Comedy|Romance|     1|   4.0|964981247|
|      6|         Heat (1995)|Action|Crime|Thri...|     1|   4.0|964982224|
|     47|Seven (a.k.a. Se7...|    Mystery|Thriller|     1|   5.0|964983815|
|     50|Usual Suspects, T...|Crime|Mystery|Thr...|     1|   5.0|964982931|
|     70|From Dusk Till Da...|Action|Comedy|Hor...|     1|   3.0|964982400|
|    101|Bottle Rocket (1996)|Adventure|Comedy|...|     1|   5.0|964980868|
|    110|   Braveheart (1995)|    Action|Drama|War|     1|   4.0|964982176|
|    151|      Rob Roy (1995)|Action|Drama|Roma...|     1|   5.0|964984041|
|    157|Canadian Bacon (1...|          Comedy|War|     1|   5.0|964984100|
|    163|   

moviedf is in the left table and ratingsdf is the right table .
This is inner join when both of the movie id matches , we get the result and hence movieId column is present in both dataframes .

In [27]:
joindf2=moviesdf.join(ratingsdf,on=['movieId'],how='outer').drop(ratingsdf.movieId)

In [28]:
joindf2.show()

+-------+--------------------+------+------+------+----------+
|movieId|               title|genres|userId|rating| timestamp|
+-------+--------------------+------+------+------+----------+
|    148|Awfully Big Adven...| Drama|   191|   5.0| 829760897|
|    471|Hudsucker Proxy, ...|Comedy|    32|   3.0| 856737165|
|    471|Hudsucker Proxy, ...|Comedy|    57|   3.0| 969753604|
|    471|Hudsucker Proxy, ...|Comedy|    91|   1.0|1112713817|
|    471|Hudsucker Proxy, ...|Comedy|   104|   4.5|1238111129|
|    471|Hudsucker Proxy, ...|Comedy|   133|   4.0| 843491793|
|    471|Hudsucker Proxy, ...|Comedy|   136|   4.0| 832450058|
|    471|Hudsucker Proxy, ...|Comedy|   171|   3.0| 866905683|
|    471|Hudsucker Proxy, ...|Comedy|   176|   5.0| 840109075|
|    471|Hudsucker Proxy, ...|Comedy|   182|   4.5|1054779644|
|    471|Hudsucker Proxy, ...|Comedy|   216|   3.0| 975212641|
|    471|Hudsucker Proxy, ...|Comedy|   217|   2.0| 955943727|
|    471|Hudsucker Proxy, ...|Comedy|   218|   4.0|1111

Find out top 10 favorite movies (highly rated) and worst (low rated) by a user (for example let user-id be 544)

In [33]:
joindf.where((joindf.userId=='471')&(joindf.rating=='4')).show()

+-------+-----+------+------+------+---------+
|movieId|title|genres|userId|rating|timestamp|
+-------+-----+------+------+------+---------+
+-------+-----+------+------+------+---------+

