In [4]:
# Import the modules
import pandas as pd
from pathlib import Path

import findspark
findspark.init()
from pyspark.sql import SparkSession
import time



In [5]:
# Read in the CSV file as a Pandas DataFrame
titles_df = pd.read_csv(
    Path("Data/titles.csv")
)

rating_df = pd.read_csv(
    Path("Data/ratings.csv")
)

movies_df = pd.read_csv(
    Path("Data/movies.csv")
)


In [6]:
movies_df['title'].head()

0                      Toy Story (1995)
1                        Jumanji (1995)
2               Grumpier Old Men (1995)
3              Waiting to Exhale (1995)
4    Father of the Bride Part II (1995)
Name: title, dtype: object

In [7]:
# Split title on (
movies_df[['titles', 'year']] = movies_df['title'].str.split('(', n=1, expand=True)

# Remove whitespace at end of title
movies_df['titles'] = movies_df['titles'].str.strip()

# Strip ) off of year
movies_df['year'] = movies_df['year'].str.rstrip(')')

In [8]:
movies_df = movies_df[movies_df['genres'].notna()]

movies_df

Unnamed: 0,movieId,title,genres,titles,year
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy,Toy Story,1995
1,2,Jumanji (1995),Adventure|Children|Fantasy,Jumanji,1995
2,3,Grumpier Old Men (1995),Comedy|Romance,Grumpier Old Men,1995
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance,Waiting to Exhale,1995
4,5,Father of the Bride Part II (1995),Comedy,Father of the Bride Part II,1995
...,...,...,...,...,...
62418,209157,We (2018),Drama,We,2018
62419,209159,Window of the Soul (2001),Documentary,Window of the Soul,2001
62420,209163,Bad Poems (2018),Comedy|Drama,Bad Poems,2018
62421,209169,A Girl Thing (2001),(no genres listed),A Girl Thing,2001


In [9]:
movie_id = []

for i, movie in movies_df.iterrows():
    try:
        year = int(movie['year'])
    except:
        print('ValueError')

    if year < 2010:
        movie_id.append(movie['movieId'])



ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError
ValueError

In [10]:
len(movie_id)

41459

In [11]:
# Drop rows that have same movieID as listed in movie_id
movies_reduced = movies_df[~movies_df['movieId'].isin(movie_id)]

movies_reduced.count()

movieId    20964
title      20964
genres     20964
titles     20964
year       20744
dtype: int64

In [12]:
rating_df.count()

userId       25000095
movieId      25000095
rating       25000095
timestamp    25000095
dtype: int64

In [13]:
rating_reduced = rating_df[~rating_df['movieId'].isin(movie_id)]

rating_reduced.count()

userId       2673702
movieId      2673702
rating       2673702
timestamp    2673702
dtype: int64

In [14]:
rating_reduced

Unnamed: 0,userId,movieId,rating,timestamp
712,3,73268,4.0,1439475098
713,3,73321,4.0,1439473925
714,3,73323,4.0,1566091399
715,3,74458,4.0,1492785558
716,3,74789,4.0,1566089982
...,...,...,...,...
24999773,162538,111617,0.5,1438784109
24999774,162538,112138,4.0,1438784075
24999775,162538,112556,3.5,1438784733
24999776,162538,116797,4.5,1438781121


In [15]:
# Create a SparkSession
spark = SparkSession.builder\
    .appName("panda to spark")\
    .config("spark.sql.debug.maxToStringFields", 2000)\
    .config("spark.driver.memory", "2g")\
    .getOrCreate()

In [16]:
# Copy Pandas dataframe to sparks
rating_spark_df = spark.createDataFrame(rating_reduced)
rating_spark_df.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     3|  73268|   4.0|1439475098|
|     3|  73321|   4.0|1439473925|
|     3|  73323|   4.0|1566091399|
|     3|  74458|   4.0|1492785558|
|     3|  74789|   4.0|1566089982|
|     3|  76077|   2.0|1484756109|
|     3|  76093|   4.0|1439473084|
|     3|  76251|   4.0|1566089828|
|     3|  77561|   4.0|1439473240|
|     3|  78469|   3.0|1439474257|
|     3|  78499|   4.0|1439473014|
|     3|  79091|   4.0|1566091612|
|     3|  79132|   5.0|1439474473|
|     3|  79224|   2.0|1566091314|
|     3|  79293|   4.0|1439474106|
|     3|  79695|   3.0|1439474170|
|     3|  80363|   3.0|1484756077|
|     3|  80463|   4.0|1492785561|
|     3|  81229|   4.0|1439474063|
|     3|  81537|   3.0|1566091015|
+------+-------+------+----------+
only showing top 20 rows



