In [1]:
spark.conf.set('spark.sql.caseSensitive', True)
from pyspark.sql.functions import lpad, col, expr, round as spark_round
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType, LongType
import requests, json
from datetime import datetime
from notebookutils import mssparkutils

In [2]:
workspace_name = "FPA_ENG"
lake_abfss = f"abfss://{workspace_name}@onelake.dfs.fabric.microsoft.com/Lakehouse.Lakehouse"
files_path = lake_abfss + "/Files/"
tables_path = lake_abfss + "/Tables/"

blob_path = "https://raw.githubusercontent.com/alisonpezzott/budget-versus-actuals-sample-data/refs/heads/main/"


In [11]:
# DimAccount
table_name = "DimAccount"
url = blob_path + table_name + ".csv"
response = requests.get(url)

local_path = f"/tmp/{table_name}.csv"
with open(local_path, 'wb') as f:
    f.write(response.content)

mssparkutils.fs.cp(f"file:{local_path}", files_path + table_name + ".csv")

schema = StructType() \
    .add("AccountGroupKey", StringType(), True) \
    .add("AccountSubgroupKey", StringType(), True) \
    .add("AccountSubgroup", StringType(), True) \
    .add("ControlAccountKey", StringType(), True) \
    .add("ControlAccount", StringType(), True) \
    .add("SubsidiaryAccountKey", StringType(), True) \
    .add("SubsidiaryAccount", StringType(), True)

df = spark.read.format("csv") \
    .option("header","true") \
    .schema(schema) \
    .load(files_path + table_name + ".csv") \
    .withColumn("AccountGroupKey", lpad(col("AccountGroupKey"), 2, "0")) \
    .withColumn("AccountSubgroupKey", lpad(col("AccountSubgroupKey"), 5, "0")) \
    .withColumn("ControlAccountKey", lpad(col("ControlAccountKey"), 8, "0")) \
    .withColumn("SubsidiaryAccountKey", lpad(col("SubsidiaryAccountKey"), 11, "0"))


df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save(tables_path + table_name)


In [4]:
# DimAccountGroup
table_name = "DimAccountGroup"
url = blob_path + table_name + ".csv"
response = requests.get(url)

local_path = f"/tmp/{table_name}.csv"
with open(local_path, 'wb') as f:
    f.write(response.content)

mssparkutils.fs.cp(f"file:{local_path}", files_path + table_name + ".csv")

schema = StructType() \
    .add("AccountGroup", StringType(), True) \
    .add("AccountGroupKey", StringType(), True) \
    .add("isSubtotal", StringType(), True)

df = spark.read.format("csv") \
    .option("header","true") \
    .schema(schema) \
    .load(files_path + table_name + ".csv") \
    .withColumn("AccountGroupKey", lpad(col("AccountGroupKey"), 2, "0")) 

df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save(tables_path + table_name)
    

In [5]:
# DimBranch
table_name = "DimBranch"
url = blob_path + table_name + ".csv"
response = requests.get(url)

local_path = f"/tmp/{table_name}.csv"
with open(local_path, 'wb') as f:
    f.write(response.content)

mssparkutils.fs.cp(f"file:{local_path}", files_path + table_name + ".csv")

schema = StructType() \
    .add("BranchKey", StringType(), True) \
    .add("Branch", StringType(), True)

df = spark.read.format("csv") \
    .option("header","true") \
    .schema(schema) \
    .load(files_path + table_name + ".csv") \
    .withColumn("BranchKey", lpad(col("BranchKey"), 2, "0")) 

df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save(tables_path + table_name)
    

In [6]:
# FactBudgetActual
table_name = "FactBudgetActual"
url = blob_path + table_name + ".csv"
response = requests.get(url)

local_path = f"/tmp/{table_name}.csv"
with open(local_path, 'wb') as f:
    f.write(response.content)

mssparkutils.fs.cp(f"file:{local_path}", files_path + table_name + ".csv")

schema = StructType() \
    .add("BranchKey", StringType(), True) \
    .add("Date", DateType(), True) \
    .add("SubsidiaryAccountKey", StringType(), True) \
    .add("BudgetAmount", DoubleType(), True) \
    .add("ActualAmount", DoubleType(), True)

df = spark.read.format("csv") \
    .option("header","true") \
    .schema(schema) \
    .load(files_path + table_name + ".csv") \
    .withColumn("BranchKey", lpad(col("BranchKey"), 2, "0")) \
    .withColumn("SubsidiaryAccountKey", lpad(col("SubsidiaryAccountKey"), 11, "0"))

df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save(tables_path + table_name)


In [3]:
# DimClient
table_name = "DimClient"
url = blob_path + table_name + ".csv"
response = requests.get(url)

local_path = f"/tmp/{table_name}.csv"
with open(local_path, 'wb') as f:
    f.write(response.content)

mssparkutils.fs.cp(f"file:{local_path}", files_path + table_name + ".csv")

schema = StructType() \
    .add("Branch", StringType(), True) \
    .add("Client", StringType(), True)

df = spark.read.format("csv") \
    .option("header","true") \
    .schema(schema) \
    .load(files_path + table_name + ".csv")

df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save(tables_path + table_name)


In [8]:
# Measures
table_name = "__Measures"

schema = StructType() \
    .add("Value", StringType(), True) 

df = spark.createDataFrame(data=[], schema=schema)

df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save(tables_path + table_name)
