In [1]:
import os
from datetime import datetime

import psycopg2

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from hdfs import InsecureClient

In [2]:
spark = SparkSession.builder\
     .config('spark.driver.extraClassPath'
            , '/home/user/shared_folder/Distrib/postgresql-42.2.23.jar')\
    .master('local')\
    .appName("lesson_13")\
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/22 12:50:47 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/02/22 12:50:47 WARN util.Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
hdfs_url = 'http://127.0.0.1:50070/'

In [4]:
pg_creds = {
    'host': '192.168.1.56',
    'port': '5432',
    'database': 'pagila',
    'user': 'pguser',
    'password': 'secret',
}

gp_url = "jdbc:postgresql://192.168.1.56:5433/gp"
gp_properties = {"user": "gpuser", "password": "secret"}

In [5]:
tables_to_load = (
          'film',
          'film_actor',
          'actor',
          'film_category',
          'category',
          'language'
         )

In [6]:
current_date = datetime.now().strftime('%Y-%m-%d')
current_date

'2022-02-22'

In [7]:
# Load to Bronze

In [8]:
client = InsecureClient(hdfs_url, user='user')

for table in tables_to_load:
    
    with psycopg2.connect(**pg_creds) as pg_connection:
        cursor = pg_connection.cursor()
    
        with client.write(
            os.path.join('/', 'datalake', 'bronze', 'pagila', table, current_date, table + '.csv'), overwrite=True
        ) as csv_file:
            cursor.copy_expert(f"COPY {table} TO STDOUT WITH HEADER CSV", 
                                   csv_file)

In [9]:
# LOAD TO SILVER

In [10]:
bronze_film_df = spark.read\
    .load(os.path.join('/', 'datalake', 'bronze', 'pagila', 'film', current_date, 'film.csv'),
          header = "true",
          inferSchema = "true",
          format = "csv"
         )

In [11]:
bronze_film_df.count()

2000

In [12]:
bronze_film_df = bronze_film_df\
    .drop('special_features')\
    .drop('fulltext')\
    .drop('last_update')\
    .drop('original_language_id')

bronze_film_df = bronze_film_df\
    .where(F.col('title').isNotNull() & F.col('description').isNotNull())

bronze_film_df = bronze_film_df.dropDuplicates()

In [13]:
bronze_film_df.count()

                                                                                

997

In [14]:
bronze_film_df.first()

Row(film_id=415, title='HIGH ENCINO', description='A Fateful Saga of a Waitress And a Hunter who must Outrace a Sumo Wrestler in Australia', release_year=2006, language_id=1, rental_duration=3, rental_rate=2.99, length=84, replacement_cost=23.99, rating='R')

In [15]:
bronze_film_df.write\
    .partitionBy('rating')\
    .parquet(os.path.join('/', 'datalake', 'silver', 'film'), mode = 'overwrite')

22/02/22 12:51:36 WARN hdfs.DataStreamer: Caught exception       (43 + 1) / 200]
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1252)
	at java.lang.Thread.join(Thread.java:1326)
	at org.apache.hadoop.hdfs.DataStreamer.closeResponder(DataStreamer.java:980)
	at org.apache.hadoop.hdfs.DataStreamer.endBlock(DataStreamer.java:630)
	at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:807)
                                                                                

In [16]:
film_actor_df = spark.read\
    .load(os.path.join('/', 'datalake', 'bronze', 'pagila', 'film_actor', current_date, 'film_actor.csv')
                                , header="true"
                                , inferSchema="true"
                                , format="csv")

In [17]:
film_actor_df.write\
    .parquet(os.path.join('/', 'datalake', 'silver', 'film_actor'), mode='overwrite')

In [18]:
actor_df = spark.read\
    .load(os.path.join('/', 'datalake', 'bronze', 'pagila', 'actor', current_date, 'actor.csv')
                                , header="true"
                                , inferSchema="true"
                                , format="csv")

