In [1]:
import pandas as pd
import json
import re
import math
import numpy as np
from collections import defaultdict
import psycopg2
import time
import os
import sys
from tqdm import tqdm

In [2]:
dataset = 'tpcds_sf1'
tmp_data_dir = 'data/tpcds_sf1'


# column_type_file = os.path.join(os.path.dirname(__file__), f'../zsce/cross_db_benchmark/datasets/{dataset}/column_type.json')
column_type_file = os.path.join(f'/home/wuy/DB/memory_prediction/zsce/cross_db_benchmark/datasets/{dataset}/column_type.json')
with open(column_type_file, 'r') as f:
    column_type = json.load(f)

schema = {}
for table, columns in column_type.items():
    if table == 'dbgen_version':
        continue
    for column, type_ in columns.items():
        if table not in schema:
            schema[table] = []
        schema[table].append(column)

# Define table aliases as their original names for tpcds by iterating tpcds_schema, meanwhile get col2idx
t2alias = {}
col2idx = {}
for table, columns in schema.items():
    for column in columns:
        t2alias[table] = table
        col2idx[table + '.' + column] = len(col2idx)

alias2t = {v: k for k, v in t2alias.items()}

In [96]:
# Database connection parameters
DB_PARAMS = {
    'database': dataset,
    'user': "wuy",
    'host': "127.0.0.1",
    'password': "wuy",
    'port': "5432"
}
conn = psycopg2.connect(**DB_PARAMS)
conn.set_session(autocommit=True)
cur = conn.cursor()

In [97]:
def to_vals(data_list):
    for dat in data_list:
        val = dat[0]
        if val is not None: break
    try:
        float(val)
        return np.array(data_list, dtype=float).squeeze()
    except:
#         print(val)
        res = []
        for dat in data_list:
            try:
                mi = dat[0].timestamp()
            except:
                mi = 0
            res.append(mi)
        return np.array(res)

## Histogram

In [100]:
hist_file = pd.DataFrame(columns=['table','column','bins','table_column'])
# load hist_file if exists
hist_file_path = os.path.join(tmp_data_dir, 'hist_file.csv')
if os.path.exists(hist_file_path):
    hist_file = pd.read_csv(hist_file_path)
else:
    for table,columns in schema.items():
        for column in tqdm(columns, desc=table):
            cmd = 'select {} from {} as {}'.format(column, table,t2alias[table])
            cur.execute(cmd)
            col = cur.fetchall()
            col_array = to_vals(col)
            
            hists = np.nanpercentile(col_array, range(0,101,2), axis=0)
            freqs, _ = np.histogram(col_array, bins=hists)

            freq_bytes = freqs.astype('float32').tobytes()
            freq_hex = freq_bytes.hex()
            
            res_dict = {
                'table':table,
                'column':column,
                'table_column': '.'.join((table, column)),
                'bins':hists,
                'freq': freq_hex
            }
            hist_file = pd.concat([hist_file, pd.DataFrame([res_dict])], ignore_index=True)
        hist_file.to_csv(hist_file_path, index=False)



In [101]:
cur.close()
conn.close()

## Sample
### Steps (There may be other easier methods)
1. generate 1000 sample points for each table
2. duplicate database schema from full db
    > pg_dump imdb -s -O > imdb_schema.sql
3. create small base by in psql
    > create database imdb_sample
4. create schema using imdb_schema.sql
5. load the sample data using pandas and sqlalchemy
6. query the small base to get sample bitmaps for each predicate

Step 1

In [102]:
DB_PARAMS = {
    'database': 'tpcds_sample',
    'user': "wuy",
    'host': "127.0.0.1",
    'password': "wuy",
    'port': "5432"
}
conm = psycopg2.connect(**DB_PARAMS)
conm.set_session(autocommit=True)
cur = conm.cursor()

In [103]:
## sampling extension
try:
    cmd = 'CREATE EXTENSION tsm_system_rows'
    cur.execute(cmd)
except Exception as e:
    print(e)
    pass

extension "tsm_system_rows" already exists



In [12]:
import pickle
# load sample_data from file if exists
sample_data_file = os.path.join(tmp_data_dir, "sample_data.pkl")
if os.path.exists(sample_data_file):
    with open(sample_data_file, "rb") as f:
        sample_data = pickle.load(f)
