In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import when
from pyspark.sql.types import StructType, StructField, IntegerType
import datetime
import os

master = "spark://zy-ubuntu:7077"  
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--master {master} --driver-memory 4g --total-executor-cores 8 --executor-memory 8g --packages org.postgresql:postgresql:42.1.1 pyspark-shell'

In [2]:
spark = SparkSession.builder \
    .appName("dim casts") \
    .getOrCreate()

In [3]:
title_principals_df = spark.read.csv("title.principals.tsv", sep=r'\t', header=True)
title_principals_df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- ordering: string (nullable = true)
 |-- nconst: string (nullable = true)
 |-- category: string (nullable = true)
 |-- job: string (nullable = true)
 |-- characters: string (nullable = true)



In [4]:
title_principals_df = title_principals_df.drop('ordering').drop('job').drop('category')

title_principals_df = title_principals_df.withColumn("characters", F.regexp_replace(F.col("characters"), '[\[\]\"]', "").alias("replaced"))
title_principals_df = title_principals_df.withColumn('characters', F.explode(F.split('characters', ',')))
title_principals_df = title_principals_df.withColumnRenamed('characters', 'character')
title_casts_df = title_principals_df.filter(
        (F.col('category') == 'self') | 
        (F.col('category') == 'actor') | 
        (F.col('category') == 'actress'))
# title_casts_df.show(truncate=False)   

In [5]:
# read tsv file into df
name_basics_df = spark.read.csv("name.basics.tsv", sep=r'\t', header=True)

# rename column
name_basics_df = name_basics_df.withColumnRenamed('primaryName', 'name')

# calculate age from birth year and death year
name_basics_df = name_basics_df.withColumn("age", when((F.col("birthYear") != '\\N') & (F.col("deathYear") != '\\N'), (F.col('deathYear').cast(IntegerType()) - F.col('birthYear').cast(IntegerType()))).when((F.col("birthYear") != '\\N') & (F.col("deathYear") == '\\N'), (datetime.datetime.now().year - F.col('birthYear')).cast(IntegerType())).otherwise(None))

# create is_alive column based on conditions
is_alive_col = F.when(
    (F.col("birthYear") != '\\N') & (F.col("deathYear") != '\\N'), False
).when((F.col("birthYear") != '\\N') & (F.col("deathYear") == '\\N'), True).otherwise(None)

name_basics_df = name_basics_df.withColumn('is_alive', is_alive_col)

# drop unused columns
name_basics_df_dropped = name_basics_df.drop('primaryProfession', 'knownForTitles', 'birthYear', 'deathYear')

In [6]:
df_casts = title_casts_df.join(name_basics_df_dropped, ['nconst'])
df_casts = df_casts.withColumn('character', F.when(F.col('character') == '\\N', F.lit(None)).otherwise(F.col('character')))

# df_casts.show()

In [7]:
df_upload = df_casts.drop('tconst').drop('character').drop_duplicates(['nconst'])
# df_upload.show()
df_upload.printSchema()

root
 |-- nconst: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- is_alive: boolean (nullable = true)



In [8]:
# insert df into dim_casts table
df_upload.write.format('jdbc').options(
      url='jdbc:postgresql://localhost:5433/imdb',
      driver='org.postgresql.Driver',
      dbtable='dim_casts',
      user='admin',
      password='password'
      ).mode('append').save()

In [9]:
title_desc_df = spark.read.format('jdbc').options(
      url='jdbc:postgresql://localhost:5433/imdb',
      driver='org.postgresql.Driver',
      dbtable='dim_title_desc',
      user='admin',
      password='password'
      ).load()
title_desc_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- tconst: string (nullable = true)
 |-- type: string (nullable = true)
 |-- primary_title: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- is_adult: boolean (nullable = true)
 |-- start_year: short (nullable = true)
 |-- end_year: short (nullable = true)
 |-- runtime_minutes: short (nullable = true)
 |-- av_rating: float (nullable = true)
 |-- num_votes: integer (nullable = true)
 |-- genre_1: string (nullable = true)
 |-- genre_2: string (nullable = true)
 |-- genre_3: string (nullable = true)



In [10]:
title_desc_id_df = title_desc_df.select('id', 'tconst')
title_desc_id_df = title_desc_id_df.withColumnRenamed('id', 'title_id')
# title_desc_id_df.show()

In [11]:
fact_titles_desc_df = title_desc_id_df.drop('tconst').withColumnRenamed('title_id', 'title_desc_id')

In [12]:
# create a df with year, month, day with today's date
now = datetime.datetime.now()

