Install Dependencies:


1.   Java 8
2.   Apache Spark with hadoop and
3.   Findspark (used to locate the spark in the system)


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

Set Environment Variables:

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark import SparkContext
#sc = SparkContext()

We are going to download some datasets called MovieLens
https://grouplens.org/datasets/movielens/

Let's use ML-25M dataset
- ratings.csv
- movies.csv


In [None]:
sc = SparkContext.getOrCreate();
from pyspark.sql import SparkSession
spark = SparkSession(sc)
movies = spark.read.csv('movies.csv',inferSchema=True, header=True)

In [None]:
movies.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

###Count the number of movies from 1995. Assume the movie year is given at the end of the title.


In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType



movieYear = udf(lambda x: x[-5:-1] if x is not None else x, StringType())
movies = movies.withColumn('year', movieYear(movies['title']))
movies = movies.withColumn('year', movies.year.cast('integer'))
rdd = movies.select('year').rdd
df = rdd.toDF()
df.filter(df.year>= 1995).count()

36868

###Count the number of movies for each year. Assume the movie year is given at the end of the title.



year | count

In [None]:
yearCount = movies.groupBy('year').count()
yearCount = yearCount.sort('count', ascending= False)
yearCount.show()

+----+-----+
|year|count|
+----+-----+
|2015| 2512|
|2016| 2488|
|2014| 2403|
|2017| 2373|
|2013| 2164|
|2018| 2032|
|2012| 1959|
|2011| 1821|
|2009| 1721|
|2010| 1673|
|2008| 1625|
|2007| 1494|
|2006| 1440|
|2005| 1250|
|2004| 1168|
|2003| 1028|
|2002| 1022|
|2019|  993|
|2001|  969|
|2000|  929|
+----+-----+
only showing top 20 rows



###For each row, convert genres as a list  (rdd or dataframe both ok)




In [None]:
rdd = movies.select('genres').rdd
rdd2 = rdd.map(lambda x: x[0].split('|'))
rdd2.collect()