else:
    tables = list(schema.keys())
    sample_data = {}
    for table in tables:
        cur.execute("Select * FROM {} LIMIT 0".format(table))
        colnames = [desc[0] for desc in cur.description]

        ts = pd.DataFrame(columns = colnames)

        for num in tqdm(range(1000), desc=table):
            cmd = 'SELECT * FROM {} TABLESAMPLE SYSTEM_ROWS(1)'.format(table)
            cur.execute(cmd)
            samples = cur.fetchall()
            for i,row in enumerate(samples):
                ts.loc[num]=row
        
        sample_data[table] = ts
    with open(sample_data_file, "wb") as f:
        pickle.dump(sample_data, f)

In [10]:
sample_data.keys()

dict_keys(['customer_address', 'customer_demographics', 'date_dim', 'warehouse', 'ship_mode', 'time_dim', 'reason', 'income_band', 'item', 'store', 'call_center', 'customer', 'web_site', 'store_returns', 'household_demographics', 'web_page', 'promotion', 'catalog_page', 'inventory', 'catalog_returns', 'web_returns', 'web_sales', 'catalog_sales', 'store_sales'])

Step 5 (Do step 2-4 outside first)

In [13]:
from sqlalchemy import create_engine
engine = create_engine('postgresql://wuy:wuy@localhost:5432/tpcds_sample')

In [14]:
for k,v in tqdm(sample_data.items()):
    try:
        v['sid'] = list(range(1000))
        cmd = 'alter table {} add column sid integer'.format(k)
        cur.execute(cmd)
        v.to_sql(k,engine,if_exists='append',index=False)
    except Exception as e:
        print(e)

100%|██████████| 24/24 [00:00<00:00, 765.70it/s]

column "sid" of relation "customer_address" already exists

column "sid" of relation "customer_demographics" already exists

column "sid" of relation "date_dim" already exists

column "sid" of relation "warehouse" already exists

column "sid" of relation "ship_mode" already exists

column "sid" of relation "time_dim" already exists

column "sid" of relation "reason" already exists

column "sid" of relation "income_band" already exists

column "sid" of relation "item" already exists

column "sid" of relation "store" already exists

column "sid" of relation "call_center" already exists

column "sid" of relation "customer" already exists

column "sid" of relation "web_site" already exists

column "sid" of relation "store_returns" already exists

column "sid" of relation "household_demographics" already exists

column "sid" of relation "web_page" already exists

column "sid" of relation "promotion" already exists

column "sid" of relation "catalog_page" already exists

column "sid" of rela




Step 6

In [15]:
import sqlparse
from sqlparse.sql import Comparison, Where
from sqlparse.tokens import Keyword, DML
import re

def is_number(value):
    """
    Check if the given value is a number.
    
    Args:
        value (str): The value to check.
        
    Returns:
        bool: True if value is a number, False otherwise.
    """
    try:
        float(value)
        return True
    except ValueError:
        return False

def extract_numeric_predicates(sql_query):
    """
    Extract predicates from the SQL WHERE clause where the right side of the operator is a number.
    
    Args:
        sql_query (str): The SQL query string.
        
    Returns:
        list: A list of numeric predicates as strings.
    """
    # Parse the SQL query
    parsed = sqlparse.parse(sql_query)
    if not parsed:
        return []
    
    stmt = parsed[0]
    numeric_predicates = []

    def extract_from_tokens(tokens):
        """
        Recursively traverse tokens to find numeric comparisons.
        
        Args:
            tokens (list): List of sqlparse tokens.
        """
        for token in tokens:
            if isinstance(token, Comparison):
                # Extract the comparison string
                comparison = str(token).strip()
                
                # Regex to split the comparison into left, operator, and right
                match = re.match(r'(.+?)(=|<>|<=|>=|<|>)(.+)', comparison)
                if match:
                    left, operator, right = match.groups()
                    left = left.strip()
                    operator = operator.strip()
                    right = right.strip()
                    
                    # Remove surrounding quotes from strings
                    if right.startswith("'") and right.endswith("'"):
                        continue  # It's a string predicate; skip
                    if right.startswith('"') and right.endswith('"'):
                        continue  # It's a string predicate; skip
                    
                    # Check if the right side is a number
                    if is_number(right):
                        numeric_predicates.append(comparison)
            elif token.is_group:
                # Recursively handle sub-tokens
                extract_from_tokens(token.tokens)

    # Iterate through the tokens to find the WHERE clause
    for token in stmt.tokens:
        if isinstance(token, Where):
            extract_from_tokens(token.tokens)
            break  # Assuming only one WHERE clause

    return numeric_predicates


