# Install DuckDB

In [None]:
pip install duckdb --upgrade

# Initialize Variables

In [1]:
DatasetURL = "https://uhesse.files.wordpress.com/2020/08/"
file = "mytpch_oracle.zip"
BaseURL = DatasetURL + file
contentName = "TPC-H-SF1"
dbName = "DuckDB"
curatedFormat = "parquet"
basePath = f"/DEV/{contentName}"
rawPath = f"{basePath}/raw"
curatedPath = f"{basePath}/curated"
zipPath = f"{rawPath}/{file}"

# Create folders if it doesn't exist

In [7]:
from pathlib import Path
Path(rawPath).mkdir(parents=True, exist_ok=True)
Path(curatedPath).mkdir(parents=True, exist_ok=True)

# Download Data

In [8]:
from urllib.request import urlretrieve
urlretrieve(BaseURL,zipPath)

('/DEV/TPC-H-SF1/raw/mytpch_oracle.zip',
 <http.client.HTTPMessage at 0x1628eebc400>)

# Unzip Data

In [9]:
import zipfile
with zipfile.ZipFile(zipPath, 'r') as zip_ref:
    zip_ref.extractall(rawPath)

# Remove Unnecessary files

In [10]:
import os
for item in os.listdir(rawPath):
    if item.endswith(".zip") or item.endswith(".sql") or item.endswith(".ctl") or item.endswith("README.txt"):
        os.remove(os.path.join(rawPath, item))

# Create Database

In [2]:
import duckdb
cursor = duckdb.connect(dbName, read_only=False)

# Initialize tpc-H Table Schema

In [3]:
tpch_schemas = {
               "nation":"{'n_nationkey': 'int', 'n_name': 'string', 'n_regionkey': 'int', 'n_comment': 'string'}",
               "region":"{'r_regionkey': 'int', 'r_name': 'string', 'r_comment': 'string'}",
               "part":"{'p_partkey': 'int', 'p_name': 'string', 'p_mfgr': 'string', 'p_brand': 'string', 'p_type': 'string', 'p_size': 'int', 'p_container': 'string', 'p_retailprice': 'decimal(15,2)', 'p_comment': 'string'}",
               "supplier":"{'s_suppkey': 'int', 's_name': 'string', 's_address': 'string', 's_nationkey': 'int', 's_phone': 'string', 's_acctbal': 'decimal(4,2)', 's_comment': 'string'}",
               "customer":"{'c_custkey': 'int','c_name': 'string', 'c_address': 'string', 'c_nationkey': 'int', 'c_phone': 'string', 'c_acctbal': 'decimal(4,2)', 'c_mktsegment': 'string', 'c_comment': 'string'}",
               "partsupp":"{'ps_partkey': 'int', 'ps_suppkey': 'int', 'ps_availqty': 'int', 'ps_supplycost': 'decimal(15,2)', 'ps_comment': 'string'}",
               "orders":"{'o_orderkey': 'int', 'o_custkey': 'int', 'o_orderstatus': 'string', 'o_totalprice': 'decimal(15,2)', 'o_orderdate': 'date', 'o_orderpriority': 'string', 'o_clerk': 'string', 'o_shippriority': 'int', 'o_comment': 'string'}",
               "lineitem":"{'l_orderkey': 'int', 'l_partkey': 'int', 'l_suppkey': 'int', 'l_linenumber': 'int', 'l_quantity': 'int', 'l_extendedprice': 'decimal(15,2)', 'l_discount': 'decimal(2,2)', 'l_tax': 'decimal(2,2)', 'l_returnflag': 'string', 'l_linestatus': 'string', 'l_shipdate': 'date', 'l_commitdate': 'date', 'l_receiptdate': 'date', 'l_shipinstruct': 'string', 'l_shipmode': 'string', 'l_comment': 'string'}"
               }

# Create view for all raw files

In [4]:
for name, schema in tpch_schemas.items():
    project_text = ""
    df = cursor.query(f"SELECT * from read_csv_auto('{rawPath}/{name}.txt', delim=',', header=False, columns={schema})")
    for col, dtype in dict(zip(df.columns, df.types)).items():
        project_text += f"trim({col}) AS {col}," if dtype == 'VARCHAR' else f"{col},"
    df.project(project_text[:-1]).create_view(name)

# Setup Star Schema sql

In [5]:
from collections import OrderedDict

