Produce vtreat data transform as a sequence of SQL statements, instead
of one large one. The idea is: it may be better to deal with a large
number of joins by sequencing through `UPDATE` instead of composition.

In [1]:

import datetime
import os
os.chdir('/Users/johnmount/Documents/work/pyvtreat/Examples/Database')
import sys
import numpy as np
import pandas as pd

from data_algebra.data_ops import *
import data_algebra.SQLite
import data_algebra.test_util
import vtreat
import vtreat.vtreat_db_adapter



In [2]:
# larger version of tests/test_db_adapter.py:test_db_adapter_monster()
def mk_example(n_rows:int = 100, n_vars:int = 50):
    step = 1/np.sqrt(n_vars)
    cols = dict()
    y = np.random.normal(size=n_rows)
    for i in range(n_vars):
        vname = f'v_{i}'
        v = np.random.choice(['a', 'b'], replace=True, size=n_rows)
        y = y + np.where(v == 'a', step, -step)
        cols[vname] = v
    vars = list(cols.keys())
    vars.sort()
    cols['y'] = y
    d = pd.DataFrame(cols)

    outcome_name = "y"
    cols_to_copy = [outcome_name]
    columns = vars + cols_to_copy

    treatment = vtreat.NumericOutcomeTreatment(
        cols_to_copy=cols_to_copy,
        outcome_name=outcome_name,
        params=vtreat.vtreat_parameters(
            {"sparse_indicators": False, "filter_to_recommended": False,}
        ),
    )
    d_train_treated = treatment.fit_transform(d)
    transform_as_data = treatment.description_matrix()
    source_descr = TableDescription(
        table_name='d_app',
        column_names=columns,
    )
    return {
        'transform_as_data': transform_as_data,
        'source_descr': source_descr,
    }

ex = mk_example(n_vars=5)
source_descr = ex['source_descr']
transform_as_data = ex['transform_as_data']


In [3]:
db_model = data_algebra.SQLite.SQLiteModel()
treatment_table_name = 'transform_as_data'
stage_3_name = 'vtreat_stage_3_table'
result_name = 'data_treated'

ops, map_vars, mapping_steps, stage_3_ops = vtreat.vtreat_db_adapter._build_data_pipelines_stages(
    source=source_descr,
    vtreat_descr=transform_as_data,
    treatment_table_name=treatment_table_name,
    stage_3_name=stage_3_name,
)

# give variables pre-update values
ops = ops.extend({
    v: '1.0' for v in map_vars
})

def update_code(i):
    step_i = mapping_steps[i]
    ov = step_i['ov']
    vi = step_i['vi']
    update_stmt = (f"""
WITH tmp_update AS (
  SELECT
    value AS {db_model.quote_identifier(ov)},
    replacement AS {db_model.quote_identifier(vi)},
  WHERE
    (treatment_class = {db_model.quote_string('MappedCodeTransform')})
    AND (orig_var = {db_model.quote_string(ov)})
    AND (variable == {db_model.quote_string(vi)})
  FROM
    {db_model.quote_identifier(treatment_table_name)}
)
UPDATE
  {stage_3_name}
SET {db_model.quote_identifier(vi)} = tmp_update.{db_model.quote_identifier(vi)}
FROM
  tmp_update
WHERE
   {db_model.quote_identifier(ov)} = tmp_update.{db_model.quote_identifier(ov)}
""")
    return update_stmt


sql_sequence = (
    [ f'DROP TABLE IF EXISTS {db_model.quote_identifier(stage_3_name)} ;']
    + [ f'DROP TABLE IF EXISTS {db_model.quote_identifier(result_name)} ;']
    + [
        f'CREATE {db_model.quote_identifier(stage_3_name)} AS \n'
        + db_model.to_sql(ops)
        + ' ;'
    ]
    + [update_code(i) + ' ;' for i in range(len(mapping_steps))]
    + [
        f'CREATE {db_model.quote_identifier(result_name)} AS \n'
        + db_model.to_sql(stage_3_ops)
        + ' ;'
    ]
    + [ f'DROP TABLE {db_model.quote_identifier(stage_3_name)} ;']
)