## Imports

In [56]:
import psycopg2
from psycopg2 import connect, OperationalError, errorcodes, errors
import numpy as np
import math
import sys
from collections import defaultdict

## Setting up ENV and GLOBAL variables

In [57]:
dbname = "census"
user = "nightfury"
host = "localhost"
password = ""
db_relation = "adult"
reference_dataset = db_relation
target_dataset = "target_dataset"


# measure_attributes=['age','fnlwgt', 'education_num','capital_gain','capital_loss','hours_per_week']
# groupby_attributes=['workclass','education','occupation','relationship','race','sex','native_country','salary_range']
# aggregate_functions=['sum','avg','max','min','count']

measure_attributes=['age','capital_gain','hours_per_week']
groupby_attributes=['workclass','relationship','sex']
aggregate_functions=['avg','count']

#Connecting with local db
try:
    conn = psycopg2.connect(f"dbname='{dbname}' user='{user}' host='{host}' password='{password}'")
    cur = conn.cursor()
    print ("Connection to database successful")
except OperationalError as err:
    print_psycopg2_exception(err)
    conn = None

#To get top k utility views
K = 5

Connection to database successful


## Util functions

In [66]:
def print_psycopg2_exception(err):
    # get details about the exception
    err_type, err_obj, traceback = sys.exc_info()

    # get the line number when exception occured
    line_num = traceback.tb_lineno
    
    # print the connect() error
    print (f"""\npsycopg2 ERROR: {err} on linu number: {line_num}""")
    print (f"""psycopg2 traceback: {traceback} -- type: {err_type}""")

    # psycopg2 extensions.Diagnostics object attribute
    print (f"""\nextensions.Diagnostics: {err.diag}""")

    # print the pgcode and pgerror exceptions
    print (f"""pgerror: {err.pgerror}""")
    print (f"""pgcode: {err.pgcode}""", "\n")
    
def get_cursor(conn):
    cur = conn.cursor()
    return cur

def get_all_views(groupby_attributes, measure_attributes, aggregate_functions):
    views = np.array(np.meshgrid(groupby_attributes, measure_attributes, aggregate_functions)).T.reshape(-1,3)
    return views

def get_aggregate_function_measure_attribute_combinations(measure_attributes, aggregate_functions):
    combinations = np.array(np.meshgrid(measure_attributes, aggregate_functions)).T.reshape(-1,2)
    return combinations

def get_combined_aggregate_query(groupby_attribute, measure_attributes, aggregate_functions, target_relation, reference_relation):
    aggregate_function_measure_attribute_combinations = get_aggregate_function_measure_attribute_combinations(measure_attributes, aggregate_functions)
    aggregate_function_measure_combined_string = ','.join([f'{combination[1]}({combination[0]})' for combination in aggregate_function_measure_attribute_combinations])
    target_query, reference_query = combined_aggregate_query_generator(groupby_attribute, aggregate_function_measure_combined_string, target_relation, reference_relation)    
    return target_query, reference_query

def combined_aggregate_query_generator(groupby_attribute, aggregate_function_measure_combined_string, target_relation, reference_relation):
    target_query = f"""SELECT {groupby_attribute}, {aggregate_function_measure_combined_string} FROM {target_relation} GROUP BY {groupby_attribute}"""
    reference_query = f"""SELECT {groupby_attribute}, {aggregate_function_measure_combined_string} FROM {reference_relation} GROUP BY {groupby_attribute}"""
    return target_query, reference_query
    
def basic_query_generator(groupby_attribute, measure_attribute, aggregate_function, target_relation, reference_relation):
    target_query = f"""SELECT {groupby_attribute}, {aggregate_function}({measure_attribute}) FROM {target_relation} GROUP BY {groupby_attribute}"""
    reference_query = f"""SELECT {groupby_attribute}, {aggregate_function}({measure_attribute}) FROM {reference_relation} GROUP BY {groupby_attribute}"""
    return target_query, reference_query

def phased_query_generator(groupby_attribute, measure_attribute, aggregate_function, target_relation, reference_relation, target_boundary, reference_boundary):
    target_query = f"""SELECT {groupby_attribute}, {aggregate_function}({measure_attribute}) 
                        FROM (SELECT * from {target_relation} offset {target_boundary[0]} row fetch next {target_boundary[1]} rows only) as X  
                        GROUP BY {groupby_attribute}"""
    
    reference_query = f"""SELECT {groupby_attribute}, {aggregate_function}({measure_attribute}) 
                        FROM (SELECT * from {reference_relation} offset {reference_boundary[0]} row fetch next {reference_boundary[1]} rows only) as X  
                        GROUP BY {groupby_attribute}"""
    
    return target_query, reference_query

def execute_get_query(cursor, query):
    try:
        cursor.execute(query)
    except Exception as err:
        print_psycopg2_exception(err)
        conn = None
        
    rows = cursor.fetchall()
    return rows

