# tpc-h Loading

Load tpc-h data from pipe-separated files stored in ADLS Gen 2 into a Lakehouse in Fabric.

Approximate run times for scale:  
1: ~1 min  
10: ~3 mins  
100: ~7 mins  
1000: ~43 mins  

**Agenda**  
Intro to Notebooks; language; startup time; markdown; table of contents; parameters; magics; frozen cell; parallel run; charting;  
variables; shortcuts; notebook resources; refresh table view; max workers; struct; dictionary; schema datatypes  
collaboration; Settings: About; change name; endorsement; scheduling, open in VS Code

## Setup

### Parameters
file_path - the path to the tpch files to be loaded into the lakehouse

Examples:  
Local files: "Files/scale 1"  
Shortcut files: "Files/tpch csv/scale 100"

In [19]:
file_path = "Files/tpch1 (1)/raw/tpc-h/scale1/csv"

StatementMeta(, e4252176-7253-4989-8af2-efcc9bc414e0, 35, Finished, Available)

### Imports
Define any imports that will be used in the notebook

In [20]:
from pyspark.sql.types import *

StatementMeta(, e4252176-7253-4989-8af2-efcc9bc414e0, 36, Finished, Available)

### Functions
Define the functions that will be used in the notebook

In [21]:
# This function gets the pipe-separated text files, loads them to a dataframe with the input schema and then loads to a lakehouse table
def get_and_load_tpch_file(file_path, table_name, schema):
    df = spark.read.option("sep", "|").csv(f"{file_path}/{table_name}.tbl", header=False, schema=schema)
    df.write.mode("overwrite").format("delta").save("Tables/" + table_name)


# Get the pipe-separated text file load to a dataframe with the given schema and return that dataframe
def get_tpch_file(file_path, table_name, schema):
    df = spark.read.option("sep", "|").csv(f"{file_path}/{table_name}.tbl", header=False, schema=schema)
    return df


# Save the input dataframe as a delta table in the lakehouse
def save_tpch_file(df, table_name):
    df.write.mode("overwrite").format("delta").save("Tables/" + table_name)

StatementMeta(, e4252176-7253-4989-8af2-efcc9bc414e0, 37, Finished, Available)

### Reset
Delete all tables from the database

In [22]:
%%sql
DROP TABLE IF EXISTS customer;
DROP TABLE IF EXISTS lineitem;
DROP TABLE IF EXISTS nation;
DROP TABLE IF EXISTS orders;
DROP TABLE IF EXISTS part;
DROP TABLE IF EXISTS partsupp;
DROP TABLE IF EXISTS region;
DROP TABLE IF EXISTS supplier;

