In [None]:
from re import sub

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, DateType, FloatType
from pyspark.sql.functions import col, input_file_name, current_date, date_format, current_timestamp, to_date, split, date_sub, count, when
from datetime import datetime, timedelta

from pyspark.sql import DataFrame
import logging

logger = logging.getLogger("hema-transform")
logger.setLevel(level=logging.INFO)


In [None]:
def visualize(df):
    return df.toPandas().head()

In [None]:
spark = SparkSession.builder.appName("hema-spark").getOrCreate()

In [None]:
FILE_DIRECTORY = "./data"
FILENAME = "train.csv"

# read file
ingested_df = spark.read.format("csv").option("header",True).load(f"{FILE_DIRECTORY}/{FILENAME}")

# raw file will be saved like this in the raw/landing zone

In [None]:
dtypes = {
    'rowId': IntegerType(),
    'orderId': StringType(),
    'shipMode': StringType(),
    'customerId': StringType(),
    'customerName': StringType(),
    'segment': StringType(),
    'country': StringType(),
    'city': StringType(),
    'state': StringType(),
    'postalCode': IntegerType(),
    'region': StringType(),
    'productId': StringType(), 
    'category': StringType(),
    'subCategory': StringType(),
    'productName': StringType(),
    'sales': FloatType()
}

augmented_cols = ['filename', "ingestionDate", 'time']
date_columns = ['orderDate', 'shipDate']

# define functions and other variables 
def camel_case(s):
    s = sub(r"(_|-)+", " ", s).title().replace(" ", "")
    return ''.join([s[0].lower(), s[1:]])

def augment_dataframe(df: DataFrame, filename: str):
    if augmented_cols in df.columns:
        df = df.drop(('filename', "ingestionDate", 'time'))
    
    df = (
        df
        .withColumn('filename', filename)
        .withColumn("ingestionDate", current_date())
        .withColumn("curr_timestamp", current_timestamp())
        .withColumn('time', date_format('curr_timestamp', 'HH:mm:ss'))
        .drop(col("curr_timestamp"))
    )
    return df

def ingestion_to_raw(df: DataFrame, filename: str):
    logger.info("Processing from original to raw format...")
    raw_df = augment_dataframe(df, filename)
    logger.info("Done!")
    return raw_df

def raw_to_curated_pipeline(df: DataFrame, filename: str):
    logger.info("Processing from raw to curated format...")
    camelcase_column_mapping = {col: camel_case(col) for col in df.columns}

    # rename columns and fix types
    for col_name in camelcase_column_mapping:
        if col_name in augmented_cols:
            continue

        new_name = camelcase_column_mapping[col_name]
        if new_name in date_columns:
            df = df.withColumnRenamed(col_name, new_name)
            df = df.withColumn(new_name, to_date(col(new_name), "dd/MM/yyyy"))
        else:
            dtype = dtypes[new_name]
            df = df.withColumnRenamed(col_name, new_name)
            df = df.withColumn(new_name, col(new_name).cast(dtype))
  
    # add metadata to dataframe
    curated_df = augment_dataframe(df, filename)
    logger.info("Done!")
    return curated_df


def curated_to_sales(df: DataFrame, filename: str):
    sales = df.select(
        col('orderId'), 
        col('orderDate'), 
        col('shipDate'), 
        col('shipMode'), 
        col('city')
    )
    sales = augment_dataframe(sales)
    return sales

def curated_to_customers(df: DataFrame, filename: str):
    customers_info = (
        df
        .select(
            col('customerId'),
            col('customerName'),
            col('country'),
            col('city'),
            col('segment')
        )
        .withColumn("customerFirstName", split(col("customerName"), " ").getItem(0))
        .withColumn("customerLastName", split(col("customerName"), " ").getItem(1))
        .withColumnRenamed("segment", "customerSegment")
    )

    # calculate the cutoff dates for 5, 15, and 30 days ago
    cutoff_5d = datetime.now() - timedelta(days=5)
    cutoff_15d = datetime.now() - timedelta(days=15)
    cutoff_30d = datetime.now() - timedelta(days=30)

    # group the DataFrame by `customerName` and calculate the total 
    # quantity of orders in the last 5, 15 and 30 days, and the total
    customers_orders = df.groupBy("customerName").agg(
        count(when(col("orderDate") >= cutoff_5d, 1)).alias("quantityOfOrdersLast5Days"),
        count(when(col("orderDate") >= cutoff_15d, 1)).alias("quantityOfOrdersLast15Days"),
        count(when(col("orderDate") >= cutoff_30d, 1)).alias("quantityOfOrdersLast30Days"),
        count(col('orderDate')).alias("TotalQuantityOfOrder")
    )

    customers = customers_info.join(
        customers_orders, customers_info.customerName == customers_orders.customerName
    )

    customers = augment_dataframe(customers, filename)
    return customers

In [None]:
raw = ingestion_to_raw(ingested_df)
curated = raw_to_curated_pipeline(raw)
sales = curated_to_sales(curated)
customers = curated_to_customers(curated)

Sales dataset might contains the attributes below
- orderId
- orderDate (YYYY/MM/DD format) 
- shipDate (YYYY/MM/DD format) 
- shipMode
- city

Customer dataset might contains the attributes below, considering that quantity of orders should be calculated filed based on raw data:
- customerId 
- customerName 
- customerFirstName 
- customeLastName 
- customerSegment 
- country
- city 
- quantityOfOrders(last5Days) 
- quantityOfOrders(last15Days) 
- quantityOfOrders(last30Days) 
- totalQuantityOfOrders

Customer dataset should be rewritten every run of the pipeline

In [None]:
sales = curated.select(
    col('orderId'), 
    col('orderDate'), 
    col('shipDate'), 
    col('shipMode'), 
    col('city')
)
visualize(sales)

In [None]:
customers_info = (
    curated
    .select(
        col('customerId'),
        col('customerName'),
        col('country'),
        col('city'),
        col('segment')
    )
    .withColumn("customerFirstName", split(col("customerName"), " ").getItem(0))
    .withColumn("customerLastName", split(col("customerName"), " ").getItem(1))
    .withColumnRenamed("segment", "customerSegment")
)

# calculate the cutoff dates for 5, 15, and 30 days ago
cutoff_5d = datetime.now() - timedelta(days=5)
cutoff_15d = datetime.now() - timedelta(days=15)
cutoff_30d = datetime.now() - timedelta(days=30)

# group the DataFrame by `customerName` and calculate the total 
# quantity of orders in the last 5, 15 and 30 days, and the total
customers_orders = curated.groupBy("customerName").agg(
    count(when(col("orderDate") >= cutoff_5d, 1)).alias("quantityOfOrdersLast5Days"),
    count(when(col("orderDate") >= cutoff_15d, 1)).alias("quantityOfOrdersLast15Days"),
    count(when(col("orderDate") >= cutoff_30d, 1)).alias("quantityOfOrdersLast30Days"),
    count(col('orderDate')).alias("TotalQuantityOfOrder")
)

customers = customers_info.join(customers_orders, customers_info.customerName == customers_orders.customerName)

In [None]:
visualize(customers)