In [None]:
pip install psycopg2-binary

In [None]:
from pyspark.sql import SparkSession, DataFrameReader
import pyspark.sql.functions as f
from pyspark.sql.types import IntegerType
import psycopg2

# Spark session & context
spark = SparkSession.builder.master("local").getOrCreate()
sc = spark.sparkContext

# Set legacy time parser policy to legacy for convenience reasons
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

In [None]:
df = spark.read.csv("data/netflix_titles.csv", inferSchema=True, header=True)

In [None]:
# format date-like columns to actual dates and year format
df = df.withColumn('date_added', f.to_date(f.unix_timestamp(df.date_added, 'MMMM dd, yyyy').cast('timestamp'))) \
       .withColumn('release_year', f.to_date(f.unix_timestamp(df.release_year, 'yyyy').cast('timestamp')))

In [None]:
df.limit(3).toPandas()

In [None]:
movie_df = df.filter(f.col("type") == "Movie")
movie_df = movie_df.withColumn("duration", f.regexp_extract("duration", '\s*(\d*)\s*min', 1).cast(IntegerType()))

filtered_movies = movie_df.filter((f.col("duration") < 190) & (f.col("release_year") > '2015-01-01'))

In [None]:
filtered_movies.toPandas()

In [None]:
# connect to postgres database
db_conn = psycopg2.connect(database="netflix", host="postgres", user="postgres", password="postgres", port="5432")
cursor = db_conn.cursor()

In [None]:
movies = filtered_movies.select("title","director","country", "release_year", "duration", "description").collect()

In [None]:
for movie in movies:
    sanitized_movie_description = movie.description.replace("'","")
    cursor.execute("INSERT INTO movies(title, director, country, release_year, duration_minutes, description) VALUES (%s, %s, %s, %s::DATE, %s, %s)", (movie.title, movie.director or 'None', movie.country or 'None', movie.release_year, movie.duration, movie.description))

In [None]:
db_conn.commit()

In [None]:
# fetch inserted data
cursor.execute("SELECT * FROM movies;")
cursor.fetchone()