# Setup

In [1]:
import sys
if '/home/hungnp5/ecommerce-datawarehouse' not in sys.path:
    sys.path.append('/home/hungnp5/ecommerce-datawarehouse')
from helpers.config.spark_init import init_spark
from helpers.util.utils import fetch_sql_table, write_sql_table

In [2]:
import pyspark.sql.functions as f
from pyspark.sql.functions import col, count
from pyspark.sql import Window
from functools import reduce

In [3]:
spark, sqlContext = init_spark(application_name='DWH', cores_total=40, cors_per_executors=10)

# Functions

In [16]:
def check_null(df):
    df.select([count(f.when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [17]:
def check_duplicates(df, cols):
    df.groupby(cols).count().where('count > 1').show(truncate=False)

# Fetch Data

In [10]:
staging_customers = fetch_sql_table(spark, db='STAGING', db_table='STAGING_CUSTOMERS')
staging_order_payments = fetch_sql_table(spark, db='STAGING', db_table='STAGING_ORDER_PAYMENTS')
staging_orders = fetch_sql_table(spark, db='STAGING', db_table='STAGING_ORDERS')
staging_products = fetch_sql_table(spark, db='STAGING', db_table='STAGING_PRODUCTS')
staging_sellers = fetch_sql_table(spark, db='STAGING', db_table='STAGING_SELLERS')

# Inspect

## Count

In [11]:
print(f"Customers: {staging_customers.count()}")
print(f"Order Payments: {staging_order_payments.count()}")
print(f"Orders: {staging_orders.count()}")
print(f"Products: {staging_products.count()}")
print(f"Sellers: {staging_sellers.count()}")

Customers: 96096
Order Payments: 103886
Orders: 114100
Products: 32951
Sellers: 3095


# ETL

# Customer

In [25]:
customers = staging_customers.where('CUSTOMER_GEO_LAT is not null and CUSTOMER_GEO_LONG is not null')

In [26]:
check_null(customers)
        .withColumn('CUSTOMER_KEY', f.monotonically_increasing_id())

+------------------+----------------+-------------+--------------+----------------+-----------------+
|CUSTOMER_UNIQUE_ID|CUSTOMER_ZIPCODE|CUSTOMER_CITY|CUSTOMER_STATE|CUSTOMER_GEO_LAT|CUSTOMER_GEO_LONG|
+------------------+----------------+-------------+--------------+----------------+-----------------+
|                 0|               0|            0|             0|               0|                0|
+------------------+----------------+-------------+--------------+----------------+-----------------+



In [27]:
write_sql_table(customers, db='DWH', db_table='CUSTOMERS', mode='append')

# Payment

In [31]:
staging_order_payments.columns

['ORDER_ID',
 'PAYMENT_SEQUENTIAL',
 'PAYMENT_TYPE',
 'PAYMENT_INSTALLMENTS',
 'PAYMENT_VALUE']

In [32]:
payments = staging_order_payments.withColumn('PAYMENT_TYPE', f.regexp_replace('PAYMENT_TYPE', '_', ''))

In [35]:
write_sql_table(payments, db='DWH', db_table='ORDER_PAYMENTS', mode='append')

# Products

In [44]:
staging_products.columns

['PRODUCT_ID',
 'PRODUCT_CATEGORY_NAME',
 'PRODUCT_CATEGORY_NAME_ENGLISH',
 'PRODUCT_NAME_LENGTH',
 'PRODUCT_DESCRIPTION_LENGTH',
 'PRODUCT_PHOTOS_QTY',
 'PRODUCT_WEIGHT_GRAMS',
 'PRODUCT_LENGTH_CM',
 'PRODUCT_HEIGHT_CM',
 'PRODUCT_WIDTH_CM']

In [42]:
products = staging_products.withColumn('PRODUCT_CATEGORY_NAME', f.regexp_replace('PRODUCT_CATEGORY_NAME', '_', ''))\
    .withColumn('PRODUCT_CATEGORY_NAME_ENGLISH', f.regexp_replace('PRODUCT_CATEGORY_NAME_ENGLISH', '_', ''))

In [43]:
write_sql_table(products, db='DWH', db_table='PRODUCTS', mode='append')

AnalysisException: Column "PRODUCT_PHOTOS_QTY" not found in schema Some(StructType(StructField(PRODUCT_KEY,LongType,false), StructField(PRODUCT_ID,StringType,false), StructField(PRODUCT_CATEGORY_NAME,StringType,false), StructField(PRODUCT_CATEGORY_NAME_ENGLISH,StringType,false), StructField(PRODUCT_NAME_LENGTH,IntegerType,false), StructField(PRODUCT_DESCRIPTION_LENGTH,IntegerType,false), StructField(PRODUCT_PHOTO_QTY,IntegerType,false), StructField(PRODUCT_WEIGHT_GRAMS,IntegerType,false), StructField(PRODUCT_LENGTH_CM,IntegerType,false), StructField(PRODUCT_HEIGHT_CM,IntegerType,false), StructField(PRODUCT_WIDTH_CM,IntegerType,false)))