In [0]:
from pyspark.sql.functions import (
    col, sum as _sum, filter, when,concat, coalesce, lit, trim, expr, substring, locate,date_format,count,collect_set,concat_ws,countDistinct, first,
     lower, rtrim, split, regexp_extract, regexp_replace, array_max)
import datetime
import re
import seaborn as sns
import sys
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from scipy import stats

spark.conf.set("spark.sql.parquet.datetimeRebaseModeInRead", "LEGACY") # Spark configuration for Parquet datetime handling

In [0]:
def load_dataset(table_names, dataframe_names):
    df_dict={}
    if len(table_names) != len(dataframe_names):
        raise ValueError("Mismatch between table names and dataframe names.")
    for i in range(len(table_names)):
        df_dict[dataframe_names[i]] = spark.table(table_names[i])
        globals().update(df_dict)

In [0]:
def trim_and_lower(df, col_list):
    for i in col_list:
        df = df.withColumn(i, trim(lower(col(i))))
    return df


In [0]:
def format_actual_hours(df_actual_hours_raw):

    df_actual_hours_date_fix = (df_actual_hours_raw
        .withColumn('ServiceDate', F.to_date('ServiceDate', 'dd-MM-yyyy'))
        .withColumn('ServiceMonthYear', date_format(col('ServiceDate'), 'yyyy-MM'))                         
    )

    keep_columns = ['StockNo', 'TypeCode', 'Hours','ServiceSegment','ServiceMonthYear']

    df_actual_hours_filtered = (df_actual_hours_date_fix
        .select(keep_columns)
    )

    cols_to_trim_actual_hours = ['StockNo','ServiceSegment']

    df_actual_hours_formatted = trim_and_lower(df_actual_hours_filtered, cols_to_trim_actual_hours)

    return df_actual_hours_formatted



In [0]:
def format_EAC_hours(df_EAC_hours_raw):
    keep_columns = ['StockNo', 'ProjectNo','SellPriceCAD','IndustryCode','Model','Stocktype']

    df_EAC_hours_filtered = (df_EAC_hours_raw
        .select(keep_columns)
    )

    cols_to_trim_EAC_hours = ['StockNo', 'ProjectNo','IndustryCode','Model','Stocktype']

    df_EAC_hours_formatted = trim_and_lower(df_EAC_hours_filtered, cols_to_trim_EAC_hours)

    return df_EAC_hours_formatted


In [0]:
def join_EAC_to_actual(df_actual_hours, df_EAC_hours):
    df_joined = (df_actual_hours
        .join(df_EAC_hours,"StockNo","left")
    )

    return df_joined

In [0]:
def clean_joined(df_dirty):
    df_clean = (df_dirty
        .filter(col("ProjectNo").isNotNull())
        .filter(col("hours") > 0)            
    )

    return df_clean

In [0]:
def add_recency_ind(df_with_ServiceMonthYear):
    return df.withColumn('recency_ind', F.when((col('ServiceMonthYear')>= '2020-01'), 1).otherwise(0))

In [0]:
def compress_to_project_level(df_stock_level):
    return (
        df_stock_level
        .groupBy("ProjectNo")
        .agg(
            count("StockNo").alias("stock_count"),
            countDistinct("model").alias("UniqueModelCount"),
            F.median('recency_ind').cast('int').alias('recency_ind'),
            _sum("Hours").alias("hours"),
            _sum("SellPriceCAD").alias("total_sell_price"),
            first("IndustryCode").alias("industry_codes"))
    )

In [0]:
def filter_outliers(df_uncapped):
    list_of_filter_vars = ['total_sell_price', 'hours', 'stock_count']
    list_of_cutoffs = [1E8, 1500, 500]

    for i in range(len(list_of_filter_vars)):
        df_uncapped = df_uncapped.filter(col(list_of_filter_vars[i])<= list_of_cutoffs[i])

    df_filtered = df_uncapped

    return df_filtered

In [0]:
def bc_transform(df_raw_hours):
    df_raw_pd = df_raw_hours.toPandas()
    raw_hours = df_raw_pd['hours']
    transformed_hours, bc_lambda = stats.boxcox(raw_hours)

    df_transformed_pd = df_raw_pd.copy()
    df_transformed_pd['hours'] = transformed_hours

    df_transformed = spark.createDataFrame(df_transformed_pd)

    return df_transformed, bc_lambda