In [85]:
import pandas as pd

In [86]:
imdb_titles_path = './data/imdb/title.basics.tsv'
imdb_ratings_path = './data/imdb/title.ratings.tsv'
imdb_crew_path = './data/imdb/title.crew.tsv'
imdb_names_path = './data/imdb/name.basics.tsv'
rotten_tomatoes_movies_path = './data/rotten_tomatoes/rotten_tomatoes_movies.csv'
netflix_titles_path = './data/netflix/netflix_titles.csv'

## IMDB dataset

In [87]:
imdb_titles_df = pd.read_csv(imdb_titles_path, '\t', nrows=100000)
imdb_titles_df.head(2)

  interactivity=interactivity, compiler=compiler, result=result)


Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"


In [88]:
imdb_ratings_df = pd.read_csv(imdb_ratings_path, '\t', nrows=100000)
imdb_ratings_df.head(2)

Unnamed: 0,tconst,averageRating,numVotes
0,tt0000001,5.7,1720
1,tt0000002,6.0,211


In [89]:
imdb_crew_df = pd.read_csv(imdb_crew_path, '\t', nrows=100000)
imdb_crew_df.head(2)

Unnamed: 0,tconst,directors,writers
0,tt0000001,nm0005690,\N
1,tt0000002,nm0721526,\N


In [90]:
imdb_names_df = pd.read_csv(imdb_names_path,'\t', nrows=100000)
imdb_names_df.head(2)

Unnamed: 0,nconst,primaryName,birthYear,deathYear,primaryProfession,knownForTitles
0,nm0000001,Fred Astaire,1899,1987,"soundtrack,actor,miscellaneous","tt0053137,tt0050419,tt0031983,tt0072308"
1,nm0000002,Lauren Bacall,1924,2014,"actress,soundtrack","tt0037382,tt0117057,tt0071877,tt0038355"


## Rotten Tomatoes Dataset

In [91]:
rotten_tomatoes_df = pd.read_csv(rotten_tomatoes_movies_path)
rotten_tomatoes_df.head(2)

