This notebook imports Anaplan Sales Forecast extracts (000s) from the data science blob storage account. It appends all forecasts together into a final dataset.

For more information on outcomes, see documentation at this link (https://swirecocacola.sharepoint.com/:w:/s/ERA/Ec3i8vaHr95Ftz03OEsJ5YYBEWVDm6ir3HVcHOzY6bLPNA?e=BigXd6)

#Set Up

##Libraries

In [0]:
#Import spark library
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.types import *

#import Python Libraries
from datetime import date, timedelta

##Snowflake Class
run the script to build the class

In [0]:
%run ./Snowflake_Connection_rgb

##Connect to Blob

In [0]:
#define the storage account and key for our blob storage
storage_account_name = 'saswazuredsdev001'
storage_account_key = dbutils.secrets.get(scope="azure_kv_blob_001", key="azblob-saswazuredsdev-001")

#Define our storage container and file type inside
container_name = "anaplan-sales-forecasting-dump"
file_type = "csv"
file_name = 'Azure Export.csv'

#set configuration to access blob container
spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net",
    dbutils.secrets.get(scope="azure_kv_blob_001", key="azblob-saswazuredsdev-001"))

If you have trouble connecting to the blob storage container, check the access level on the container in Azure. Using the connection method above requires "container" access instead of "private" access.

