In [1]:
import pandas as pd
import json
import re
import math
import numpy as np
from collections import defaultdict
import psycopg2
import time
import os

In [2]:


imdb_schema = {'title': ['t.id', 't.kind_id', 't.production_year'],
 'movie_companies': ['mc.id',
  'mc.company_id',
  'mc.movie_id',
  'mc.company_type_id'],
 'cast_info': ['ci.id', 'ci.movie_id', 'ci.person_id', 'ci.role_id'],
 'movie_info_idx': ['mi_idx.id', 'mi_idx.movie_id', 'mi_idx.info_type_id'],
 'movie_info': ['mi.id', 'mi.movie_id', 'mi.info_type_id'],
 'movie_keyword': ['mk.id', 'mk.movie_id', 'mk.keyword_id']}
t2alias = {'title':'t','movie_companies':'mc','cast_info':'ci',
          'movie_info_idx':'mi_idx','movie_info':'mi','movie_keyword':'mk'}
alias2t = {}
for k,v in t2alias.items(): alias2t[v] = k

In [3]:
conm = psycopg2.connect(database="imdb", user="wuy", host="127.0.0.1",password="wuy", port="5432")
conm.set_session(autocommit=True)
cur = conm.cursor()

In [4]:
def to_vals(data_list):
    '''
    convert a list of tuples to a numpy array of values
    '''
    for dat in data_list:
        val = dat[0]
        # finds first non-None value
        if val is not None: break 
    try:
        float(val)
        return np.array(data_list, dtype=float).squeeze()
    except:
#         print(val)
        res = []
        for dat in data_list:
            try:
                mi = dat[0].timestamp()
            except:
                mi = 0
            res.append(mi)
        return np.array(res)

## Histogram

In [5]:
hist_file_path ='data/imdb/new_hist_file.csv'
if os.path.exists(hist_file_path):
    hist_file = pd.read_csv(hist_file_path)
else:
    hist_file = pd.DataFrame(columns=['table','column','bins','table_column'])
    hist_file
    for table,columns in imdb_schema.items():
        for column in columns:
            cmd = 'select {} from {} as {}'.format(column, table,t2alias[table])
            cur.execute(cmd)
            col = cur.fetchall()
            col_array = to_vals(col)
            # calculate the percentile of the col_array, at intervals of 2%, forming the basis for the histogram bins.
            hists = np.nanpercentile(col_array, range(0,101,2), axis=0)
            res_dict = {
                'table':table,
                'column':column,
                'table_column': '.'.join((table, column)),
                'bins':hists
            }
            # hist_file = hist_file.append(res_dict,ignore_index=True)
            hist_file = pd.concat([hist_file, pd.DataFrame(res_dict)], ignore_index=True)
            hist_file.to_csv(hist_file_path, index=False)

In [6]:
hist_file

Unnamed: 0,table,column,bins,table_column
0,title,t.id,1.00,title.t.id
1,title,t.id,50567.22,title.t.id
2,title,t.id,101133.44,title.t.id
3,title,t.id,151699.66,title.t.id
4,title,t.id,202265.88,title.t.id
...,...,...,...,...
1015,movie_keyword,mk.keyword_id,46138.00,movie_keyword.mk.keyword_id
1016,movie_keyword,mk.keyword_id,58101.00,movie_keyword.mk.keyword_id
1017,movie_keyword,mk.keyword_id,71628.84,movie_keyword.mk.keyword_id
1018,movie_keyword,mk.keyword_id,84788.00,movie_keyword.mk.keyword_id


## Simpler Approach to sample without creating a smaller database

In [14]:
import pandas as pd
import numpy as np
import psycopg2
from sqlalchemy import create_engine
import os
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s:%(message)s')

NUM_SAMPLES_PER_TABLE = 1000  # Adjust as needed

