# Initialize Variables

In [None]:
sf=100
tpchTables = ['nation','region','customer','supplier','orders','partsupp','part','lineitem']
lakehousePath = "/lakehouse/default/" 
baseSparkPath = "Files/tpch"
baseFilePath = lakehousePath + baseSparkPath
sfFilePath = baseFilePath + "/sf" + str(sf)

# Set Spark Config

In [None]:
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", True)
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)

# Create TCPH tables if it doesn't exist already

In [None]:
import duckdb
import pathlib
import os
if not os.path.exists(sfFilePath):
    con=duckdb.connect()
    con.sql('PRAGMA disable_progress_bar;SET preserve_insertion_order=false')
    con.sql(f"CALL dbgen(sf={sf})") 
    for tbl in tpchTables :
        tblPath = sfFilePath + "/" + tbl
        pathlib.Path(tblPath).mkdir(parents=True, exist_ok=True) 
        con.sql(f"COPY (SELECT * FROM {tbl}) TO '{tblPath}/{tbl}.parquet' ")
    con.close()

# Create Delta Tables

In [None]:
for tbl in tpchTables:
    tblPath = sfFilePath.replace(lakehousePath,'') + "/"+ tbl
    spark.sql(f"drop table if exists {tbl}")
    spark.read.parquet(tblPath).write.mode('overwrite').format('delta').saveAsTable(tbl)

# Create List for Facts and Dims

In [None]:
from collections import OrderedDict

tpch_star_schema_sql = OrderedDict()
tpch_star_schema_sql['dimcustomer'] ="""
SELECT cus.c_custkey                AS custkey,
       cus.c_name                   AS customer_name,
       cus.c_address                AS customer_address,
       Substring(cus.c_phone, 1, 2) AS customer_country_code,
       cus.c_phone                  AS customer_phone_no,
       cus.c_acctbal                AS customer_account_balance,
       cus.c_mktsegment             AS market_segment,
       nat.n_name                   AS customer_nation,
       reg.r_name                   AS customer_region,
       cus.c_comment                AS customer_comment
FROM   customer cus
       INNER JOIN nation nat
               ON ( nat.n_nationkey = cus.c_nationkey )
       INNER JOIN region reg
               ON ( reg.r_regionkey = nat.n_regionkey ) 
"""
tpch_star_schema_sql['dimsupplier'] = """
SELECT s_suppkey  AS suppkey,
       s_name     AS supplier_name,
       s_address  AS supplier_address,
       s_phone    AS supplier_phone_no,
       s_acctbal  AS supplier_account_balance,
       s_comment  AS supplier_comment,
       nat.n_name AS supplier_nation,
       reg.r_name AS supplier_region
FROM   supplier sup
       INNER JOIN nation nat
               ON ( nat.n_nationkey = sup.s_nationkey )
       INNER JOIN region reg
               ON ( reg.r_regionkey = nat.n_regionkey ) 
"""
tpch_star_schema_sql['dimpart'] = """
SELECT p_partkey   AS partkey,
       p_name      AS part_name,
       p_mfgr      AS manufacturer,
       p_brand     AS brand,
       p_type      AS type,
       p_size      AS size,
       p_container AS container
FROM   part
"""
tpch_star_schema_sql['dimorderinfo'] = """
SELECT Row_number() OVER( ORDER BY NULL ) AS orderinfokey,
       t.*
FROM   (SELECT DISTINCT ord.o_shippriority  AS ship_priority,
                        lit.l_shipmode      AS ship_mode,
                        ord.o_orderpriority AS order_priority,
                        ord.o_orderstatus   AS order_status,
                        lit.l_shipinstruct  AS ship_instruct,
                        lit.l_returnflag    AS return_flag,
                        lit.l_linestatus    AS line_status
        FROM   orders ord
               INNER JOIN lineitem lit
                            ON ( lit.l_orderkey = ord.o_orderkey )) t 
"""
tpch_star_schema_sql['factpartsupp'] = """
SELECT ps_partkey    AS partkey,
       ps_suppkey    AS suppkey,
       ps_availqty   AS availqty,
       ps_supplycost AS supplycost
FROM   partsupp 
"""
tpch_star_schema_sql['factorderline'] = """
SELECT lit.l_partkey     AS partkey,
       lit.l_suppkey     AS suppkey,
       ord.o_custkey     AS custkey,
       ord.o_orderkey    AS orderkey,
       inf.orderinfokey  AS orderinfokey,
       lit.l_shipdate    AS shipdate,
       ord.o_orderdate   AS orderdate,
       lit.l_commitdate  AS commitdate,
       lit.l_receiptdate AS receiptdate,
       lit.l_quantity    AS quantity,
       prt.p_retailprice AS retailprice,
       lit.l_discount    AS discount,
       psp.ps_supplycost AS supplycost,
       lit.l_tax         AS tax,
       ord.o_comment     AS order_comment
FROM   orders ord
       INNER JOIN lineitem lit
                    ON ( ord.o_orderkey = lit.l_orderkey )
       INNER JOIN partsupp psp
                    ON ( psp.ps_partkey = lit.l_partkey
                         AND psp.ps_suppkey = lit.l_suppkey )
       INNER JOIN part prt
                    ON ( prt.p_partkey = lit.l_partkey )
       INNER JOIN dimorderinfo inf
                    ON ( inf.ship_priority = ord.o_shippriority
                         AND inf.ship_mode = lit.l_shipmode
                         AND inf.order_priority = ord.o_orderpriority
                         AND inf.order_status = ord.o_orderstatus
                         AND inf.ship_instruct = lit.l_shipinstruct
                         AND inf.return_flag = lit.l_returnflag
                         AND inf.line_status = lit.l_linestatus ) 
"""
tpch_star_schema_sql['dimdate'] = """
with date_range as (
  select
    to_date(
      year(
        least(
          min(shipdate),
          min(orderdate),
          min(commitdate),
          min(receiptdate)
        )
      ) || '-01-01'
    ) start_date,
    to_date(
      year(
        greatest(
          max(shipdate),
          max(orderdate),
          max(commitdate),
          max(receiptdate)
        )
      ) || '-12-31'
    ) end_date
  from
    factorderline
),
dt_tbl AS (
  select
    explode(sequence(start_date, end_date, interval 1 day)) as date
  from
    date_range
)
select
  date,
  year(date) as year,
  'Q' || quarter(date) as quarter,
  month(date) as month_number,
  date_format(date, 'MMM') as month,
  year(date) * 12 + month(date) - 1 as year_month_number,
  date_format(date, 'MMM yy') as year_month
from
  dt_tbl
"""

# Create Delta Tables for Facts and Dims

In [None]:
for tbl, sql in tpch_star_schema_sql.items():
    spark.sql(f"drop table if exists {tbl}")
    spark.sql(sql).write.mode('overwrite').format('delta').saveAsTable(tbl)
    spark.sql(f"OPTIMIZE {tbl}")
    spark.sql(f"VACUUM {tbl} RETAIN 0 HOURS")