# Clean datasets within Spark & save to parquet

There are currently 2 sources of dataset:
1. [IMDB](https://datasets.imdbws.com/) - Obtained directly from IMDB. Stored as 6 tsv files. 
2. Kaggle - [The Movies Dataset](https://www.kaggle.com/rounakbanik/the-movies-dataset), obtained from the official GroupLens website. Stored as 5 csv.

## Import credentials & set spark session

In [52]:
# Import and definition statements
import configparser
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, concat, col, lit, length
from pyspark.sql.types import IntegerType, DoubleType, LongType

# Function to convert "\N" to None/null
def blank_as_null(x):
    return when(col(x) != r"\N", col(x)).otherwise(None)

In [None]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

os.environ["AWS_ACCESS_KEY_ID"] = config.get('AWS','key')
os.environ["AWS_SECRET_ACCESS_KEY"] = config.get('AWS','secret')

print("Key: ", os.environ["AWS_ACCESS_KEY_ID"])
print("Secret: ", os.environ["AWS_SECRET_ACCESS_KEY"])

In [3]:
# Create spark session
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

# Load 6 IMDB tsv files from S3
## Load title-principals.tsv

In [59]:
s3_bucket = "s3a://udacity-dend-capstone-1995/datasets/imdb/"

In [60]:
title_principals_path = s3_bucket + 'title-principals.tsv'
title_principals_path

's3a://udacity-dend-capstone-1995/datasets/imdb/title-principals.tsv'

In [61]:
principal_df = spark.read.csv(title_principals_path, sep=r'\t', header=True)

In [63]:
principal_df = principal_df.withColumn("characters", blank_as_null("characters"))
principal_df = principal_df.withColumn("job", blank_as_null("job"))

# Change the type of 'ordering' column to int
principal_df = principal_df.withColumn("ordering", principal_df["ordering"].cast(IntegerType()))

# Rename columns
principal_df = principal_df.selectExpr("tconst as TITLE_ID", "nconst as NAME_ID", "ordering as ORDERING", "category as CATEGORY", 
                                      "job as JOB", "characters as CHARACTERS")

print('Num rows: ' , principal_df.count())
principal_df.printSchema()
principal_df.show(5)

Num rows:  42750666
root
 |-- TITLE_ID: string (nullable = true)
 |-- NAME_ID: string (nullable = true)
 |-- ORDERING: integer (nullable = true)
 |-- CATEGORY: string (nullable = true)
 |-- JOB: string (nullable = true)
 |-- CHARACTERS: string (nullable = true)

+---------+---------+--------+---------------+--------------------+----------+
| TITLE_ID|  NAME_ID|ORDERING|       CATEGORY|                 JOB|CHARACTERS|
+---------+---------+--------+---------------+--------------------+----------+
|tt0000001|nm1588970|       1|           self|                null|  ["Self"]|
|tt0000001|nm0005690|       2|       director|                null|      null|
|tt0000001|nm0374658|       3|cinematographer|director of photo...|      null|
|tt0000002|nm0721526|       1|       director|                null|      null|
|tt0000002|nm1335271|       2|       composer|                null|      null|
+---------+---------+--------+---------------+--------------------+----------+
only showing top 5 rows



In [66]:
# Check length of string columns
length_df = principal_df.withColumn('TITLE_ID', length("TITLE_ID")) \
                   .withColumn('NAME_ID', length("NAME_ID"))       \
                   .withColumn('CATEGORY', length("CATEGORY"))     \
                   .withColumn('JOB', length("JOB")).withColumn('CHARACTERS', length("CHARACTERS"))          
                   

max_length = length_df.agg({"TITLE_ID": "max", "NAME_ID": "max" , "CATEGORY": "max",
                            "JOB": "max", "CHARACTERS": "max"
                               }).collect()

max_length

[Row(max(JOB)=286, max(CATEGORY)=19, max(TITLE_ID)=10, max(CHARACTERS)=463, max(NAME_ID)=10)]

In [8]:
# Save parquet to S3 bucket
output_path = "s3a://udacity-dend-capstone-1995/parquet/imdb-principal/imdb_principal.parquet"
principal_df.write.parquet(output_path, 'overwrite')

In [67]:
del principal_df

## Load title-akas.tsv

In [48]:
title_akas_path = s3_bucket + 'title-akas.tsv'
title_akas_path

's3a://udacity-dend-capstone-1995/datasets/imdb/title-akas.tsv'

In [49]:
# Read and clean table
akas_df = spark.read.csv(title_akas_path, sep=r'\t', header=True)

# Replace '\N' with nulls
akas_df = akas_df.withColumn("language", blank_as_null("language"))
akas_df = akas_df.withColumn("types", blank_as_null("types"))
akas_df = akas_df.withColumn("attributes", blank_as_null("attributes"))
akas_df = akas_df.withColumn("title", blank_as_null("title"))

# Change column type
akas_df = akas_df.withColumn("ordering", akas_df["ordering"].cast(IntegerType()))
akas_df = akas_df.withColumn("isOriginalTitle", akas_df["isOriginalTitle"].cast(IntegerType()))

# Rename columns
akas_df = akas_df.selectExpr("titleId as TITLE_ID", "ordering as ORDERING", "title as TITLE", 
                             "region as REGION", "language as LANGUAGE", "types as TYPES", 
                             "attributes as ATTRIBUTES", "isOriginalTitle as IS_ORIGINAL_TITLE" )

print("Row counts: ", akas_df.count())
akas_df.printSchema()
akas_df.show(5)

Row counts:  24978357
root
 |-- TITLE_ID: string (nullable = true)
 |-- ORDERING: integer (nullable = true)
 |-- TITLE: string (nullable = true)
 |-- REGION: string (nullable = true)
 |-- LANGUAGE: string (nullable = true)
 |-- TYPES: string (nullable = true)
 |-- ATTRIBUTES: string (nullable = true)
 |-- IS_ORIGINAL_TITLE: integer (nullable = true)

+---------+--------+--------------------+------+--------+-----------+-------------+-----------------+
| TITLE_ID|ORDERING|               TITLE|REGION|LANGUAGE|      TYPES|   ATTRIBUTES|IS_ORIGINAL_TITLE|
+---------+--------+--------------------+------+--------+-----------+-------------+-----------------+
|tt0000001|       1|          Карменсіта|    UA|    null|imdbDisplay|         null|                0|
|tt0000001|       2|          Carmencita|    DE|    null|       null|literal title|                0|
|tt0000001|       3|Carmencita - span...|    HU|    null|imdbDisplay|         null|                0|
|tt0000001|       4|          Καρμε

In [58]:
# Check length of string columns
length_df = akas_df.withColumn('length_TITLE_ID', length(akas_df.TITLE_ID)) \
                   .withColumn('length_TITLE', length(akas_df.TITLE))       \
                   .withColumn('length_region', length(akas_df.REGION))     \
                   .withColumn('length_language', length(akas_df.LANGUAGE)) \
                   .withColumn('length_types', length(akas_df.TYPES))       \
                   .withColumn('length_attributes', length(akas_df.ATTRIBUTES))

max_length = length_df.agg({"length_TITLE_ID": "max", "length_TITLE": "max" , "length_region": "max",
                            "length_language": "max", "length_types": "max", "length_attributes": "max"
                               }).collect()

max_length

[Row(max(length_TITLE)=831, max(length_language)=3, max(length_TITLE_ID)=10, max(length_attributes)=62, max(length_region)=4, max(length_types)=20)]

In [12]:
# Save to S3 bucket
output_path = "s3a://udacity-dend-capstone-1995/parquet/imdb-akas/imdb_akas.parquet"
akas_df.write.parquet(output_path, 'overwrite')

del akas_df

## Load title-basics.tsv

In [68]:
title_basics_path = s3_bucket + 'title-basics.tsv'
title_basics_path

's3a://udacity-dend-capstone-1995/datasets/imdb/title-basics.tsv'

In [69]:
# Read and clean table
basics_df = spark.read.csv(title_basics_path, sep=r'\t', header=True)

# Replace all invalid values with null
basics_df = basics_df.withColumn("titleType", blank_as_null("titleType"))
basics_df = basics_df.withColumn("primaryTitle", blank_as_null("primaryTitle"))
basics_df = basics_df.withColumn("originalTitle", blank_as_null("originalTitle"))
basics_df = basics_df.withColumn("isAdult", blank_as_null("isAdult"))
basics_df = basics_df.withColumn("startYear", blank_as_null("startYear"))
basics_df = basics_df.withColumn("endYear", blank_as_null("endYear"))
basics_df = basics_df.withColumn("runtimeMinutes", blank_as_null("runtimeMinutes"))
basics_df = basics_df.withColumn("genres", blank_as_null("genres"))

# Cast column types
basics_df = basics_df.withColumn("isAdult", basics_df["isAdult"].cast(IntegerType()))
basics_df = basics_df.withColumn("startYear", basics_df["startYear"].cast(IntegerType()))
basics_df = basics_df.withColumn("endYear", basics_df["endYear"].cast(IntegerType()))
basics_df = basics_df.withColumn("runtimeMinutes", basics_df["runtimeMinutes"].cast(IntegerType()))

# Rename columns
basics_df = basics_df.selectExpr("tconst as TITLE_ID", "titleType as TITLE_TYPE", "primaryTitle as PRIMARY_TITLE", 
                             "originalTitle as ORIGINAL_TITLE", "isAdult as IS_ADULT", "startYear as START_YEAR", 
                             "endYear as END_YEAR", "runtimeMinutes as RUNTIME_MINUTES", "genres as GENRES" )

print("Row counts: ", basics_df.count())
basics_df.printSchema()
basics_df.show(5)

Row counts:  7554298
root
 |-- TITLE_ID: string (nullable = true)
 |-- TITLE_TYPE: string (nullable = true)
 |-- PRIMARY_TITLE: string (nullable = true)
 |-- ORIGINAL_TITLE: string (nullable = true)
 |-- IS_ADULT: integer (nullable = true)
 |-- START_YEAR: integer (nullable = true)
 |-- END_YEAR: integer (nullable = true)
 |-- RUNTIME_MINUTES: integer (nullable = true)
 |-- GENRES: string (nullable = true)

+---------+----------+--------------------+--------------------+--------+----------+--------+---------------+--------------------+
| TITLE_ID|TITLE_TYPE|       PRIMARY_TITLE|      ORIGINAL_TITLE|IS_ADULT|START_YEAR|END_YEAR|RUNTIME_MINUTES|              GENRES|
+---------+----------+--------------------+--------------------+--------+----------+--------+---------------+--------------------+
|tt0000001|     short|          Carmencita|          Carmencita|       0|      1894|    null|              1|   Documentary,Short|
|tt0000002|     short|Le clown et ses c...|Le clown et ses c...| 

In [70]:
# Check length of string columns
length_df = basics_df.withColumn('TITLE_ID', length("TITLE_ID")) \
                   .withColumn('TITLE_TYPE', length("TITLE_TYPE"))       \
                   .withColumn('PRIMARY_TITLE', length("PRIMARY_TITLE"))     \
                   .withColumn('ORIGINAL_TITLE', length("ORIGINAL_TITLE")).withColumn('GENRES', length("GENRES"))          
                   

max_length = length_df.agg({"TITLE_ID": "max", "TITLE_TYPE": "max" , "PRIMARY_TITLE": "max",
                            "ORIGINAL_TITLE": "max", "GENRES": "max"
                               }).collect()

max_length

[Row(max(PRIMARY_TITLE)=419, max(TITLE_ID)=10, max(ORIGINAL_TITLE)=419, max(TITLE_TYPE)=12, max(GENRES)=32)]

In [15]:
# Save to parquet on S3
output_path = "s3a://udacity-dend-capstone-1995/parquet/imdb-basics/imdb_basics.parquet"
basics_df.write.parquet(output_path, 'overwrite')

del basics_df

## Load title-crew.tsv

In [71]:
title_crew_path = s3_bucket + 'title-crew.tsv'
title_crew_path

's3a://udacity-dend-capstone-1995/datasets/imdb/title-crew.tsv'

In [72]:
# Read and clean table
crew_df = spark.read.csv(title_crew_path, sep=r'\t', header=True)

crew_df = crew_df.withColumn("writers", blank_as_null("writers"))
crew_df = crew_df.withColumn("directors", blank_as_null("directors"))

crew_df = crew_df.selectExpr("tconst as TITLE_ID", "directors as DIRECTORS", "writers as WRITERS")
                             
print("Row counts: ", crew_df.count())
crew_df.printSchema()
crew_df.show(5)

Row counts:  7554298
root
 |-- TITLE_ID: string (nullable = true)
 |-- DIRECTORS: string (nullable = true)
 |-- WRITERS: string (nullable = true)

+---------+---------+-------+
| TITLE_ID|DIRECTORS|WRITERS|
+---------+---------+-------+
|tt0000001|nm0005690|   null|
|tt0000002|nm0721526|   null|
|tt0000003|nm0721526|   null|
|tt0000004|nm0721526|   null|
|tt0000005|nm0005690|   null|
+---------+---------+-------+
only showing top 5 rows



In [73]:
# Check length of string columns
length_df = crew_df.withColumn('TITLE_ID', length("TITLE_ID")) \
                   .withColumn('DIRECTORS', length("DIRECTORS"))       \
                   .withColumn('WRITERS', length("WRITERS"))     
                        
                   

max_length = length_df.agg({"TITLE_ID": "max", "DIRECTORS": "max" , "WRITERS": "max"
                               }).collect()

max_length

[Row(max(TITLE_ID)=10, max(WRITERS)=12469, max(DIRECTORS)=4697)]

In [18]:
# Save to parquet on S3
output_path = "s3a://udacity-dend-capstone-1995/parquet/imdb-crew/imdb_crew.parquet"
crew_df.write.parquet(output_path, 'overwrite')

del crew_df

## Load title-episode.tsv

In [74]:
title_episode_path = s3_bucket + 'title-episode.tsv'
title_episode_path

's3a://udacity-dend-capstone-1995/datasets/imdb/title-episode.tsv'

In [75]:
# Read and clean table
episode_df = spark.read.csv(title_episode_path, sep=r'\t', header=True)

episode_df = episode_df.withColumn("parentTconst", blank_as_null("parentTconst"))
episode_df = episode_df.withColumn("seasonNumber", blank_as_null("seasonNumber"))
episode_df = episode_df.withColumn("episodeNumber", blank_as_null("episodeNumber"))

episode_df = episode_df.withColumn("seasonNumber", episode_df["seasonNumber"].cast(IntegerType()))
episode_df = episode_df.withColumn("episodeNumber", episode_df["episodeNumber"].cast(IntegerType()))

episode_df = episode_df.selectExpr("tconst as TITLE_ID", "parentTconst as PARENT_TITLE_ID", "seasonNumber as SEASON_NUMBER", 
                                   "episodeNumber as EPISODE_NUMBER")
        
print("Row counts: ", episode_df.count())
episode_df.printSchema()
episode_df.show(5)

Row counts:  5471901
root
 |-- TITLE_ID: string (nullable = true)
 |-- PARENT_TITLE_ID: string (nullable = true)
 |-- SEASON_NUMBER: integer (nullable = true)
 |-- EPISODE_NUMBER: integer (nullable = true)

+---------+---------------+-------------+--------------+
| TITLE_ID|PARENT_TITLE_ID|SEASON_NUMBER|EPISODE_NUMBER|
+---------+---------------+-------------+--------------+
|tt0041951|      tt0041038|            1|             9|
|tt0042816|      tt0989125|            1|            17|
|tt0042889|      tt0989125|         null|          null|
|tt0043426|      tt0040051|            3|            42|
|tt0043631|      tt0989125|            2|            16|
+---------+---------------+-------------+--------------+
only showing top 5 rows



In [76]:
# Check length of string columns
length_df = episode_df.withColumn('TITLE_ID', length("TITLE_ID")) \
                   .withColumn('PARENT_TITLE_ID', length("PARENT_TITLE_ID"))                          

max_length = length_df.agg({"TITLE_ID": "max", "PARENT_TITLE_ID": "max"
                               }).collect()

max_length

[Row(max(TITLE_ID)=10, max(PARENT_TITLE_ID)=10)]

In [21]:
# Save to parquet on S3
output_path = "s3a://udacity-dend-capstone-1995/parquet/imdb-episode/imdb_episode.parquet"
episode_df.write.parquet(output_path, 'overwrite')

del episode_df

## Load title-ratings.tsv

In [77]:
title_ratings_path = s3_bucket + 'title-ratings.tsv'
title_ratings_path

's3a://udacity-dend-capstone-1995/datasets/imdb/title-ratings.tsv'

In [78]:
# Read and clean table
rating_df = spark.read.csv(title_ratings_path, sep=r'\t', header=True)

rating_df = rating_df.withColumn("averageRating", blank_as_null("averageRating"))
rating_df = rating_df.withColumn("numVotes", blank_as_null("numVotes"))

rating_df = rating_df.withColumn("numVotes", rating_df["numVotes"].cast(IntegerType()))
rating_df = rating_df.withColumn("averageRating", rating_df["averageRating"].cast(DoubleType()))

rating_df = rating_df.selectExpr("tconst as TITLE_ID", "averageRating as AVERAGE_RATING", "numVotes as NUM_VOTES")
                                   
print("Row counts: ", rating_df.count())
rating_df.printSchema()
rating_df.show(5)

Row counts:  1116071
root
 |-- TITLE_ID: string (nullable = true)
 |-- AVERAGE_RATING: double (nullable = true)
 |-- NUM_VOTES: integer (nullable = true)

+---------+--------------+---------+
| TITLE_ID|AVERAGE_RATING|NUM_VOTES|
+---------+--------------+---------+
|tt0000001|           5.7|     1677|
|tt0000002|           6.1|      208|
|tt0000003|           6.5|     1403|
|tt0000004|           6.2|      123|
|tt0000005|           6.2|     2197|
+---------+--------------+---------+
only showing top 5 rows



In [24]:
# Save to parquet on S3
output_path = "s3a://udacity-dend-capstone-1995/parquet/imdb-rating/imdb_rating.parquet"
rating_df.write.parquet(output_path, 'overwrite')

del rating_df

## Load name-basics.tsv

In [79]:
name_basics_path = s3_bucket + 'name-basics.tsv'
name_basics_path

's3a://udacity-dend-capstone-1995/datasets/imdb/name-basics.tsv'

In [80]:
# Read and clean table
name_basic_df = spark.read.csv(name_basics_path, sep=r'\t', header=True)

name_basic_df = name_basic_df.withColumn("primaryName", blank_as_null("primaryName"))
name_basic_df = name_basic_df.withColumn("birthYear", blank_as_null("birthYear"))
name_basic_df = name_basic_df.withColumn("deathYear", blank_as_null("deathYear"))
name_basic_df = name_basic_df.withColumn("primaryProfession", blank_as_null("primaryProfession"))
name_basic_df = name_basic_df.withColumn("knownForTitles", blank_as_null("knownForTitles"))

name_basic_df = name_basic_df.withColumn("birthYear", name_basic_df["birthYear"].cast(IntegerType()))
name_basic_df = name_basic_df.withColumn("deathYear", name_basic_df["deathYear"].cast(IntegerType()))

name_basic_df = name_basic_df.selectExpr("nconst as NAME_ID", "primaryName as PRIMARY_NAME", "birthYear as BIRTH_YEAR", 
                                         "deathYear as DEATH_YEAR", "primaryProfession as PRIMARY_PROFESSION", "knownForTitles as KNOWN_FOR_TITLES")

print("Row counts: ", name_basic_df.count())
name_basic_df.printSchema()
name_basic_df.show(5)

Row counts:  10671837
root
 |-- NAME_ID: string (nullable = true)
 |-- PRIMARY_NAME: string (nullable = true)
 |-- BIRTH_YEAR: integer (nullable = true)
 |-- DEATH_YEAR: integer (nullable = true)
 |-- PRIMARY_PROFESSION: string (nullable = true)
 |-- KNOWN_FOR_TITLES: string (nullable = true)

+---------+---------------+----------+----------+--------------------+--------------------+
|  NAME_ID|   PRIMARY_NAME|BIRTH_YEAR|DEATH_YEAR|  PRIMARY_PROFESSION|    KNOWN_FOR_TITLES|
+---------+---------------+----------+----------+--------------------+--------------------+
|nm0000001|   Fred Astaire|      1899|      1987|soundtrack,actor,...|tt0050419,tt00531...|
|nm0000002|  Lauren Bacall|      1924|      2014|  actress,soundtrack|tt0117057,tt00383...|
|nm0000003|Brigitte Bardot|      1934|      null|actress,soundtrac...|tt0049189,tt00599...|
|nm0000004|   John Belushi|      1949|      1982|actor,soundtrack,...|tt0080455,tt00779...|
|nm0000005| Ingmar Bergman|      1918|      2007|writer,direc

In [84]:
# Check length of string columns
length_df = name_basic_df.withColumn('NAME_ID', length("NAME_ID")) \
                   .withColumn('PRIMARY_NAME', length("PRIMARY_NAME")).withColumn('PRIMARY_PROFESSION', length("PRIMARY_PROFESSION")).withColumn('KNOWN_FOR_TITLES', length("KNOWN_FOR_TITLES"))      

max_length = length_df.agg({"NAME_ID": "max", "PRIMARY_NAME": "max",
                            "PRIMARY_PROFESSION": "max", "KNOWN_FOR_TITLES": "max"
                               }).collect()

max_length

[Row(max(NAME_ID)=10, max(KNOWN_FOR_TITLES)=72, max(PRIMARY_NAME)=105, max(PRIMARY_PROFESSION)=66)]

In [27]:
# Save to parquet on S3
output_path = "s3a://udacity-dend-capstone-1995/parquet/imdb-name-basic/imdb_name_basic.parquet"
name_basic_df.write.parquet(output_path, 'overwrite')

del name_basic_df

# Load kaggle-movie-dataset
Contains 5 csv files: movies_metadata.csv, keywords.csv, links.csv, credits.csv, ratings.csv

## Load movies_metadata.csv

In [85]:
filepath = "s3a://udacity-dend-capstone-1995/datasets/kaggle-movie-dataset/movies_metadata.csv"
filepath

's3a://udacity-dend-capstone-1995/datasets/kaggle-movie-dataset/movies_metadata.csv'

In [91]:
# Read and clean table
movie_df = spark.read.csv(filepath, header=True)

movie_df = movie_df.withColumn("budget", movie_df["budget"].cast(IntegerType()))
movie_df = movie_df.withColumn("revenue", movie_df["revenue"].cast(IntegerType()))
movie_df = movie_df.withColumn("id", movie_df["id"].cast(IntegerType()))
movie_df = movie_df.where(col("id").isNotNull())
movie_df = movie_df.withColumn("runtime", movie_df["runtime"].cast(DoubleType()))
movie_df = movie_df.withColumn("vote_average", movie_df["vote_average"].cast(DoubleType()))
movie_df = movie_df.withColumn("vote_count", movie_df["vote_count"].cast(IntegerType()))

movie_df = movie_df.selectExpr("adult as ADULT", "belongs_to_collection as BELONGS_TO_COLLECTION", "budget as BUDGET", "genres as GENRES",
                               "homepage as HOMEPAGE", "id as TMDB_ID", "imdb_id as IMDB_ID", "original_language as ORIGINAL_LANGUAGE",
                               "original_title as ORIGINAL_TITLE", "overview as OVERVIEW", "popularity as POPULARITY", "poster_path as POSTER_PATH",
                               "production_companies as PRODUCTION_COMPANIES", "production_countries as PRODUCTION_COUNTRIES", "release_date as RELEASE_DATE", "revenue as REVENUE",
                               "runtime as RUNTIME", "spoken_languages as SPOKEN_LANGUAGES", "status as STATUS", "tagline as TAGLINE",
                               "title as TITLE", "video as IS_VIDEO", "vote_average as VOTE_AVERAGE", "vote_count as VOTE_COUNT"
                              )

print("Row counts: ", movie_df.count())
movie_df.printSchema()
movie_df.limit(5).toPandas().head(5)

Row counts:  45363
root
 |-- ADULT: string (nullable = true)
 |-- BELONGS_TO_COLLECTION: string (nullable = true)
 |-- BUDGET: integer (nullable = true)
 |-- GENRES: string (nullable = true)
 |-- HOMEPAGE: string (nullable = true)
 |-- TMDB_ID: integer (nullable = true)
 |-- IMDB_ID: string (nullable = true)
 |-- ORIGINAL_LANGUAGE: string (nullable = true)
 |-- ORIGINAL_TITLE: string (nullable = true)
 |-- OVERVIEW: string (nullable = true)
 |-- POPULARITY: string (nullable = true)
 |-- POSTER_PATH: string (nullable = true)
 |-- PRODUCTION_COMPANIES: string (nullable = true)
 |-- PRODUCTION_COUNTRIES: string (nullable = true)
 |-- RELEASE_DATE: string (nullable = true)
 |-- REVENUE: integer (nullable = true)
 |-- RUNTIME: double (nullable = true)
 |-- SPOKEN_LANGUAGES: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- TAGLINE: string (nullable = true)
 |-- TITLE: string (nullable = true)
 |-- IS_VIDEO: string (nullable = true)
 |-- VOTE_AVERAGE: double (nullable = tru

Unnamed: 0,ADULT,BELONGS_TO_COLLECTION,BUDGET,GENRES,HOMEPAGE,TMDB_ID,IMDB_ID,ORIGINAL_LANGUAGE,ORIGINAL_TITLE,OVERVIEW,...,RELEASE_DATE,REVENUE,RUNTIME,SPOKEN_LANGUAGES,STATUS,TAGLINE,TITLE,IS_VIDEO,VOTE_AVERAGE,VOTE_COUNT
0,False,"{'id': 10194, 'name': 'Toy Story Collection', ...",30000000,"[{'id': 16, 'name': 'Animation'}, {'id': 35, '...",http://toystory.disney.com/toy-story,862,tt0114709,en,Toy Story,"Led by Woody, Andy's toys live happily in his ...",...,1995-10-30,373554033.0,81.0,"[{'iso_639_1': 'en', 'name': 'English'}]",Released,,Toy Story,False,7.7,5415.0
1,False,,65000000,"[{'id': 12, 'name': 'Adventure'}, {'id': 14, '...",,8844,tt0113497,en,Jumanji,When siblings Judy and Peter discover an encha...,...,1995-12-15,262797249.0,104.0,"[{'iso_639_1': 'en', 'name': 'English'}, {'iso...",Released,Roll the dice and unleash the excitement!,Jumanji,False,6.9,2413.0
2,False,"{'id': 119050, 'name': 'Grumpy Old Men Collect...",0,"[{'id': 10749, 'name': 'Romance'}, {'id': 35, ...",,15602,tt0113228,en,Grumpier Old Men,A family wedding reignites the ancient feud be...,...,1995-12-22,0.0,101.0,"[{'iso_639_1': 'en', 'name': 'English'}]",Released,Still Yelling. Still Fighting. Still Ready for...,Grumpier Old Men,False,6.5,92.0
3,False,,16000000,"[{'id': 35, 'name': 'Comedy'}, {'id': 18, 'nam...",,31357,tt0114885,en,Waiting to Exhale,"""Cheated on, mistreated and stepped on, the wo...",...,/16XOMpEaLWkrcPqSQqhTmeJuqQl.jpg,,,1995-12-22,81452156,127.0,"[{'iso_639_1': 'en', 'name': 'English'}]",Released,,
4,False,"{'id': 96871, 'name': 'Father of the Bride Col...",0,"[{'id': 35, 'name': 'Comedy'}]",,11862,tt0113041,en,Father of the Bride Part II,Just when George Banks has recovered from his ...,...,1995-02-10,76578911.0,106.0,"[{'iso_639_1': 'en', 'name': 'English'}]",Released,Just When His World Is Back To Normal... He's ...,Father of the Bride Part II,False,5.7,173.0


In [88]:
# Check length of string columns
length_df = movie_df.withColumn('ADULT', length(movie_df.ADULT)).withColumn('BELONGS_TO_COLLECTION', length(movie_df.BELONGS_TO_COLLECTION)).withColumn('GENRES', length(movie_df.GENRES)) \
                   .withColumn('HOMEPAGE', length(movie_df.HOMEPAGE)).withColumn('IMDB_ID', length(movie_df.IMDB_ID)).withColumn('ORIGINAL_LANGUAGE', length(movie_df.ORIGINAL_LANGUAGE))       \
                   .withColumn('ORIGINAL_TITLE', length(movie_df.ORIGINAL_TITLE)).withColumn('OVERVIEW', length(movie_df.OVERVIEW)).withColumn('POPULARITY', length(movie_df.POPULARITY))     \
                   .withColumn('POSTER_PATH', length(movie_df.POSTER_PATH)).withColumn('PRODUCTION_COMPANIES', length(movie_df.PRODUCTION_COMPANIES)).withColumn('PRODUCTION_COUNTRIES', length(movie_df.PRODUCTION_COUNTRIES)) \
                   .withColumn('RELEASE_DATE', length(movie_df.RELEASE_DATE)).withColumn('SPOKEN_LANGUAGES', length(movie_df.SPOKEN_LANGUAGES)).withColumn('STATUS', length(movie_df.STATUS))       \
                   .withColumn('TAGLINE', length(movie_df.TAGLINE)).withColumn('TITLE', length(movie_df.TITLE)).withColumn('IS_VIDEO', length(movie_df.IS_VIDEO))

max_length = length_df.agg({"ADULT": "max", "BELONGS_TO_COLLECTION": "max" , "GENRES": "max",
                            "HOMEPAGE": "max", "IMDB_ID": "max", "ORIGINAL_LANGUAGE": "max",
                            "ORIGINAL_TITLE": "max", "OVERVIEW": "max", "POPULARITY": "max",
                            "POSTER_PATH": "max", "PRODUCTION_COMPANIES": "max", "PRODUCTION_COUNTRIES": "max",
                            "RELEASE_DATE": "max", "SPOKEN_LANGUAGES": "max", "STATUS": "max",
                            "TAGLINE": "max", "TITLE": "max", "IS_VIDEO": "max",
                               }).collect()

max_length

[Row(max(POSTER_PATH)=958, max(POPULARITY)=666, max(IMDB_ID)=126, max(SPOKEN_LANGUAGES)=765, max(ORIGINAL_LANGUAGE)=131, max(STATUS)=753, max(ORIGINAL_TITLE)=165, max(TITLE)=295, max(TAGLINE)=350, max(BELONGS_TO_COLLECTION)=313, max(IS_VIDEO)=295, max(RELEASE_DATE)=694, max(GENRES)=264, max(PRODUCTION_COUNTRIES)=1039, max(HOMEPAGE)=251, max(OVERVIEW)=1000, max(PRODUCTION_COMPANIES)=1252, max(ADULT)=413)]

In [92]:
# Save to parquet on S3
output_path = "s3a://udacity-dend-capstone-1995/parquet/kaggle-movie-metadata/kaggle_movie_metadata.parquet"
movie_df.write.parquet(output_path, 'overwrite')

del movie_df

## Load keywords.csv

In [93]:
filepath = "s3a://udacity-dend-capstone-1995/datasets/kaggle-movie-dataset/keywords.csv"
filepath

's3a://udacity-dend-capstone-1995/datasets/kaggle-movie-dataset/keywords.csv'

In [95]:
# Read and clean table
keyword_df = spark.read.csv(filepath, header=True)

keyword_df = keyword_df.withColumn("id", keyword_df["id"].cast(IntegerType()))
keyword_df = keyword_df.where(col("id").isNotNull())
keyword_df = keyword_df.selectExpr("id as TMDB_ID", "keywords as KEYWORDS")

print("Row counts: ", keyword_df.count())
keyword_df.printSchema()
keyword_df.limit(5).toPandas().head(5)

Row counts:  46419
root
 |-- TMDB_ID: integer (nullable = true)
 |-- KEYWORDS: string (nullable = true)



Unnamed: 0,TMDB_ID,KEYWORDS
0,862,"[{'id': 931, 'name': 'jealousy'}, {'id': 4290,..."
1,8844,"""[{'id': 10090, 'name': 'board game'}, {'id': ..."
2,15602,"[{'id': 1495, 'name': 'fishing'}, {'id': 12392..."
3,31357,"[{'id': 818, 'name': 'based on novel'}, {'id':..."
4,11862,"[{'id': 1009, 'name': 'baby'}, {'id': 1599, 'n..."


In [96]:
# Check length of string columns
length_df = keyword_df.withColumn('TMDB_ID', length(keyword_df.TMDB_ID))\
                   .withColumn('KEYWORDS', length(keyword_df.KEYWORDS))

max_length = length_df.agg({"TMDB_ID": "max", "KEYWORDS": "max"
                               }).collect()

max_length

[Row(max(TMDB_ID)=6, max(KEYWORDS)=5305)]

In [33]:
# Save to parquet on S3
output_path = "s3a://udacity-dend-capstone-1995/parquet/kaggle-movie-keywords/kaggle_movie_keywords.parquet"
keyword_df.write.parquet(output_path, 'overwrite')

del keyword_df

## Load credits.csv

In [97]:
filepath = "s3a://udacity-dend-capstone-1995/datasets/kaggle-movie-dataset/credits.csv"
filepath

's3a://udacity-dend-capstone-1995/datasets/kaggle-movie-dataset/credits.csv'

In [98]:
# Read and clean table
credit_df = spark.read.csv(filepath, header=True)

credit_df = credit_df.withColumn("id", credit_df["id"].cast(IntegerType()))
credit_df = credit_df.where(col("id").isNotNull())

credit_df = credit_df.selectExpr("id as TMDB_ID", "crew as MOVIE_CREW", "cast as MOVIE_CAST")

print("Row counts: ", credit_df.count())
credit_df.printSchema()
credit_df.limit(5).toPandas().head(5)

Row counts:  30457
root
 |-- TMDB_ID: integer (nullable = true)
 |-- MOVIE_CREW: string (nullable = true)
 |-- MOVIE_CAST: string (nullable = true)



Unnamed: 0,TMDB_ID,MOVIE_CREW,MOVIE_CAST
0,8844,"[{'credit_id': '52fe44bfc3a36847f80a7cd1', 'de...","[{'cast_id': 1, 'character': 'Alan Parrish', '..."
1,15602,"[{'credit_id': '52fe466a9251416c75077a89', 'de...","[{'cast_id': 2, 'character': 'Max Goldman', 'c..."
2,11862,"[{'credit_id': '52fe44959251416c75039ed7', 'de...","[{'cast_id': 1, 'character': 'George Banks', '..."
3,11860,"[{'credit_id': '52fe44959251416c75039da9', 'de...","[{'cast_id': 1, 'character': 'Linus Larrabee',..."
4,45325,"[{'credit_id': '52fe46bdc3a36847f810f797', 'de...","[{'cast_id': 2, 'character': 'Tom Sawyer', 'cr..."


In [100]:
# Check length of string columns
length_df = credit_df.withColumn('MOVIE_CREW', length(credit_df.MOVIE_CREW))\
                   .withColumn('MOVIE_CAST', length(credit_df.MOVIE_CAST))

max_length = length_df.agg({"MOVIE_CREW": "max", "MOVIE_CAST": "max"
                               }).collect()

max_length

[Row(max(MOVIE_CAST)=24701, max(MOVIE_CREW)=33198)]

In [101]:
# Save to parquet on S3
output_path = "s3a://udacity-dend-capstone-1995/parquet/kaggle-movie-credits/kaggle_movie_credits.parquet"
credit_df.write.parquet(output_path, 'overwrite')

del credit_df

## Load links.csv

In [102]:
filepath = "s3a://udacity-dend-capstone-1995/datasets/kaggle-movie-dataset/links.csv"
filepath

's3a://udacity-dend-capstone-1995/datasets/kaggle-movie-dataset/links.csv'

In [103]:
# Read and clean table
links_df = spark.read.csv(filepath, header=True)

links_df = links_df.withColumn("movieId", links_df["movieId"].cast(IntegerType()))
links_df = links_df.withColumn("tmdbId", links_df["tmdbId"].cast(IntegerType()))

links_df = links_df.withColumn("imdb_id", concat(lit("tt"), col("imdbId") ) )

links_df = links_df.drop('imdbId')

links_df = links_df.selectExpr("movieId as LINKS_ID", "tmdbId as TMDB_ID", "imdb_id as IMDB_ID")

print("Row counts: ", links_df.count())
links_df.printSchema()
links_df.limit(5).toPandas().head(5)

Row counts:  45843
root
 |-- LINKS_ID: integer (nullable = true)
 |-- TMDB_ID: integer (nullable = true)
 |-- IMDB_ID: string (nullable = true)



Unnamed: 0,LINKS_ID,TMDB_ID,IMDB_ID
0,1,862,tt0114709
1,2,8844,tt0113497
2,3,15602,tt0113228
3,4,31357,tt0114885
4,5,11862,tt0113041


In [105]:
# Check length of string columns
length_df = links_df.withColumn('IMDB_ID', length(links_df.IMDB_ID))
                   
max_length = length_df.agg({"IMDB_ID": "max"
                               }).collect()

max_length

[Row(max(IMDB_ID)=9)]

In [39]:
# Save to parquet on S3
output_path = "s3a://udacity-dend-capstone-1995/parquet/kaggle-movie-links/kaggle_movie_links.parquet"
links_df.write.parquet(output_path, 'overwrite')

del links_df

## Load ratings.csv

In [106]:
filepath = "s3a://udacity-dend-capstone-1995/datasets/kaggle-movie-dataset/ratings.csv"
filepath

's3a://udacity-dend-capstone-1995/datasets/kaggle-movie-dataset/ratings.csv'

In [107]:
# Read and clean table
rating_df = spark.read.csv(filepath, header=True)

rating_df = rating_df.withColumn("userId", rating_df["userId"].cast(IntegerType()))
rating_df = rating_df.withColumn("movieId", rating_df["movieId"].cast(IntegerType()))
rating_df = rating_df.withColumn("rating", rating_df["rating"].cast(DoubleType()))
rating_df = rating_df.withColumn("timestamp", rating_df["timestamp"].cast(LongType()))

rating_df = rating_df.selectExpr("userId as USER_ID", "movieId as TMDB_ID", "rating as RATING", "timestamp as TIMESTAMP")

print("Row counts: ", rating_df.count())
rating_df.printSchema()
rating_df.limit(5).toPandas().head(5)

Row counts:  26024289
root
 |-- USER_ID: integer (nullable = true)
 |-- TMDB_ID: integer (nullable = true)
 |-- RATING: double (nullable = true)
 |-- TIMESTAMP: long (nullable = true)



Unnamed: 0,USER_ID,TMDB_ID,RATING,TIMESTAMP
0,1,110,1.0,1425941529
1,1,147,4.5,1425942435
2,1,858,5.0,1425941523
3,1,1221,5.0,1425941546
4,1,1246,5.0,1425941556


In [42]:
# Save to parquet on S3
output_path = "s3a://udacity-dend-capstone-1995/parquet/kaggle-movie-ratings/kaggle_movie_ratings.parquet"
rating_df.write.parquet(output_path, 'overwrite')

del rating_df