def sample_table(cursor, table, alias, num_samples):
    """
    Samples a specified number of rows from a table using TABLESAMPLE SYSTEM_ROWS.

    Args:
        cursor: psycopg2 cursor object.
        table (str): Table name.
        alias (str): Alias for the table.
        num_samples (int): Number of samples to fetch.

    Returns:
        pd.DataFrame: Sampled data as a DataFrame.
    """
    try:
        # Calculate the sampling rate to approximate the desired number of samples
        # Note: SYSTEM_ROWS may not guarantee exact sample size; adjust as needed
        sampling_cmd = f"SELECT * FROM {table} AS {alias} TABLESAMPLE SYSTEM_ROWS({num_samples}) "
        cursor.execute(sampling_cmd)
        samples = cursor.fetchall()
        colnames = [desc[0] for desc in cursor.description]
        df = pd.DataFrame(samples, columns=colnames)
        logging.info(f"Sampled {len(df)} rows from table '{table}'.")
        return df
    except Exception as e:
        logging.error(f"Error sampling table '{table}': {e}")
        return pd.DataFrame()  # Return empty DataFrame on error

# Dictionary to hold sampled data
sampled_data = {}

for table, columns in imdb_schema.items():
    alias = t2alias[table]
    df = sample_table(cur, table, alias, NUM_SAMPLES_PER_TABLE)
    if not df.empty:
        # Assign a unique sample ID
        df['sid'] = range(len(df))
        sampled_data[table] = df


2024-10-18 19:53:41,203 INFO:Sampled 1000 rows from table 'title'.
2024-10-18 19:53:41,208 INFO:Sampled 1000 rows from table 'movie_companies'.
2024-10-18 19:53:41,213 INFO:Sampled 1000 rows from table 'cast_info'.
2024-10-18 19:53:41,217 INFO:Sampled 1000 rows from table 'movie_info_idx'.
2024-10-18 19:53:41,221 INFO:Sampled 1000 rows from table 'movie_info'.
2024-10-18 19:53:41,225 INFO:Sampled 1000 rows from table 'movie_keyword'.


In [15]:
# Path to the synthetic workload file
synthetic_workload_path = 'data/imdb/workloads/synthetic.csv'

# Check if the file exists
if not os.path.exists(synthetic_workload_path):
    logging.error(f"Synthetic workload file not found at '{synthetic_workload_path}'.")
    raise FileNotFoundError(f"Synthetic workload file not found at '{synthetic_workload_path}'.")

# Read the synthetic workload CSV
query_file = pd.read_csv(synthetic_workload_path, sep='#', header=None)
query_file.columns = ['table', 'join', 'predicate', 'card']
logging.info("Loaded synthetic workload queries.")


2024-10-18 19:54:01,452 INFO:Loaded synthetic workload queries.


In [17]:
def apply_predicates(sampled_data, predicates):
    """
    Applies a list of predicates to the sampled data and returns bitmaps.

    Args:
        sampled_data (dict): Dictionary of sampled DataFrames per table.
        predicates (list): List of predicates in triplet form [left, op, right].

    Returns:
        dict: Dictionary of bitmaps per table.
    """
    table_bitmaps = {}
    for i in range(0, len(predicates), 3):
        left, op, right = predicates[i:i+3]
        alias, col = left.split('.')
        table = alias2t.get(alias)
        if not table:
            logging.warning(f"Alias '{alias}' not found in alias2t mapping.")
            continue
        df = sampled_data.get(table)
        if df is None:
            logging.warning(f"No sampled data found for table '{table}'.")
            continue
        
        # Convert right to appropriate type
        # This example assumes integer comparisons; adjust as needed
        try:
            right_val = int(right)
        except ValueError:
            try:
                right_val = float(right)
            except ValueError:
                # Assume string if not numeric
                right_val = right.strip("'")  # Remove quotes if any
        
        # Build the query string dynamically
        if op in ['=', '==']:
            condition = df[col] == right_val
        elif op == '>':
            condition = df[col] > right_val
        elif op == '<':
            condition = df[col] < right_val
        elif op == '>=':
            condition = df[col] >= right_val
        elif op == '<=':
            condition = df[col] <= right_val
        elif op == '!=':
            condition = df[col] != right_val
        else:
            logging.warning(f"Unsupported operator '{op}' in predicate.")
            continue
        
        # Generate bitmap
        bitmap = condition.astype('uint8').values  # 1 for True, 0 for False
        if table in table_bitmaps:
            table_bitmaps[table] &= bitmap  # Combine with existing bitmap
        else:
            table_bitmaps[table] = bitmap
    return table_bitmaps


