In [None]:
import psycopg2
import math
import numpy as np
import pandas as pd
from typing import List, Literal, Dict, Any, Tuple, Optional
from decimal import Decimal, ROUND_HALF_UP
from datetime import datetime
import os

In [None]:
#construct PostgreSQL Database
db = psycopg2.connect(
    host="localhost",
    port=5432,
    user="postgres",
    #password="ipProject",
    database="tpch_db"
)
tpch_cursor = db.cursor()
tpch_cursor.execute("SELECT tablename FROM pg_tables WHERE schemaname='public';")

In [None]:
#Statements for Table creation
createTable = """
CREATE TABLE REGION  ( R_REGIONKEY  INTEGER NOT NULL,
                            R_NAME       CHAR(25) NOT NULL,
                            R_COMMENT    VARCHAR(152),
                     PRIMARY KEY(R_REGIONKEY));
CREATE TABLE NATION  ( N_NATIONKEY  INTEGER NOT NULL,
                            N_NAME       CHAR(25) NOT NULL,
                            N_REGIONKEY  INTEGER NOT NULL,
                            N_COMMENT    VARCHAR(152),
                     PRIMARY KEY(N_NATIONKEY),
                     FOREIGN KEY (N_REGIONKEY) REFERENCES REGION (R_REGIONKEY));
 
CREATE TABLE CUSTOMER ( C_CUSTKEY     INTEGER NOT NULL,
                             C_NAME        VARCHAR(25) NOT NULL,
                             C_ADDRESS     VARCHAR(40) NOT NULL,
                             C_NATIONKEY   INTEGER NOT NULL,
                             C_PHONE       CHAR(15) NOT NULL,
                             C_ACCTBAL     DECIMAL(15,2)   NOT NULL,
                             C_MKTSEGMENT  CHAR(10) NOT NULL,
                             C_COMMENT     VARCHAR(117) NOT NULL,
                             PRIMARY KEY(C_CUSTKEY),
                             FOREIGN KEY (C_NATIONKEY) REFERENCES NATION (N_NATIONKEY));

CREATE TABLE ORDERS  ( O_ORDERKEY       INTEGER NOT NULL,
                           O_CUSTKEY        INTEGER NOT NULL,
                           O_ORDERSTATUS    CHAR(1) NOT NULL,
                           O_TOTALPRICE     DECIMAL(15,2) NOT NULL,
                           O_ORDERDATE      DATE NOT NULL,
                           O_ORDERPRIORITY  CHAR(15) NOT NULL,  
                           O_CLERK          CHAR(15) NOT NULL, 
                           O_SHIPPRIORITY   INTEGER NOT NULL,
                           O_COMMENT        VARCHAR(79) NOT NULL,
                           PRIMARY KEY(O_ORDERKEY),
                           FOREIGN KEY(O_CUSTKEY) REFERENCES CUSTOMER (C_CUSTKEY));

CREATE TABLE SUPPLIER ( S_SUPPKEY     INTEGER NOT NULL,
                             S_NAME        CHAR(25) NOT NULL,
                             S_ADDRESS     VARCHAR(40) NOT NULL,
                             S_NATIONKEY   INTEGER NOT NULL,
                             S_PHONE       CHAR(15) NOT NULL,
                             S_ACCTBAL     DECIMAL(15,2) NOT NULL,
                             S_COMMENT     VARCHAR(101) NOT NULL, 
                            PRIMARY KEY(S_SUPPKEY),
                            FOREIGN KEY (S_NATIONKEY) REFERENCES NATION (N_NATIONKEY));

CREATE TABLE PART  ( P_PARTKEY     INTEGER NOT NULL,
                          P_NAME        VARCHAR(55) NOT NULL,
                          P_MFGR        CHAR(25) NOT NULL,
                          P_BRAND       CHAR(10) NOT NULL,
                          P_TYPE        VARCHAR(25) NOT NULL,
                          P_SIZE        INTEGER NOT NULL,
                          P_CONTAINER   CHAR(10) NOT NULL,
                          P_RETAILPRICE DECIMAL(15,2) NOT NULL,
                          P_COMMENT     VARCHAR(23) NOT NULL,
                        PRIMARY KEY(P_PARTKEY));

CREATE TABLE PARTSUPP ( PS_PARTKEY     INTEGER NOT NULL,
                             PS_SUPPKEY     INTEGER NOT NULL,
                             PS_AVAILQTY    INTEGER NOT NULL,
                             PS_SUPPLYCOST  DECIMAL(15,2)  NOT NULL,
                             PS_COMMENT     VARCHAR(199) NOT NULL,
                             PRIMARY KEY(PS_PARTKEY,PS_SUPPKEY),
                             FOREIGN KEY (PS_PARTKEY) REFERENCES PART (P_PARTKEY),
                             FOREIGN KEY (PS_SUPPKEY) REFERENCES SUPPLIER (S_SUPPKEY));

CREATE TABLE LINEITEM (L_ORDERKEY    INTEGER NOT NULL,
                             L_PARTKEY     INTEGER NOT NULL,
                             L_SUPPKEY     INTEGER NOT NULL,
                             L_LINENUMBER  INTEGER NOT NULL,
                             L_QUANTITY    DECIMAL(15,2) NOT NULL,
                             L_EXTENDEDPRICE  DECIMAL(15,2) NOT NULL,
                             L_DISCOUNT    DECIMAL(15,2) NOT NULL,
                             L_TAX         DECIMAL(15,2) NOT NULL,
                             L_RETURNFLAG  CHAR(1) NOT NULL,
                             L_LINESTATUS  CHAR(1) NOT NULL,
                             L_SHIPDATE    DATE NOT NULL,
                             L_COMMITDATE  DATE NOT NULL,
                             L_RECEIPTDATE DATE NOT NULL,
                             L_SHIPINSTRUCT CHAR(25) NOT NULL,
                             L_SHIPMODE     CHAR(10) NOT NULL,
                             L_COMMENT      VARCHAR(44) NOT NULL,
                            PRIMARY KEY(L_ORDERKEY, L_LINENUMBER),
                            FOREIGN KEY (L_ORDERKEY) REFERENCES ORDERS(O_ORDERKEY),
                            FOREIGN KEY (L_PARTKEY, L_SUPPKEY) REFERENCES PARTSUPP(PS_PARTKEY, PS_SUPPKEY));
"""