def get_key_based_values(target_result, reference_result):
    key_set = set()
    target_result_dict = {}
    reference_result_dict = {}
    final_target_values = []
    final_reference_values = []
    
    for key, value in target_result:
        key_set.add(key)
        target_result_dict[key] = float(value)
        
    for key, value in reference_result:
        key_set.add(key)
        reference_result_dict[key] = float(value)
        
    for key in key_set:
        final_target_values.append(target_result_dict.get(key, float(10e-20)) if target_result_dict.get(key, float(10e-20)) != 0.0 else float(10e-20))
        final_reference_values.append(reference_result_dict.get(key, float(10e-20)) if reference_result_dict.get(key, float(10e-20)) != 0.0 else float(10e-20))
        
    return final_target_values, final_reference_values
    
def get_valid_gropby_value(group_by_value):
    if group_by_value == 0.0:
        group_by_value = float(10e-20)
    return float(group_by_value)
    
def get_aggregate_function_measure_attribute_based_values(target_result, reference_result, aggregate_function_measure_attribute_combinations):
    """
    query example: SELECT workclass, avg(age),count(age),avg(capital_gain),count(capital_gain),avg(hours_per_week),count(hours_per_week) FROM target_dataset GROUP BY workclass
    target_result example:
    [('Self-emp-not-inc', Decimal('43.0000000000000000'), 1, Decimal('0E-20'), 1, Decimal('45.0000000000000000'), 1), 
    ('Private', Decimal('48.3333333333333333'), 3, Decimal('0E-20'), 3, Decimal('33.3333333333333333'), 3)]
    """
    key_set = set()
    target_result_dict = {}
    reference_result_dict = {}
    final_target_values = []
    final_reference_values = []
    no_combinations = len(aggregate_function_measure_attribute_combinations)
    combination_value_dict = {}
    
    for values in target_result:
        key_set.add(values[0])
        target_result_dict[values[0]] = list(map(get_valid_gropby_value, values[1:]))
        
    for values in reference_result:
        key_set.add(values[0])
        reference_result_dict[values[0]] = list(map(get_valid_gropby_value, values[1:]))
        
    for i, combination in enumerate(aggregate_function_measure_attribute_combinations):
        combination = tuple(combination)
        combination_value_dict[combination] = {"target_values":[], "reference_values":[]}
        for key in key_set:
            target_value = target_result_dict.get(key, [float(10e-20)]*no_combinations)[i]
            reference_value = reference_result_dict.get(key, [float(10e-20)]*no_combinations)[i]
            
            combination_value_dict[combination]["target_values"].append(target_value)
            combination_value_dict[combination]["reference_values"].append(reference_value)
            
    return combination_value_dict

def get_aggregate_function_measure_attribute_based_KL_divergence(aggregate_function_measure_attribute_based_values):
    kl_divergence_dict = {}
    for aggregate_function_measure_attribute_pair in aggregate_function_measure_attribute_based_values:
        target_vector = aggregate_function_measure_attribute_based_values[aggregate_function_measure_attribute_pair]['target_values']
        reference_vector = aggregate_function_measure_attribute_based_values[aggregate_function_measure_attribute_pair]['reference_values']
        kl_divergence_dict[aggregate_function_measure_attribute_pair] = calculate_kl_divergence(target_vector, reference_vector)
        
    return kl_divergence_dict
        
def calculate_kl_divergence(vector1, vector2):
    return sum(vector1[i] * math.log2(vector1[i]/vector2[i]) for i in range(len(vector1)))

def basic_get_top_k_utility_views(kl_divergence_view_mapping_list, k):
    top_k_views = [view for kl_divergence, view in sorted(kl_divergence_view_mapping_list)[0:k]]
    return top_k_views

def phased_get_top_k_utility_views(kl_divergence_view_mapping, k):
    sorted_items = sorted(kl_divergence_view_mapping.items(), key=lambda kv: kv[1])[0:k]
    return [item[0] for item in sorted_items]

