In [122]:
#%run ./lib.py

In [123]:
from pyspark import SparkConf, SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, DecimalType, TimestampType, DataType, IntegerType
from pyspark.sql.functions import col, lit, trim, substring, concat, udf, upper, initcap
from datetime import datetime
import os

In [124]:
conf = SparkConf() \
    .setAppName("projeto_pbi") \
    .setSparkHome('./spark/home')

LAKE_HOME = os.getenv("LAKE_HOME", "/spark/home")

sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

In [125]:
TABLE_NAME = 'fact_order'

def create_sk(spark, df, key_column_name, table_name):
    sk = {}
    sk = df.select(col(key_column_name).alias("key")).rdd.zipWithIndex()
    new_sk = sk.map(lambda row: list(row[0]) + [row[1] + 1])
    new_sk_map = new_sk.collectAsMap()

    sk_schema = \
        StructType(
            [StructField('key', StringType(), True),
            StructField('SK', LongType(), True)]
        )

    sk_frame = spark.createDataFrame(new_sk, sk_schema)
    sk_frame.write.mode('overwrite').csv('{}/dataset/e-commerce/02_surrogate_key/sk_{}.csv'.format(LAKE_HOME, table_name), header=True)

    return new_sk_map

def locate_sk(mapping: dict):
    return udf(lambda x: mapping.get(x), IntegerType())


def map_sk(spark, table_name):
    sk_table = spark.read.csv('{}/dataset/e-commerce/02_surrogate_key/sk_{}.csv'.format(LAKE_HOME, table_name), header=True)
    dict_sk = sk_table.rdd.map(lambda x: (x[0], int(x[1]))).collectAsMap()
    return dict_sk

In [126]:
df = spark.read.csv('{}/dataset/e-commerce/01_extract/orders_dataset.csv'.format(LAKE_HOME), header=True)

df_order = \
    df.select(
        col('order_id').cast(StringType()).alias('ID_ORDER'),
        col('customer_id').cast(StringType()).alias('ID_CUSTOMER'),
        col('order_status').cast(StringType()).alias('ORD_STATUS'),
        col('order_purchase_timestamp').cast(TimestampType()).alias('DAT_PURCHASE'),
        col('order_approved_at').cast(TimestampType()).alias('DAT_APPROVED'),
        col('order_delivered_carrier_date').cast(TimestampType()).alias('DAT_SHIPPED'),
        col('order_delivered_customer_date').cast(TimestampType()).alias('DAT_DELIVERED'),
        col('order_estimated_delivery_date').cast(TimestampType()).alias('DAT_ESTIMATED_DELIVERY')
    )

customer_sk = map_sk(spark=spark, table_name='dim_customer')

In [138]:
sk = create_sk(spark=spark, df=df_order, key_column_name='ID_ORDER', table_name=TABLE_NAME)

fact_order = \
    df_order \
        .withColumn('SK_ORDER', locate_sk(sk)(col('ID_ORDER'))) \
        .withColumn('SK_CUSTOMER', locate_sk(customer_sk)(col('ID_CUSTOMER')))

                                                                                

In [140]:
fact_order = \
    fact_order.select(
        col('SK_ORDER'),
        col('SK_CUSTOMER'),
        col('ID_ORDER'),
        col('ID_CUSTOMER'),
        col('ORD_STATUS'),
        col('DAT_PURCHASE'),
        col('DAT_APPROVED'),
        col('DAT_SHIPPED'),
        col('DAT_DELIVERED'),
        col('DAT_ESTIMATED_DELIVERY'),   
    )

fact_order.write.mode('overwrite').csv('{}/dataset/e-commerce/03_dim/{}.csv'.format(LAKE_HOME, TABLE_NAME), header=True)

                                                                                