tpch_star_schema_sql = OrderedDict()
tpch_star_schema_sql['dimcustomer'] ="""
SELECT cus.c_custkey                AS custkey,
       cus.c_name                   AS customer_name,
       cus.c_address                AS customer_address,
       Substring(cus.c_phone, 1, 2) AS customer_country_code,
       cus.c_phone                  AS customer_phone_no,
       cus.c_acctbal                AS customer_account_balance,
       cus.c_mktsegment             AS market_segment,
       nat.n_name                   AS customer_nation,
       reg.r_name                   AS customer_region,
       cus.c_comment                AS customer_comment
FROM   customer cus
       INNER JOIN nation nat
               ON ( nat.n_nationkey = cus.c_nationkey )
       INNER JOIN region reg
               ON ( reg.r_regionkey = nat.n_regionkey ) 
"""
tpch_star_schema_sql['dimsupplier'] = """
SELECT s_suppkey  AS suppkey,
       s_name     AS supplier_name,
       s_address  AS supplier_address,
       s_phone    AS supplier_phone_no,
       s_acctbal  AS supplier_account_balance,
       s_comment  AS supplier_comment,
       nat.n_name AS supplier_nation,
       reg.r_name AS supplier_region
FROM   supplier sup
       INNER JOIN nation nat
               ON ( nat.n_nationkey = sup.s_nationkey )
       INNER JOIN region reg
               ON ( reg.r_regionkey = nat.n_regionkey ) 
"""
tpch_star_schema_sql['dimpart'] = """
SELECT p_partkey   AS partkey,
       p_name      AS part_name,
       p_mfgr      AS manufacturer,
       p_brand     AS brand,
       p_type      AS type,
       p_size      AS size,
       p_container AS container
FROM   part
"""
tpch_star_schema_sql['dimorderinfo'] = """
SELECT Row_number() OVER( ORDER BY NULL ) AS orderinfokey,
       t.*
FROM   (SELECT DISTINCT ord.o_shippriority  AS ship_priority,
                        lit.l_shipmode      AS ship_mode,
                        ord.o_orderpriority AS order_priority,
                        ord.o_orderstatus   AS order_status,
                        lit.l_shipinstruct  AS ship_instruct,
                        lit.l_returnflag    AS return_flag,
                        lit.l_linestatus    AS line_status
        FROM   orders ord
               INNER JOIN lineitem lit
                            ON ( lit.l_orderkey = ord.o_orderkey )) t 
"""
tpch_star_schema_sql['factpartsupp'] = """
SELECT ps_partkey    AS partkey,
       ps_suppkey    AS suppkey,
       ps_availqty   AS availqty,
       ps_supplycost AS supplycost
FROM   partsupp 
"""
tpch_star_schema_sql['factorderline'] = """
SELECT lit.l_partkey     AS partkey,
       lit.l_suppkey     AS suppkey,
       ord.o_custkey     AS custkey,
       ord.o_orderkey    AS orderkey,
       inf.orderinfokey  AS orderinfokey,
       lit.l_shipdate    AS shipdate,
       ord.o_orderdate   AS orderdate,
       lit.l_commitdate  AS commitdate,
       lit.l_receiptdate AS receiptdate,
       lit.l_quantity    AS quantity,
       prt.p_retailprice AS retailprice,
       lit.l_discount    AS discount,
       psp.ps_supplycost AS supplycost,
       lit.l_tax         AS tax,
       ord.o_comment     AS order_comment
FROM   orders ord
       INNER JOIN lineitem lit
                    ON ( ord.o_orderkey = lit.l_orderkey )
       INNER JOIN partsupp psp
                    ON ( psp.ps_partkey = lit.l_partkey
                         AND psp.ps_suppkey = lit.l_suppkey )
       INNER JOIN part prt
                    ON ( prt.p_partkey = lit.l_partkey )
       LEFT OUTER JOIN dimorderinfo inf
                    ON ( inf.ship_priority = ord.o_shippriority
                         AND inf.ship_mode = lit.l_shipmode
                         AND inf.order_priority = ord.o_orderpriority
                         AND inf.order_status = ord.o_orderstatus
                         AND inf.ship_instruct = lit.l_shipinstruct
                         AND inf.return_flag = lit.l_returnflag
                         AND inf.line_status = lit.l_linestatus ) 
"""

# Create Parquet files based on sql and build a view on top

In [6]:
for tbl, sql in tpch_star_schema_sql.items():
    filePath = f"{curatedPath}/{tbl}.{curatedFormat}"
    cursor.execute(f"COPY ({sql}) TO '{filePath}' (FORMAT '{curatedFormat}');")
    cursor.execute(f"CREATE OR REPLACE VIEW {tbl} AS SELECT * from '{filePath}'")

# Test Data

In [7]:
sqlTest = """
SELECT
    COUNT(*)
FROM
    factorderline
"""
cursor.execute(sqlTest).fetchdf()

Unnamed: 0,count_star()
0,6001215
