In [1]:
%load_ext autoreload
%autoreload 2

import pyspark as pys
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
sc = pys.sql.SparkSession.builder.appName('filter_movielens').master('local[4]').getOrCreate()


movieSchema = StructType([StructField('MovieId',IntegerType()), \
                          StructField('Title', StringType()),   \
                          StructField('Genres', StringType())])

ratingSchema = StructType([StructField('UserId',IntegerType()),  \
                           StructField('MovieId',IntegerType()),  \
                           StructField('Rating', FloatType()),    \
                           StructField('Timestamp', TimestampType())])

#read as text first
movieText = sc.read.text('ml-10m/ml-10M100K/movies.dat')
movieText.select(split(movieText['value'],'::')).show()


+--------------------+
|    split(value, ::)|
+--------------------+
|[1, Toy Story (19...|
|[2, Jumanji (1995...|
|[3, Grumpier Old ...|
|[4, Waiting to Ex...|
|[5, Father of the...|
|[6, Heat (1995), ...|
|[7, Sabrina (1995...|
|[8, Tom and Huck ...|
|[9, Sudden Death ...|
|[10, GoldenEye (1...|
|[11, American Pre...|
|[12, Dracula: Dea...|
|[13, Balto (1995)...|
|[14, Nixon (1995)...|
|[15, Cutthroat Is...|
|[16, Casino (1995...|
|[17, Sense and Se...|
|[18, Four Rooms (...|
|[19, Ace Ventura:...|
|[20, Money Train ...|
+--------------------+
only showing top 20 rows



In [3]:
rddc = sc.sparkContext
movieLines = rddc.textFile('ml-10m/ml-10M100K/movies.dat')


In [4]:
movieParsed = movieLines.map(lambda l : l.split(sep='::'))

In [6]:
movies = sc.createDataFrame(movieParsed, schema = ['MovieId', 'Title', 'Genres'])
movies

DataFrame[MovieId: string, Title: string, Genres: string]

In [7]:
movies = movies.withColumn('MovieId', movies['MovieId'].cast(IntegerType()))

In [8]:
movies
movies.show()

+-------+--------------------+--------------------+
|MovieId|               Title|              Genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|  Animation|Children|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [14]:
#load ratings
ratingLines = rddc.textFile('ml-10m/ml-10M100K/ratings.dat')
#split lines
ratingParsed = ratingLines.map(lambda l : l.split(sep='::'))
ratings = sc.createDataFrame(ratingParsed, schema = ['UserId', 'MovieId', 'Rating', 'Timestamp'])
ratings.show()
ratings


+------+-------+------+---------+
|UserId|MovieId|Rating|Timestamp|
+------+-------+------+---------+
|     1|    122|     5|838985046|
|     1|    185|     5|838983525|
|     1|    231|     5|838983392|
|     1|    292|     5|838983421|
|     1|    316|     5|838983392|
|     1|    329|     5|838983392|
|     1|    355|     5|838984474|
|     1|    356|     5|838983653|
|     1|    362|     5|838984885|
|     1|    364|     5|838983707|
|     1|    370|     5|838984596|
|     1|    377|     5|838983834|
|     1|    420|     5|838983834|
|     1|    466|     5|838984679|
|     1|    480|     5|838983653|
|     1|    520|     5|838984679|
|     1|    539|     5|838984068|
|     1|    586|     5|838984068|
|     1|    588|     5|838983339|
|     1|    589|     5|838983778|
+------+-------+------+---------+
only showing top 20 rows



DataFrame[UserId: string, MovieId: string, Rating: string, Timestamp: string]

In [15]:
ratings = ratings.withColumn('UserId', ratings['UserId'].cast(IntegerType()))
ratings = ratings.withColumn('MovieId', ratings['MovieId'].cast(IntegerType()))
ratings = ratings.withColumn('Rating', ratings['Rating'].cast(FloatType()))
ratings
ratings.show()

+------+-------+------+---------+
|UserId|MovieId|Rating|Timestamp|
+------+-------+------+---------+
|     1|    122|   5.0|838985046|
|     1|    185|   5.0|838983525|
|     1|    231|   5.0|838983392|
|     1|    292|   5.0|838983421|
|     1|    316|   5.0|838983392|
|     1|    329|   5.0|838983392|
|     1|    355|   5.0|838984474|
|     1|    356|   5.0|838983653|
|     1|    362|   5.0|838984885|
|     1|    364|   5.0|838983707|
|     1|    370|   5.0|838984596|
|     1|    377|   5.0|838983834|
|     1|    420|   5.0|838983834|
|     1|    466|   5.0|838984679|
|     1|    480|   5.0|838983653|
|     1|    520|   5.0|838984679|
|     1|    539|   5.0|838984068|
|     1|    586|   5.0|838984068|
|     1|    588|   5.0|838983339|
|     1|    589|   5.0|838983778|
+------+-------+------+---------+
only showing top 20 rows



In [37]:
ppdRatings = ratings.groupBy('MovieId').agg({'Rating' : 'avg'}).withColumnRenamed('avg(Rating)','Rating')
#ppdRatings.show() #can be optimized


In [39]:
filteredRatings = ppdRatings.join(movies, ppdRatings.MovieId == movies.MovieId, 'inner').select(movies.Title, ppdRatings.Rating)
#ppdRatings.show()
filteredRatings.first()

Row(Title='Awfully Big Adventure, An (1995)', Rating=2.8797468354430378)

In [45]:
filteredRatings.createOrReplaceTempView('ratings')


+-----+------+
|Title|Rating|
+-----+------+
+-----+------+



In [46]:
sqlDF = sc.sql("SELECT * FROM ratings WHERE Title LIKE 'August Rush%'")
sqlDF.show()

+------------------+-----------------+
|             Title|           Rating|
+------------------+-----------------+
|August Rush (2007)|3.476878612716763|
+------------------+-----------------+



In [131]:
import pandas as pd

scrappedData = pd.read_pickle('all_data.pkl')
print(scrappedData)


                                   name audience critics          date  \
0              children_of_a_lesser_god       79      81   Oct 3, 1986   
1                            changeland       66      63   Jun 7, 2019   
2                    japanese_war_bride     None    None  Jan 29, 1952   
3                                8_mile       54      75   Nov 8, 2002   
4                                 ted_2       50      45  Jun 26, 2015   
...                                 ...      ...     ...           ...   
4494                  two_english_girls       82      86  Oct 15, 1972   
4495                      3_generations       44      32   May 5, 2017   
4496                    prometheus_2012       68      73   Jun 8, 2012   
4497                      mrs_doubtfire       77      72  Nov 24, 1993   
4498  national_treasure_book_of_secrets       67      36  Dec 21, 2007   

                                                  genre             director  
0       [Drama, Musical &amp; Pe

In [132]:
import numpy as np
scrappedData.fillna(value = np.nan, inplace = True)

In [133]:
scrappedData

Unnamed: 0,name,audience,critics,date,genre,director
0,children_of_a_lesser_god,79,81,"Oct 3, 1986","[Drama, Musical &amp; Performing Arts, Romance]",[Randa Haines]
1,changeland,66,63,"Jun 7, 2019","[Comedy, Drama]",[Seth Green]
2,japanese_war_bride,,,"Jan 29, 1952","[Classics, Drama]",[King Vidor]
3,8_mile,54,75,"Nov 8, 2002",[Drama],[Curtis Hanson]
4,ted_2,50,45,"Jun 26, 2015",[Comedy],[Seth MacFarlane]
...,...,...,...,...,...,...
4494,two_english_girls,82,86,"Oct 15, 1972","[Art House &amp; International, Documentary, D...",[François Truffaut]
4495,3_generations,44,32,"May 5, 2017","[Comedy, Drama]",[]
4496,prometheus_2012,68,73,"Jun 8, 2012","[Action &amp; Adventure, Drama, Horror, Scienc...",[Ridley Scott]
4497,mrs_doubtfire,77,72,"Nov 24, 1993","[Comedy, Drama, Kids &amp; Family]",[Chris Columbus]


In [134]:
scrappedData = scrappedData.astype({'audience' : 'float', 'critics' : 'float'})
scrappedData.dtypes

name         object
audience    float64
critics     float64
date         object
genre        object
director     object
dtype: object

In [135]:
from datetime import *
for x in range(scrappedData.shape[0]):
    if scrappedData.at[x, 'date'] is np.nan:
        newTime = datetime(1677,9,22)
    else:
        newTime = datetime.strptime(scrappedData.at[x, 'date'], '%b %d, %Y')
    scrappedData.at[x,'date'] = newTime
scrappedData

Unnamed: 0,name,audience,critics,date,genre,director
0,children_of_a_lesser_god,79.0,81.0,1986-10-03 00:00:00,"[Drama, Musical &amp; Performing Arts, Romance]",[Randa Haines]
1,changeland,66.0,63.0,2019-06-07 00:00:00,"[Comedy, Drama]",[Seth Green]
2,japanese_war_bride,,,1952-01-29 00:00:00,"[Classics, Drama]",[King Vidor]
3,8_mile,54.0,75.0,2002-11-08 00:00:00,[Drama],[Curtis Hanson]
4,ted_2,50.0,45.0,2015-06-26 00:00:00,[Comedy],[Seth MacFarlane]
...,...,...,...,...,...,...
4494,two_english_girls,82.0,86.0,1972-10-15 00:00:00,"[Art House &amp; International, Documentary, D...",[François Truffaut]
4495,3_generations,44.0,32.0,2017-05-05 00:00:00,"[Comedy, Drama]",[]
4496,prometheus_2012,68.0,73.0,2012-06-08 00:00:00,"[Action &amp; Adventure, Drama, Horror, Scienc...",[Ridley Scott]
4497,mrs_doubtfire,77.0,72.0,1993-11-24 00:00:00,"[Comedy, Drama, Kids &amp; Family]",[Chris Columbus]


In [136]:
scrappedData = scrappedData.astype({'date' : 'datetime64'})
scrappedData.dtypes

name                object
audience           float64
critics            float64
date        datetime64[ns]
genre               object
director            object
dtype: object

In [137]:
import re
for x in range(scrappedData.shape[0]):
    if scrappedData.at[x, 'name'] is np.nan:
        pass
    else:
        scrappedData.at[x, 'name'] = re.subn('_',' ',scrappedData.at[x, 'name'])[0]



In [138]:
scrappedData

Unnamed: 0,name,audience,critics,date,genre,director
0,children of a lesser god,79.0,81.0,1986-10-03,"[Drama, Musical &amp; Performing Arts, Romance]",[Randa Haines]
1,changeland,66.0,63.0,2019-06-07,"[Comedy, Drama]",[Seth Green]
2,japanese war bride,,,1952-01-29,"[Classics, Drama]",[King Vidor]
3,8 mile,54.0,75.0,2002-11-08,[Drama],[Curtis Hanson]
4,ted 2,50.0,45.0,2015-06-26,[Comedy],[Seth MacFarlane]
...,...,...,...,...,...,...
4494,two english girls,82.0,86.0,1972-10-15,"[Art House &amp; International, Documentary, D...",[François Truffaut]
4495,3 generations,44.0,32.0,2017-05-05,"[Comedy, Drama]",[]
4496,prometheus 2012,68.0,73.0,2012-06-08,"[Action &amp; Adventure, Drama, Horror, Scienc...",[Ridley Scott]
4497,mrs doubtfire,77.0,72.0,1993-11-24,"[Comedy, Drama, Kids &amp; Family]",[Chris Columbus]


In [139]:
cleanData = scrappedData[~scrappedData.audience.isnull() & ~scrappedData.critics.isnull()]

In [105]:

scrappedData.dtypes

name        object
audience    object
critics     object
date        object
genre       object
director    object
dtype: object

In [140]:
cleanData[ (cleanData.date >= datetime(2010,1,1,0,0)) & ((cleanData.audience - cleanData.critics) > 30) ]

Unnamed: 0,name,audience,critics,date,genre,director
7,the smurfs 2,56.0,14.0,2013-07-31,"[Animation, Comedy]",[Raja Gosnell]
76,jacobs ladder 2019,41.0,5.0,2019-08-23,"[Horror, Mystery &amp; Suspense]",[]
83,midway 2019,92.0,42.0,2019-11-08,"[Action &amp; Adventure, Drama]",[Roland Emmerich]
98,the lion of judah,62.0,0.0,2011-03-25,"[Animation, Kids &amp; Family]",[]
119,alvin and the chipmunks chipwrecked,51.0,12.0,2011-12-16,"[Animation, Comedy, Kids &amp; Family]",[Mike Mitchell]
...,...,...,...,...,...,...
4249,bright,84.0,28.0,2017-12-22,[Drama],[David Ayer]
4251,tulip fever,43.0,10.0,2017-09-01,"[Drama, Romance]",[]
4267,dont let go 2019,79.0,39.0,2019-08-30,[Mystery &amp; Suspense],[Jacob Estes]
4294,inconceivable 2017,63.0,31.0,2017-06-30,[Mystery &amp; Suspense],[]


In [144]:
print(cleanData[ (cleanData.date >= datetime(2010,1,1,0,0)) & ((cleanData.audience - cleanData.critics) > 30) ].name.to_string())

7                                            the smurfs 2
76                                     jacobs ladder 2019
83                                            midway 2019
98                                      the lion of judah
119                   alvin and the chipmunks chipwrecked
133                                            the upside
158                                    the wedding ringer
175                       king arthur legend of the sword
180                                    the miracle season
190                               madeas big happy family
202                                       gods not dead 2
203                                          believe 2016
206                       star wars the rise of skywalker
303                                            son of god
319                                     collateral beauty
328                                         barefoot 2014
338                                       vampire academy
395           

In [147]:
scrappedData.to_pickle('all_data_clean.pkl', protocol = 4)

In [None]:
sqlSearchStr = "SELECT * FROM ratings WHERE Title LIKE "
new = 'August Rush%'
for i in range(superclean.name):
    appendStr = "'" + superclean.name.iloc[i]+"%'"
    sqlDF = sc.sql(sqlSearchStr + appendStr)
    