## Working with Pandas

In [32]:
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

In [4]:
xls = pd.ExcelFile('movies.xlsx')
movies = pd.read_excel(xls, 'u.item')
movies.head(3)

Unnamed: 0,movie id,movie title,release date,video release date,IMDb URL,unknown,Action,Adventure,Animation,Children's,...,Fantasy,Film-Noir,Horror,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western
0,1,Toy Story (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Toy%20Story%2...,0,0,0,1,1,...,0,0,0,0,0,0,0,0,0,0
1,2,GoldenEye (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?GoldenEye%20(...,0,1,1,0,0,...,0,0,0,0,0,0,0,1,0,0
2,3,Four Rooms (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Four%20Rooms%...,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,0


In [39]:
movies = movies.drop_duplicates()

In [6]:
ratings = pd.read_excel(xls, 'u.data')
ratings.head(3)

Unnamed: 0,user id,item id,rating,timestamp,date,month year
0,196,242,3,881250949,1997-12-04 15:55:48.864,12.1997
1,186,302,3,891717742,1998-04-04 19:22:22.000,4.1998
2,22,377,1,878887116,1997-11-07 07:18:36.000,11.1997


In [40]:
ratings = ratings.drop_duplicates()

In [8]:
counts = ratings['user id'].value_counts().rename_axis('user id').reset_index(name='amount')
counts.head()

Unnamed: 0,user id,amount
0,405,737
1,655,685
2,13,636
3,450,540
4,276,518


In [9]:
count_max = counts[counts.amount == counts.amount.max()]
count_max.head()

Unnamed: 0,user id,amount
0,405,737


In [10]:
item_id = ratings[ratings['user id'].isin([405])]
item_id = item_id.drop_duplicates()
item_id  

Unnamed: 0,user id,item id,rating,timestamp,date,month year
12276,405,56,4,885544911,1998-01-23 08:41:51,1.1998
12383,405,592,1,885548670,1998-01-23 09:44:30,1.1998
12430,405,1582,1,885548670,1998-01-23 09:44:30,1.1998
12449,405,171,1,885549544,1998-01-23 09:59:04,1.1998
12460,405,580,1,885547447,1998-01-23 09:24:07,1.1998
...,...,...,...,...,...,...
98956,405,375,1,885546835,1998-01-23 09:13:55,1.1998
98978,405,445,4,885548435,1998-01-23 09:40:35,1.1998
99148,405,1246,1,885547735,1998-01-23 09:28:55,1.1998
99465,405,196,1,885546112,1998-01-23 09:01:52,1.1998


In [11]:
df = movies.merge(item_id, left_on='movie id', right_on='item id', how='inner')
df.head()

Unnamed: 0,movie id,movie title,release date,video release date,IMDb URL,unknown,Action,Adventure,Animation,Children's,...,Sci-Fi,Thriller,War,Western,user id,item id,rating,timestamp,date,month year
0,2,GoldenEye (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?GoldenEye%20(...,0,1,1,0,0,...,0,1,0,0,405,2,1,885547953,1998-01-23 09:32:33,1.1998
1,4,Get Shorty (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Get%20Shorty%...,0,1,0,0,0,...,0,0,0,0,405,4,4,885547314,1998-01-23 09:21:54,1.1998
2,5,Copycat (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Copycat%20(1995),0,0,0,0,0,...,0,1,0,0,405,5,4,885545070,1998-01-23 08:44:30,1.1998
3,8,Babe (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Babe%20(1995),0,0,0,0,1,...,0,0,0,0,405,8,4,885545015,1998-01-23 08:43:35,1.1998
4,11,Seven (Se7en) (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Se7en%20(1995),0,0,0,0,0,...,0,1,0,0,405,11,4,885545263,1998-01-23 08:47:43,1.1998


In [12]:
ratings2 = df[['user id', 'movie title']]  
ratings2.head()

Unnamed: 0,user id,movie title
0,405,GoldenEye (1995)
1,405,Get Shorty (1995)
2,405,Copycat (1995)
3,405,Babe (1995)
4,405,Seven (Se7en) (1995)


In [13]:
with_genres = df[['movie id', 'user id', 'movie title', "release date", "Action", "Adventure", "Animation",
              "Children's", "Comedy", "Crime", "Documentary", "Drama", "Fantasy",
              "Film-Noir", "Horror", "Musical", "Mystery", "Romance", "Sci-Fi",
              "Thriller", "War", "Western"]]  
with_genres

Unnamed: 0,movie id,user id,movie title,release date,Action,Adventure,Animation,Children's,Comedy,Crime,...,Fantasy,Film-Noir,Horror,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western
0,2,405,GoldenEye (1995),01-Jan-1995,1,1,0,0,0,0,...,0,0,0,0,0,0,0,1,0,0
1,4,405,Get Shorty (1995),01-Jan-1995,1,0,0,0,1,0,...,0,0,0,0,0,0,0,0,0,0
2,5,405,Copycat (1995),01-Jan-1995,0,0,0,0,0,1,...,0,0,0,0,0,0,0,1,0,0
3,8,405,Babe (1995),01-Jan-1995,0,0,0,1,1,0,...,0,0,0,0,0,0,0,0,0,0
4,11,405,Seven (Se7en) (1995),01-Jan-1995,0,0,0,0,0,1,...,0,0,0,0,0,0,0,1,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
732,1588,405,Salut cousin! (1996),21-Feb-1997,0,0,0,0,1,0,...,0,0,0,0,0,0,0,0,0,0
733,1589,405,Schizopolis (1996),23-May-1997,0,0,0,0,1,0,...,0,0,0,0,0,0,0,0,0,0
734,1590,405,"To Have, or Not (1995)",06-Jun-1997,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
735,1591,405,Duoluo tianshi (1995),21-Jan-1998,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [14]:
sums = ratings[['item id', 'rating']].groupby(['item id']).sum().reset_index('item id')
sums.head()

Unnamed: 0,item id,rating
0,1,1753
1,2,420
2,3,273
3,4,742
4,5,284


In [16]:
counts2 = ratings[['item id', 'rating']].groupby(['item id']).count().reset_index('item id')
counts2.head()

Unnamed: 0,item id,rating
0,1,452
1,2,131
2,3,90
3,4,209
4,5,86


In [17]:
sum_count = counts2.merge(sums, left_on='item id', right_on='item id', how='inner')
sum_count['average'] = sum_count['rating_x'] / sum_count['rating_y']
sum_count   # rating_x = Counts, rating_y = Amount

Unnamed: 0,item id,rating_x,rating_y,average
0,1,452,1753,0.257844
1,2,131,420,0.311905
2,3,90,273,0.329670
3,4,209,742,0.281671
4,5,86,284,0.302817
...,...,...,...,...
1677,1678,1,1,1.000000
1678,1679,1,3,0.333333
1679,1680,1,2,0.500000
1680,1681,1,3,0.333333


In [18]:
merges = with_genres.merge(sum_count, left_on='movie id', right_on='item id', how='inner')
merges['year'] = merges['release date'].str[-4:]
merges.head()

Unnamed: 0,movie id,user id,movie title,release date,Action,Adventure,Animation,Children's,Comedy,Crime,...,Romance,Sci-Fi,Thriller,War,Western,item id,rating_x,rating_y,average,year
0,2,405,GoldenEye (1995),01-Jan-1995,1,1,0,0,0,0,...,0,0,1,0,0,2,131,420,0.311905,1995
1,4,405,Get Shorty (1995),01-Jan-1995,1,0,0,0,1,0,...,0,0,0,0,0,4,209,742,0.281671,1995
2,5,405,Copycat (1995),01-Jan-1995,0,0,0,0,0,1,...,0,0,1,0,0,5,86,284,0.302817,1995
3,8,405,Babe (1995),01-Jan-1995,0,0,0,1,1,0,...,0,0,0,0,0,8,219,875,0.250286,1995
4,11,405,Seven (Se7en) (1995),01-Jan-1995,0,0,0,0,0,1,...,0,0,1,0,0,11,236,908,0.259912,1995


In [25]:
X = merges[['year', 'rating_x', 'rating_y', "Action", "Adventure", "Animation",
              "Children's", "Comedy", "Crime", "Documentary", "Drama", "Fantasy",
              "Film-Noir", "Horror", "Musical", "Mystery", "Romance", "Sci-Fi",
              "Thriller", "War", "Western"]]
y = merges[['average']]

In [26]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

In [35]:
lr = LinearRegression() 
lr.fit(X_train, y_train)
pred = lr.predict(X_test)
lr.coef_

array([[-2.24807572e-05, -1.90084198e-03,  2.59431087e-04,
         4.06345577e-02, -2.73102369e-02, -3.78780191e-02,
        -2.96205292e-02, -5.99242167e-03, -1.38373976e-02,
         4.59844357e-02, -2.35434186e-02,  7.84260556e-02,
         2.94336841e-02,  6.00618345e-02,  4.09848602e-02,
        -1.97913034e-02, -1.27094071e-02, -1.06791075e-02,
        -2.14287095e-02, -9.39175590e-03, -4.28855791e-02]])

In [29]:
mean_squared_error(y_train, lr.predict(X_train))

0.020958597916862987

In [28]:
mean_squared_error(y_test, lr.predict(X_test))

0.02950460092856953

Low MSE indicates a high accuracy of the model, with very low deviation.

## Working with Pyspark

In [None]:
!apt-get update

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q https://downloads.apache.org/spark/spark-3.2.3/spark-3.2.3-bin-hadoop2.7.tgz

In [None]:
!tar -xvf spark-3.2.3-bin-hadoop2.7.tgz

In [None]:
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.3-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import when

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [None]:
ratings_sp = spark.read.options(delimiter=';').csv('ratings.csv', inferSchema="true")
ratings_sp = ratings_sp.select(col("_c0").alias("user_id"), col("_c1").alias("item_id"), col("_c2").alias("rating"), col("_c3").alias("timestamp"))
ratings_sp.show(2)

+-------+-------+------+---------+
|user_id|item_id|rating|timestamp|
+-------+-------+------+---------+
|    196|    242|     3|881250949|
|    186|    302|     3|891717742|
+-------+-------+------+---------+
only showing top 2 rows



In [None]:
movies_sp = spark.read.options(delimiter=';').csv('movies.csv', inferSchema="true")
movies_sp = movies_sp.select(col("_c0").alias("movie_id"), col("_c1").alias("movie_title"), col("_c2").alias("release_date"), col("_c3").alias("video_release_date"), col("_c4").alias("IMDb_URL"), col("_c5").alias("unknown"), col("_c6").alias("Action"), col("_c7").alias("Adventure"), col("_c8").alias("Animation"), col("_c9").alias("Childrens"), col("_c10").alias("Comedy"), col("_c11").alias("Crime"), col("_c12").alias("Documentary"), col("_c13").alias("Drama"), col("_c14").alias("Fantasy"), col("_c15").alias("Film_Noir"), col("_c16").alias("Horror"), col("_c17").alias("Musical"), col("_c18").alias("Mystery"), col("_c19").alias("Romance"), col("_c20").alias("Sci_Fi"), col("_c21").alias("Thriller"), col("_c22").alias("War"), col("_c23").alias("Western"))
movies_sp.show(10)


+--------+--------------------+------------+------------------+--------------------+-------+------+---------+---------+---------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|movie_id|         movie_title|release_date|video_release_date|            IMDb_URL|unknown|Action|Adventure|Animation|Childrens|Comedy|Crime|Documentary|Drama|Fantasy|Film_Noir|Horror|Musical|Mystery|Romance|Sci_Fi|Thriller|War|Western|
+--------+--------------------+------------+------------------+--------------------+-------+------+---------+---------+---------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|       1|    Toy Story (1995)| 01-Jan-1995|              null|http://us.imdb.co...|      0|     0|        0|        1|        1|     1|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|
|       2|    GoldenEye (1995)| 01-Jan-1995|    

In [None]:
ratings_sp.createOrReplaceTempView('ratings_sp')
movies_sp.createOrReplaceTempView('movies_sp')
general = spark.sql('select item_id, count(rating) as count, sum(rating) as rating_sp, sum(rating)/count(rating) as average from ratings_sp group by item_id order by item_id').show(2)

+-------+-----+---------+------------------+
|item_id|count|rating_sp|           average|
+-------+-----+---------+------------------+
|      1|  452|     1753|3.8783185840707963|
|      2|  131|      420|3.2061068702290076|
+-------+-----+---------+------------------+
only showing top 2 rows



Average rating for each movie (by id)

In [None]:
ratings_sp.createOrReplaceTempView('ratings_sp')
movies_sp.createOrReplaceTempView('movies_sp')
general_gen = spark.sql('with cte1 as (with cte as (select item_id, count(rating) as count_sp, sum(rating) as rating_sp, sum(rating)/count(rating) as average_sp from ratings_sp group by item_id) select movies_sp.*, * from cte inner join movies_sp on movies_sp.movie_id = cte.item_id order by movie_id) select movie_id, case when Action == 1 then average_sp end Action, case when Adventure == 1 then average_sp end Adventure, case when Animation == 1 then average_sp end Animation, case when Childrens == 1 then average_sp end Childrens, case when Comedy == 1 then average_sp end Comedy, case when Crime == 1 then average_sp end Crime, case when Documentary == 1 then average_sp end Documentary, case when Drama == 1 then average_sp end Drama, case when Fantasy == 1 then average_sp end Fantasy, case when Film_Noir == 1 then average_sp end Film_Noir, case when Horror == 1 then average_sp end Horror, case when Musical == 1 then average_sp end Musical, case when Mystery == 1 then average_sp end Mystery, case when Romance == 1 then average_sp end Romance, case when Sci_Fi == 1 then average_sp end Sci_Fi, case when Thriller == 1 then average_sp end Thriller, case when War == 1 then average_sp end War, case when Western == 1 then average_sp end Western from cte1').show(10)

+--------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------+------------------+-------+---------+------+-------+-------+-------+-----------------+------------------+-----------------+-------+
|movie_id|            Action|         Adventure|         Animation|         Childrens|            Comedy|            Crime|Documentary|             Drama|Fantasy|Film_Noir|Horror|Musical|Mystery|Romance|           Sci_Fi|          Thriller|              War|Western|
+--------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------+------------------+-------+---------+------+-------+-------+-------+-----------------+------------------+-----------------+-------+
|       1|              null|              null|3.8783185840707963|3.8783185840707963|3.8783185840707963|             null|       null|              null|   null|     null|  null|   null|   null|   n

Average rating for each movie (by genre)

In [None]:
general_max = spark.sql('with cte as (select item_id, count(rating) as count_sp, sum(rating) as rating_sp, sum(rating)/count(rating) as average_sp from ratings_sp group by item_id) select movies_sp.movie_title, cte.count_sp from cte inner join movies_sp on movies_sp.movie_id = cte.item_id order by cte.count_sp desc')
general_max.show(5)

+--------------------+--------+
|         movie_title|count_sp|
+--------------------+--------+
|    Star Wars (1977)|     583|
|      Contact (1997)|     509|
|        Fargo (1996)|     508|
|Return of the Jed...|     507|
|    Liar Liar (1997)|     485|
+--------------------+--------+
only showing top 5 rows



Top 5 by popularity

In [None]:
general_min = spark.sql('with cte as (select item_id, count(rating) as count_sp, sum(rating) as rating_sp, sum(rating)/count(rating) as average_sp from ratings_sp group by item_id) select movies_sp.movie_title, cte.count_sp from cte inner join movies_sp on movies_sp.movie_id = cte.item_id order by cte.count_sp')
general_min.show(5)

5 less popular