[['Adventure', 'Animation', 'Children', 'Comedy', 'Fantasy'],
 ['Adventure', 'Children', 'Fantasy'],
 ['Comedy', 'Romance'],
 ['Comedy', 'Drama', 'Romance'],
 ['Comedy'],
 ['Action', 'Crime', 'Thriller'],
 ['Comedy', 'Romance'],
 ['Adventure', 'Children'],
 ['Action'],
 ['Action', 'Adventure', 'Thriller'],
 ['Comedy', 'Drama', 'Romance'],
 ['Comedy', 'Horror'],
 ['Adventure', 'Animation', 'Children'],
 ['Drama'],
 ['Action', 'Adventure', 'Romance'],
 ['Crime', 'Drama'],
 ['Drama', 'Romance'],
 ['Comedy'],
 ['Comedy'],
 ['Action', 'Comedy', 'Crime', 'Drama', 'Thriller'],
 ['Comedy', 'Crime', 'Thriller'],
 ['Crime', 'Drama', 'Horror', 'Mystery', 'Thriller'],
 ['Action', 'Crime', 'Thriller'],
 ['Drama', 'Sci-Fi'],
 ['Drama', 'Romance'],
 ['Drama'],
 ['Children', 'Drama'],
 ['Drama', 'Romance'],
 ['Adventure', 'Drama', 'Fantasy', 'Mystery', 'Sci-Fi'],
 ['Crime', 'Drama'],
 ['Drama'],
 ['Mystery', 'Sci-Fi', 'Thriller'],
 ['Adventure', 'Romance', 'IMAX'],
 ['Children', 'Drama'],
 ['Drama', '

###What is the most frequent genre that appears with action?


In [None]:
rdd3 = rdd2.filter(lambda x: x[0]=='Action')
rdd3.collect()

[['Action', 'Crime', 'Thriller'],
 ['Action'],
 ['Action', 'Adventure', 'Thriller'],
 ['Action', 'Adventure', 'Romance'],
 ['Action', 'Comedy', 'Crime', 'Drama', 'Thriller'],
 ['Action', 'Crime', 'Thriller'],
 ['Action', 'Crime', 'Drama'],
 ['Action', 'Adventure', 'Fantasy'],
 ['Action', 'Drama', 'Thriller'],
 ['Action', 'Sci-Fi', 'Thriller'],
 ['Action', 'Comedy', 'Horror', 'Thriller'],
 ['Action'],
 ['Action', 'Sci-Fi', 'Thriller'],
 ['Action', 'Crime', 'Drama', 'Thriller'],
 ['Action', 'Adventure', 'Drama'],
 ['Action', 'Thriller'],
 ['Action', 'Adventure', 'Thriller'],
 ['Action', 'Thriller'],
 ['Action', 'Drama', 'War'],
 ['Action', 'Adventure', 'Comedy', 'Crime'],
 ['Action', 'Drama'],
 ['Action', 'Comedy', 'Crime', 'Drama', 'Thriller'],
 ['Action', 'Drama', 'Romance', 'War'],
 ['Action', 'Adventure', 'Comedy', 'Crime'],
 ['Action', 'Adventure', 'Mystery', 'Sci-Fi'],
 ['Action', 'Romance', 'Western'],
 ['Action', 'Crime', 'Thriller'],
 ['Action', 'Drama', 'Romance'],
 ['Action', 

###Read the ratings.csv file, and find the earliest review. When was it?


In [None]:
from datetime import datetime
ts = 1538364257

# if you encounter a "year is out of range" error the timestamp
# may be in milliseconds, try `ts /= 1000` in that case
print(datetime.utcfromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'))

2018-10-01 03:24:17


In [None]:
ratings = spark.read.csv('ratings.csv',inferSchema=True, header=True)

In [None]:
ratings.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
|     1|    665|   5.0|1147878820|
|     1|    899|   3.5|1147868510|
|     1|   1088|   4.0|1147868495|
|     1|   1175|   3.5|1147868826|
|     1|   1217|   3.5|1147878326|
|     1|   1237|   5.0|1147868839|
|     1|   1250|   4.0|1147868414|
|     1|   1260|   3.5|1147877857|
|     1|   1653|   4.0|1147868097|
|     1|   2011|   2.5|1147868079|
|     1|   2012|   2.5|1147868068|
|     1|   2068|   2.5|1147869044|
|     1|   2161|   3.5|1147868609|
|     1|   2351|   4.5|1147877957|
|     1|   2573|   4.0|1147878923|
|     1|   2632|   5.0|1147878248|
|     1|   2692|   5.0|1147869100|
+------+-------+------+----------+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import min
minTimeStamp = ratings.select("timestamp").rdd.min()[0]
print(datetime.utcfromtimestamp(minTimeStamp).strftime('%Y-%m-%d %H:%M:%S'))

1995-01-09 11:46:49


###Read the ratings.csv file, and find the number of total users



In [None]:
usersCnt = ratings.groupBy("userId").count()
usersCnt.count()

109328

### How many movies were reviewed by the same user more than once?



In [None]:
ratingCnt = ratings.groupBy(["userId", "movieId"]).count().where('count>1')
ratingCnt.count()

0

###Is every movie in the rating.csv file in movie.csv?



In [None]:
joined = ratings.join(movies, on= 'movieId')
joined.count()

16861087

In [None]:
joined.na.drop().count()

16851769

A: No

### How many movies in movies.csv never been reviewd by any user from rating.csv?  



In [None]:
a = joined.count()
b = joined.na.drop().count()

print(a-b)

9318


### Design own recommender engine based on quantity of review, quality of review, freshness of review. Walk through your idea with codes. Combine three components equally.

In [None]:
from pyspark.sql import functions as F

Pyspark sql supports lot of functions :)

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html

see aggregate functions !!!

In [None]:
ratings.groupBy("movieId").agg(F.mean('rating')).show()

+-------+------------------+
|movieId|       avg(rating)|
+-------+------------------+
|   1088| 3.248096842630725|
|   1580|3.5772621208192894|
|   3175|3.6128885748889785|
|  44022| 3.250076663600123|
| 175197|2.6970443349753697|
|   1645| 3.552041151362592|
|    471|3.6644782790892583|
|   3794| 3.159047619047619|
|   8638|3.9774045801526716|
|  33722|3.5354330708661417|
|   2142| 3.043080054274084|
|   2366| 3.539308905380334|
|   6658| 2.923780487804878|
|   1959| 3.615288999378496|
|   6620| 3.774051360674588|
|  54190|3.5474642392717817|
|   3918|2.9422246220302375|
|  68135|3.0502244668911334|
|   1342| 2.969703301295445|
|   1591|2.6231944444444446|
+-------+------------------+
only showing top 20 rows



what about I want to aggregate more than 1... (mean and count)

In [None]:
ratings.groupBy("movieId").agg(F.mean('rating'), F.count('rating')).show()

+-------+------------------+-------------+
|movieId|       avg(rating)|count(rating)|
+-------+------------------+-------------+
|   1088| 3.248096842630725|         8013|
|   1580|3.5772621208192894|        26999|
|   3175|3.6128885748889785|         9908|
|  44022| 3.250076663600123|         3261|
| 175197|2.6970443349753697|          406|
|   1645| 3.552041151362592|         9137|
|    471|3.6644782790892583|         7159|
|   3794| 3.159047619047619|          525|
|   8638|3.9774045801526716|         3275|
|  33722|3.5354330708661417|          127|
|   2142| 3.043080054274084|         1474|
|   2366| 3.539308905380334|         4312|
|   6658| 2.923780487804878|          492|
|   1959| 3.615288999378496|         3218|
|   6620| 3.774051360674588|         2609|
|  54190|3.5474642392717817|         1538|
|   3918|2.9422246220302375|          926|
|  68135|3.0502244668911334|         1782|
|   1342| 2.969703301295445|         2393|
|   1591|2.6231944444444446|         3600|
+-------+--

Do you want to apply a function on both column and create another column?

Let's use UDF!

In [None]:
ratingsAgg = ratings.groupBy("movieId").agg(F.mean('rating'), F.count('rating'))

In [None]:
from pyspark.sql.functions import udf


rs = udf(lambda x : x )
rsScores = ratingsAgg.withColumn('rsScore', rs(ratingsAgg['avg(rating)']))
rsScores.show()

+-------+------------------+-------------+------------------+
|movieId|       avg(rating)|count(rating)|           rsScore|
+-------+------------------+-------------+------------------+
|     29|3.9108723135271806|         2373|3.9108723135271806|
|   3091| 4.049411764705883|          425| 4.049411764705883|
|  60756|3.3919098143236073|          754|3.3919098143236073|
|  71530| 3.277027027027027|          592| 3.277027027027027|
|  72011|3.7143366619115548|         1402|3.7143366619115548|
| 106002| 3.391304347826087|          598| 3.391304347826087|
| 106100|3.8762135922330097|         1030|3.8762135922330097|
|    474|3.7230081906180192|         5372|3.7230081906180192|
|   2927|4.0618181818181816|          275|4.0618181818181816|
|   2529|3.6230748446365846|         3701|3.6230748446365846|
|  60408|3.7848837209302326|           86|3.7848837209302326|
|     26|3.6283185840707963|          791|3.6283185840707963|
|   5385| 4.062730627306273|          271| 4.062730627306273|
|  96829