In [1]:
from pyspark.sql import SparkSession, DataFrame, Column, Row
from pyspark.sql.types import IntegerType

In [2]:
builder = SparkSession.builder \
    .appName("Basic Cardinality Estimation") \
    .config("spark.sql.cbo.enabled", True) \
    .config("spark.master", "local[*]") \
    .config("spark.sql.cbo.joinReorder.enabled", True) \
    .config("spark.sql.cbo.joinReorder.dp.threshold", 16) \
    .config("spark.sql.statistics.histogram.enabled", True) \
    .config("spark.sql.statistics.histogram.numBins", 25) \
    .enableHiveSupport()

spark = builder.getOrCreate()

In [3]:
import pandas as pd
import numpy as np

In [31]:
def get_uniform_ints_df(max_val):
    """Returns a single column dataframe of 500 integers sampled uniformly from [0,50)"""
    pd_df = pd.DataFrame(np.random.randint(0,max_val,[500], np.int))
    df = spark.createDataFrame(pd_df, ["x"])
    return df

def create_uniform_tables():
    """Creates 4 tables with single column of uniform distribution and computes table and column statistics"""
    table_names = ["A", "B", "C", "D"]
    for i in range(4):
        table_name = table_names[i]
        df = get_uniform_ints_df(25*(i+1))
        df.write.mode("overwrite").saveAsTable(table_name)
        spark.sql("Analyze Table " + table_name + " compute statistics")
        spark.sql("Analyze Table " + table_name + " compute statistics for columns " + "x")
        spark.sql("Describe extended " + table_name).show();
    

