In [0]:
import pandas as pd
import numpy as np
from IPython.display import display

In [0]:
def read_data_from_unity_catalog():
    # Data via unity catalog
    # Set the current catalog and schema (database) if necessary
    spark.sql("USE CATALOG `edp-apac-uat`") # Unity Catalog name
    spark.sql("USE l1_asurion_apac") # Schema (database) name

    # Query a table
    preprocessed_data = spark.sql("SELECT * FROM demand_forecast") # Table name #spark.read.table
    #preprocessed_data.show()

    # Print the schema to understand data types
    # preprocessed_data.printSchema()
    preprocessed_data.display()
    preprocessed_data = preprocessed_data.toPandas()
    return preprocessed_data

## Thailand

In [0]:
%sql
SELECT count(*) from demand_forecast

In [0]:
preprocessed_data = read_data_from_unity_catalog()
preprocessed_data = preprocessed_data.toPandas()
preprocessed_data.shape

In [0]:
df_th = preprocessed_data[preprocessed_data['Country']=='Thailand']
df_th.shape



In [0]:
df_sg = preprocessed_data[preprocessed_data['Country']=='Singapore']
df_sg.shape


In [0]:

df_my = preprocessed_data[preprocessed_data['Country']=='Malaysia']
df_my.shape

In [0]:
preprocessed_data['Model'].nunique()

In [0]:
#Count non zero records overall
preprocessed_data_non_zero_count = (preprocessed_data['Shipped_Claim']!=0).sum()
preprocessed_data_non_zero_count

In [0]:
#Count non zero records for thailand
TH_non_zero_count = (df_th['Shipped_Claim']!=0).sum()
TH_non_zero_count

## Singapore

In [0]:
df_sg.shape

In [0]:
df_sg['Model'].nunique()

In [0]:
#Count non zero records
SG_non_zero_count = (df_sg['Shipped_Claim']!=0).sum()
SG_non_zero_count

## Malaysia

In [0]:
df_my.shape

In [0]:
df_my['Model'].nunique()

In [0]:
#Count non zero records
MY_non_zero_count = (df_my['Shipped_Claim']!=0).sum()
MY_non_zero_count

In [0]:
preprocessed_data.columns

In [0]:
# To extract raw data as CSV
# preprocessed_data.to_csv('df_raw.csv', index=False)

In [0]:
preprocessed_data.isnull().sum()

In [0]:
preprocessed_data

In [0]:
# to open data in browser
# import dtale
# d = dtale.show(preprocessed_data, open_browser=True)

# # Data Cleaning

In [0]:
preprocessed_data.shape

In [0]:
preprocessed_data.columns

In [0]:
print(preprocessed_data.dtypes)

In [0]:
# drop null values 
preprocessed_data.dropna(subset=['Shipped_Claim'], inplace=True)

In [0]:
preprocessed_data.shape

In [0]:

# Drop rows where both 'Shipped_Claim' and 'Closing_Base' are zero
condition = ~((preprocessed_data['Shipped_Claim'] == 0) & (preprocessed_data['Closing_Base'] == 0)) 
# Filter DataFrame based on condition
preprocessed_data = preprocessed_data[condition]

In [0]:
preprocessed_data

In [0]:
preprocessed_data.shape

In [0]:
preprocessed_data['Model'].nunique()

In [0]:
# read pred succ mapping
df_pred_fixed = pd.read_csv('df_pred_fixed-16feb.csv')
df_pred_fixed.shape

In [0]:
# Fill missing Predecessor in raw data
# Merge preprocessed_data with df_pred_fixed to map each Model to its Predecessor in df_pred_fixed
merged_df = preprocessed_data.merge(df_pred_fixed[['Model', 'Predecessor']], on='Model', how='left', suffixes=('', '_fixed'))
 
# Fill missing Predecessor values in preprocessed_data with those from df_pred_fixed
merged_df['Predecessor'] = merged_df['Predecessor'].fillna(merged_df['Predecessor_fixed'])
 
# Drop the temporary 'Predecessor_fixed' column after filling the missing values
merged_df.drop(columns=['Predecessor_fixed'], inplace=True)
 
# Updating preprocessed_data to reflect these changes
preprocessed_data = merged_df

In [0]:
preprocessed_data.shape

In [0]:
# Drop enteries having Model Age na
preprocessed_data.dropna(subset=['Model_Age_Days'], inplace =True)