data = [(now.year,now.month,now.day)]

schema = StructType([ \
    StructField("year",IntegerType(),True), \
    StructField("month",IntegerType(),True), \
    StructField("day",IntegerType(),True), \
  ])
 
dl_date_df = spark.createDataFrame(data=data,schema=schema)
dl_date_df.show()

+----+-----+---+
|year|month|day|
+----+-----+---+
|2021|    6|  8|
+----+-----+---+



In [13]:
# insert dataset download date into db
dl_date_df.write.format('jdbc').options(
      url='jdbc:postgresql://localhost:5433/imdb',
      driver='org.postgresql.Driver',
      dbtable='dim_download_date',
      user='admin',
      password='password'
      ).mode('append').save()

In [14]:
dl_date_pg_df = spark.read.format('jdbc').options(
      url='jdbc:postgresql://localhost:5433/imdb',
      driver='org.postgresql.Driver',
      dbtable='dim_download_date',
      user='admin',
      password='password'
      ).load()

In [15]:
dl_date_pg_df.show()

+---+----+-----+---+
| id|year|month|day|
+---+----+-----+---+
|  1|2021|    6|  8|
+---+----+-----+---+



In [16]:
now = datetime.datetime.now()

df_date_id = dl_date_pg_df.filter(
        (F.col('year') == now.year) & 
        (F.col('month') == now.month) &
        (F.col('day') == now.day))
        
dl_date_id = df_date_id.select('id').collect()[0][0]

In [18]:
# fact_titles_desc_df.show()
fact_titles_desc_df = fact_titles_desc_df.withColumn('download_date_id', F.lit(dl_date_id))

In [19]:
fact_titles_desc_df.write.format('jdbc').options(
      url='jdbc:postgresql://localhost:5433/imdb',
      driver='org.postgresql.Driver',
      dbtable='fact_titles',
      user='admin',
      password='password'
      ).mode('append').save()

In [20]:
casts_from_pg_df = spark.read.format('jdbc').options(
      url='jdbc:postgresql://localhost:5433/imdb',
      driver='org.postgresql.Driver',
      dbtable='dim_casts',
      user='admin',
      password='password'
      ).load()
casts_from_pg_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: short (nullable = true)
 |-- is_alive: boolean (nullable = true)
 |-- nconst: string (nullable = true)



In [21]:
df_casts.printSchema()
title_desc_id_df.printSchema()

root
 |-- nconst: string (nullable = true)
 |-- tconst: string (nullable = true)
 |-- character: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- is_alive: boolean (nullable = true)

root
 |-- title_id: integer (nullable = true)
 |-- tconst: string (nullable = true)



In [22]:
casts_id_df = casts_from_pg_df.select('id', 'nconst')
casts_id_df = casts_id_df.withColumnRenamed('id', 'cast_id')

In [23]:
df_casts_composite_table = df_casts.select('nconst', 'tconst', 'character')

In [24]:
df_tmp = casts_id_df.join(df_casts_composite_table, ['nconst'])

In [25]:
df_titles_casts = df_tmp.join(title_desc_id_df, ['tconst'])
df_titles_casts = df_titles_casts.drop('nconst').drop('tconst')
df_titles_casts.show()

+-------+--------------------+--------+
|cast_id|           character|title_id|
+-------+--------------------+--------+
| 228806|         Bert Duncan|     382|
| 711763|       Wedding Guest|     382|
|1390991|       Wedding Guest|     382|
|1892894|Nat Berry - The L...|     382|
|2083602|Polly Berry - the...|     382|
|2079655|          Tom Atkins|     382|
|1239779|The Young Woman's...|     386|
|1267188|      The Blacksmith|     386|
|1382635|       At Barn Dance|     386|
|1960610|The Young Woman's...|     386|
|2074687|The Young Woman's...|     386|
|2083602|     The Young Woman|     386|
|2088111|                null|     386|
| 483378|       Mary's Mother|     391|
| 579232|          The Artist|     391|
| 616371|                Mary|     391|
|1239779|       Mary's Father|     391|
|1523742|      Bobby's Mother|     391|
|1960610|               Bobby|     391|
|2316200|The Artist's Fiancée|     391|
+-------+--------------------+--------+
only showing top 20 rows



In [26]:
df_titles_casts.count()

5307973

In [27]:
# insert df into titles_casts table
df_titles_casts.write.format('jdbc').options(
      url='jdbc:postgresql://localhost:5433/imdb',
      driver='org.postgresql.Driver',
      dbtable='titles_casts',
      user='admin',
      password='password'
      ).mode('append').save()