In [18]:
# List to hold bitmap results for each query
table_samples = []

for idx, row in query_file.iterrows():
    predicates = row['predicate'].split(',')
    bitmaps = apply_predicates(sampled_data, predicates)
    table_samples.append(bitmaps)
    logging.info(f"Processed query {idx}: Generated bitmaps.")

logging.info("Completed processing all synthetic workload queries.")


2024-10-18 19:59:18,652 INFO:Processed query 0: Generated bitmaps.
2024-10-18 19:59:18,656 INFO:Processed query 1: Generated bitmaps.
2024-10-18 19:59:18,657 INFO:Processed query 2: Generated bitmaps.
2024-10-18 19:59:18,658 INFO:Processed query 3: Generated bitmaps.
2024-10-18 19:59:18,660 INFO:Processed query 4: Generated bitmaps.
2024-10-18 19:59:18,662 INFO:Processed query 5: Generated bitmaps.
2024-10-18 19:59:18,664 INFO:Processed query 6: Generated bitmaps.
2024-10-18 19:59:18,666 INFO:Processed query 7: Generated bitmaps.
2024-10-18 19:59:18,668 INFO:Processed query 8: Generated bitmaps.
2024-10-18 19:59:18,670 INFO:Processed query 9: Generated bitmaps.
2024-10-18 19:59:18,671 INFO:Processed query 10: Generated bitmaps.
2024-10-18 19:59:18,673 INFO:Processed query 11: Generated bitmaps.
2024-10-18 19:59:18,674 INFO:Processed query 12: Generated bitmaps.
2024-10-18 19:59:18,678 INFO:Processed query 13: Generated bitmaps.
2024-10-18 19:59:18,681 INFO:Processed query 14: Generated

In [26]:
table_samples[1].keys()

dict_keys(['title', 'movie_info'])

In [28]:
sum(table_samples[1]['movie_info'])

828

In [21]:
bitmaps_df = pd.DataFrame(table_samples)
bitmaps_df.to_pickle('./data/imdb/workloads/synthetic_bimatps.pkl')

In [None]:
cur.close()
conn.close()

## Sample
### Steps (There may be other easier methods)
1. generate 1000 sample points for each table
2. duplicate database schema from full db
    > pg_dump imdb -s -O > imdb_schema.sql
3. create small base by in psql
    > create database imdb_sample
4. create schema using imdb_schema.sql
5. load the sample data using pandas and sqlalchemy
6. query the small base to get sample bitmaps for each predicate

Step 1

In [8]:
## sampling extension
cmd = 'CREATE EXTENSION tsm_system_rows' # enable system rows sampling
cur.execute(cmd)

DuplicateObject: extension "tsm_system_rows" already exists