def get_phase_partitions(total_rows, phases):
    return [total_rows // num_phases + (1 if x < total_rows % phases else 0) for x in range (phases)]


## Getting user input and setting the target_db

In [59]:
#test_query = select * from adult where relationship =' Unmarried'; 
try:
    attribute_value_input = input("Enter the attribute and associative value for the where clause seperated by space.")
    attribute, value = attribute_value_input.split(" ")
    query = f"select * from adult where {attribute} = '{value}'" 
    #cur = get_cursor(conn)
    cur.execute(f"""DROP table IF EXISTS {target_dataset};""")
    print(f"""Target dataset create command: create table {target_dataset} as {query};""")
    cur.execute(f"""create table {target_dataset} as {query};""")
    conn.commit()
    print(f"Creation of target table successfull.")
except Exception as err:
    print("Error in establishing target db")
    print_psycopg2_exception(err)
    conn = None
    
print(f"The reference dataset is {reference_dataset}")

Enter the attribute and associative value for the where clause seperated by space.relationship Unmarried
Target dataset create command: create table target_dataset as select * from adult where relationship = 'Unmarried';
Creation of target table successfull.
The reference dataset is adult


## Get all views

In [60]:
views = get_all_views(groupby_attributes, measure_attributes, aggregate_functions)

## Basic Implementation

In [73]:
def basic_implementation(views):
    kl_divergence_view_mapping_list = []
    for groupby_attribute, measure_attribute, aggregate_function in views:
        target_query, reference_query = basic_query_generator(groupby_attribute, measure_attribute, aggregate_function, target_dataset, reference_dataset)
        target_result = execute_get_query(get_cursor(conn), target_query)
        reference_result = execute_get_query(get_cursor(conn), reference_query)
        target_values, reference_values = get_key_based_values(target_result, reference_result)
        kl_divergence = calculate_kl_divergence(target_values, reference_values)
        kl_divergence_view_mapping_list.append((kl_divergence, (groupby_attribute, measure_attribute, aggregate_function)))
    
    return kl_divergence_view_mapping_list

## Phased Implementation

In [76]:
def phased_implementation(views):
    num_phases = 3
    target_dataset_rows = execute_get_query(get_cursor(conn), f"""select count(*) from {target_dataset};""")[0][0]
    reference_dataset_rows = execute_get_query(get_cursor(conn), f"""select count(*) from {reference_dataset};""")[0][0]
    target_dataset_partitions = get_phase_partitions(target_dataset_rows, num_phases)
    reference_dataset_partitions = get_phase_partitions(reference_dataset_rows, num_phases)

    # 13, 33
    # 13 -> 5, 4, 4 [1, 2, 3, 4, 5], [6, 7, 8, 9], [10, 11, 12, 13]
    # 33 -> 11, 11, 11
    target_start = reference_start = 0
    kl_divergence_view_mapping = defaultdict(int)
    for phase in range(num_phases):
        target_offset = target_dataset_partitions[phase]
        reference_offset = reference_dataset_partitions[phase]
        for groupby_attribute, measure_attribute, aggregate_function in views:
            target_query, reference_query = phased_query_generator(groupby_attribute, measure_attribute, 
                                                            aggregate_function, target_dataset, 
                                                            reference_dataset, [target_start, target_offset], 
                                                            [reference_start, reference_offset])
            
            target_result = execute_get_query(get_cursor(conn), target_query)
            reference_result = execute_get_query(get_cursor(conn), reference_query)
            target_values, reference_values = get_key_based_values(target_result, reference_result)
            curr_kl_divergence = calculate_kl_divergence(target_values, reference_values)
            kl_divergence_view_mapping[(groupby_attribute, measure_attribute, aggregate_function)] = (kl_divergence_view_mapping[(groupby_attribute, measure_attribute, aggregate_function)]*phase + curr_kl_divergence) / (phase + 1)
            
        target_start += target_offset
        reference_start += reference_offset
        
    return kl_divergence_view_mapping

## Final Result

In [77]:
kl_divergence_view_mapping_list = basic_implementation(views)
top_k_views = basic_get_top_k_utility_views(kl_divergence_view_mapping_list,  K)
print(top_k_views)

[('workclass', 'age', 'count'), ('workclass', 'capital_gain', 'count'), ('workclass', 'hours_per_week', 'count'), ('sex', 'age', 'count'), ('sex', 'capital_gain', 'count')]


In [78]:
kl_divergence_view_mapping = phased_implementation(views)
top_k_views = phased_get_top_k_utility_views(kl_divergence_view_mapping,  K)
print(top_k_views)

NameError: name 'num_phases' is not defined

## Shared Optimization

In [81]:
kl_divergence_view_mapping_list = []
aggregate_function_measure_attribute_combinations = get_aggregate_function_measure_attribute_combinations(measure_attributes, aggregate_functions)
for groupby_attribute in groupby_attributes:
    target_query, reference_query = get_combined_aggregate_query(groupby_attribute, measure_attributes, aggregate_functions, target_dataset, reference_dataset)
    target_result = execute_get_query(get_cursor(conn), target_query)
    reference_result = execute_get_query(get_cursor(conn), reference_query)
    aggregate_function_measure_attribute_based_values = get_aggregate_function_measure_attribute_based_values(target_result, reference_result, aggregate_function_measure_attribute_combinations)
    aggregate_function_measure_attribute_based_KL_divergence = get_aggregate_function_measure_attribute_based_KL_divergence(aggregate_function_measure_attribute_based_values)
    for aggregate_function_measure_attribute_pair in aggregate_function_measure_attribute_based_KL_divergence:
        kl_divergence = aggregate_function_measure_attribute_based_KL_divergence[aggregate_function_measure_attribute_pair]
        kl_divergence_view_mapping_list.append((kl_divergence, (groupby_attribute, aggregate_function_measure_attribute_pair[0], aggregate_function_measure_attribute_pair[1])))

In [82]:
top_k_views = basic_get_top_k_utility_views(kl_divergence_view_mapping_list,  K)
top_k_views

[('workclass', 'age', 'count'),
 ('workclass', 'capital_gain', 'count'),
 ('workclass', 'hours_per_week', 'count'),
 ('sex', 'age', 'count'),
 ('sex', 'capital_gain', 'count')]