In [0]:
#view the file in the blob container
display(dbutils.fs.ls(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/"))

#old ways to view the contents of the blob container
#%fs ls wasbs://anaplan-sales-forecasting-dump@saswazuredsdev001.blob.core.windows.net/
#display(dbutils.fs.ls(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/"))

path,name,size,modificationTime
abfss://anaplan-sales-forecasting-dump@saswazuredsdev001.dfs.core.windows.net/Azure Export.csv,Azure Export.csv,7377675924,1673545223000
abfss://anaplan-sales-forecasting-dump@saswazuredsdev001.dfs.core.windows.net/year=2021/,year=2021/,0,0
abfss://anaplan-sales-forecasting-dump@saswazuredsdev001.dfs.core.windows.net/year=2022/,year=2022/,0,0
abfss://anaplan-sales-forecasting-dump@saswazuredsdev001.dfs.core.windows.net/year=2023/,year=2023/,0,0


#Read Data from Middle Extracts
07-04-2022 : 12-30-2022

##Section 1: Build File Path
This section takes a single date and turns it into a file path

###Function: get_date_parts

Parameters
- date

Output
- the year, month, and day as three separate variables. All are integers

In [0]:
def get_date_parts(date):
    #extract date parts
    year = date.year
    month = date.month
    day = date.day
    
    #return each individually
    return year, month, day

###Function: build_path

Parameters
- name of blob container as string
- blob storage account name as string
- date of the file as date
- file name as string

Output
- the path to the file as a string

In [0]:
def build_path(container_name, storage_account_name, date, file_name):
    #extract date parts
    year, month, day = get_date_parts(date)
    
    #create the path name
    #note we are using the updated blob driver
        #see here for details: https://docs.databricks.com/storage/azure-storage.html
    path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/year={year}/month={month}/day={day}/{file_name}"
    
    return path

In [0]:
# path = build_path(container_name, storage_account_name, sat, file_name)

##Section 2: Read Data
This section takes a path and reads the data. It does all necessary transfromations and outputs a dataframe ready to be written

###Build List of Columns to Keep

In [0]:
# #keep only these columns
# columns_keep = ['year',
#                'week',
#                'profit_center',
#                'super_channel',
#                'retailer',
#                'material',
#                'forecast_volume',
#                'forecast_volume_spc']

###Function: build_date_table

Parameters
- none

Output
- dataframe with all saturdays from 2021 to 2023 with corresponding swire years and weeks

In [0]:
def build_date_table():
    #initialize the class
    sfdt = SnowflakeDataTool()
    
    #query the master date table for all saturdays in 2021-2023
    query = "select date, swire_year_number, swire_week_of_year_number \
    from db_swire_bi_p_edw.transformed.dim_mdm_date \
    where date > '2020-12-31' and date < '2023-12-31' \
    and swire_day_of_week_number = 1\
    ;"
    
    #call the sql method, which runs a query
    #store as variable
    date_table = sfdt.sql(query)
    
    #return the date table
    return date_table
    

In [0]:
# date_table = build_date_table()

###Function: read_and_clean_data

Parameters
- path to file as a string
- list of columns to include in output
- date to which the file corresponds
- snowflake date dimension table as a dataframe (output of build_date_table function)

Output
- dataframe with forecasts for current and future weeks
- includes a column with date forecast was pulled

In [0]:
def read_and_clean_data(path, columns_keep, date, date_table):
    #read data in from blob container
    #infer schema and keep header
    df = spark.read.option("inferSchema", "true").option("header", 'true').csv(path)
    
    #keep only needed columns
    df = df.select(columns_keep)
    
    ##filter for only future forecasts
    
    #extract the current year and week for the given date
    year = date_table.filter(col('date') == date).collect()[0][1]
    week = date_table.filter(col('date') == date).collect()[0][2]
    
    #filter for only current or future rows 
    df = (df.filter(col('year') >= year) #keep only current and future years
          .filter( (col('year') > year) | (col('week') >= week) ) #keep all of future years and only current and future weeks from current year
     )
    
    #group by grain and sum over duplicates
        #see documentation for full explanation
    df = (df.groupby('year',
                     'week',
                     'profit_center',
                     'super_channel',
                     'retailer',
                     'material')
          .agg(F.sum('forecast_volume').alias('forecast_volume'),
               F.sum('forecast_volume_spc').alias('forecast_volume_spc')
              )
         )
    
    #add column with date forecast was pulled
    df = df.withColumn('forecast_date',lit(date))
    
    #return filtered df
    return df
    

In [0]:
# df = read_and_clean_data(path,columns_keep,sat,date_table)

##Section 3: Write and Append
This section takes a dataframe and appends it to the delta table

###Function: drop_existing_table

Parameters
- path to hive table as string

Output
- Drop table if it exists

In [0]:
def drop_existing_table(db_table_name):
    #set as false
    table_exist = False
    
    #if table exists return true
    try:
        spark.read.table(db_table_name) # Check if spark can read the table
        table_exist = True        
    except:
        pass
    
    #if table exists, drop it
    if table_exist:
        spark.sql("DROP TABLE " + db_table_name)


###Function: append_to_table

Parameters
- dataframe to append
- delta table to which to append dataframe

Output
- data is appended

In [0]:
def append_to_table(df, db_table_name):
    #append dataframe to table
    #will write new table automatically if doesn't already exist 
    df.write.mode('append').saveAsTable(db_table_name)

##Section 4: Looping through the job
This section pulls data from blob and stores it as a databricks table

###Function: all_saturdays

Parameters
- start date
- end date

Output
- list of all saturdays in time range (inclusive)

In [0]:
#Generate a list of all saturdays in a range 
def all_saturdays(start_date, end_date):
    # Initialize a list to store the dates
    dates = []
    # Calculate the difference in days between the start date and the first Saturday
    delta = 5 - start_date.weekday()
    if delta < 0:
        delta += 7
    # Add the first Saturday to the list
    saturday = start_date + timedelta(days=delta)
    while saturday <= end_date:
        dates.append(saturday)
        # Add 7 days to get the next Saturday
        saturday += timedelta(days=7)
    return dates

###Function: mid_job

Parameters
- first date to look for files
- final date to look for files

Output
- table written to hive storage with all forecasts appended

In [0]:
def mid_job(start_date, end_date):
    
    #set up variables for blob storage
    storage_account_name = 'saswazuredsdev001'
    container_name = "anaplan-sales-forecasting-dump"
    file_name = 'Azure Export.csv'
    
    #set up variables for file manipulation
    #keep only these columns
    columns_keep = ['year',
                    'week',
                    'profit_center',
                    'super_channel',
                    'retailer',
                    'material',
                    'forecast_volume',
                    'forecast_volume_spc']
    #build date table
    date_table = build_date_table()
    
    #table name to save data
    syear = start_date.year
    smonth = start_date.month
    sday = start_date.day
    eyear = end_date.year
    emonth = end_date.month
    eday = end_date.day
    table_name = f'rgb_db.forecast_extract_{syear}_{smonth}_{sday}_{eyear}_{emonth}_{eday}'
    
    #drop destination table if exists
    drop_existing_table(table_name)
    
    
    #creat a list of all file extract dates
    saturdays = all_saturdays(start_date, end_date)
    
    #loop through all saturdays
    for sat in saturdays:
        #build file path
        path = build_path(container_name, storage_account_name, sat, file_name)
        #pull and transform file from blob storage
        df = read_and_clean_data(path,columns_keep,sat,date_table)
        #append to delta table
        append_to_table(df, table_name)
        

In [0]:
#start and end dates of historical upload
start_date = date(2022,7,4)
end_date = date(2022,12,30)
#run job
mid_job(start_date, end_date)

#Read Data from New Extracts
12-31-2022 : 2-9-2023

##Section 1: New Steps
This section adds a function to fill spaces with underscores

###Function: fill_with_underscore

Parameters
- dataframe

Output
- dataframe with spaces in column titles replaced with underscores

In [0]:
def fill_with_underscore(df):
    
    for col in df.columns:
        df = df.withColumnRenamed(col, col.replace(" ", "_"))
        
    return df

###Function: new_read_and_clean_data

Parameters
- path to file as a string
- list of columns to include in output
- date to which the file corresponds
- snowflake date dimension table as a dataframe (output of build_date_table function)

Output
- dataframe with forecasts for current and future weeks
- includes a column with date forecast was pulled

In [0]:
def new_read_and_clean_data(path, columns_keep, date, date_table):
    #read data in from blob container
    #infer schema and keep header
    df = spark.read.option("inferSchema", "true").option("header", 'true').csv(path)
    
    #replace spaces with underscores
    df = fill_with_underscore(df)
    
    #keep only needed columns
    df = df.select(columns_keep)
    
    ##filter for only future forecasts
    
    #extract the current year and week for the given date
    year = date_table.filter(col('date') == date).collect()[0][1]
    week = date_table.filter(col('date') == date).collect()[0][2]
    
    #filter for only current or future rows 
    df = (df.filter(col('year') >= year) #keep only current and future years
          .filter( (col('year') > year) | (col('week') >= week) ) #keep all of future years and only current and future weeks from current year
     )
    
    #group by grain and sum over duplicates
        #see documentation for full explanation
    df = (df.groupby('year',
                     'week',
                     'profit_center',
                     'super_channel',
                     'retailer',
                     'material')
          .agg(F.sum('forecast_volume').alias('forecast_volume'),
               F.sum('forecast_volume_spc').alias('forecast_volume_spc')
              )
         )
    
    #add column with date forecast was pulled
    df = df.withColumn('forecast_date',lit(date))
    
    #return filtered df
    return df
    

##Section 2: New Job
This section pulls data from blob and stores it as a databricks table

###Function: new_job

Parameters
- first date to look for files
- final date to look for files

Output
- table written to hive storage with all forecasts appended

In [0]:
def new_job(start_date, end_date):
    
    #set up variables for blob storage
    storage_account_name = 'saswazuredsdev001'
    container_name = "anaplan-sales-forecasting-dump"
    file_name = 'Azure Export.csv'
    
    #set up variables for file manipulation
    #keep only these columns
    columns_keep = ['year',
                    'week',
                    'profit_center',
                    'super_channel',
                    'retailer',
                    'material',
                    'forecast_volume',
                    'forecast_volume_spc']
    #build date table
    date_table = build_date_table()
    
    #table name to save data
    syear = start_date.year
    smonth = start_date.month
    sday = start_date.day
    eyear = end_date.year
    emonth = end_date.month
    eday = end_date.day
    table_name = f'rgb_db.forecast_extract_{syear}_{smonth}_{sday}_{eyear}_{emonth}_{eday}'
    
    #drop destination table if exists
    drop_existing_table(table_name)
    
    
    #creat a list of all file extract dates
    saturdays = all_saturdays(start_date, end_date)
    
    #loop through all saturdays
    for sat in saturdays:
        #build file path
        path = build_path(container_name, storage_account_name, sat, file_name)
        #pull and transform file from blob storage
        df = new_read_and_clean_data(path,columns_keep,sat,date_table)
        #append to delta table
        append_to_table(df, table_name)
        

In [0]:
#start and end dates of historical upload
start_date = date(2022,12,31)
end_date = date(2023,2,9)
#run job
new_job(start_date, end_date)

#Read Data from Old Extracts
2-27-2021 : 7-3-2022

##Section 1: Old Job

note that I renamed all files in blob storage to 'Azure Export.csv' so this code would work

note that we are missing a file on June 26, 2021. Splitting this code into two sections to avoid error call

note that the file on October 23, 2021 is only 20% the size we expect and has incorrect datatypes. Splitting code to avoid pulling this file

In [0]:
#start and end dates of historical upload
start_date = date(2021,2,27)
end_date = date(2021,6,25)
#run job
new_job(start_date, end_date)

In [0]:
#start and end dates of historical upload
start_date = date(2021,6,27)
end_date = date(2021,10,22)
#run job
new_job(start_date, end_date)

In [0]:
#start and end dates of historical upload
start_date = date(2021,10,24)
end_date = date(2022,7,3)
#run job
new_job(start_date, end_date)

#Append All

In [0]:
#pull all pieces
df1 = spark.read.table('rgb_db.forecast_extract_2021_2_27_2021_6_25')
df2 = spark.read.table('rgb_db.forecast_extract_2021_6_27_2021_10_22')
df3 = spark.read.table('rgb_db.forecast_extract_2021_10_24_2022_7_3')
df4 = spark.read.table('rgb_db.forecast_extract_2022_7_4_2022_12_30')
df5 = spark.read.table('rgb_db.forecast_extract_2022_12_31_2023_2_9')

In [0]:
drop_existing_table('rgb_db.forecast_extract_full')

#make list of df
frames = [df1, df2, df3, df4, df5]
#loop through appending to final table
for df in frames:
    append_to_table(df,'rgb_db.forecast_extract_full')

In [0]:
#make sure table was created
df = spark.read.table('rgb_db.forecast_extract_full')

In [0]:
#view final table
display(df)

year,week,profit_center,sales_office,super_channel,retailer,cold_drink_channel,material,forecast_volume,forecast_volume_spc,forecast_date
2021,10,4400222000,G222,A,0,,154850,0.3333333333333333,0.3333333333333333,2021-02-27
2021,27,4400291000,G291,A,0,,100287,4.374207571092193,4.374207571092193,2021-02-27
2021,33,4400294000,G294,A,0,,100281,2.9997857295907435,2.9997857295907435,2021-02-27
2021,20,4400144000,G144,A,0,,128832,0.16,0.16,2021-02-27
2021,33,4400281000,G281,A,0,,131673,0.0137931034482758,0.0137931034482758,2021-02-27
2021,46,4400281000,G281,A,0,,131673,0.0137931034482758,0.0137931034482758,2021-02-27
2021,21,4400141000,G141,A,0,,116305,6.36267867383132,6.36267867383132,2021-02-27
2021,46,4400143000,G143,A,0,,121750,10.491379217273954,10.491379217273954,2021-02-27
2021,32,4400235000,G235,A,0,,116307,6.391752577319587,6.391752577319587,2021-02-27
2021,15,4400236000,G236,A,0,,121750,2.8308062490047945,2.8308062490047945,2021-02-27


Final Table

name: forecast_extract_full

Schema:  
year - year of the forecasted week  
week - number of the forecasted week  
profit_center - location for the forecast   
super_channel - channel in which retailer is assigned  
retailer - the key account plan for the forecast  
material - unique material id for the forecast  
forecast_volume - raw cases forecasted for the given week, profit center, retailer, and material combination  
forecast_volume_spc - spc cases forecasted for the given week, profit center, retailer, and material combination  
forecast_date - date on which the forecast was made