In [None]:
table = ['region','nation','customer','orders','supplier','part','partsupp','lineitem']

In [None]:
#Table creation
for i, sql in enumerate(createTable.split(';')[:-1]):
    tpch_cursor.execute(sql)
    print(f'Table {table[i]} Created...')

In [None]:
#Store database
db.commit()

In [None]:
#Insert Data

In [None]:
def changeType(dataType, data):
    if dataType == 'integer':
        return int(data)
    elif dataType == 'character' or dataType == 'character varying':
        return data
    elif dataType == 'numeric':
        return float(round(Decimal(data),2))
    elif dataType == 'date':
        return datetime.strptime(data, '%Y-%m-%d').date()
    #print(1)
    #return data

In [None]:
def getDataType(db,cursor):
    cursor.execute("select tablename from pg_tables where schemaname = 'public';")
    tables = [t[0] for t in cursor.fetchall()]

    dicType = {}
    dicCol = {}
    for table in tables:
        try:
            cursor.execute(f"""
                SELECT column_name, data_type
                FROM information_schema.columns
                WHERE table_name = '{table}'
                ORDER BY ordinal_position;
            """)
        except Exception as e:
            print('Wrong SQL statement: ',e)
            db.rollback()
            
        li1 = []
        li2 = []
        for col_name, data_type in cursor.fetchall():
            li1.append(col_name)
            li2.append(data_type)
        dicCol[table] = li1
        dicType[table] = li2

    return dicType, dicCol

In [None]:
dbType,dbCol = getDataType(db,tpch_cursor)

In [None]:
#Insert Data
for table in list(dbType.keys()):
    rows = []
    with open(f'tpch-dbgen/{table.lower()}.tbl', 'r', encoding='utf-8') as f:
        count = 1
        for line in f:
            insert = line.strip().split('|')[:-1]
            #print(insert)
            #print(insert)
            for i in range(len(insert)):
                insert[i] = changeType(dbType[table][i], insert[i])
                
            if any(x is None for x in insert):
                print(f'{count} None Data Exist in table {table}')
                #print(insert)
                count += 1
                #break
        #break
            rows.append(insert)
            
    placeholder = ','.join(['%s']*len(dbCol[table]))
    col = ','.join(dbCol[table])
    #print(rows)
    #print(tuple(insert))
    try:
        tpch_cursor.executemany(
            f'''
            insert into {table} ({col}) values({placeholder})
            ''',rows)
    except Exception as e:
            print('Wrong SQL statement: ',e)
            db.rollback()
    
    db.commit()
    print(f'Done for {table}')

In [None]:
#count the number of records
tpch_cursor.execute(
    '''
    select count(*) from orders;
    '''
)
tpch_cursor.fetchone()

In [None]:
db.close()