In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from airflow.hooks.base_hook import BaseHook
import os

In [2]:
spark = SparkSession.builder\
    .config('spark.driver.extraClassPath', '/home/user/shared/postgresql-42.3.1.jar')\
    .master('local')\
    .appName('homework_lesson_16')\
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark

In [4]:
gp_conn = BaseHook.get_connection('DWH')
gp_url = f"jdbc:postgresql://{gp_conn.host}:{gp_conn.port}/{gp_conn.schema}"
gp_creds = {"user":gp_conn.login, "password":gp_conn.password}

[2021-11-15 19:01:31,670] {base_hook.py:89} INFO - Using connection to: id: DWH. Host: 192.168.1.249, Port: 5433, Schema: postgres, Login: gpuser, Password: XXXXXXXX, extra: None


In [5]:
print(gp_url)

jdbc:postgresql://192.168.1.249:5433/postgres


In [6]:
print(gp_creds)

{'user': 'gpuser', 'password': 'secret'}


# Dimension Clients

In [7]:
df_silver_clients = spark.read.parquet("/silver/dshop/clients")

                                                                                

In [8]:
df_dim_clients = df_silver_clients\
    .select('client_id', F.col('fullname').alias('client_name'))

In [9]:
df_dim_clients.write.jdbc(gp_url, table = 'dim_clients', properties = gp_creds, mode='overwrite')

                                                                                

# Dimension Products

In [10]:
df_silver_aisles = spark.read.parquet("/silver/dshop/aisles")
df_silver_departments = spark.read.parquet("/silver/dshop/departments")
df_silver_products = spark.read.parquet("/silver/dshop/products")

In [11]:
df_dim_products = df_silver_products\
    .join(df_silver_aisles, 'aisle_id', 'left')\
    .join(df_silver_departments, 'department_id', 'left')\
    .select('product_id', 'product_name', 'aisle', 'department')

In [12]:
df_dim_products.write.jdbc(gp_url, table = 'dim_products', properties = gp_creds, mode='overwrite')

                                                                                

# Dimension Date

In [13]:
df_period = spark.createDataFrame(["2000-01-01"], "string").toDF("start")

In [14]:
df_period = df_period.withColumn("stop", F.current_date())

In [15]:
start, stop = df_period.select([F.col(c).cast("timestamp").cast("long") for c in ("start", "stop")])\
    .first()

[Stage 8:>                                                          (0 + 1) / 1]                                                                                

In [16]:
df_dim_date = spark.range(start, stop, 24*60*60)\
    .select(F.col("id").cast("timestamp").cast("date").alias("date"))\
    .withColumn("year", F.year("date"))\
    .withColumn("month", F.month("date"))\
    .withColumn("day", F.dayofmonth("date"))\
    .withColumn("day_of_week", F.dayofweek("date"))\
    .withColumn("day_of_year", F.dayofyear("date"))

In [17]:
df_dim_date.write.jdbc(gp_url, table = 'dim_date', properties = gp_creds, mode='overwrite')

                                                                                

# Fact Orders

In [18]:
df_silver_orders = spark.read.parquet("/silver/dshop/orders")

In [19]:
df_fact_orders = df_silver_orders\
    .select('order_id', 'product_id', 'client_id', 'order_date', 'quantity')

In [20]:
df_fact_orders.write.jdbc(gp_url, table = 'fact_orders', properties = gp_creds, mode='overwrite')

                                                                                

In [21]:
#read actual clients from DB

In [22]:
df_dim_clients = spark.read.jdbc(gp_url, table = 'dim_clients', properties = gp_creds)

In [23]:
df_missing_dim_clients = df_fact_orders\
    .join(df_dim_clients, 'client_id', 'left_anti')\
    .groupby('client_id')\
    .count()\
    .withColumn('client_name', F.lit('Unknown'))\
    .select('client_id', 'client_name')

In [24]:
df_missing_dim_clients.write.jdbc(gp_url, table = 'dim_clients', properties = gp_creds, mode='append')

                                                                                

In [25]:
#read actual products from DB

In [26]:
df_dim_products = spark.read.jdbc(gp_url, table = 'dim_products', properties = gp_creds)

In [27]:
df_missing_dim_products = df_fact_orders\
    .join(df_dim_products, 'product_id', 'left_anti')\
    .groupby('product_id')\
    .count()\
    .withColumn('product_name', F.lit('Unknown'))\
    .withColumn('aisle', F.lit('Unknown'))\
    .withColumn('department', F.lit('Unknown'))\
    .select('product_id', 'product_name', 'aisle', 'department')

In [28]:
df_missing_dim_products.write.jdbc(gp_url, table = 'dim_products', properties = gp_creds, mode='append')

                                                                                


# Fact out_of_stock

In [29]:
df_silver_out_of_stock_app = spark.read.parquet("/silver/out_of_stock_app/")

In [30]:
df_fact_out_of_stock = df_silver_out_of_stock_app

In [31]:
df_fact_out_of_stock.write.jdbc(gp_url, table = 'fact_out_of_stock', properties = gp_creds, mode='overwrite')

                                                                                

In [32]:
#read actual products from DB

In [33]:
df_dim_products = spark.read.jdbc(gp_url, table = 'dim_products', properties = gp_creds)

In [34]:
df_missing_dim_products = df_fact_out_of_stock\
    .join(df_dim_products, 'product_id', 'left_anti')\
    .groupby('product_id')\
    .count()\
    .withColumn('product_name', F.lit('Unknown'))\
    .withColumn('aisle', F.lit('Unknown'))\
    .withColumn('department', F.lit('Unknown'))\
    .select('product_id', 'product_name', 'aisle', 'department')

In [35]:
df_missing_dim_products.write.jdbc(gp_url, table = 'dim_products', properties = gp_creds, mode='append')

                                                                                