In [19]:
actor_df.write\
    .parquet(os.path.join('/', 'datalake', 'silver', 'actor'), mode='overwrite')

In [20]:
film_category_df = spark.read\
    .load(os.path.join('/', 'datalake', 'bronze', 'pagila', 'film_category', current_date, 'film_category.csv')
                                , header="true"
                                , inferSchema="true"
                                , format="csv")

In [21]:
film_category_df.write\
    .parquet(os.path.join('/', 'datalake', 'silver', 'film_category'), mode='overwrite')

In [22]:
category_df = spark.read\
    .load(os.path.join('/', 'datalake', 'bronze', 'pagila', 'category', current_date, 'category.csv')
                                , header="true"
                                , inferSchema="true"
                                , format="csv")

In [23]:
category_df.write\
    .parquet(os.path.join('/', 'datalake', 'silver', 'category'), mode='overwrite')

In [24]:
language_df = spark.read\
    .load(os.path.join('/', 'datalake', 'bronze', 'pagila', 'language', current_date, 'language.csv')
                                , header="true"
                                , inferSchema="true"
                                , format="csv")

In [25]:
language_df.write\
    .parquet(os.path.join('/', 'datalake', 'silver', 'language'), mode='overwrite')

In [26]:
# Load to GOLD

In [27]:
film_df = spark.read.parquet(os.path.join('/', 'datalake', 'silver', 'film'))
film_category_df = spark.read.parquet(os.path.join('/', 'datalake', 'silver', 'film_category'))
category_df = spark.read.parquet(os.path.join('/', 'datalake', 'silver', 'category'))
film_actor_df = spark.read.parquet(os.path.join('/', 'datalake', 'silver', 'film_actor'))
actor_df = spark.read.parquet(os.path.join('/', 'datalake', 'silver', 'actor'))
language_df = spark.read.parquet(os.path.join('/', 'datalake', 'silver', 'language'))

In [28]:
film_category_df = film_category_df.join(category_df
                      , film_category_df.category_id == category_df.category_id
                      , 'left')\
                .select(film_category_df.film_id, category_df.name.alias('category_name'))

In [29]:
film_df = film_df.join(film_category_df
                      , film_df.film_id == film_category_df.film_id
                      , 'left')\
                .select(film_df['*'], film_category_df.category_name)

In [30]:
film_actor_df = film_actor_df.join(actor_df
                      , film_actor_df.actor_id == actor_df.actor_id
                      , 'left')\
                .select(film_actor_df.film_id, actor_df.first_name, actor_df.last_name)

film_actor_df = film_actor_df.withColumn('name', F.concat(F.col('first_name'), F.lit(' '), F.col('last_name')))

film_actor_df = film_actor_df.drop('first_name').drop('last_name')

In [31]:
film_df = film_df.join(film_actor_df
                      , film_df.film_id == film_actor_df.film_id
                      , 'left')\
                .select(film_df['*'], film_actor_df.name.alias('actor_name'))

In [32]:
film_df = film_df.join(language_df
                      , film_df.language_id == language_df.language_id
                      , 'left')\
                .select(film_df['*'], language_df.name.alias('language'))

In [33]:
film_df.first()

                                                                                

Row(film_id=35, title='ARACHNOPHOBIA ROLLERCOASTER', description='A Action-Packed Reflection of a Pastry Chef And a Composer who must Discover a Mad Scientist in The First Manned Space Station', release_year=2006, language_id=1, rental_duration=4, rental_rate=2.99, length=147, replacement_cost=24.99, rating='PG-13', category_name='Horror', actor_name='HUMPHREY GARLAND', language='English             ')

In [34]:
film_df.select('film_id','title','rating','category_name','actor_name','language').show()

