# Welcome

Welcome to the data prep notebook. For additional information about this dataset and the code please visit [https://github.com/bradleyschacht/sample-data](https://github.com/bradleyschacht/sample-data). Below you will find a list of the activities that need to be completed before running this notebook. 

Happy coding!

1. Attach a lakehouse to this notebook
1. Update the values in the **"Parameters"** section
   - **storage_connection_id** - The ID of the connection being used to connect to the ADLS Gen2 account where the source data is stored
   - **storage_account_name** - The name of the ADLS Gen2 account where the source data is stored
   - **storage_path_root** - The location of the sample dataset inside the ADLS Gen2 account in the format of **/_container_/_folder_/_path_/_here_**
   - **create_delta_v_order_disabled** - Specifies if Delta files should be created with V-Order disabled
   - **create_delta_v_order_enabled** - Specifies if Delta files should be created with V-Order enabled
   - **create_json** - Specifies if JSON files should be created
1. If you're feeling lucky, update the values in the **Proceed with caution** section



# Parameters

In [None]:
# Define the variables
storage_connection_id   = 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx'
storage_account_name    = 'storage_account_name_Here'
storage_path_root       = '/container/FolderStructure/TPC-H'

create_delta_v_order_disabled   = True
create_delta_v_order_enabled    = True
create_json                     = True

# Proceed with caution
Only modify the code in this section after fully reviewing the notebook code for downstream impact. 

This notebook has been configured to work with a specific folder structure. While this section will provide additional flexibility exercise caution when making changes as it could impact other sections of code.

In [None]:
# Build the other required variables 
# !!!!!  Review all changes for downstream impact !!!!! #
adls_shortcut_name                          = 'TPC-H'
data_directory_raw                          = 'Files/TPC-H/GB_001/Raw'
data_directory_clean_delta_v_order_disabled = 'Files/TPC-H/GB_001/Clean/Delta_V_Order_Disabled'
data_directory_clean_delta_v_order_enabled  = 'Files/TPC-H/GB_001/Clean/Delta_V_Order_Enabled'
data_directory_clean_json                   = 'Files/TPC-H/GB_001/Clean/JSON'

# The setup

In [None]:
#Import the relevant libraries
import json
import requests
import sempy.fabric as fabric
import time
from pyspark.sql.functions import col, when
from pyspark.sql.types import *

workspace_id = fabric.get_workspace_id()
lakehouse_id = fabric.get_lakehouse_id()

In [None]:
def fn_create_lakehouse_shortcut(shortcut_path, shortcut_name, target):
    
    request_headers = {
        "Authorization": "Bearer " + mssparkutils.credentials.getToken("pbi"),
        "Content-Type": "application/json"
    }

    request_body = {
        "path": shortcut_path,
        "name": shortcut_name,
        "target": target
    }

    request_url = f'https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items/{lakehouse_id}/shortcuts?shortcutConflictPolicy=Abort'
    
    if requests.request(method = "GET", url = f'https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items/{lakehouse_id}/shortcuts/{shortcut_path}/{shortcut_name}', headers = request_headers).status_code == 200:
        print('The shortcut already exists')

    else:
        response = requests.request(method = "POST", url = request_url, headers = request_headers, json = request_body)

        if response.status_code == 201:
            print (f'\nShortcut created successfully: {shortcut_path}/{shortcut_name}\n')
            #print (f'{request_url} \n')
            #print (f'{request_headers} \n')
            #print (f'{json.dumps(request_body, indent = 3)} \n')
            time.sleep(5)

        else:
            error_summary = {
                'status_code' : response.status_code,
                'error_type' : json.loads(response.text)['errorCode'],
                'error_code' : json.loads(response.text)['moreDetails'][0]['errorCode'],
                'error_message' : json.loads(response.text)['message'],
                'request_url' : request_url,
                'request_headers' : request_headers,
                'request_body' : request_body
            }

            raise Exception(f'\n\nShortcut creation failed\n\nError summary:\n{json.dumps(error_summary, indent = 3)}\n\nFull response text:\n{response.text}')

In [None]:
def fn_process_data (file_name, table_name, dataframe_schema, delta_table_definition):
    
    #####  Define source and destination location variables  #####
    source                                          = f'{data_directory_raw}/{file_name}'
    destination_directory_delta_v_order_disabled    = f'{data_directory_clean_delta_v_order_disabled}/{table_name}'
    destination_directory_delta_v_order_enabled     = f'{data_directory_clean_delta_v_order_enabled}/{table_name}'
    destination_directory_json                      = f'{data_directory_clean_json}/{table_name}'

    delta_table_definition_v_order_disabled = delta_table_definition.replace("[DELTA_TABLE_LOCATION]", destination_directory_delta_v_order_disabled)
    delta_table_definition_v_order_enabled  = delta_table_definition.replace("[DELTA_TABLE_LOCATION]", destination_directory_delta_v_order_enabled)

    #####  Print the variable values to the cell output  #####
    print (f'Table Name:                                {table_name}')
    print (f'Source File:                               {source}')
    print (f'Delta Table Location (V-Order Disabled):   {destination_directory_delta_v_order_disabled}')
    print (f'Delta Table Location (V-Order Enabled):    {destination_directory_delta_v_order_enabled}')
    print (f'JSON Location:                             {destination_directory_json} \n')
        
    #####  Read the files  #####
    if create_delta_v_order_disabled == True or create_delta_v_order_enabled == True or create_json == True:
        df = spark.read.load(source, format = 'csv', delimiter = '|', header = False, schema = dataframe_schema)

    #####  Create the delta table - V-Order Disabled  #####
    if create_delta_v_order_disabled == True:
            
        print (f'{table_name} - Delta - Disabling V-Order on write')
        spark.conf.set('spark.sql.parquet.vorder.enabled', 'false')

        time.sleep(5)
        
        print (f'{table_name} - Delta - Dropping the table if it exists')
        spark.sql(f'DROP TABLE IF EXISTS {table_name}')

        print (f'{table_name} - Delta - Removing the directory if it exists')
        if mssparkutils.fs.exists(destination_directory_delta_v_order_disabled):
            mssparkutils.fs.rm(destination_directory_delta_v_order_disabled, True)
        
        print (f'{table_name} - Delta - Creating the table')
        spark.sql(delta_table_definition_v_order_disabled)

        time.sleep(5)

        print (f'{table_name} - Delta - Writing to the table has started')
        df.write.format("delta").mode("overwrite").insertInto(table_name)
        print (f'{table_name} - Delta - Writing to the table has completed')

        print (f'{table_name} - Delta - Dropping the table if it exists \n')
        spark.sql(f'DROP TABLE IF EXISTS {table_name}')

        time.sleep(5)

    #####  Create the delta table - V-Order Enabled  #####
    if create_delta_v_order_enabled == True:

        print (f'{table_name} - Delta - Enabling V-Order on write')
        spark.conf.set('spark.sql.parquet.vorder.enabled', 'true')

        time.sleep(5)
        
        print (f'{table_name} - Delta - Dropping the table if it exists')
        spark.sql(f'DROP TABLE IF EXISTS {table_name}')

        print (f'{table_name} - Delta - Removing the directory if it exists')
        if mssparkutils.fs.exists(destination_directory_delta_v_order_enabled):
            mssparkutils.fs.rm(destination_directory_delta_v_order_enabled, True)
        
        print (f'{table_name} - Delta - Creating the table')
        spark.sql(delta_table_definition_v_order_enabled)

        time.sleep(5)

        print (f'{table_name} - Delta - Writing to the table has started')
        df.write.format("delta").mode("overwrite").insertInto(table_name)
        print (f'{table_name} - Delta - Writing to the table has completed')

        print (f'{table_name} - Delta - Dropping the table if it exists \n')
        spark.sql(f'DROP TABLE IF EXISTS {table_name}')

        time.sleep(5)

    #####  Create the JSON files  #####
    if create_json == True:
        print (f'{table_name} - JSON - Removing the directory if it exists')
        if mssparkutils.fs.exists(destination_directory_json):
            mssparkutils.fs.rm(destination_directory_json, True)
        
        print (f'{table_name} - JSON - Writing to the table has started')
        df.write.mode('overwrite').json(destination_directory_json)
        print (f'{table_name} - JSON - Writing to the table has completed \n')

In [None]:
##### Create the shortcut to the ADLS Gen2 storage account in the files section of the lakehouse  #####
target = {
    "adlsGen2": {
        "location": f'https://{storage_account_name}.dfs.core.windows.net',
        "subpath": storage_path_root,
        "connectionId": storage_connection_id
    }
}

fn_create_lakehouse_shortcut(
    shortcut_path ='Files',
    shortcut_name = adls_shortcut_name,
    target = target
)

time.sleep(10)

# The data processing

In [None]:
# customer
file_name = 'customer/*.tbl*'
table_name = 'customer'
dataframe_schema = \
	StructType([ \
			StructField("c_custkey", LongType()), \
			StructField("c_name", StringType()), \
			StructField("c_address", StringType()), \
			StructField("c_nationkey", IntegerType()), \
			StructField("c_phone", StringType()), \
			StructField("c_acctbal", DoubleType()), \
			StructField("c_mktsegment", StringType()), \
			StructField("c_comment", StringType()) \
	])
delta_table_definition = """
CREATE TABLE customer
	(
		c_custkey 		bigint,
		c_name 			varchar(25),
		c_address 		varchar(40),
		c_nationkey		int,
		c_phone			varchar(15),
		c_acctbal 		decimal(15, 2),
		c_mktsegment	varchar(10),
		c_comment		varchar(117)
	)
	LOCATION '[DELTA_TABLE_LOCATION]'
"""

fn_process_data (file_name, table_name, dataframe_schema, delta_table_definition)

In [None]:
# lineItem
file_name = 'lineitem/*.tbl*'
table_name = 'lineitem'
dataframe_schema = \
	StructType([ \
			StructField("l_orderkey", LongType()), \
			StructField("l_partkey", LongType()), \
			StructField("l_suppkey", LongType()), \
			StructField("l_linenumber", IntegerType()), \
			StructField("l_quantity", DoubleType()), \
			StructField("l_extendedprice", DoubleType()), \
			StructField("l_discount", DoubleType()), \
			StructField("l_tax", DoubleType()), \
			StructField("l_returnflag", StringType()), \
			StructField("l_linestatus", StringType()), \
			StructField("l_shipdate", DateType()), \
			StructField("l_commitdate", DateType()), \
			StructField("l_receiptdate", DateType()), \
			StructField("l_shipInstruct", StringType()), \
			StructField("l_shipmode", StringType()), \
			StructField("l_comment", StringType()) \
	])
delta_table_definition = """
CREATE TABLE lineitem
	(
		l_orderkey 			bigint ,
		l_partkey 			bigint ,
		l_suppkey 			bigint ,
		l_linenumber		int,
		l_quantity 			decimal(15, 2),
		l_extendedprice		decimal(15, 2),
		l_discount 			decimal(15, 2),
		l_tax 				decimal(15, 2),
		l_returnflag		varchar(1),
		l_linestatus		varchar(1),
		l_shipdate 			date,
		l_commitdate		date,
		l_receiptdate 		date,
		l_shipInstruct 		varchar(25),
		l_shipmode 			varchar(10),
		l_comment 			varchar(44)
	)
	LOCATION '[DELTA_TABLE_LOCATION]'
"""

fn_process_data (file_name, table_name, dataframe_schema, delta_table_definition)

In [None]:
# nation
file_name = 'nation/*.tbl*'
table_name = 'nation'
dataframe_schema = \
	StructType([ \
			StructField("n_nationkey", IntegerType()), \
			StructField("n_name", StringType()), \
			StructField("n_regionkey", IntegerType()), \
			StructField("n_comment", StringType()) \
	])
delta_table_definition = """
CREATE TABLE nation
	(
		n_nationkey		int,
		n_name			varchar(25),
		n_regionkey		int,
		n_comment		varchar(152) 	
	)
	LOCATION '[DELTA_TABLE_LOCATION]'
"""

fn_process_data (file_name, table_name, dataframe_schema, delta_table_definition)

In [None]:
# orders
file_name = 'orders/*.tbl*'
table_name = 'orders'
dataframe_schema = \
	StructType([ \
			StructField("o_orderkey", LongType()), \
			StructField("o_custkey", LongType()), \
			StructField("o_orderstatus", StringType()), \
			StructField("o_totalprice", DoubleType()), \
			StructField("o_orderdate", DateType()), \
			StructField("o_orderpriority", StringType()), \
			StructField("o_clerk", StringType()), \
			StructField("o_shippriority", IntegerType()), \
			StructField("o_comment", StringType()) \
	])
delta_table_definition = """
CREATE TABLE orders
	(
		o_orderkey 			bigint,
		o_custkey 			bigint,
		o_orderstatus 		varchar(1),
		o_totalprice		decimal(15, 2),
		o_orderdate			date,
		o_orderpriority		varchar(15),
		o_clerk				varchar(15),
		o_shippriority		int,
		o_comment 			varchar(79)
	)
	LOCATION '[DELTA_TABLE_LOCATION]'
"""

fn_process_data (file_name, table_name, dataframe_schema, delta_table_definition)

In [None]:
# part
file_name = 'part/*.tbl*'
table_name = 'part'
dataframe_schema = \
	StructType([ \
			StructField("p_partkey", LongType()), \
			StructField("p_name", StringType()), \
			StructField("p_mfgr", StringType()), \
			StructField("p_brand", StringType()), \
			StructField("p_type", StringType()), \
			StructField("p_size", IntegerType()), \
			StructField("p_container", StringType()), \
			StructField("p_retailprice", DoubleType()), \
			StructField("p_comment", StringType()) \
	])
delta_table_definition = """
CREATE TABLE part
	(
		p_partkey 		bigint,
		p_name 			varchar(55),
		p_mfgr 			varchar(25),
		p_brand			varchar(10),
		p_type 			varchar(25),
		p_size			int,
		p_container		varchar(10),
		p_retailprice	decimal(15, 2),
		p_comment 		varchar(23)
	)
	LOCATION '[DELTA_TABLE_LOCATION]'
"""

fn_process_data (file_name, table_name, dataframe_schema, delta_table_definition)

In [None]:
# partsupp
file_name = 'partsupp/*.tbl*'
table_name = 'partsupp'
dataframe_schema = \
	StructType([ \
			StructField("ps_partkey", LongType()), \
			StructField("ps_suppkey", LongType()), \
			StructField("ps_availqty", IntegerType()), \
			StructField("ps_supplycost", DoubleType()), \
			StructField("ps_comment", StringType()) \
	])
delta_table_definition = """
CREATE TABLE partsupp
	(
		ps_partkey 		bigint,
		ps_suppkey 		bigint,
		ps_availqty 	int,
		ps_supplycost 	decimal(15, 2),
		ps_comment 		varchar(199)
	)
	LOCATION '[DELTA_TABLE_LOCATION]'
"""

fn_process_data (file_name, table_name, dataframe_schema, delta_table_definition)

In [None]:
# region
file_name = 'region/*.tbl*'
table_name = 'region'
dataframe_schema = \
	StructType([ \
			StructField("r_regionkey", IntegerType()), \
			StructField("r_name", StringType()), \
			StructField("r_comment", StringType()) \
	])
delta_table_definition = """
CREATE TABLE region
	(
		r_regionkey		int,
		r_name 			varchar(25),
		r_comment 		varchar(152)
	)
	LOCATION '[DELTA_TABLE_LOCATION]'
"""

fn_process_data (file_name, table_name, dataframe_schema, delta_table_definition)

In [None]:
# supplier
file_name = 'supplier/*.tbl*'
table_name = 'supplier'
dataframe_schema = \
	StructType([ \
			StructField("s_suppkey", LongType()), \
			StructField("s_name", StringType()), \
			StructField("s_address", StringType()), \
			StructField("s_nationkey", IntegerType()), \
			StructField("s_phone", StringType()), \
			StructField("s_acctbal", DoubleType()), \
			StructField("s_comment", StringType()) \
	])
delta_table_definition = """
CREATE TABLE supplier
	(
		s_suppkey 		bigint,
		s_name 			varchar(25),
		s_address 		varchar(40),
		s_nationkey		int,
		s_phone			varchar(15),
		s_acctbal 		decimal(15, 2),
		s_comment 		varchar(101)
	)
	LOCATION '[DELTA_TABLE_LOCATION]'
"""

fn_process_data (file_name, table_name, dataframe_schema, delta_table_definition)