In [None]:
import json
import os

import numpy as np
import pandas as pd
import pyspark.sql.functions as F
import requests
from dotenv import dotenv_values
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, LongType

In [None]:
SECRET = dotenv_values('.env')

spark = (SparkSession.builder.master("local")
         .config("spark.jars", SECRET["PSQL_JAR"]) # Path to PSQL jar in local
         .appName('etl')
         .getOrCreate())

In [None]:
def load_in_database(df, table_name, mode='append'):
    """Load pyspark dataframe into PSQL database"""

    jdbc_url = f'jdbc:postgresql://localhost:{SECRET["PORT"]}/{SECRET["DATABASE_NAME"]}'

    (df.write
     .format("jdbc")
     .option("url", jdbc_url)
     .option("driver", "org.postgresql.Driver")
     .option("dbtable", f'{table_name}')
     .option("user", SECRET["DATABASE_USERNAME"])
     .option("password", SECRET["DATABASE_PASSWORD"])
     .mode(mode)
     .save())

    print(f'Successfully write {table_name} dataframe into database')

In [None]:
def read_from_blob(filename, custom_schema):
    """Read CSV file, from MovieLens dataset, with defined schema"""

    current_directory = os.getcwd()
    filepath = f'{current_directory}/blob/{filename}.csv'

    df = (spark.read
          .format("csv")
          .schema(custom_schema)
          .option("header", True)
          .option("sep", ",")
          .load(filepath))

    print(f'Successfully read {filename} file from blob')
    return df

In [None]:
rating_schema = StructType([
    StructField("userId", IntegerType(), False),
    StructField("movieId", IntegerType(), False),
    StructField("rating", FloatType(), False),
    StructField("timestamp", LongType(), False)
])
rating_df = read_from_blob('ratings', rating_schema)

In [None]:
links_schema = StructType([
    StructField("movieId", IntegerType(), False),
    StructField("imdbId", IntegerType(), False),
    StructField("tmdbId", IntegerType(), False)
])
links_df = read_from_blob('links', links_schema)

In [None]:
# Specifically use inner join to remove incompatible data
rating_df = rating_df.join(links_df, on='movieId', how='inner')

rating_df = rating_df.drop('movieId', 'timestamp', 'imdbId').dropna()

# Rename columns to database table columns name
column_name_mapping = {'userId': 'user_id',
                  'tmdbId': 'movie_id',
                  'rating': 'rating'}
rating_df = rating_df.select(*[F.col(old_name).alias(new_name) for old_name, new_name in column_name_mapping.items()])

rating_df.printSchema()

In [None]:
# Use TMDB data for more information about movie
headers = {
    "accept": "application/json",
    "Authorization": f"Bearer {SECRET['API_KEY']}"
}

In [None]:
genres_url = "https://api.themoviedb.org/3/genre/movie/list?language=en"
response = requests.get(genres_url, headers=headers)
response.raise_for_status()

genres_pdf = pd.DataFrame(json.loads(response.text)['genres'])
genres_pdf = genres_pdf.rename(columns={'name': 'genre'})
genres_table = spark.createDataFrame(genres_pdf)

genres_table.printSchema()

load_in_database(genres_table, 'genres')

In [None]:
# Split movie_ids into 100 chunks
movie_np_array = rating_df.select('movie_id').distinct().toPandas().values.reshape(-1)
movie_chunks_list = np.array_split(movie_np_array, 100)

# Some movies in dataset are outdated in TMDB database
fail_request_movie_id = []

for chunk in movie_chunks_list:
    success_request_chunk = []

    for movie_id in chunk:
        try:
            url = f"https://api.themoviedb.org/3/movie/{movie_id}?language=en-US"
            response = requests.get(url, headers=headers)
            response.raise_for_status()
            success_request_chunk.append(json.loads(response.text))

        except requests.exceptions.HTTPError:
            fail_request_movie_id.append(movie_id)

    pdf = pd.DataFrame(success_request_chunk)

    # Filter out useful data
    movies_pdf = pdf.loc[:, ['id', 'title', 'release_date', 'runtime', 'overview', 'popularity',
                             'vote_average', 'vote_count', 'poster_path', 'backdrop_path']]
    # Cast types
    movies_pdf['release_date'] = pd.to_datetime(movies_pdf['release_date'])
    movie_table = spark.createDataFrame(movies_pdf)
    load_in_database(movie_table, 'movies')


    # Normalize JSON format of list of dictionary columns - genres column
    movie_genres_pdf = pdf.loc[:, ['id', 'genres']]
    movie_genres_json = json.loads(movie_genres_pdf.to_json(orient='records'))
    movie_genres_pdf = pd.json_normalize(data=movie_genres_json, record_path='genres', record_prefix='genres_', meta=['id'])

    movie_genres_pdf = movie_genres_pdf[['id', 'genres_id']].drop_duplicates()
    movie_genres_pdf = movie_genres_pdf.rename(columns={'id': 'movie_id', 'genres_id': 'genre_id'})
    movie_genres_table = spark.createDataFrame(movie_genres_pdf)
    load_in_database(movie_genres_table, 'movie_genres')

In [None]:
fail_request_movie_id = [int(value) for value in fail_request_movie_id]

# Filter out not existed movie
rating_table = rating_df.filter(~rating_df.movie_id.isin(fail_request_movie_id))

In [None]:
user_table = rating_table.select(rating_table.user_id.alias('id')).distinct()

# Generalize username, password for user in dataset
user_table = user_table.withColumn('username',
                                   F.concat(F.lit('username'), F.monotonically_increasing_id().cast('string')))
user_table = user_table.withColumn('password',
                                   F.concat(F.lit('password'), F.monotonically_increasing_id().cast('string')))
load_in_database(user_table, 'users')

In [None]:
rating_table = rating_table.dropDuplicates(['user_id', 'movie_id'])

load_in_database(rating_table, 'ratings')

In [None]:
spark.stop()