In [12]:
import psycopg2
from psycopg2.extras import execute_batch
import pandas as pd

In [13]:
# establish a connection to the PostgreSQL database
conn = psycopg2.connect(
    dbname = "dw_cs", 
    user = "postgres", 
    #host= 'localhost',
    host= '172.31.160.1',
    password = "test31415",
    port = 5432
)

# Generic function to create table for partitioning

In [3]:
def create_partition(table, attribute, init_month, end_month, year):
    if end_month==12:
        end_month_part=1
        new_year=year+1
    else:
        end_month_part= str(int(end_month)+1)
        new_year=year
    
    query=f"""CREATE TABLE {table}_m{init_month}_m{end_month}_y{year} PARTITION OF {table}
    FOR VALUES FROM ('{year}-{init_month}-01') TO ('{new_year}-{end_month_part}-01');"""
    return query

# Lineitem on shipdate

In [14]:
with conn.cursor() as cur:
 
    cur.execute("DELETE FROM lineitem;") 
    cur.execute("DROP TABLE lineitem;")
    cur.execute("""
CREATE TABLE IF NOT EXISTS public.lineitem (
    l_orderkey INT,
    l_partkey INT,
    l_suppkey INT,
    l_linenumber INT,
    l_quantity DECIMAL,
    l_extendedprice DECIMAL,
    l_discount DECIMAL,
    l_tax DECIMAL,
    l_returnflag CHAR(1),
    l_linestatus CHAR(1),
    o_orderdate DATE,
    l_commitdate DATE,
    l_receiptdate DATE,
    l_shipinstruct CHAR(25),
    l_shipmode CHAR(10),
    l_comment VARCHAR(44)
) PARTITION BY RANGE (l_shipdate);
""")


In [None]:
def create_subpartition(table, init_month, end_month, year):
    if end_month==12:
        end_month_part=1
        new_year=year+1
    else:
        end_month_part= str(int(end_month)+1)
        new_year=year
    
    query=f"""CREATE TABLE {table}_m{init_month}_m{end_month}_y{year} PARTITION OF {table}
    FOR VALUES FROM ('{year}-{init_month}-01') TO ('{new_year}-{end_month_part}-01') PARTITION BY LIST(l_returnflag);

    CREATE TABLE {table}_m{init_month}_m{end_month}_y{year}_A PARTITION OF {table}_m{init_month}_m{end_month}_y{year}
    FOR VALUES IN ('A');
    
    CREATE TABLE {table}_m{init_month}_m{end_month}_y{year}_R PARTITION OF {table}_m{init_month}_m{end_month}_y{year}
    FOR VALUES IN ('R');
    
    CREATE TABLE {table}_m{init_month}_m{end_month}_y{year}_N PARTITION OF {table}_m{init_month}_m{end_month}_y{year}
    FOR VALUES IN ('N');"""
    return query

In [5]:
for year in range(1992,1999):
    for month in range(1,12,3):
        query=create_partition('lineitem', month, month+2, year)
        #create subpartition
        
        with conn.cursor() as cursor:
            cursor.execute(query)
            conn.commit()
            print('Partition created')

Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created


In [10]:
# Define the partition indexes
partition_indexes = [idx for idx in range(0, 7_000_000, 1_000_000)]    # 6e6 is the number of rows in the orders table

# Loop through each file
for i in range(1, 11):
    # Loop through each partition
    for j in range(len(partition_indexes) - 1):
        # Read the CSV file in chunks
        df = pd.read_csv(f"/home/saradsai/data_management/TPC-H/lineitem_8.csv", delimiter="|", header=None, skiprows=partition_indexes[j], nrows=partition_indexes[j + 1] - partition_indexes[j])
        
        # Convert the DataFrame to a list of tuples
        data = [tuple(row) for row in df.to_numpy()]
        print(f"Read {len(data)} rows from partition {j} of file {i}")

        # Insert the data into the database
        conn.rollback()
        with conn.cursor() as cur:
            execute_batch(
                cur,
                "INSERT INTO lineitem (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
                data
            )
            conn.commit()

    print(f"Inserted {len(data)} rows in the lineitem table from file {i}")