+-------+--------------------+------+-------------+------------------+--------------------+
|film_id|               title|rating|category_name|        actor_name|            language|
+-------+--------------------+------+-------------+------------------+--------------------+
|     35|ARACHNOPHOBIA ROL...| PG-13|       Horror|  HUMPHREY GARLAND|English             |
|     35|ARACHNOPHOBIA ROL...| PG-13|       Horror|      EWAN GOODING|English             |
|     35|ARACHNOPHOBIA ROL...| PG-13|       Horror|     RITA REYNOLDS|English             |
|     35|ARACHNOPHOBIA ROL...| PG-13|       Horror|        CUBA ALLEN|English             |
|     35|ARACHNOPHOBIA ROL...| PG-13|       Horror|    MORGAN HOPKINS|English             |
|     35|ARACHNOPHOBIA ROL...| PG-13|       Horror|    DARYL WAHLBERG|English             |
|     35|ARACHNOPHOBIA ROL...| PG-13|       Horror|         JUDY DEAN|English             |
|     35|ARACHNOPHOBIA ROL...| PG-13|       Horror|      GRACE MOSTEL|English   

In [35]:
films = film_df.groupBy(
    F.col('film_id'),
    F.col('title'),
    F.col('description'),
    F.col('release_year'),
    F.col('rating'),
    F.col('category_name'),
    F.col('language'),
).agg(F.collect_list(F.col('actor_name')).alias('actors'))

In [36]:
films.select('film_id','title','actors').show(truncate=False)



+-------+-------------------+-----------------------------------------------------------------------------------------------------------------------------------+
|film_id|title              |actors                                                                                                                             |
+-------+-------------------+-----------------------------------------------------------------------------------------------------------------------------------+
|211    |DARLING BREAKING   |[JAYNE NEESON, FRANCES DAY-LEWIS]                                                                                                  |
|50     |BAKED CLEOPATRA    |[MICHELLE MCCONAUGHEY]                                                                                                             |
|957    |WAR NOTTING        |[HUMPHREY WILLIS, FAY WINSLET, SALMA NOLTE, CUBA ALLEN, MINNIE ZELLWEGER, MARY TANDY, NATALIE HOPKINS, HELEN VOIGHT, GRACE MOSTEL] |
|628    |NORTHWEST POLISH   

                                                                                

In [37]:
films.write.parquet(os.path.join('/','datalake','gold','films'), mode = 'overwrite')

                                                                                

In [38]:
# Process to DWH

In [39]:
films_df = spark.read.parquet(os.path.join('/', 'datalake', 'gold', 'films'))

In [40]:
films_df = films.select(
        films['*'], F.explode(F.col('actors')).alias('actor')
    ).drop('actors')

In [41]:
films_df.select('film_id','actor').show()



+-------+--------------------+
|film_id|               actor|
+-------+--------------------+
|    211|        JAYNE NEESON|
|    211|   FRANCES DAY-LEWIS|
|     50|MICHELLE MCCONAUGHEY|
|    957|     HUMPHREY WILLIS|
|    957|         FAY WINSLET|
|    957|         SALMA NOLTE|
|    957|          CUBA ALLEN|
|    957|    MINNIE ZELLWEGER|
|    957|          MARY TANDY|
|    957|     NATALIE HOPKINS|
|    957|        HELEN VOIGHT|
|    957|        GRACE MOSTEL|
|    628|         MARY KEITEL|
|    628|          REESE WEST|
|    628|          CUBA BIRCH|
|    628|    JEFF SILVERSTONE|
|    628|    LAURENCE BULLOCK|
|    628|       RITA REYNOLDS|
|    628|         MAE HOFFMAN|
|    628|MICHELLE MCCONAUGHEY|
+-------+--------------------+
only showing top 20 rows



                                                                                

In [42]:
films_df.write.jdbc(gp_url
                   , table = 'films'
                   , properties = gp_properties
                   , mode = 'overwrite')

22/02/22 13:45:56 WARN spark.HeartbeatReceiver: Removing executor driver with no recent heartbeats: 173412 ms exceeds timeout 120000 ms
22/02/22 13:45:56 WARN spark.SparkContext: Killing executors is not supported by current scheduler.