In [None]:
# table_samples = []
# for i,row in query_file.iterrows():
#     table_sample = {}
#     preds = row['predicate'].split(',')
#     for i in range(0,len(preds),3):
#         left, op, right = preds[i:i+3]
#         alias,col = left.split('.')
#         table = alias2t[alias]
#         pred_string = ''.join((col,op,right))
#         q = 'select sid from {} where {}'.format(table, pred_string)
#         cur.execute(q)
#         sps = np.zeros(1000).astype('uint8')
#         sids = cur.fetchall()
#         sids = np.array(sids).squeeze()
#         if sids.size>1:
#             sps[sids] = 1
#         if table in table_sample:
#             table_sample[table] = table_sample[table] & sps
#         else:
#             table_sample[table] = sps
#     table_samples.append(table_sample)

In [20]:
data_dir = '/home/wuy/DB/pg_mem_data'

# load table_sample from file if exists
table_sample_file = os.path.join(tmp_data_dir, 'table_samples.pkl')
if os.path.exists(table_sample_file):
    with open(table_sample_file, 'rb') as f:
        table_samples = pickle.load(f)
    print('Loaded table_samples from file.')
else:
    with open(os.path.join(data_dir, dataset, 'tiny_plans.json')) as f:
        plans = json.load(f)

    table_pattern = r'\"([a-zA-Z_]+)\"\.'
    column_pattern = r'\.\"([a-zA-Z_]+)\"'

    table_samples = []
    for plan in tqdm(plans):
        table_sample = {}
        predicates = extract_numeric_predicates(plan['sql'])
        # print(plan['sql'])
        for predicate in predicates:
            try:
                table_name = re.search(table_pattern, predicate).group(1)
                column_name = re.search(column_pattern, predicate).group(1)
                if column_type[table_name][column_name] == 'char':
                    continue
                q = 'select sid from {} where {}'.format(table_name, predicate)
                cur.execute(q)
                sps = np.zeros(1000).astype('uint8')
                sids = cur.fetchall()
                sids = np.array(sids).squeeze()
                if sids.size>1:
                    sps[sids] = 1
                if table_name in table_sample:
                    table_sample[table_name] = table_sample[table_name] & sps
                else:
                    table_sample[table_name] = sps
            except Exception as e:
                print(f"Error: {e}")
        # if len(table_sample) > 0:
        table_samples.append(table_sample)

    import pickle
    # Save table_samples to file
    with open(table_sample_file, 'wb') as f:
        pickle.dump(table_samples, f)


  0%|          | 0/100 [00:00<?, ?it/s]

100%|██████████| 100/100 [00:00<00:00, 265.60it/s]


In [19]:
table_samples

[{'store_sales': array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0

In [72]:
query_file = pd.read_csv('data/imdb/workloads/synthetic.csv',sep='#',header=None)
query_file.columns = ['table','join','predicate','card']

In [73]:
query_file.head()

Unnamed: 0,table,join,predicate,card
0,cast_info ci,,"ci.person_id,=,172968",838
1,"title t,movie_info mi",t.id=mi.movie_id,"t.kind_id,<,3,t.production_year,=,2008,mi.info...",297013
2,"title t,cast_info ci",t.id=ci.movie_id,"ci.person_id,<,3194645",31427248
3,"title t,cast_info ci,movie_info mi","t.id=ci.movie_id,t.id=mi.movie_id","ci.person_id,=,1742124,ci.role_id,>,2,mi.info_...",12
4,"title t,cast_info ci,movie_info_idx mi_idx","t.id=ci.movie_id,t.id=mi_idx.movie_id","t.kind_id,=,7,t.production_year,>,0,ci.role_id...",733244


In [76]:
conm = psycopg2.connect(database="imdb", user="wuy", host="127.0.0.1",password="wuy", port="5432")
conm.set_session(autocommit=True)
cur = conm.cursor()

In [77]:
table_samples = []
for i,row in query_file.iterrows():
    table_sample = {}
    preds = row['predicate'].split(',')
    for i in range(0,len(preds),3):
        left, op, right = preds[i:i+3]
        alias,col = left.split('.')
        table = alias2t[alias]
        pred_string = ''.join((col,op,right))
        q = 'select sid from {} where {}'.format(table, pred_string)
        cur.execute(q)
        sps = np.zeros(1000).astype('uint8')
        sids = cur.fetchall()
        sids = np.array(sids).squeeze()
        if sids.size>1:
            sps[sids] = 1
        if table in table_sample:
            table_sample[table] = table_sample[table] & sps
        else:
            table_sample[table] = sps
    table_samples.append(table_sample)

KeyError: 'ci'

In [85]:
# table_samples