StatementMeta(, e4252176-7253-4989-8af2-efcc9bc414e0, -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

### Schemas
  Define the schemas for all tables to be imported.  
This code is using a dictionary to hold the table name and table schema.

5.5 Dictionaries  
https://docs.python.org/3/tutorial/datastructures.html

Dictionaries are similar to arrays, but contain key:value pairs, where keys must be unique within that dictionary.
This dictionary (dict_schemas) holds the table name (key) and a struct containing the schema, including column name, datatype and nullability.

In [23]:
# Create a dictionary holding the table name and accompanying schema
dict_schemas = {
    'customer': StructType([
        StructField("custkey", IntegerType(), nullable=False),
        StructField("name", StringType(), nullable=False),
        StructField("address", StringType(), nullable=False),
        StructField("nationkey", IntegerType(), nullable=True),
        StructField("phone", StringType(), nullable=True),
        StructField("acctbal", FloatType(), nullable=True),
        StructField("mktsegment", StringType(), nullable=True),
        StructField("comment", StringType(), nullable=True)
 ]),
    'lineitem': StructType([
        StructField("orderkey", IntegerType(), nullable=False),
        StructField("partkey", IntegerType(), nullable=False),
        StructField("suppkey", IntegerType(), nullable=False),
        StructField("linenumber", IntegerType(), nullable=False),
        StructField("quantity", FloatType(), nullable=True),
        StructField("extendedprice", FloatType(), nullable=True),
        StructField("discount", FloatType(), nullable=True),
        StructField("tax", FloatType(), nullable=True),
        StructField("returnflag", StringType(), nullable=False),
        StructField("linestatus", StringType(), nullable=False),
        StructField("shipdate", DateType(), nullable=False),
        StructField("commitdate", DateType(), nullable=False),
        StructField("receiptdate", DateType(), nullable=False),
        StructField("shipinstruct", StringType(), nullable=False),
        StructField("shipmode", StringType(), nullable=False),
        StructField("comment", StringType(), nullable=True)
    ]),
    'nation': StructType([
        StructField("nationkey", IntegerType(), nullable=False),
        StructField("name", StringType(), nullable=False),
        StructField("regionkey", IntegerType(), nullable=False),
        StructField("comment", StringType(), nullable=True)
    ]),
    'orders': StructType([
        StructField("orderkey", IntegerType(), nullable=False),
        StructField("custkey", IntegerType(), nullable=False),
        StructField("orderstatus", StringType(), nullable=False),
        StructField("totalprice", FloatType(), nullable=True),
        StructField("orderdate", DateType(), nullable=False),
        StructField("orderpriority", StringType(), nullable=False),
        StructField("clearkdate", StringType(), nullable=False),
        StructField("shippriority", IntegerType(), nullable=False),
        StructField("comment", StringType(), nullable=True)
    ]),
    'part': StructType([
        StructField("partkey", IntegerType(), nullable=False),
        StructField("name", StringType(), nullable=True),
        StructField("mfgr", StringType(), nullable=True),
        StructField("brand", StringType(), nullable=True),
        StructField("type", StringType(), nullable=True),
        StructField("size", IntegerType(), nullable=True),
        StructField("container", StringType(), nullable=True),
        StructField("retailprice", FloatType(), nullable=True),
        StructField("comment", StringType(), nullable=True)
    ]),
    'partsupp': StructType([
        StructField("partkey", IntegerType(), nullable=False),
        StructField("suppkey", IntegerType(), nullable=False),
        StructField("availqty", IntegerType(), nullable=False),
        StructField("supplycost", FloatType(), nullable=True),
        StructField("comment", StringType(), nullable=True)
    ]),
    'region': StructType([
        StructField("regionkey", StringType(), nullable=False),
        StructField("name", StringType(), nullable=False),
        StructField("comment", StringType(), nullable=True)
    ]),
    'supplier': StructType([
        StructField("suppkey", IntegerType(), nullable=False),
        StructField("name", StringType(), nullable=False),
        StructField("address", StringType(), nullable=False),
        StructField("nationkey", IntegerType(), nullable=True),
        StructField("phone", StringType(), nullable=True),
        StructField("acctbal", FloatType(), nullable=True),
        StructField("comment", StringType(), nullable=True)
    ])
}


for table_name, schema in dict_schemas.items():
    print(table_name, '\r\n', schema, '\r\n')

StatementMeta(, e4252176-7253-4989-8af2-efcc9bc414e0, 46, Finished, Available)

customer 
 StructType([StructField('custkey', IntegerType(), False), StructField('name', StringType(), False), StructField('address', StringType(), False), StructField('nationkey', IntegerType(), True), StructField('phone', StringType(), True), StructField('acctbal', FloatType(), True), StructField('mktsegment', StringType(), True), StructField('comment', StringType(), True)]) 

lineitem 
 StructType([StructField('orderkey', IntegerType(), False), StructField('partkey', IntegerType(), False), StructField('suppkey', IntegerType(), False), StructField('linenumber', IntegerType(), False), StructField('quantity', FloatType(), True), StructField('extendedprice', FloatType(), True), StructField('discount', FloatType(), True), StructField('tax', FloatType(), True), StructField('returnflag', StringType(), False), StructField('linestatus', StringType(), False), StructField('shipdate', DateType(), False), StructField('commitdate', DateType(), False), StructField('receiptdate', DateType(), Fal

## Main
Load all the tables into the database

### Serial load

In [None]:
for table_name, schema in dict_schemas.items():
    print(table_name, '\r\n', schema, '\r\n')
    get_and_load_tpch_file(file_path, table_name, schema)

### Parallel load
Use concurrent.futures to loop through the schema dictionary in parallel

In [24]:
# Loop through the schemas dictionary in parallel
from concurrent.futures import ThreadPoolExecutor

def load_table(table_name):
    schema = dict_schemas[table_name]
    get_and_load_tpch_file(file_path, table_name, schema)


# Use a ThreadPoolExecutor to run the tasks in parallel
with ThreadPoolExecutor(max_workers=4) as executor:
    executor.map(load_table, dict_schemas.keys())

StatementMeta(, e4252176-7253-4989-8af2-efcc9bc414e0, 47, Finished, Available)

In [25]:
# Report how many CPUs the worker has
import os
print(os.cpu_count())

StatementMeta(, e4252176-7253-4989-8af2-efcc9bc414e0, 48, Finished, Available)

8


### Checks
Report the rowcounts to confirm how many rows have been loaded into each table

In [26]:
%%sql
SELECT 'customer' AS source, COUNT(*) AS records FROM customer
UNION ALL
SELECT 'lineitem' AS source, COUNT(*) AS records FROM lineitem
UNION ALL
SELECT 'nation	' AS source, COUNT(*) AS records FROM nation
UNION ALL
SELECT 'orders	' AS source, COUNT(*) AS records FROM orders
UNION ALL
SELECT 'part	' AS source, COUNT(*) AS records FROM part
UNION ALL
SELECT 'partsupp' AS source, COUNT(*) AS records FROM partsupp
UNION ALL
SELECT 'region	' AS source, COUNT(*) AS records FROM region
UNION ALL
SELECT 'supplier' AS source, COUNT(*) AS records FROM supplier;

StatementMeta(, e4252176-7253-4989-8af2-efcc9bc414e0, 49, Finished, Available)

<Spark SQL result set with 8 rows and 2 fields>

In [27]:
%%sql
SHOW TABLES

StatementMeta(, e4252176-7253-4989-8af2-efcc9bc414e0, 50, Finished, Available)

<Spark SQL result set with 8 rows and 3 fields>