Read 1000000 rows from partition 0 of file 9
Read 1000000 rows from partition 1 of file 9
Read 1000000 rows from partition 2 of file 9
Read 1000000 rows from partition 3 of file 9
Read 1000000 rows from partition 4 of file 9
Read 999992 rows from partition 5 of file 9
Inserted 999992 rows in the lineitem table from file 9
Read 1000000 rows from partition 0 of file 10
Read 1000000 rows from partition 1 of file 10
Read 1000000 rows from partition 2 of file 10
Read 1000000 rows from partition 3 of file 10
Read 1000000 rows from partition 4 of file 10
Read 999992 rows from partition 5 of file 10
Inserted 999992 rows in the lineitem table from file 10


execution time for the population of the partition:
58 m 24 s+ 1m 56 s + 14 m 49 s

# Orders on orderdate

In [12]:
conn.close()

In [4]:
with conn.cursor() as cur:
    
    cur.execute("DELETE FROM orders;")  
    cur.execute("DROP TABLE orders;")
    cur.execute("""CREATE TABLE IF NOT EXISTS public.orders (
    o_orderkey INT,
    o_custkey INT,
    o_orederstatus CHAR(1),
    o_totalprice DECIMAL,
    o_orderdate DATE,
    o_orederpriority CHAR(15),
    o_clerk CHAR(15),
    o_shippriority INT,
    o_comment VARCHAR(79)
    --CONSTRAINT fk_customer FOREIGN KEY (o_custkey) REFERENCES customer(c_custkey)
)
PARTITION BY RANGE (o_orderdate);
                
""")
    conn.commit()


In [5]:
for year in range(1992,1999):
    for month in range(1,12,3):
        query=create_partition('orders', 'o_orderdate', month, month+2, year)
        with conn.cursor() as cursor:
            cursor.execute(query)
            conn.commit()
            print('Partition created')

Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created
Partition created


In [6]:
# Define the partition indexes
partition_indexes = [idx for idx in range(0, 7_000_000, 1_000_000)]    # 6e6 is the number of rows in the orders table


# Loop through each partition
for j in range(len(partition_indexes) - 1):
    # Read the CSV file in chunks
    df = pd.read_csv(f"/home/saradsai/data_management/TPC-H/orders.csv", delimiter="|", header=None, skiprows=partition_indexes[j], nrows=partition_indexes[j + 1] - partition_indexes[j])
    
    # Convert the DataFrame to a list of tuples
    data = [tuple(row) for row in df.to_numpy()]
    print(f"Read {len(data)} rows from partition {j} of file")

    # Insert the data into the database
    conn.rollback()
    with conn.cursor() as cur:
        execute_batch(
            cur,
            "INSERT INTO orders (o_orderkey, o_custkey, o_orederstatus, o_totalprice, o_orderdate, o_orederpriority, o_clerk, o_shippriority, o_comment) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
            data
        )
        conn.commit()


Read 1000000 rows from partition 0 of file
Read 1000000 rows from partition 1 of file
Read 1000000 rows from partition 2 of file
Read 1000000 rows from partition 3 of file
Read 1000000 rows from partition 4 of file
Read 1000000 rows from partition 5 of file


In [7]:
def insert_primary(table, attribute, init_month, end_month, year):
    
    query=f"""ALTER TABLE {table}_m{init_month}_m{end_month}_y{year} ADD PRIMARY KEY({attribute});
    ALTER TABLE {table}_m{init_month}_m{end_month}_y{year} ADD FOREIGN KEY(o_custkey) REFERENCES customer(c_custkey) """
    return query

In [8]:
for year in range(1992,1999):
    for month in range(1,12,3):
        query=insert_primary('orders', 'o_orderkey', month, month+2, year)
        with conn.cursor() as cursor:
            cursor.execute(query)
            conn.commit()
            print('Primary key created')

Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created
Primary key created


In [11]:
conn.close()