In [17]:
# Create a temporary view of the DataFrame.
rating_spark_df.createOrReplaceTempView('rating_data')
rating_spark_df.printSchema()

root
 |-- userId: long (nullable = true)
 |-- movieId: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)



In [35]:
# calculate the average rating for each movie id
start_time = time.time()
rating_grouped_df = spark.sql("""
SELECT movieId AS MovieId
        , ROUND(AVG(rating),2) AS AVERAGE_RATING
        , COUNT(*) AS RATINGS_CNT  
FROM rating_data          
  
GROUP BY movieId
ORDER BY movieId           
""")
rating_grouped_df.show()

+-------+--------------+-----------+
|MovieId|AVERAGE_RATING|RATINGS_CNT|
+-------+--------------+-----------+
|  73268|          3.22|       1512|
|  73270|           3.2|         54|
|  73319|          3.31|        957|
|  73321|          3.46|       5629|
|  73323|          3.84|       1684|
|  73929|          2.62|        864|
|  74115|          2.74|         36|
|  74131|           3.1|        129|
|  74135|          3.56|         17|
|  74154|          3.02|        681|
|  74156|          3.18|        677|
|  74450|          2.92|       1092|
|  74452|          2.65|        631|
|  74458|          3.96|      18886|
|  74530|          2.93|       1981|
|  74532|          2.66|        631|
|  74545|          3.59|       2021|
|  74580|          2.75|        376|
|  74595|          2.92|          6|
|  74613|           3.5|         12|
+-------+--------------+-----------+
only showing top 20 rows



In [36]:
rating_grouped_df.count()

20539

In [37]:
# Copy Pandas dataframe to sparks
movies_spark_df = spark.createDataFrame(movies_reduced)
movies_spark_df.show()

+-------+--------------------+--------------------+--------------------+--------------------+
|movieId|               title|              genres|              titles|                year|
+-------+--------------------+--------------------+--------------------+--------------------+
|  73268|  Daybreakers (2010)|Action|Drama|Horr...|         Daybreakers|                2010|
|  73270|Milk of Sorrow, T...|               Drama| Milk of Sorrow, The|Teta asustada, La...|
|  73319|    Leap Year (2010)|      Comedy|Romance|           Leap Year|                2010|
|  73321|Book of Eli, The ...|Action|Adventure|...|    Book of Eli, The|                2010|
|  73323|Girl Who Kicked t...|Action|Crime|Mystery|Girl Who Kicked t...|Luftslottet som s...|
|  73929|       Legion (2010)|Action|Fantasy|Ho...|              Legion|                2010|
|  74115|Dante's Inferno A...|Action|Animation|...|Dante's Inferno A...|                2010|
|  74131|Extraordinary Mea...|               Drama|Extraordi

In [38]:
movies_spark_df.createOrReplaceTempView('movies_data')
movies_spark_df.printSchema()
rating_grouped_df.createOrReplaceTempView('rating_grouped')
rating_grouped_df.printSchema()

root
 |-- movieId: long (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- titles: string (nullable = true)
 |-- year: string (nullable = true)

root
 |-- MovieId: long (nullable = true)
 |-- AVERAGE_RATING: double (nullable = true)
 |-- RATINGS_CNT: long (nullable = false)



In [42]:
# join the ratings data to the Movies dropping ones that dont match
start_time = time.time()
rating_movies_df = spark.sql("""
SELECT MD.*
     , RG.AVERAGE_RATING
     , RG.RATINGS_CNT                                                
FROM movies_data     MD
   , rating_grouped  RG                                  
WHERE MD.movieID == RG.MovieID  
""")
rating_movies_df.show()

+-------+--------------------+--------------------+--------------------+--------------------+--------------+-----------+
|movieId|               title|              genres|              titles|                year|AVERAGE_RATING|RATINGS_CNT|
+-------+--------------------+--------------------+--------------------+--------------------+--------------+-----------+
| 106002| Ender's Game (2013)|Action|Adventure|...|        Ender's Game|                2013|           3.4|       3752|
| 106100|Dallas Buyers Clu...|               Drama|  Dallas Buyers Club|                2013|          3.91|       6783|
|  91628|New Year's Eve (2...|      Comedy|Romance|      New Year's Eve|                2011|          2.79|        372|
| 102993|Way, Way Back, Th...|        Comedy|Drama|  Way, Way Back, The|                2013|          3.77|       1383|
| 142507|Pawn Sacrifice (2...|               Drama|      Pawn Sacrifice|                2015|          3.56|        298|
|  87992|      Mammuth (2010)|  

In [43]:
rating_movies_df.count()

20539

In [45]:
pandas_df = rating_movies_df.toPandas()
pandas_df.to_csv("Data/movieRating.csv", index=False)