# Join, then convert a CSV file to Parquet using Spark

This notebook will convert a CSV file to Parquet for compuational advantages later on. However, we will start by adding a `YearOfRelease` column to the original data by joining it with the file `movie_titles.csv`.

In [4]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DateType, StringType, FloatType
from pyspark.sql.functions import rand

import os

In [5]:
# Input and output data paths
data_csv = "cleaned-dataset-csv/cleaned_*.csv"
movie_titles_path = "netflix-dataset/movie_titles.csv"
output_data_parquet = "data-parquet"

In [6]:
skip = False

# If parquet file already exist, don't convert it. 
if os.path.exists(output_data_parquet):
    skip = True
    
    
if not skip:
    # Initiate SparkContext if currently not running
    try:
        sc = SparkContext(appName="CSV to Parquet")
        spark = SparkSession.builder.master("local").getOrCreate()
    except: 
        pass

22/01/31 13:17:03 WARN Utils: Your hostname, Jakobs-MBP.local resolves to a loopback address: 127.0.0.1; using 192.168.0.10 instead (on interface en0)
22/01/31 13:17:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/31 13:17:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/01/31 13:17:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Each one of the two datasets have different columns and data types. Following, we'll specify that. 

In [7]:
# Specify name and data type for each column and dataFrame
schema_data = StructType([
    StructField("MovieID", IntegerType(), True),
    StructField("CustomerID", IntegerType(), True),
    StructField("Rating", FloatType(), True),
    StructField("RatingDate", DateType(), True)])

schema_movies = StructType([
    StructField("MovieID", IntegerType(), True),
    StructField("YearOfRelease", IntegerType(), True)])

Using the schemas, we load the datasets into two different DataFrames. 

In [8]:
# Load the datasets with the specified schemas
if not skip:
    df = spark.read.csv("cleaned-dataset-csv/cleaned_*.csv", 
                        header=False, 
                        schema=schema_data)
    movie_titles = spark.read.csv(movie_titles_path, 
                                  schema=schema_movies)
    
    movie_titles.show(5)
    df.show(5)

+-------+-------------+
|MovieID|YearOfRelease|
+-------+-------------+
|      1|         2003|
|      2|         2004|
|      3|         1997|
|      4|         1994|
|      5|         2004|
+-------+-------------+
only showing top 5 rows

+-------+----------+------+----------+
|MovieID|CustomerID|Rating|RatingDate|
+-------+----------+------+----------+
|      1|   1488844|   3.0|2005-09-06|
|      1|    822109|   5.0|2005-05-13|
|      1|    885013|   4.0|2005-10-19|
|      1|     30878|   4.0|2005-12-26|
|      1|    823519|   3.0|2004-05-03|
+-------+----------+------+----------+
only showing top 5 rows



In [70]:
df.cube("Rating").count().show()



+------+---------+
|Rating|    count|
+------+---------+
|   3.0| 28811247|
|   5.0| 23168232|
|   4.0| 33750958|
|   1.0|  4617990|
|  null|100480507|
|   2.0| 10132080|
+------+---------+



                                                                                

Let's now join the two dataframes and keep only the columns we are interested in - which are all columns in `df` but only `YearOfRelease` in `movie_titles`. We join them on the common column `MovieID`. In order to select only the columns we're interested in, we create an alias for each of the DataFrames. 

In [9]:
# Specify alias for the DataFrames
df = df.alias("df")
movie_titles = movie_titles.alias("movie_titles")

# Join the two DataFrames on MovieID. Select only columns of interest
df = df.join(movie_titles,
             df.MovieID == movie_titles.MovieID,
             "left").select("df.*", "movie_titles.YearOfRelease")

In [10]:
# Shuffle the data before displaying the top rows
df = df.orderBy(rand())
df.show(5)



+-------+----------+------+----------+-------------+
|MovieID|CustomerID|Rating|RatingDate|YearOfRelease|
+-------+----------+------+----------+-------------+
|   1367|    851525|   4.0|2005-10-31|         1993|
|  12945|    697889|   5.0|2004-05-21|         2001|
|   1435|    804335|   5.0|2003-06-23|         2001|
|    607|   1266203|   4.0|2005-03-04|         1994|
|   3958|   1257921|   4.0|2005-10-21|         2003|
+-------+----------+------+----------+-------------+
only showing top 5 rows



                                                                                

In [11]:
%%time

# Write to Parquet file
if not skip:
    df.write.parquet(output_data_parquet)

                                                                                

CPU times: user 53.3 ms, sys: 25.3 ms, total: 78.6 ms
Wall time: 1min 6s


This went fast - let's confirm the files are there.

In [12]:
os.listdir(output_data_parquet)

['part-00012-d2c77d45-4888-4ff3-8a09-dee7a771837a-c000.snappy.parquet',
 'part-00033-d2c77d45-4888-4ff3-8a09-dee7a771837a-c000.snappy.parquet',
 '.part-00032-d2c77d45-4888-4ff3-8a09-dee7a771837a-c000.snappy.parquet.crc',
 '.part-00011-d2c77d45-4888-4ff3-8a09-dee7a771837a-c000.snappy.parquet.crc',
 'part-00038-d2c77d45-4888-4ff3-8a09-dee7a771837a-c000.snappy.parquet',
 '.part-00023-d2c77d45-4888-4ff3-8a09-dee7a771837a-c000.snappy.parquet.crc',
 'part-00005-d2c77d45-4888-4ff3-8a09-dee7a771837a-c000.snappy.parquet',
 '.part-00000-d2c77d45-4888-4ff3-8a09-dee7a771837a-c000.snappy.parquet.crc',
 'part-00024-d2c77d45-4888-4ff3-8a09-dee7a771837a-c000.snappy.parquet',
 'part-00019-d2c77d45-4888-4ff3-8a09-dee7a771837a-c000.snappy.parquet',
 '.part-00020-d2c77d45-4888-4ff3-8a09-dee7a771837a-c000.snappy.parquet.crc',
 'part-00015-d2c77d45-4888-4ff3-8a09-dee7a771837a-c000.snappy.parquet',
 '.part-00003-d2c77d45-4888-4ff3-8a09-dee7a771837a-c000.snappy.parquet.crc',
 '.DS_Store',
 'part-00028-d2c77d4

**Note for later**: Partitioning the DataFrame once created 40 parquet files instead of originally 20. This cut the final model training by up to 40 minutes. From originally 126min to 85min. 