In [0]:
preprocessed_data.shape

In [0]:
# Drop rows where 'Model Age' is negative

preprocessed_data = preprocessed_data[preprocessed_data['Model_Age_Days']!='NULL']
preprocessed_data['Model_Age_Days'] = preprocessed_data['Model_Age_Days'].astype(int)
preprocessed_data = preprocessed_data[preprocessed_data['Model_Age_Days']>= 0].reset_index(drop=True)

In [0]:
#preprocessed_data['Model_Age_Days'].nunique() 
#Launch date is not available for those records

count_nulls=(preprocessed_data['Model_Age_Days'] == 'NULL').sum()
count_nulls


In [0]:
preprocessed_data.shape

In [0]:
count_nulls=(preprocessed_data['Model_Series'] == 'NA').sum()
count_nulls

In [0]:
#validating null, n/a, na values

#list(preprocessed_data['Model_Series'].unique()) #'NA'
# list(preprocessed_data['Predecessor'].unique()) #'N/A', 'NA', nan

list(preprocessed_data['Successor'].unique()) #'N/A', 'NA'

# list(preprocessed_data['Model_No_Color'].unique()) #'Watch', 'Ke',
#  'KEY2',
#  'KEY2LEC',
#  'KEY2LE',
#  'KEY2LESL',
#   None,

In [0]:
# Drop rows where values are NA, N/A, nan, NULL in predecessor
preprocessed_data['Predecessor'] = preprocessed_data['Predecessor'].replace('NA', 'None')
preprocessed_data['Predecessor'] = preprocessed_data['Predecessor'].replace('N/A', 'None')
preprocessed_data['Predecessor'] = preprocessed_data['Predecessor'].replace('NULL', 'None')
preprocessed_data['Predecessor'].fillna('None', inplace=True)


In [0]:
# Drop rows where values are NA, N/A, nan, NULL in successor
preprocessed_data['Successor'] = preprocessed_data['Successor'].replace('NA', 'None')
preprocessed_data['Successor'] = preprocessed_data['Successor'].replace('N/A', 'None')
preprocessed_data['Successor'] = preprocessed_data['Successor'].replace('NULL', 'None')
preprocessed_data['Successor'].fillna('None', inplace=True)
# preprocessed_data['Successor'].fillna('None', inplace=True)
# preprocessed_data['Model_Series'].fillna('None', inplace=True)

In [0]:
# Drop rows where values are NA, N/A, nan, NULL in Model_Series
preprocessed_data['Model_Series'] = preprocessed_data['Model_Series'].replace('NA', 'None')
preprocessed_data['Model_Series'] = preprocessed_data['Model_Series'].replace('N/A', 'None')
preprocessed_data['Model_Series'] = preprocessed_data['Model_Series'].replace('NULL', 'None')
preprocessed_data['Model_Series'].fillna('None', inplace=True)

In [0]:
# Drop rows where values are NA, N/A, nan, NULL in Model_No_Color
preprocessed_data['Model_No_Color'] = preprocessed_data['Model_No_Color'].replace('NA', 'None')
preprocessed_data['Model_No_Color'] = preprocessed_data['Model_No_Color'].replace('N/A', 'None')
preprocessed_data['Model_No_Color'] = preprocessed_data['Model_No_Color'].replace('NULL', 'None')
preprocessed_data['Model_No_Color'].fillna('None', inplace=True)

In [0]:
preprocessed_data.shape

In [0]:
preprocessed_data.isna().sum()

In [0]:
preprocessed_data['Product_Launch_Date'].unique()

In [0]:

preprocessed_data_filtered1 = preprocessed_data # backup var
preprocessed_data_filtered1.shape
# drop rows with null values for product launch date
preprocessed_data = preprocessed_data[preprocessed_data['Product_Launch_Date']!='NULL']

In [0]:
#convert mixed date formats into standard 
import pandas as pd 
def parse_mixed_dates(date_str): # Define the date formats to try 
    date_formats = ['%Y-%m-%d', '%d-%m-%Y'] 

    # Iterate over the date formats and try to convert the date string to a datetime object 
    for fmt in date_formats: 
        try: 
            return pd.to_datetime(date_str, format=fmt) 
        except ValueError: 
            continue 
    # If none of the formats work, return pd.NaT 
    return pd.NaT # Assuming 'df' is your DataFrame and 'mixed_date_col' is the column with mixed date formats

