# The purpose of this notebook is to load business plans from an Excel file into a existing budget planning table.

Step 1 - Importing Python packages needed

In [1]:
import pandas as pd
import numpy as np
import uuid
from datetime import datetime
from pyspark.sql.types import *

StatementMeta(, d310942a-9e38-4672-a7d6-790e2493c9c5, 3, Finished, Available, Finished)

Step 2 - Importing the excel file into Pandas data frame. Interesting piece of code being used in this cell:

pandas.melt() is a function in the pandas library that "unpivots" or "melts" a DataFrame from a wide format to a long format.  Think of it like taking columns and stacking them into rows.  This is incredibly useful for preparing data for analysis or visualization, especially when working with data that has multiple measurements or variables associated with a single observation.

In [2]:
#path to excel file with targets 
excel_path = "./builtin/fake_budget_update1.xlsx"
#excel sheet name with targets
sheet_nam = "Sheet1"
#reading the excel file into pandas data frame
df = pd.read_excel(excel_path, sheet_name=sheet_nam)

#looping through all the values in data frame and replacing white space with null
for col in df.columns:
    df[col] = df[col].replace(r'^\s*$', np.nan, regex=True)
#dropping all rows where all values are null
df = df.dropna(how="all")
#transposing all the columns except for the ones in the array id_vars is set to
df = pd.melt(df, id_vars=["Plan Version", "Model", "Metric"], value_vars=None, var_name="Time Period", value_name="value")
#putting all columns in the data frame into cammel case
df.columns = ['_'.join(col.strip().split()).lower() for col in df.columns]
#converts time period to string
df['time_period'] = df['time_period'].astype(str)

StatementMeta(, d310942a-9e38-4672-a7d6-790e2493c9c5, 4, Finished, Available, Finished)

Step 3 - Adding Columns to facilitate the type 2 table: Record Start, Record End, Transaction Type, Active Record Flag, Unique ID. Interesting Piece of code being used in this cell:

uuid.UUID5 - uuid5 takes the namespace UUID and the name string as input. It then combines them and uses the SHA-1 hashing algorithm to produce a 128-bit UUID. Crucially, the same namespace and name will always produce the same UUID. This is the key benefit of version 5 UUIDs.

Why use uuid5?

- Reproducibility: If you have a name and a namespace, you can reliably generate the same UUID every time. This is useful for creating consistent identifiers for resources, especially when you need to refer to the same resource across different systems or databases.
- Uniqueness within a namespace: While not guaranteed to be globally unique like version 4 UUIDs (randomly generated), version 5 UUIDs provide uniqueness within the context of a given namespace. This is often sufficient and avoids the storage overhead of truly random UUIDs when you have a well-defined namespace.
- Avoiding collisions: By using a namespace, you reduce the risk of collisions (two different names generating the same UUID) compared to just hashing the name directly.


In [3]:
#adding additional columns 
df['record_valid_start'] = pd.to_datetime('1900-01-01')
df['record_valid_end'] = pd.Timestamp.max
df['transaction_typ'] = 'I'
df['active_record_flag'] = True
#calculating a unique id by using uuid5 
namespace = namespace = uuid.UUID('1f925fee-159c-44e3-b9d0-aadbf58cf7ff')
df['id'] = df.apply(lambda row: str(uuid.uuid5(namespace, '-'.join([str(row['plan_version']), str(row['model']), str(row['metric']), str(row['time_period'])]))), axis=1)
#moving the id column to the front of the data frame
df = df[['id'] + [col for col in df.columns if col != 'id']]

StatementMeta(, d310942a-9e38-4672-a7d6-790e2493c9c5, 5, Finished, Available, Finished)

Step 4 - Converting the Pandas data frame to a spark data frame. You define a StructType for a Spark DataFrame for several important reasons, all related to ensuring data quality, consistency, and efficient processing:

- Schema Enforcement:  A StructType defines the schema of your DataFrame.  This includes the names of the columns, their data types (e.g., String, Integer, Double, Date), and whether they can be null.  By explicitly defining the schema, you enforce it when reading or creating the DataFrame.  This helps prevent data inconsistencies and errors.  Spark can then validate the data against the schema.  If data doesn't conform, Spark can either reject it or handle it according to your configuration (e.g., by filling in default values).
- Data Type Safety:  Specifying data types in the StructType ensures that Spark treats the data correctly. For example, if you intend to perform numerical calculations on a column, you must define it as a numeric type (Integer, Double, etc.).  Without a schema, Spark might infer the wrong data type (e.g., String) and cause errors when you try to perform calculations.
- Performance Optimization: When Spark knows the data types of your columns upfront (thanks to the StructType), it can optimize query execution.  It can choose more efficient data storage formats and processing strategies.  Without a schema, Spark has to infer the data types, which can be slower.
- Metadata Management: The StructType provides metadata about your data, which can be very useful for data governance and documentation.  It makes it clear what each column represents and what type of data it should contain.
- Interoperability:  Defining a schema makes it easier to exchange data between different systems.  If you have a well-defined schema, other systems can understand the structure and data types of your Spark DataFrame.
- Data Validation:  As mentioned earlier, you can use the schema for data validation. You can specify constraints within the schema, such as allowing nulls or setting maximum/minimum values. This way, you catch data quality issues early on.
- Working with Nested Data: StructType is essential for working with nested data structures.  You can define complex schemas that include nested structs, arrays, and maps. This is crucial for handling semi-structured data like JSON.