Unnamed: 0,rotten_tomatoes_link,movie_title,movie_info,critics_consensus,content_rating,genres,directors,authors,actors,original_release_date,...,production_company,tomatometer_status,tomatometer_rating,tomatometer_count,audience_status,audience_rating,audience_count,tomatometer_top_critics_count,tomatometer_fresh_critics_count,tomatometer_rotten_critics_count
0,m/0814255,Percy Jackson & the Olympians: The Lightning T...,"Always trouble-prone, the life of teenager Per...",Though it may seem like just another Harry Pot...,PG,"Action & Adventure, Comedy, Drama, Science Fic...",Chris Columbus,"Craig Titley, Chris Columbus, Rick Riordan","Logan Lerman, Brandon T. Jackson, Alexandra Da...",2010-02-12,...,20th Century Fox,Rotten,49.0,149.0,Spilled,53.0,254421.0,43,73,76
1,m/0878835,Please Give,Kate (Catherine Keener) and her husband Alex (...,Nicole Holofcener's newest might seem slight i...,R,Comedy,Nicole Holofcener,Nicole Holofcener,"Catherine Keener, Amanda Peet, Oliver Platt, R...",2010-04-30,...,Sony Pictures Classics,Certified-Fresh,87.0,142.0,Upright,64.0,11574.0,44,123,19


## Netflix Dataset

In [26]:
netflix_df = pd.read_csv(netflix_titles_path)
netflix_df.head(2)

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description
0,s1,TV Show,3%,,"João Miguel, Bianca Comparato, Michel Gomes, R...",Brazil,"August 14, 2020",2020,TV-MA,4 Seasons,"International TV Shows, TV Dramas, TV Sci-Fi &...",In a future where the elite inhabit an island ...
1,s2,Movie,7:19,Jorge Michel Grau,"Demián Bichir, Héctor Bonilla, Oscar Serrano, ...",Mexico,"December 23, 2016",2016,TV-MA,93 min,"Dramas, International Movies",After a devastating earthquake hits Mexico Cit...


# Table Extractions and Transformations

## Read tables

In [33]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [41]:
from pyspark.sql.types import *

In [111]:
titles_schema = StructType([
    StructField("tconst", StringType(), False),
    StructField("titleType", StringType(), False),
    StructField("primaryTitle", StringType(), False),
    StructField("originalTitle", StringType(), False),
    StructField("isAdult", BooleanType(), False),
    StructField("startYear", IntegerType(), True),
    StructField("endYear", IntegerType(), True),
    StructField("runtimeMinutes", IntegerType(), True),
    StructField("genres", StringType(), True)
])

In [177]:
imdb_titles = spark.read.csv(imdb_titles_path, sep=r'\t', header=True, schema=titles_schema)
imdb_ratings = spark.read.csv(imdb_ratings_path, sep=r'\t', header=True)
imdb_crew = spark.read.csv(imdb_crew_path, sep=r'\t', header=True)
imdb_names = spark.read.csv(imdb_names_path, sep=r'\t', header=True)
rotten_tomatoes_titles = spark.read.csv(rotten_tomatoes_movies_path, header=True)
netflix_titles = spark.read.csv(netflix_titles_path, header=True)

## Build Final Tables

### Genre Table

In [178]:
from pyspark.sql.functions import regexp_replace

In [179]:
imdb_titles = imdb_titles.filter(imdb_titles.genres!='\\N')
imdb_titles = imdb_titles.withColumn('genre', F.explode(F.split(F.trim(imdb_titles.genres), ',')))

In [180]:
imdb_titles.select('genre').distinct().collect()

[Row(genre='Crime'),
 Row(genre='Romance'),
 Row(genre='Thriller'),
 Row(genre='Adventure'),
 Row(genre='Drama'),
 Row(genre='War'),
 Row(genre='Documentary'),
 Row(genre='Reality-TV'),
 Row(genre='Family'),
 Row(genre='Fantasy'),
 Row(genre='Game-Show'),
 Row(genre='Adult'),
 Row(genre='History'),
 Row(genre='Mystery'),
 Row(genre='Musical'),
 Row(genre='Animation'),
 Row(genre='Music'),
 Row(genre='Film-Noir'),
 Row(genre='Short'),
 Row(genre='Horror'),
 Row(genre='Western'),
 Row(genre='Biography'),
 Row(genre='Comedy'),
 Row(genre='Sport'),
 Row(genre='Action'),
 Row(genre='Talk-Show'),
 Row(genre='Sci-Fi'),
 Row(genre='News')]

In [111]:
import psycopg2

In [112]:
conn = psycopg2.connect(
    host="localhost",
    database="movies_db")
conn.autocommit= True
cur = conn.cursor()


## Tables Create Statements

todo: fix in schema titles.id to varchar

In [None]:
imdb_titles = spark.read.csv(imdb_titles_path, sep=r'\t', header=True, schema=titles_schema)
imdb_ratings = spark.read.csv(imdb_ratings_path, sep=r'\t', header=True)
imdb_crew = spark.read.csv(imdb_crew_path, sep=r'\t', header=True)
imdb_names = spark.read.csv(imdb_names_path, sep=r'\t', header=True)
rotten_tomatoes_titles = spark.read.csv(rotten_tomatoes_movies_path, header=True)
netflix_titles = spark.read.csv(netflix_titles_path, header=True)

In [139]:
def create_database():

    # connect to default database
    conn = psycopg2.connect("host=127.0.0.1 dbname=default user=antoniam")
    conn.set_session(autocommit=True)
    cur = conn.cursor()
    
    # create sparkify database with UTF8 encoding
    cur.execute("DROP DATABASE IF EXISTS movies_db")
    cur.execute("CREATE DATABASE movies_db WITH ENCODING 'utf8' TEMPLATE template0")

    # close connection to default database
    conn.close()    
    
    # connect to sparkify database
    conn = psycopg2.connect("host=127.0.0.1 dbname=movies_db")
    cur = conn.cursor()
    
    return cur, conn

In [140]:
cur, conn = create_database()

OperationalError: could not connect to server: Connection refused
	Is the server running on host "127.0.0.1" and accepting
	TCP/IP connections on port 5432?


In [106]:
create_staging_imdb_titles ="""CREATE TABLE IF NOT EXISTS movies_db.staging_imdb_titles(
tconst VARCHAR NOT NULL PRIMARY KEY,
titleType VARCHAR,
primaryTitle VARCHAR,
originalTitle VARCHAR,
isAdult BOOLEAN,
startYear VARCHAR,
endYear VARCHAR,
runtimeMinutes INT4,
genres VARCHAR);
"""

create_staging_imdb_ratings ="""CREATE TABLE IF NOT EXISTS movies_db.staging_imdb_ratings(
tconst VARCHAR NOT NULL,
averageRating FLOAT,
numVotes INT);
"""

create_staging_imdb_crew = """CREATE TABLE IF NOT EXISTS movies_db.staging_imdb_crew(
tconst VARCHAR NOT NULL,
directors VARCHAR,
writers VARCHAR);
"""

create_staging_imdb_names = """CREATE TABLE IF NOT EXISTS movies_db.staging_imdb_names(
nconst VARCHAR NOT NULL PRIMARY KEY,
primaryName VARCHAR NOT NULL,
birthYear VARCHAR,
deathYear VARCHAR,
primaryProfession VARCHAR,
knownForTitles VARCHAR);
"""

create_staging_rotten_tomatoes_titles = """CREATE TABLE IF NOT EXISTS movies_db.staging_rotten_tomatoes_titles(
rotten_tomatoes_link VARCHAR, 
movie_title VARCHAR,
movie_info VARCHAR,
critics_consensus VARCHAR, 
content_rating VARCHAR, 
genres VARCHAR, 
directors VARCHAR, 
authors VARCHAR,
actors VARCHAR, 
original_release_date DATE, 
streaming_release_date DATE, 
runtime FLOAT,
production_company VARCHAR, 
tomatometer_status VARCHAR, 
tomatometer_rating FLOAT,
tomatometer_count INT, 
audience_status VARCHAR, 
audience_rating FLOAT,
audience_count INT, 
tomatometer_top_critics_count INT,
tomatometer_fresh_critics_count INT, 
tomatometer_rotten_critics_count INT);
"""

create_staging_netflix_titles = """CREATE TABLE IF NOT EXISTS movies_db.staging_netflix_titles(
show_id VARCHAR NOT NULL PRIMARY KEY,
type VARHCAR,
director VARCHAR,
cast VARCHAR,
country VARCHAR,
date_added DATE,
release_year INT4,
rating VARCHAR,
duration VARCHAR,
listed_in VARCHAR,
description VARCHAR);
"""

In [107]:
create_titles = \
"""CREATE TABLE IF NOT EXISTS titles(
  id VARCHAR NOT NULL PRIMARY KEY,
  name VARCHAR,
  description VARCHAR,
  year INT4,
  runtime_minutes INT,
  country VARCHAR,
  isAdult BOOLEAN,
  type VARCHAR,
  imdb_avg_score FLOAT,
  imdb_n_ratings INT,
  rt_critics_score FLOAT,
  rt_n_critics INT,
  rt_audience_score FLOAT,
  rt_n_audience INT);
"""

create_roles = \
"""CREATE TABLE IF NOT EXISTS roles(
title_id VARCHAR,
person_id VARCHAR,
role_name VARCHAR NOT NULL
);
"""

create_persons = \
"""CREATE TABLE IF NOT EXISTS persons(
id VARCHAR NOT NULL PRIMARY KEY,
full_name VARCHAR NOT NULL,
birth_year INT4,
death_year INT4
);
"""

create_title_genres = \
"""CREATE TABLE IF NOT EXISTS title_genres(
title_id VARCHAR,
genre_id VARCHAR);"""

create_genres = \
"""CREATE TABLE IF NOT EXISTS genres(
id VARCHAR NOT NULL PRIMARY KEY,
genre_name VARCHAR NOT NULL);"""

In [109]:
staging_tables_sql = [create_staging_imdb_titles, create_staging_imdb_ratings, create_staging_imdb_crew, create_staging_imdb_names, 
 create_staging_rotten_tomatoes_titles, create_staging_netflix_titles]

In [113]:
cur.execute(create_titles)
cur.execute(create_persons)
cur.execute(create_title_genres)
cur.execute(create_genres)

In [115]:
for s in staging_tables_sql:
    cur.execute(s)

ProgrammingError: schema "movies_db" does not exist
LINE 1: CREATE TABLE IF NOT EXISTS movies_db.staging_imdb_titles(
                                   ^
