# TPCDS: Preprocessing, DB Setup and Data Load Script

In [96]:
import sys, os, re
import psycopg2
import numpy as np
import pandas as pd
from dotenv import load_dotenv
from psycopg2 import Error
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
load_dotenv()


True

In [97]:
# set up connection variables
db_host = os.getenv("PGHOST")
db_port = os.getenv("PGPORT")
db_user = os.getenv("PGUSER")
db_pass = os.getenv("PGPASSWORD")

# function to connect with postgres
def connect_postgres(db_name = 'postgres'):
    try:
        # Connect to an existing database
        connection = psycopg2.connect(database = db_name)
        # Set auto-commit
        connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT);
        # Create a cursor to perform database operations
        cur = connection.cursor()
        # Print PostgreSQL details
        print("PostgreSQL server information")
        print(connection.get_dsn_parameters(), "\n")
        # Executing a SQL query
        cur.execute("SELECT version();")
        # Fetch result
        record = cur.fetchone()
        print("You are connected to - ", record, "\n")

    except (Exception, Error) as error:
        print("Error while connecting to PostgreSQL", error)
    else:
        return cur

In [98]:
# connect to postgres

cur = connect_postgres()

PostgreSQL server information
{'user': 'postgres', 'channel_binding': 'prefer', 'dbname': 'postgres', 'host': 'localhost', 'port': '25433', 'options': '', 'sslmode': 'prefer', 'sslcompression': '0', 'sslcertmode': 'allow', 'sslsni': '1', 'ssl_min_protocol_version': 'TLSv1.2', 'gssencmode': 'prefer', 'krbsrvname': 'postgres', 'gssdelegation': '0', 'target_session_attrs': 'any', 'load_balance_hosts': 'disable'} 

You are connected to -  ('PostgreSQL 17.0 (Debian 17.0-1.pgdg120+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit',) 



In [99]:
# drop tpcds db

db_name = "tpcds"

cur.execute(
    f"DROP DATABASE IF EXISTS {db_name} WITH (FORCE);"
)
print("SQL Status Output:\n", cur.statusmessage)

SQL Status Output:
 DROP DATABASE


In [100]:
# change win1252 encoding temp db to normal before drop
try:
    cur.execute(
        "ALTER DATABASE win1252_temp is_template false;"
    )
except Exception as e:
    print(e)
else:
    print("SQL Status Output:\n", cur.statusmessage)

SQL Status Output:
 ALTER DATABASE


In [101]:
# drop win1252 encoding temp db (after set to normal db)
cur.execute(
    "DROP DATABASE IF EXISTS win1252_temp WITH (FORCE);"
)
print("SQL Status Output:\n", cur.statusmessage)

SQL Status Output:
 DROP DATABASE


In [102]:
# create win1252 encoding temp db
cur.execute(
    """
    
    CREATE DATABASE win1252_temp
        WITH
        OWNER = postgres
        TEMPLATE = template0
        ENCODING = 'WIN1252'
        CONNECTION LIMIT = -1
        IS_TEMPLATE = True;

    """
)
print("SQL Status Output:\n", cur.statusmessage)

SQL Status Output:
 CREATE DATABASE


In [103]:
# create tpcds db
cur.execute(
    f"""

    CREATE DATABASE {db_name}
        WITH
        OWNER = postgres
        TEMPLATE = win1252_temp
        ENCODING = 'WIN1252'
        CONNECTION LIMIT = -1
        IS_TEMPLATE = False;
        
    """
)
print("SQL Status Output:\n", cur.statusmessage)

SQL Status Output:
 CREATE DATABASE


In [104]:
# connect to tpcds db
cur = connect_postgres(db_name)

PostgreSQL server information
{'user': 'postgres', 'channel_binding': 'prefer', 'dbname': 'tpcds', 'host': 'localhost', 'port': '25433', 'options': '', 'sslmode': 'prefer', 'sslcompression': '0', 'sslcertmode': 'allow', 'sslsni': '1', 'ssl_min_protocol_version': 'TLSv1.2', 'gssencmode': 'prefer', 'krbsrvname': 'postgres', 'gssdelegation': '0', 'target_session_attrs': 'any', 'load_balance_hosts': 'disable'} 

You are connected to -  ('PostgreSQL 17.0 (Debian 17.0-1.pgdg120+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit',) 



In [105]:
# create tables for db
cur.execute(open("DSGen-software-code-3.2.0rc1/tools/tpcds.sql", "r").read())
print("SQL Status Output:\n", cur.statusmessage)
cur.execute(open("DSGen-software-code-3.2.0rc1/tools/tpcds_source.sql", "r").read())
print("SQL Status Output:\n", cur.statusmessage)

SQL Status Output:
 CREATE TABLE
SQL Status Output:
 CREATE TABLE


In [106]:
# get dir path

path = os.path.join(os.getcwd(), 'data', 'tmp_1')
files = os.listdir(path)
print(path)

/home/alfio/Desktop/DataWarehouse/TPC_DS/tpcds-benchmark/data/tmp_1


In [107]:
# function to get full absolute path of csv files containing data

def get_absolute_path(d):
    return [os.path.join(d, f) for f in os.listdir(d)]

In [108]:
# get full abosolute path of csv files containing data

files_abs_path = [p.replace('\\', '/') for p in get_absolute_path(path)]
print("Total files:", len(files_abs_path))
print("First few files...")
files_abs_path[:5]