preprocessed_data['Product_Launch_Date'] = preprocessed_data['Product_Launch_Date'].apply(parse_mixed_dates)
# Verify the result 
print(preprocessed_data['Product_Launch_Date'].head())

In [0]:
preprocessed_data['Product_Launch_Date'].unique()

In [0]:
preprocessed_data['Model_Series_Launch_Date'].unique()

In [0]:
preprocessed_data['Model_Series_Launch_Date'] = preprocessed_data['Model_Series_Launch_Date'].apply(parse_mixed_dates)

In [0]:
preprocessed_data['Model_Series_Launch_Date'].unique()

In [0]:
preprocessed_data.isnull().sum()

In [0]:
# Extract year and month into new columns for 'Product_Launch_Date' 
preprocessed_data['Product_Launch_Year'] = preprocessed_data['Product_Launch_Date'].dt.year 
preprocessed_data['Product_Launch_Month'] = preprocessed_data['Product_Launch_Date'].dt.month 
# Extract year and month into new columns for 'Model_Series_Launch_Date' 
preprocessed_data['Model_Series_Launch_Year'] = preprocessed_data['Model_Series_Launch_Date'].dt.year 
preprocessed_data['Model_Series_Launch_Month'] = preprocessed_data['Model_Series_Launch_Date'].dt.month 

In [0]:
preprocessed_data.isnull().sum()

In [0]:
preprocessed_data.display()

In [0]:
# Make Model Type columns
type_patterns = ['PLUS', 'ULTRA', 'FOLD', 'EDGE', 'DUO', 'FLIP', 'FE','STAR','LITE','PRO MAX', 'PRO', 'MINI', 'MAX']

# Extract MODEL_TYPE based on patterns
preprocessed_data['Model_Type'] = preprocessed_data['Model_Family'].str.extract('(' + '|'.join(type_patterns) + ')', expand=False)

# Fill missing values in MODEL_TYPE with 'Regular'
preprocessed_data['Model_Type'].fillna('BASIC', inplace=True)

In [0]:
preprocessed_data.display()

In [0]:
preprocessed_data.sort_values(by='YearMonth', inplace=True)

In [0]:
preprocessed_data.display()

In [0]:
preprocessed_data.shape

In [0]:
# Convert the 'YearMonth' column to datetime (ignoring missing values)
preprocessed_data['YearMonth'] = pd.to_datetime(preprocessed_data['YearMonth'], format='%Y%m', errors='coerce')
 
# Extract the 'Year' and 'Month' into new columns
preprocessed_data['Year'] = preprocessed_data['YearMonth'].dt.year
preprocessed_data['Month'] = preprocessed_data['YearMonth'].dt.month

In [0]:
# # Import necessary libraries  
# from pyspark.sql import SparkSession  
  
# # Create a Spark Session  
# spark = SparkSession.builder.getOrCreate()  

# # Convert the pandas DataFrame to a Spark DataFrame  
# spark_df = spark.createDataFrame(preprocessed_data)  
  
# # Write the Spark DataFrame to DBFS in Delta format  
# spark_df.write.format("delta").save("/preprocessed_data_15feb2024")
# spark.sql("USE CATALOG `edp-apac-uat`") # Unity Catalog name
# spark.sql("USE l1_asurion_apac") # Schema (database) name
# # Create a table in the Databricks Unified Data Catalog  
# spark.sql("CREATE TABLE preprocessed_data_15feb USING DELTA LOCATION '/preprocessed_data_15feb2024'") 

list(preprocessed_data.columns)

In [0]:
# Storing ML results into unity catalog

from pyspark.sql import SparkSession
from pyspark.sql.functions import abs, col
import pyspark.sql.functions as F

# Initialize Spark Session
spark = SparkSession.builder.appName("preprocessed_data_15feb2024").getOrCreate()
# Set the current catalog and schema (database) if necessary
spark.sql("USE CATALOG `edp-apac-uat`") # Unity Catalog name
spark.sql("USE l1_asurion_apac") # Schema (database) name

# Create DataFrame
results_df = spark.createDataFrame(preprocessed_data, schema=list(preprocessed_data.columns))

# Display the DataFrame
results_df.show()

# Write the DataFrame to a Unity Catalog table
results_df.write.mode("overwrite").saveAsTable("l1_asurion_apac.preprocessed_data_15feb2024")