In [32]:
create_uniform_tables()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|                   x|              bigint|   null|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|            Database|             default|       |
|               Table|                   a|       |
|               Owner|        amogkamsetty|       |
|        Created Time|Sun Nov 18 02:35:...|       |
|         Last Access|Thu Jan 01 00:00:...|       |
|          Created By|         Spark 2.4.0|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|    Table Properties|[transient_lastDd...|       |
|          Statistics|2560 bytes, 500 rows|       |
|            Location|file:/mnt/c/Users...|       |
|       Serde Library|org.apache.hadoop...|       |
|         InputFormat|org.apache.hadoop...|       |
|        Out

In [33]:
stats = {}
stats["A"] = spark.sql("describe extended A x").rdd.collectAsMap()
stats["B"] = spark.sql("describe extended B x").rdd.collectAsMap()
stats["C"] = spark.sql("describe extended C x").rdd.collectAsMap()
stats["D"] = spark.sql("describe extended D x").rdd.collectAsMap()

In [34]:
result = spark.sql("Select * from A where A.x > 25")
print(result._jdf.queryExecution().optimizedPlan().stats().rowCount().get())
result.count()
stats["A"]

0


{'avg_col_len': '8',
 'bin_0': 'lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1',
 'bin_1': 'lower_bound: 0.0, upper_bound: 2.0, distinct_count: 2',
 'bin_10': 'lower_bound: 10.0, upper_bound: 11.0, distinct_count: 1',
 'bin_11': 'lower_bound: 11.0, upper_bound: 12.0, distinct_count: 1',
 'bin_12': 'lower_bound: 12.0, upper_bound: 13.0, distinct_count: 1',
 'bin_13': 'lower_bound: 13.0, upper_bound: 13.0, distinct_count: 1',
 'bin_14': 'lower_bound: 13.0, upper_bound: 14.0, distinct_count: 1',
 'bin_15': 'lower_bound: 14.0, upper_bound: 15.0, distinct_count: 1',
 'bin_16': 'lower_bound: 15.0, upper_bound: 16.0, distinct_count: 1',
 'bin_17': 'lower_bound: 16.0, upper_bound: 17.0, distinct_count: 1',
 'bin_18': 'lower_bound: 17.0, upper_bound: 18.0, distinct_count: 1',
 'bin_19': 'lower_bound: 18.0, upper_bound: 20.0, distinct_count: 2',
 'bin_2': 'lower_bound: 2.0, upper_bound: 3.0, distinct_count: 1',
 'bin_20': 'lower_bound: 20.0, upper_bound: 20.0, distinct_count: 1',
 'bin_21

## Graph Nets

In [96]:
import collections
import time

from graph_nets import utils_np
from graph_nets import utils_tf
from graph_nets.demos import models
import matplotlib.pyplot as plt
import tensorflow as tf
import itertools

In [167]:
SEED = 1
np.random.seed(SEED)
tf.set_random_seed(SEED)
table_names = ["A", "B", "C", "D"]
node_features = {}

In [178]:
def featurize_col_stats():
    for table_name, column_stats in stats.items():
        row_count = spark.sql("select * from "+table_name).count()
        feature = [row_count, \
                   column_stats['distinct_count'], \
                   column_stats['num_nulls'], \
                   column_stats['min'], \
                   column_stats['max'], \
                  column_stats['avg_col_len'], \
                  column_stats['max_col_len']]
        feature_np = np.asarray(feature, dtype=np.int32)
        node_features[table_name] = tf.convert_to_tensor(feature_np, dtype=tf.float32) 

In [179]:
featurize_col_stats()
node_features

{'A': <tf.Tensor 'Const_22:0' shape=(7,) dtype=float32>,
 'B': <tf.Tensor 'Const_25:0' shape=(7,) dtype=float32>,
 'C': <tf.Tensor 'Const_24:0' shape=(7,) dtype=float32>,
 'D': <tf.Tensor 'Const_23:0' shape=(7,) dtype=float32>}

In [195]:
def create_graph_dicts(num_examples, selectivity=True):
    """Generates the input and target graphs for training.
        Returns an array of input_graphs with featurized nodes
        Without selectivity, 4 choose 2 = 6 possible graphs
        With selectivity, generate 2 tables, and create a random selectivity on one of them"""
    input_graphs = []
    relation_tuples = []
    if not selectivity:
        #Do not want to use selectivity when generating samples
        #Total of 6 input graphs, ignore num_examples
        for combo in list(itertools.combinations(range(4), 2)):
            table1_name = table_names[combo[0]]
            table2_name = table_names[combo[1]]
            table1_features = node_features[table1_name]
            table2_features = node_features[table2_name]
            input_graphs.append({"nodes": [table1_features, table2_features], "receivers": [0, 1], "senders": [1, 0]})
            relation_tuples.append(combo)
        return input_graphs, relation_tuples
    else:
        return None, None

def create_target_graphs(batch_size, input_graphs, relation_tuples):
    target_graphs = []
    for i in range(batch_size):
        input_graph = utils_tf.get_graph(input_graphs, i)
        table1 = relation_tuples[i][0]
        table2 = relation_tuples[i][1]
        cardinality = spark.sql("select * from "+table_names[table1]+","+table_names[table2]+" where "+table_names[table1]+".x = "+table_names[table2]+".x").count()
        num_edges = input_graph.n_edge
        edges = tf.constant([[cardinality], [cardinality]])
        target_graphs.append(input_graph._replace(edges=edges, nodes=None))
    return utils_tf.concat(target_graphs, axis=0)

def create_data_ops(batch_size, selectivity=True):
    if selectivity:
        pass
    else:
        #ignore batch_size
        inputs_op, relation_tuples = create_graph_dicts(6, False)
        inputs_op = utils_tf.data_dicts_to_graphs_tuple(inputs_op)
        #inputs_op = utils_tf.fully_connect_graph_dynamic(inputs_op)
    
        targets_op = create_target_graphs(6, inputs_op, relation_tuples)
    
    return inputs_op, targets_op

def make_all_runnable_in_session(*args):
    return [utils_tf.make_runnable_in_session(a) for a in args]
        
        

In [196]:
#tf.reset_default_graph()
inputs_op_tf, targets_op_tf = create_data_ops(6, False)
inputs_op_tf, targets_op_tf

(GraphsTuple(nodes=<tf.Tensor 'data_dicts_to_graphs_tuple_12/concat_1:0' shape=(12, 7) dtype=float32>, edges=None, receivers=<tf.Tensor 'data_dicts_to_graphs_tuple_12/add:0' shape=(12,) dtype=int32>, senders=<tf.Tensor 'data_dicts_to_graphs_tuple_12/add_1:0' shape=(12,) dtype=int32>, globals=None, n_node=<tf.Tensor 'data_dicts_to_graphs_tuple_12/stack_1:0' shape=(6,) dtype=int32>, n_edge=<tf.Tensor 'data_dicts_to_graphs_tuple_12/stack:0' shape=(6,) dtype=int32>),
 GraphsTuple(nodes=None, edges=<tf.Tensor 'graph_concat_10/concat_edges:0' shape=(12, 1) dtype=int32>, receivers=<tf.Tensor 'graph_concat_10/add:0' shape=(12,) dtype=int32>, senders=<tf.Tensor 'graph_concat_10/add_1:0' shape=(12,) dtype=int32>, globals=None, n_node=<tf.Tensor 'graph_concat_10/concat_n_node:0' shape=(6,) dtype=int32>, n_edge=<tf.Tensor 'graph_concat_10/concat_n_edge:0' shape=(6,) dtype=int32>))

In [197]:
inputs_op_tf, targets_op_tf = make_all_runnable_in_session(inputs_op_tf, targets_op_tf)

with tf.Session() as sess:
    inputs_graphs, targets_graphs = sess.run([inputs_op_tf, targets_op_tf])
    print("Input Graphs: ",inputs_graphs)
    print("Target Graphs: ", targets_graphs)

Input Graphs:  GraphsTuple(nodes=array([[ 500.,   25.,    0.,    0.,   24.,    8.,    8.],
       [ 500.,   52.,    0.,    0.,   49.,    8.,    8.],
       [ 500.,   25.,    0.,    0.,   24.,    8.,    8.],
       [ 500.,   76.,    0.,    0.,   74.,    8.,    8.],
       [ 500.,   25.,    0.,    0.,   24.,    8.,    8.],
       [ 500.,  101.,    0.,    0.,   99.,    8.,    8.],
       [ 500.,   52.,    0.,    0.,   49.,    8.,    8.],
       [ 500.,   76.,    0.,    0.,   74.,    8.,    8.],
       [ 500.,   52.,    0.,    0.,   49.,    8.,    8.],
       [ 500.,  101.,    0.,    0.,   99.,    8.,    8.],
       [ 500.,   76.,    0.,    0.,   74.,    8.,    8.],
       [ 500.,  101.,    0.,    0.,   99.,    8.,    8.]], dtype=float32), edges=None, receivers=array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11], dtype=int32), senders=array([ 1,  0,  3,  2,  5,  4,  7,  6,  9,  8, 11, 10], dtype=int32), globals=None, n_node=array([2, 2, 2, 2, 2, 2], dtype=int32), n_edge=array([2, 2, 2, 

In [211]:
def compute_accuracy(target, output):
    """Computes accuracy of the final graph. Returns fraction of correctly solved cardinalities
    
        Target is the target graph
        Output is the output graph"""
    target_dicts = utils_np.graphs_tuple_to_data_dicts(target)
    output_dicts = utils_np.graphs_tuple_to_data_dicts(output)
    correctly_solved = []
    for td, od in zip(target_dicts, output_dicts):
        predicted1 = od['edges'][0]
        predicted2 = od['edges'][1]
        actual1 = td['edges'][0]
        actual2 = td['edges'][1]
        if predicted1==actual1 and predicted2==actual2:
            correctly_solved.append(1)
        else:
            correctly_solved.append(0)
    correct = np.mean(correctly_solved)
    return correct

def create_loss_ops(target_op, output_ops):
    if not isinstance(output_ops, collections.Sequence):
        output_ops = [output_ops]
    
    loss_ops = [
        tf.losses.mean_squared_error(target_op.edges, output_op.edges) for output_op in output_ops
    ]
    
    return loss_ops
    

## Training

In [218]:
#tf.reset_default_graph()

num_processing_steps_tr = 10
num_processing_steps_ge = 10

num_training_iterations = 10000000
batch_size_tr = 6
batch_size_ge = 6

inputs_op_tr, targets_op_tr = create_data_ops(
    batch_size_tr, False)
inputs_op_tr = utils_tf.set_zero_edge_features(inputs_op_tr, 1)
inputs_op_tr = utils_tf.set_zero_global_features(inputs_op_tr, 1)
#target_op_tr = utils_tf.set_zero_node_features(targets_op_tr, 1)

inputs_op_ge, targets_op_ge = create_data_ops(
    batch_size_ge, False)
inputs_op_ge = utils_tf.set_zero_edge_features(inputs_op_ge, 1)
inputs_op_ge = utils_tf.set_zero_global_features(inputs_op_ge, 1)
#target_op_ge = utils_tf.set_zero_node_features(targets_op_ge, 1)

# Instantiate the model.
model = models.EncodeProcessDecode(edge_output_size=1, node_output_size=None)
# A list of outputs, one per processing step.
output_ops_tr = model(inputs_op_tr, num_processing_steps_tr)
output_ops_ge = model(inputs_op_ge, num_processing_steps_ge)

# Loss.
loss_ops_tr = create_loss_ops(targets_op_tr, output_ops_tr)
loss_op_tr = sum(loss_ops_tr) / num_processing_steps_tr  # loss_ops_tr
loss_ops_ge = create_loss_ops(targets_op_ge, output_ops_ge)
loss_op_ge = loss_ops_ge[-1]

# Optimizer.
learning_rate = 1e-3
optimizer = tf.train.AdamOptimizer(learning_rate)
step_op = optimizer.minimize(loss_op_tr)

# Lets an iterable of TF graphs be output from a session as NP graphs.
inputs_op_tr, targets_op_tr = make_all_runnable_in_session(
    inputs_op_tr, targets_op_tr)
inputs_op_ge, targets_op_ge = make_all_runnable_in_session(
    inputs_op_ge, targets_op_ge)



In [219]:
# This cell resets the Tensorflow session, but keeps the same computational
# graph.

try:
  sess.close()
except NameError:
  pass
sess = tf.Session()
sess.run(tf.global_variables_initializer())

last_iteration = 0
logged_iterations = []
losses_tr = []
corrects_tr = []
solveds_tr = []
losses_ge = []
corrects_ge = []
solveds_ge = []

In [None]:
# How much time between logging and printing the current results.
log_every_seconds = 20

print("# (iteration number), T (elapsed seconds), "
      "Ltr (training loss), Lge (test/generalization loss), "
      "Ctr (training fraction nodes/edges labeled correctly), "
      "Str (training fraction examples solved correctly), "
      "Cge (test/generalization fraction nodes/edges labeled correctly), "
      "Sge (test/generalization fraction examples solved correctly)")

start_time = time.time()
last_log_time = start_time
for iteration in range(last_iteration, num_training_iterations):
  last_iteration = iteration
  train_values = sess.run({
      "step": step_op,
      "inputs": inputs_op_tr,
      "targets": targets_op_tr,
      "loss": loss_op_tr,
      "outputs": output_ops_tr
  })
  the_time = time.time()
  elapsed_since_last_log = the_time - last_log_time
  if elapsed_since_last_log > log_every_seconds:
    last_log_time = the_time
    test_values = sess.run({
        "targets": targets_op_ge,
        "loss": loss_op_ge,
        "outputs": output_ops_ge,
    })
    #print(train_values["targets"])
    #print(train_values["outputs"])
    correct_tr = compute_accuracy(train_values["targets"],
                                             train_values["outputs"][-1])
    correct_ge = compute_accuracy(test_values["targets"],
                                             test_values["outputs"][-1])
    elapsed = time.time() - start_time
    losses_tr.append(train_values["loss"])
    corrects_tr.append(correct_tr)
    losses_ge.append(test_values["loss"])
    corrects_ge.append(correct_ge)
    logged_iterations.append(iteration)
    print("# {:05d}, T {:.1f}, Ltr {:.4f}, Lge {:.4f}, Ctr {:.4f}, "
          "Cge {:.4f}".format(
              iteration, elapsed, train_values["loss"], test_values["loss"],
              correct_tr, correct_ge))

# (iteration number), T (elapsed seconds), Ltr (training loss), Lge (test/generalization loss), Ctr (training fraction nodes/edges labeled correctly), Str (training fraction examples solved correctly), Cge (test/generalization fraction nodes/edges labeled correctly), Sge (test/generalization fraction examples solved correctly)
# 01436, T 22.0, Ltr 9331994.0000, Lge 9330773.0000, Ctr 0.0000, Cge 0.0000
# 03353, T 40.0, Ltr 6370661.0000, Lge 6368956.0000, Ctr 0.0000, Cge 0.0000
# 05481, T 60.0, Ltr 3008800.2500, Lge 3007464.7500, Ctr 0.0000, Cge 0.0000
# 07578, T 80.0, Ltr 1052513.3750, Lge 1052007.0000, Ctr 0.0000, Cge 0.0000
# 09642, T 100.0, Ltr 349020.5938, Lge 348837.3125, Ctr 0.0000, Cge 0.0000
# 11680, T 120.0, Ltr 68396.0625, Lge 68312.1719, Ctr 0.0000, Cge 0.0000
# 13790, T 140.0, Ltr 791.1899, Lge 788.3756, Ctr 0.0000, Cge 0.0000
# 15920, T 160.0, Ltr 0.0169, Lge 0.0021, Ctr 0.0000, Cge 0.0000
# 18030, T 180.1, Ltr 7.8798, Lge 0.2998, Ctr 0.0000, Cge 0.0000
# 20121, T 200.1, Lt