Total files: 31
First few files...


['/home/alfio/Desktop/DataWarehouse/TPC_DS/tpcds-benchmark/data/tmp_1/customer_demographics_2_4.csv',
 '/home/alfio/Desktop/DataWarehouse/TPC_DS/tpcds-benchmark/data/tmp_1/customer_demographics_4_4.csv',
 '/home/alfio/Desktop/DataWarehouse/TPC_DS/tpcds-benchmark/data/tmp_1/dbgen_version_1_4.csv',
 '/home/alfio/Desktop/DataWarehouse/TPC_DS/tpcds-benchmark/data/tmp_1/store_1_4.csv',
 '/home/alfio/Desktop/DataWarehouse/TPC_DS/tpcds-benchmark/data/tmp_1/ship_mode_1_4.csv']

In [109]:
def exclude_non_csv_files(file_list):
    return list(filter(lambda x: x.endswith('.csv'), file_list))

files = exclude_non_csv_files(files)
files_abs_path = exclude_non_csv_files(files_abs_path)

In [110]:
import re

# Assume files_abs_path is defined and contains the list of file paths
file_count = 0

for iteration in range(0, 1):
    for file in files_abs_path:
        if 'dbgen_version' in file:
            with open(file, 'r', encoding='latin-1') as f:
                lines = f.readlines()
            new_lines = []
            for line in lines:
                # Remove the last '^' and any spaces after it
                line = re.sub(r'\^(?!.*\^)\s*', '', line)
                new_lines.append(line)
            with open(file, 'w', encoding='latin-1') as f:
                f.writelines(new_lines)
            file_count += 1
    print(f'\nIteration {iteration + 1} done!')
    print(f'{file_count} file(s) updated for extra column exclusion.')
    file_count = 0


Iteration 1 done!
1 file(s) updated for extra column exclusion.


In [111]:
# generate sql commands for loading data from csv to postgres db
# considers that csv files were generated in parallel stream

sql_commands_file = open('data_load_script.sql','w')

for file in files:
    underscore_index = [underscore_ind.start() for underscore_ind in re.finditer('_', file)]
    file_name = file[:underscore_index[-2]]
    file_path = os.path.join(path, file)
    sql_command = "COPY public."+file_name+" FROM '"+file_path+"' delimiter '^' CSV;\n"
    sql_commands_file.write(sql_command)

sql_commands_file.close()

In [81]:
os.environ

environ{'CLUTTER_IM_MODULE': 'ibus',
        'COLORTERM': 'truecolor',
        'CONDA_EXE': '/home/alfio/anaconda3/bin/conda',
        'CONDA_PYTHON_EXE': '/home/alfio/anaconda3/bin/python',
        'CONDA_SHLVL': '0',
        'DBUS_SESSION_BUS_ADDRESS': 'unix:path=/run/user/1000/bus',
        'DEBUGINFOD_URLS': 'https://debuginfod.ubuntu.com ',
        'DEFAULTS_PATH': '/usr/share/gconf/awesome.default.path',
        'DESKTOP_SESSION': 'awesome',
        'DISPLAY': ':1',
        'GDMSESSION': 'awesome',
        'GNOME_TERMINAL_SCREEN': '/org/gnome/Terminal/screen/8e760709_52bd_405a_bfdb_eca49d85649c',
        'GNOME_TERMINAL_SERVICE': ':1.52',
        'GPG_AGENT_INFO': '/run/user/1000/gnupg/S.gpg-agent:0:1',
        'GTK_IM_MODULE': 'ibus',
        'GTK_MODULES': 'gail:atk-bridge',
        'HOME': '/home/alfio',
        'JAVA_HOME': '/usr/lib/jvm/java-11-openjdk-amd64',
        'LANG': 'en_US.UTF-8',
        'LC_ADDRESS': 'it_IT.UTF-8',
        'LC_IDENTIFICATION': 'it_IT.UTF-8',
    

In [112]:
#insert

import subprocess
import os

db_name = "tpcds"

# psql command
command = [
    "psql",
    "-d", db_name,      # dbname
    "-f", ""  # SQL file to execute
]

# copy the env
env = os.environ.copy()

# run command as subprocess
subprocess.run(command, env=env)

 sr_returned_date_sk | sr_return_time_sk | sr_item_sk | sr_customer_sk | sr_cdemo_sk | sr_hdemo_sk | sr_addr_sk | sr_store_sk | sr_reason_sk | sr_ticket_number | sr_return_quantity | sr_return_amt | sr_return_tax | sr_return_amt_inc_tax | sr_fee | sr_return_ship_cost | sr_refunded_cash | sr_reversed_charge | sr_store_credit | sr_net_loss 
---------------------+-------------------+------------+----------------+-------------+-------------+------------+-------------+--------------+------------------+--------------------+---------------+---------------+-----------------------+--------+---------------------+------------------+--------------------+-----------------+-------------
(0 rows)



CompletedProcess(args=['psql', '-d', 'tpcds', '-c', 'select * from store_returns'], returncode=0)

In [72]:
# add constraints to db

cur.execute(open("tpcds_ri.sql", "r").read())
print("SQL Status Output:\n", cur.statusmessage)


SQL Status Output:
 ALTER TABLE


In [73]:
# close connection to db

cur.close()

#### End of script.