In [4]:
#setting up a spark data frame so spark does not incorrectly infer the schma
schema = StructType([
    StructField("id", StringType(), False),
    StructField("plan_version", StringType(), False),
    StructField("model", StringType(), False),
    StructField("metric", StringType(), False),
    StructField("time_period", StringType(), False),
    StructField("value", DoubleType(), True),
    StructField("record_valid_start", DateType(), False),
    StructField("record_valid_end", DateType(), False),
    StructField("transaction_typ", StringType(), False),
    StructField("active_record_flg", BooleanType(), False) 
])
#converting the pandas data frame to a spark data frame
spark_df = spark.createDataFrame(df, schema=schema)
#create temporary view with the records that we want to merge in to the existing table
spark_df.createOrReplaceTempView("targets_update")

StatementMeta(, d310942a-9e38-4672-a7d6-790e2493c9c5, 6, Finished, Available, Finished)

Step 5 - Storing the current date and time when the notebook runs allows us to consistently record the start time for the Type 2 table records.

In [5]:
#storing the date time 
record_update_time= datetime.now()

StatementMeta(, d310942a-9e38-4672-a7d6-790e2493c9c5, 7, Finished, Available, Finished)

Step 6 - defining a SQL Query to identify records that are either new or updates to existing records. Using string interpolation to insert the record start date time variable set in step 5

In [6]:
#defining the SQL Query
query = f'''
    select distinct tu.id
    , tu.plan_version
    , tu.model
    , tu.metric
    , tu.time_period
    , tu.value
    , tu.record_valid_start
    , tu.record_valid_end
    , tu.transaction_typ
    , tu.active_record_flg
    from targets_update as tu
    left join targets as t 
        on tu.id = t.id
    where t.id is null 
    union all 
    select distinct tu.id
    , tu.plan_version
    , tu.model
    , tu.metric
    , tu.time_period
    , tu.value
    , date_trunc('second', to_timestamp('{record_update_time}')) record_valid_start
    , tu.record_valid_end
    , 'U' transaction_typ
    , tu.active_record_flg
    from targets_update as tu
    inner join targets as t 
        on tu.id = t.id
    where tu.value <> t.value
    and t.active_record_flg = true
'''

StatementMeta(, d310942a-9e38-4672-a7d6-790e2493c9c5, 8, Finished, Available, Finished)

Step 6 - Loading SQL query into table. You need to store in table to avoid changes to date due to spark's Lazy execution. Lazy execution in Spark is a core concept that significantly impacts its performance and efficiency. It essentially means that Spark delays the execution of transformations until an action is triggered.

ecause of this, if a DataFrame references underlying data via a SQL Query that is subsequently modified, later references to that same DataFrame will reflect those changes. Storing the desired data in a staging table ensures consistency and prevents unintended data changes.

In [7]:
#executing the SQL Query from above into a spark dataframe
spark_df_changes = spark.sql(query)
#writing the spark dataframe to a staging table to avoid spark lazy Execution
spark_df_changes.write.mode("overwrite").format("delta").saveAsTable("targets_staging")

StatementMeta(, d310942a-9e38-4672-a7d6-790e2493c9c5, 9, Finished, Available, Finished)

Step 7 - Merging updates into the new table involves three key actions: updating existing records (setting their end date to the previously stored timestamp and their active record flag to false), inserting the new records, and finally, dropping the staging table.

In [8]:
#SQL Query for merging in the updates
query = f'''
    merge into targets t
    using targets_staging ts
        on t.id = ts.id
        and ts.transaction_typ = 'U'
    when matched then
    update set t.record_valid_end = date_trunc('second', to_timestamp('{record_update_time}')),
    t.active_record_flg = false;
'''
spark.sql(query)
#SQL Query for inserting in the new records
query = '''
    insert into targets
    select * from targets_staging;
'''
spark.sql(query)
#SQL Query for droping the staging table
query = '''
    drop table targets_staging;
'''



StatementMeta(, d310942a-9e38-4672-a7d6-790e2493c9c5, 10, Finished, Available, Finished)

In [9]:
%%sql
select *
from targets as a 
where a.id = '61eb66f1-40ec-5eb6-87dc-5a2906281be1'
order by record_valid_start desc

StatementMeta(, d310942a-9e38-4672-a7d6-790e2493c9c5, 11, Finished, Available, Finished)

<Spark SQL result set with 2 rows and 10 fields>