In [9]:
tables = list(imdb_schema.keys())
sample_data = {}
for table in tables:
    cur.execute("Select * FROM {} LIMIT 0".format(table))
    colnames = [desc[0] for desc in cur.description] # cur.description provides metadata about the result set, including column names and data types.

    ts = pd.DataFrame(columns = colnames)

    for num in range(1000):
        cmd = 'SELECT * FROM {} TABLESAMPLE SYSTEM_ROWS(1)'.format(table) # return 1 row per table
        cur.execute(cmd)
        samples = cur.fetchall()
        for i,row in enumerate(samples):
            ts.loc[num]=row
    
    sample_data[table] = ts

  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[num]=row
  ts.loc[n

In [16]:
sample_data['title']

Unnamed: 0,id,title,imdb_index,kind_id,production_year,imdb_id,phonetic_code,episode_of_id,season_nr,episode_nr,series_years,md5sum
0,932539,You're On!,,7,2012,,Y65,932530,,,,a8966ecaddc04537483668396786e6ab
1,602982,(#1.50),,7,1988,,,599479,1,50,,3a9d3d3b953b1e0307bb48b97f1c0c0e
2,1418446,The Finale,,7,2009,,F54,1418427,1,4,,3f49c17f0698ebc54367fd05c6c61941
3,1066816,(#1.68),,7,2006,,,1066746,1,68,,eb23b6c6274ded914bd6603fe3d43a97
4,1637227,1981 Liberty Bowl,,3,1981,,L1631,,,,,67bc4d25f1186ff177f8f9448686fe25
...,...,...,...,...,...,...,...,...,...,...,...,...
995,1873124,Esercito e marina a Bir Tobras,,1,1912,,E2623,,,,,4adf2f9b06e9916bba2a006dadc94954
996,379939,(#1.4),,7,2007,,,379934,1,4,,966f98e1424f4a8d05442e950d5e96e0
997,1222418,(1966-06-02),,7,1966,,,1222059,,,,b78f601e69b05330112f0973c7d9092e
998,1708517,Backyard Fury,,1,2008,,B2631,,,,,dd17969cd068b60c2300fb5d1d53bf45


Step 5 (Do step 2-4 outside first)

In [50]:
from sqlalchemy import create_engine
engine = create_engine('postgresql://postgres:admin@localhost:5432/imdb_sample')

In [72]:
for k,v in sample_data.items():
    v['sid'] = list(range(1000))
    cmd = 'alter table {} add column sid integer'.format(k)
    cur.execute(cmd)
    v.to_sql(k,engine,if_exists='append',index=False) # insert the DataFrame `v` into the table `k` within the database `imdb_sample`

Step 6

In [44]:
query_file = pd.read_csv('data/imdb/workloads/synthetic.csv',sep='#',header=None)
query_file.columns = ['table','join','predicate','card']

In [53]:
query_file.head()

Unnamed: 0,table,join,predicate,card
0,cast_info ci,,"ci.person_id,=,172968",838
1,"title t,movie_info mi",t.id=mi.movie_id,"t.kind_id,<,3,t.production_year,=,2008,mi.info...",297013
2,"title t,cast_info ci",t.id=ci.movie_id,"ci.person_id,<,3194645",31427248
3,"title t,cast_info ci,movie_info mi","t.id=ci.movie_id,t.id=mi.movie_id","ci.person_id,=,1742124,ci.role_id,>,2,mi.info_...",12
4,"title t,cast_info ci,movie_info_idx mi_idx","t.id=ci.movie_id,t.id=mi_idx.movie_id","t.kind_id,=,7,t.production_year,>,0,ci.role_id...",733244


In [69]:
conm = psycopg2.connect(database="imdb_sample", user="postgres", host="127.0.0.1",password="admin", port="5432")
conm.set_session(autocommit=True)
cur = conm.cursor()

In [83]:
table_samples = []
for i,row in query_file.iterrows():
    table_sample = {}
    preds = row['predicate'].split(',')
    for i in range(0,len(preds),3):
        left, op, right = preds[i:i+3]
        alias,col = left.split('.')
        table = alias2t[alias]
        pred_string = ''.join((col,op,right))
        # Constructs a SQL query to select the `sid` (sample ID) from the current `table` where the predicate holds true.
        q = 'select sid from {} where {}'.format(table, pred_string)
        cur.execute(q)
        sps = np.zeros(1000).astype('uint8')
        sids = cur.fetchall()
        sids = np.array(sids).squeeze()
        if sids.size>1: # at least 1 sample satisfies the predicate
            sps[sids] = 1
        if table in table_sample:
            table_sample[table] = table_sample[table] & sps
        else:
            table_sample[table] = sps
    table_samples.append(table_sample)

In [85]:
# table_samples