# Batch-wise Training and Prediction with scikit-learn in Exasol

## Install Python Dependencies

In [119]:
!pip install pyexasol stopwatch.py



## Create connection to the DB

In [120]:
EXASOL_DSN="db_container_test:8888"
EXASOL_USER="sys"
EXASOL_PASSWORD="exasol" 
EXASOL_SCHEMA="ML_TEST"

def connect():
    import pyexasol
    c = pyexasol.connect(dsn=EXASOL_DSN, user=EXASOL_USER, password=EXASOL_PASSWORD)
    c.execute(f"CREATE SCHEMA IF NOT EXISTS {EXASOL_SCHEMA};")
    c.open_schema(EXASOL_SCHEMA)
    return c;

c=connect()
c.execute("select 1").fetchall()

[(1,)]

## Import Dataset
- First, we load the Dataset from a CSV directly into the database
- We use here the well known adult dataset. 
    - The task of the dataset is to predict whether the income exceeds $50K/yr based on census data.

In [99]:
c=connect()
from stopwatch import Stopwatch
import pandas as pd
c.execute("""
CREATE OR REPLACE TABLE adult (
  age integer,
  workclass varchar(2000000),
  fnlwgt integer,	
  education varchar(2000000),
  education_num integer,	
  marital_status varchar(2000000),
  occupation varchar(2000000),
  relationship varchar(2000000),
  race varchar(2000000),
  sex varchar(2000000),
  capitalgain integer,	
  capitalloss integer,	
  hoursperweek integer,
  native_country varchar(2000000),
  class varchar(2000000)
);
""")
c.execute("""
IMPORT INTO adult 
FROM CSV AT 'https://datahub.io/machine-learning/adult/r/' FILE '1.csv'
SKIP = 1;
""")

<ExaStatement session_id=1671745972762486026 stmt_idx=3>

### Number of instances

In [112]:
c=connect()
c.export_to_pandas("select count(*) from adult")

Unnamed: 0,COUNT(*)
0,390736


## Amplify the dataset for Speed Test

In [101]:
c=connect()
for i in range(3):
    c.execute("""
              insert into adult select * from  adult 
              """)

## Inspect the Dataset

### Number of instances

In [102]:
c=connect()
c.export_to_pandas("select count(*) from adult")

Unnamed: 0,COUNT(*)
0,390736


### Class/Target (Income) distribution

In [113]:
c=connect()
c.export_to_pandas("select class, count(*) from adult group by class")

Unnamed: 0,CLASS,COUNT(*)
0,<=50K\r,297240
1,>50K\r,93496


### Work class distribution

In [114]:
c=connect()
c.export_to_pandas("select workclass, count(*) from adult group by workclass")

Unnamed: 0,WORKCLASS,COUNT(*)
0,Without-pay,168
1,State-gov,15848
2,,22392
3,Self-emp-not-inc,30896
4,Never-worked,80
5,Local-gov,25088
6,Self-emp-inc,13560
7,Federal-gov,11456
8,Private,271248


### Two column statistics

In [115]:
c=connect()
c.export_to_pandas("""
    select workclass, class, count(*) 
    from adult group by workclass, class 
    order by workclass, class
    """)

Unnamed: 0,WORKCLASS,CLASS,COUNT(*)
0,Federal-gov,<=50K\r,6968
1,Federal-gov,>50K\r,4488
2,Local-gov,<=50K\r,17672
3,Local-gov,>50K\r,7416
4,Never-worked,<=50K\r,80
5,Private,<=50K\r,212152
6,Private,>50K\r,59096
7,Self-emp-inc,<=50K\r,6056
8,Self-emp-inc,>50K\r,7504
9,Self-emp-not-inc,<=50K\r,22280


## Preprocess the Dataset in the Database
We need to normalize the data before we can use it in scikit-learn. For that we do the following operations in the database to benfit from its speed and scalebility.
- Id generation for categorical columns
- Normalization of numerical columns

Both operations need compute functions on the whole dataset and can get slow for large datasets with the usual data-science libraries.

In [116]:
c=connect()
import textwrap
c.execute("""
CREATE OR REPLACE TABLE adult_with_id AS 
SELECT 
  rownum as id,
  AGE, WORKCLASS, FNLWGT, EDUCATION, EDUCATION_NUM, MARITAL_STATUS, OCCUPATION, 
  RELATIONSHIP, RACE, SEX, CAPITALGAIN, CAPITALLOSS, HOURSPERWEEK, NATIVE_COUNTRY, CLASS
FROM adult;
""")
c.execute(textwrap.dedent("""
CREATE OR REPLACE TABLE WORKCLASS_CATEGORIES AS SELECT rownum - 1 AS id, WORKCLASS 
FROM (SELECT DISTINCT WORKCLASS FROM adult_with_id);
"""))
c.execute(textwrap.dedent("""
CREATE OR REPLACE TABLE EDUCATION_CATEGORIES AS SELECT rownum - 1 AS id, EDUCATION
FROM (SELECT DISTINCT EDUCATION FROM adult_with_id);
"""))
c.execute(textwrap.dedent("""
CREATE OR REPLACE TABLE MARITAL_STATUS_CATEGORIES AS SELECT rownum - 1 AS id, MARITAL_STATUS 
FROM (SELECT DISTINCT MARITAL_STATUS FROM adult_with_id);
"""))
c.execute(textwrap.dedent("""
CREATE OR REPLACE TABLE OCCUPATION_CATEGORIES AS SELECT rownum - 1 AS id, OCCUPATION 
FROM (SELECT DISTINCT OCCUPATION FROM adult_with_id);
"""))
c.execute(textwrap.dedent("""
CREATE OR REPLACE TABLE RELATIONSHIP_CATEGORIES AS SELECT rownum - 1 AS id, RELATIONSHIP 
FROM (SELECT DISTINCT RELATIONSHIP FROM adult_with_id);
"""))
c.execute(textwrap.dedent("""
CREATE OR REPLACE TABLE RACE_CATEGORIES AS SELECT rownum - 1 AS id, RACE 
FROM (SELECT DISTINCT RACE FROM adult_with_id);
"""))
c.execute(textwrap.dedent("""
CREATE OR REPLACE TABLE SEX_CATEGORIES AS SELECT rownum - 1 AS id, SEX 
FROM (SELECT DISTINCT SEX FROM adult_with_id);
"""))
c.execute(textwrap.dedent("""
CREATE OR REPLACE TABLE NATIVE_COUNTRY_CATEGORIES AS SELECT rownum - 1 AS id, NATIVE_COUNTRY 
FROM (SELECT DISTINCT NATIVE_COUNTRY FROM adult_with_id);
"""))
c.execute(textwrap.dedent("""
CREATE OR REPLACE TABLE CLASS_CATEGORIES AS SELECT rownum - 1 AS id, CLASS 
FROM (SELECT DISTINCT CLASS FROM adult_with_id);
"""))
c.execute(textwrap.dedent("""
CREATE OR REPLACE TABLE adult_preprocessed AS 
SELECT
  adult.id,
  1.00 * ("AGE" - MIN("AGE") OVER()) / (MAX("AGE") OVER () - MIN("AGE") OVER ()) AS "AGE", 
  1.00 * ("FNLWGT" - MIN("FNLWGT") OVER()) / (MAX("FNLWGT") OVER () - MIN("FNLWGT") OVER ()) AS "FNLWGT", 
  1.00 * ("EDUCATION_NUM" - MIN("EDUCATION_NUM") OVER()) / (MAX("EDUCATION_NUM") OVER () - MIN("EDUCATION_NUM") OVER ()) AS "EDUCATION_NUM", 
  1.00 * ("CAPITALGAIN" - MIN("CAPITALGAIN") OVER()) / (MAX("CAPITALGAIN") OVER () - MIN("CAPITALGAIN") OVER ()) AS "CAPITALGAIN", 
  1.00 * ("CAPITALLOSS" - MIN("CAPITALLOSS") OVER()) / (MAX("CAPITALLOSS") OVER () - MIN("CAPITALLOSS") OVER ()) AS "CAPITALLOSS", 
  1.00 * ("HOURSPERWEEK" - MIN("HOURSPERWEEK") OVER()) / (MAX("HOURSPERWEEK") OVER () - MIN("HOURSPERWEEK") OVER ()) AS "HOURSPERWEEK", 
  WORKCLASS_CATEGORIES.id AS WORKCLASS, 
  EDUCATION_CATEGORIES.id AS EDUCATION, 
  MARITAL_STATUS_CATEGORIES.id AS MARITAL_STATUS, 
  OCCUPATION_CATEGORIES.id AS OCCUPATION, 
  RELATIONSHIP_CATEGORIES.id AS RELATIONSHIP, 
  RACE_CATEGORIES.id AS RACE, 
  SEX_CATEGORIES.id AS SEX, 
  NATIVE_COUNTRY_CATEGORIES.id AS NATIVE_COUNTRY, 
  CLASS_CATEGORIES.id AS CLASS 
FROM adult_with_id as adult
JOIN WORKCLASS_CATEGORIES ON adult.WORKCLASS = WORKCLASS_CATEGORIES.WORKCLASS 
JOIN EDUCATION_CATEGORIES ON adult.EDUCATION = EDUCATION_CATEGORIES.EDUCATION 
JOIN MARITAL_STATUS_CATEGORIES ON adult.MARITAL_STATUS = MARITAL_STATUS_CATEGORIES.MARITAL_STATUS 
JOIN OCCUPATION_CATEGORIES ON adult.OCCUPATION = OCCUPATION_CATEGORIES.OCCUPATION 
JOIN RELATIONSHIP_CATEGORIES ON adult.RELATIONSHIP = RELATIONSHIP_CATEGORIES.RELATIONSHIP 
JOIN RACE_CATEGORIES ON adult.RACE = RACE_CATEGORIES.RACE 
JOIN SEX_CATEGORIES ON adult.SEX = SEX_CATEGORIES.SEX 
JOIN NATIVE_COUNTRY_CATEGORIES ON adult.NATIVE_COUNTRY = NATIVE_COUNTRY_CATEGORIES.NATIVE_COUNTRY 
JOIN CLASS_CATEGORIES ON adult.CLASS = CLASS_CATEGORIES.CLASS
"""))
c.export_to_pandas("select * from adult_preprocessed limit 10")

Unnamed: 0,ID,AGE,FNLWGT,EDUCATION_NUM,CAPITALGAIN,CAPITALLOSS,HOURSPERWEEK,WORKCLASS,EDUCATION,MARITAL_STATUS,OCCUPATION,RELATIONSHIP,RACE,SEX,NATIVE_COUNTRY,CLASS
0,179473,0.75,0.12683,0.4,0,0.0,0.25,8,11,6,9,2,2,1,14,0
1,179607,1.0,0.063809,0.066667,0,0.0,0.5,1,8,6,1,2,0,1,14,0
2,180516,0.5,0.133625,0.6,0,0.75,0.5,8,13,6,0,2,2,1,14,0
3,181129,1.0,0.174735,0.533333,0,0.0,0.25,8,4,6,5,1,2,0,14,0
4,182317,0.5,0.192128,0.533333,0,0.0,0.5,8,4,5,0,5,2,1,14,0
5,183524,0.5,0.107002,0.333333,0,0.0,0.5,8,10,4,4,3,2,1,14,0
6,184066,0.25,0.115615,0.266667,0,0.0,0.75,8,5,6,3,2,2,1,14,0
7,184093,0.25,0.126554,0.533333,0,0.0,0.5,8,4,4,10,3,2,1,14,0
8,184142,0.25,0.151397,0.533333,0,0.0,0.0,8,4,6,2,0,2,0,14,0
9,185704,0.0,0.100308,0.533333,0,0.0,0.5,8,4,4,5,0,2,0,14,0


## Create UDF Helper functions

- udf_utils: Abstraction for interating through the dataset in batches
- bucketfs_utils: Upload and download object to/from the bucketfs
- model_utils: Creating the scikit-learn model class and one-hot encoder for categorical data
- preprocess_batch: Building vectors from the columns of the rows
- compute_score_utils: Computes score (mean accuracy) batchwise for the model
- predict_utils: Computes prediction batch-wise for the model
- train_utils: Executes training batch-wise for the model

In [107]:
c=connect()
import textwrap
c.execute(textwrap.dedent("""
CREATE OR REPLACE PYTHON3 SCALAR SCRIPT udf_utils() RETURNS int AS

def reset(ctx):
  ctx.reset()
  df=ctx.get_dataframe(1,start_col=1)

def iterate_trough_dataset(ctx, batch_size, 
                          map_function,
                          init_function,
                          aggregate_function):
  reset(ctx)
  number_of_tuples_left=ctx.size()-1
  state = init_function()
  while True:
    if number_of_tuples_left<batch_size:
      if number_of_tuples_left>0:
        df = ctx.get_dataframe(number_of_tuples_left,start_col=1)
        number_of_tuples_left=0
      else:
        reset(ctx)
        break
    else:
      df = ctx.get_dataframe(batch_size,start_col=1)
      number_of_tuples_left=number_of_tuples_left-batch_size
    result = map_function(df)
    state = aggregate_function(state, result)
  return state
/
"""))
c.execute(textwrap.dedent("""
CREATE OR REPLACE PYTHON3 SCALAR SCRIPT bucketfs_utils(...)
RETURNS INT AS
import os
import requests
import uuid
from sklearn.externals import joblib

def upload_object_to_bucketfs(object, host, bucket_name, path, user, password, secure=True):
    temp_file = "/tmp/" + str(uuid.uuid4().hex + ".pkl")
    joblib.dump(object, temp_file, compress=True)
    protocol = 'https' if secure else 'http'

    with open(temp_file, "rb") as f:
        url = f"{protocol}://{user}:{password}@{host}/{bucket_name}/{path}"
        response=requests.put(url, data=f)
        if response.status_code != 200:
            raise Exception(str(response)+" "+url)
    try:
        os.remove(temp_file)
    except OSError:
        pass

def download_object_from_bucketfs(host, bucket_name, path, user, password, secure=True):
    protocol = 'https' if secure else 'http'
    url = f"{protocol}://{user}:{password}@{host}/{bucket_name}/{path}"
    response=requests.get(url)
    if response.status_code != 200:
        raise Exception(str(response))
    temp_file = "/tmp/" + str(uuid.uuid4().hex + ".pkl")
    with open(temp_file, "wb") as f:
        f.write(response.content)
    object = joblib.load(temp_file)
    try:
        os.remove(temp_file)
    except OSError:
        pass
    return object
/
"""))

c.execute(textwrap.dedent("""
CREATE OR REPLACE PYTHON3 SCALAR SCRIPT model_utils() RETURNS int AS
import pandas as pd
from pandas import DataFrame
import numpy as np
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import OneHotEncoder

udf_utils = exa.import_script('ML_TEST.UDF_UTILS')

def get_config(ctx):
  df=ctx.get_dataframe(1,start_col=2)
  inputs = df.iloc[:,0:-1]
  categorical_input_column_names=[column for column in inputs.columns if inputs[column].iloc[0]>0]
  numerical_input_column_names=[column for column in inputs.columns if inputs[column].iloc[0]==-1]
  input_column_categories = np.array([int(inputs[column].iloc[0])+1 for column in categorical_input_column_names])
  target_column_name = df.columns[-1]
  target_classes=int(df[target_column_name].iloc[0])+1
  return {"categorical_input_column_names": categorical_input_column_names, 
          "numerical_input_column_names": numerical_input_column_names, 
          "input_column_categories":input_column_categories, 
          "target_column_name":target_column_name, 
          "target_classes":target_classes}

def create_model(ctx,data_config, **kwargs):
  input_column_one_hot_encoder=create_one_hot_encoder(ctx,
    data_config["input_column_categories"],
    data_config["categorical_input_column_names"])
  classifier=SGDClassifier(**kwargs)
  return {"input_column_one_hot_encoder":input_column_one_hot_encoder,"classifier":classifier}

def create_one_hot_encoder(ctx,categories,categorical_column_names):
  one_hot_encoder = OneHotEncoder(n_values=categories, sparse=False)
  udf_utils.reset(ctx)
  data=ctx.get_dataframe(2,start_col=2)[categorical_column_names].astype(int).values
  one_hot_encoder.fit(data)
  return one_hot_encoder

/
"""))
c.execute(textwrap.dedent("""
CREATE OR REPLACE PYTHON3 SCALAR SCRIPT preprocessing_utils() RETURNS int AS
import pandas as pd
from pandas import DataFrame
import numpy as np
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import OneHotEncoder

def encode_categorical_columns(categorical_columns: DataFrame, one_hot_encoder: OneHotEncoder):
  transformed_columns = DataFrame(one_hot_encoder.transform(categorical_columns.values))
  return transformed_columns

def preprocess_batch(df, data_config, 
                    input_column_one_hot_encoder):
  categorical_input_column_names = data_config["categorical_input_column_names"]
  numerical_input_column_names = data_config["numerical_input_column_names"]
  target_column_name = data_config["target_column_name"]

  encoded_input_categorical_columns = encode_categorical_columns(
                                          df[categorical_input_column_names],
                                          input_column_one_hot_encoder)
  numerical_input_columns = df[numerical_input_column_names].astype(float)
  input_columns = pd.concat([encoded_input_categorical_columns, 
                            numerical_input_columns],axis=1)
  target_column = df[target_column_name].astype(int)
  return input_columns, target_column


/
"""))
c.execute(textwrap.dedent("""
CREATE OR REPLACE PYTHON3 SCALAR SCRIPT compute_score_utils() RETURNS int AS
import pandas as pd
from pandas import DataFrame
import numpy as np
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import OneHotEncoder

udf_utils = exa.import_script('ML_TEST.UDF_UTILS')
model_utils = exa.import_script('ML_TEST.MODEL_UTILS')
preprocessing_utils = exa.import_script('ML_TEST.PREPROCESSING_UTILS')

def compute_score(ctx,batch_size, data_config, model):
  final_state = udf_utils.iterate_trough_dataset(
    ctx,batch_size,
    lambda df: compute_score_batch(ctx, df, data_config, model),
    lambda: {"count":0,"score_sum":0},
    lambda state, result: {"count":state["count"]+result[0],"score_sum":state["score_sum"]+result[1]})
  score=final_state["score_sum"]/final_state["count"]
  return score

def compute_score_batch(ctx, df, data_config, model):
  input_column_one_hot_encoder = model["input_column_one_hot_encoder"]
  input_columns, target_column = preprocessing_utils.preprocess_batch(
                                                  df, data_config,
                                                  input_column_one_hot_encoder)
  classifier = model["classifier"]
  score=classifier.score(input_columns.values,target_column.values)
  return len(df),len(df)*score
/
"""))
c.execute(textwrap.dedent("""
CREATE OR REPLACE PYTHON3 SCALAR SCRIPT predict_utils() RETURNS int AS
import pandas as pd
from pandas import DataFrame
import numpy as np
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import OneHotEncoder

udf_utils = exa.import_script('ML_TEST.UDF_UTILS')
model_utils = exa.import_script('ML_TEST.MODEL_UTILS')
preprocessing_utils = exa.import_script('ML_TEST.PREPROCESSING_UTILS')

def predict_batch(ctx, df, data_config, model):
  input_column_one_hot_encoder = model["input_column_one_hot_encoder"]
  input_columns, target_column = preprocessing_utils.preprocess_batch(
                                                  df, data_config,
                                                  input_column_one_hot_encoder)
  target_classes=np.arange(data_config["target_classes"])
  classifier = model["classifier"]
  result = classifier.predict(input_columns.values)
  return DataFrame({"0":df["1"],"1":target_column,"2":result})

def predict(ctx,batch_size, data_config, model):
  udf_utils.iterate_trough_dataset(
    ctx,batch_size,
    lambda df: predict_batch(ctx, df, data_config, model),
    lambda: None,
    lambda state, result: ctx.emit( 
        result
      )
    )
/
"""))
c.execute(textwrap.dedent("""
CREATE OR REPLACE PYTHON3 SCALAR SCRIPT train_utils() RETURNS int AS
import pandas as pd
from pandas import DataFrame
import numpy as np
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import OneHotEncoder

udf_utils = exa.import_script('ML_TEST.UDF_UTILS')
model_utils = exa.import_script('ML_TEST.MODEL_UTILS')
preprocessing_utils = exa.import_script('ML_TEST.PREPROCESSING_UTILS')

def train_batch(ctx, df, data_config, model):
  input_column_one_hot_encoder = model["input_column_one_hot_encoder"]
  input_columns, target_column = preprocessing_utils.preprocess_batch(
                                                  df, data_config,
                                                  input_column_one_hot_encoder)
  target_classes=np.arange(data_config["target_classes"])
  classifier = model["classifier"]
  classifier.partial_fit(input_columns.values,target_column.values, classes=target_classes)

def train(ctx,batch_size, data_config, model):
  udf_utils.iterate_trough_dataset(
    ctx,batch_size,
    lambda df: train_batch(ctx, df, data_config, model),
    lambda: None,
    lambda state, result: None)

/
"""))

<ExaStatement session_id=1671745989781000370 stmt_idx=8>

## Create Training UDFs

In [108]:
c=connect()
c.execute(textwrap.dedent("""
CREATE OR REPLACE PYTHON3 SET SCRIPT train_sgd_classifier(...) EMITS (result_column VARCHAR(2000000)) AS
import pandas as pd
from pandas import DataFrame
import numpy as np
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import OneHotEncoder

import uuid

model_utils = exa.import_script('ML_TEST.MODEL_UTILS')
bucketfs_utils = exa.import_script('ML_TEST.BUCKETFS_UTILS')
predict_utils = exa.import_script('ML_TEST.PREDICT_UTILS')
compute_score_utils = exa.import_script('ML_TEST.COMPUTE_SCORE_UTILS')
train_utils = exa.import_script('ML_TEST.TRAIN_UTILS')

def run(ctx):
  # Setup of training
  data_config = model_utils.get_config(ctx)
  model = model_utils.create_model(ctx, data_config,loss='log',warm_start=True) 
  epochs=1
  batch_size=100
  base_path = f"models/{uuid.uuid4()}"
  ctx.emit(base_path)
  
  # Run training
  for i in range(epochs):
    train_utils.train(ctx, batch_size, data_config, model)
    score=compute_score_utils.compute_score(ctx, batch_size, data_config, model)
    ctx.emit(f"epoch:{i} score:{score}")
    if score==1.0:
      break
      
  # Upload model to bucketfs
  model_path = f"{base_path}/model.joblib"
  data_config_path = f"{base_path}/data_config.joblib"
  bucketfs_utils.upload_object_to_bucketfs(
      model, "localhost:6583", "default", model_path, "w", "write", secure=False)
  bucketfs_utils.upload_object_to_bucketfs(
      data_config, "localhost:6583", "default", data_config_path, "w", "write", secure=False)
/
"""))

<ExaStatement session_id=1671745990213075781 stmt_idx=2>

## Run batch-wise training

We use the linear SGDClassifier from scikit-learnas model, because it allows the batch-wise learning for large datasets and use the database to randomize the order.

https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.SGDClassifier.html

**Disclaimer**: For the sake of simplicity we left out the split in trainings and test set. However, it is now problem for the database to split the dataset randomly, similar how we randomize the order.

In [124]:
c=connect()
sw=Stopwatch()
r=c.execute("""
select train_sgd_classifier(
                pos, id, age, fnlwgt, education_num, capitalgain, capitalloss, hoursperweek, 
                workclass, education, marital_status, occupation, relationship, race, sex, native_country, class
                order by pos asc)
from (
  -- first input line contains the config for the one-hot-encoder
  select gr, 0 as pos, null as id,
  -1 as age, -1 as fnlwgt, -1 as education_num, -1 as capitalgain, -1 as capitalloss, -1 hoursperweek, 
  (SELECT MAX(id) FROM WORKCLASS_CATEGORIES) as workclass,
  (SELECT MAX(id) FROM EDUCATION_CATEGORIES) as education,
  (SELECT MAX(id) FROM MARITAL_STATUS_CATEGORIES) as marital_status,
  (SELECT MAX(id) FROM OCCUPATION_CATEGORIES) as occupation,
  (SELECT MAX(id) FROM RELATIONSHIP_CATEGORIES) as relationship,
  (SELECT MAX(id) FROM RACE_CATEGORIES) as race,
  (SELECT MAX(id) FROM SEX_CATEGORIES) as sex,
  (SELECT MAX(id) FROM NATIVE_COUNTRY_CATEGORIES) as native_country,
  (SELECT MAX(id) FROM CLASS_CATEGORIES) as class
  from values 1,2,3 as T(gr)
  union all
  select floor(rand(1,4)) as gr, rand(1,2) as pos, -- randomize the order of the training data
         id, age, fnlwgt, education_num, capitalgain, capitalloss, hoursperweek, 
         workclass, education, marital_status, occupation, relationship, race, sex, native_country, class
  from adult_preprocessed
) as q
group by gr
""")
sw.stop()
print(f"Runtime: {sw}")
base_path=r.fetchone()[0]
print("base_path:",base_path)
for i,t in enumerate(r.fetchall()):
  print(t[0])
  if i>50:
    break


Runtime: 20.99s
base_path: models/454ceb7b-a956-45f8-b3ef-5fe97fbdc939
epoch:0 score:0.8457788297828164
models/4083cdcc-615a-486e-b9db-c96bffc6b125
epoch:0 score:0.8446782229517438
models/f875251d-1e03-4215-a01d-3700e5031994
epoch:0 score:0.8271135625786789


## Create Prediction UDFs

In [110]:
c=connect()
c.execute(textwrap.dedent("""
CREATE OR REPLACE PYTHON3 SET SCRIPT predict_sgd_classifier(...) 
EMITS (id int, class int, predicted_class int) AS
import pandas as pd
from pandas import DataFrame
import numpy as np
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import OneHotEncoder

bucketfs_utils = exa.import_script('ML_TEST.BUCKETFS_UTILS')
predict_utils = exa.import_script('ML_TEST.PREDICT_UTILS')
compute_score_utils = exa.import_script('ML_TEST.COMPUTE_SCORE_UTILS')

def init(ctx):
  # Get model path from first row
  df=ctx.get_dataframe(num_rows=1)
  ctx.reset()
  base_path=df['0'][0]
  batch_size=1000
  
  # Load model
  model_path = f"{base_path}/model.joblib"
  data_config_path = f"{base_path}/data_config.joblib"
  model=bucketfs_utils.download_object_from_bucketfs(
      "localhost:6583", "default", model_path, "w", "write", secure=False)
  data_config=bucketfs_utils.download_object_from_bucketfs(
      "localhost:6583", "default", data_config_path, "w", "write", secure=False)
  return model, data_config, batch_size

def run(ctx):
  model, data_config, batch_size = init(ctx)
  
  # Run prediction
  predict_utils.predict(ctx, batch_size, data_config, model)
/
"""))

<ExaStatement session_id=1671746038782335291 stmt_idx=2>

## Run parallel prediction
We run the prediction in parallel in multiple instances of our UDFs. This allows us to parallelize algorithms for prediction which usually don't provide functionallity for parallel execution. To achieve this, you only need to add a `group by` clause to the query. 

In [118]:
c=connect()
sw=Stopwatch()
c.execute(f"""
create or replace table prediction_results as
select adult_with_id.id, age, fnlwgt, education_num, capitalgain, capitalloss, hoursperweek, 
       workclass, education, marital_status, occupation, relationship, race, sex, native_country,
       adult_with_id.class, CLASS_CATEGORIES.class as predicted_class
from (
        select predict_sgd_classifier(
                        '{base_path}', id, age, fnlwgt, education_num, capitalgain, capitalloss, hoursperweek, 
                        workclass, education, marital_status, occupation, relationship, race, sex, native_country, 
                        class)
        from adult_preprocessed
        group by iproc(), mod(rownum,3) -- parallelize prediction with 2 instances per node
    ) as prediction
join adult_with_id -- join source tables and resolve predicted categories
on prediction.id=adult_with_id.id
join CLASS_CATEGORIES
on prediction.predicted_class=CLASS_CATEGORIES.id
order by adult_with_id.id
""")
sw.stop()
print(f"Runtime: {sw}")
c.export_to_pandas("select * from prediction_results limit 10")

Runtime: 10.87s


Unnamed: 0,ID,AGE,FNLWGT,EDUCATION_NUM,CAPITALGAIN,CAPITALLOSS,HOURSPERWEEK,WORKCLASS,EDUCATION,MARITAL_STATUS,OCCUPATION,RELATIONSHIP,RACE,SEX,NATIVE_COUNTRY,CLASS,PREDICTED_CLASS
0,1,2,77516,13,1,0,2,State-gov,Bachelors,Never-married,Adm-clerical,Not-in-family,White,Male,United-States,<=50K\r,<=50K\r
1,2,3,83311,13,0,0,0,Self-emp-not-inc,Bachelors,Married-civ-spouse,Exec-managerial,Husband,White,Male,United-States,<=50K\r,<=50K\r
2,3,2,215646,9,0,0,2,Private,HS-grad,Divorced,Handlers-cleaners,Not-in-family,White,Male,United-States,<=50K\r,<=50K\r
3,4,3,234721,7,0,0,2,Private,11th,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,United-States,<=50K\r,<=50K\r
4,5,1,338409,13,0,0,2,Private,Bachelors,Married-civ-spouse,Prof-specialty,Wife,Black,Female,Cuba,<=50K\r,<=50K\r
5,6,2,284582,14,0,0,2,Private,Masters,Married-civ-spouse,Exec-managerial,Wife,White,Female,United-States,<=50K\r,>50K\r
6,7,3,160187,5,0,0,0,Private,9th,Married-spouse-absent,Other-service,Not-in-family,Black,Female,Jamaica,<=50K\r,<=50K\r
7,8,3,209642,9,0,0,2,Self-emp-not-inc,HS-grad,Married-civ-spouse,Exec-managerial,Husband,White,Male,United-States,>50K\r,<=50K\r
8,9,1,45781,14,4,0,3,Private,Masters,Never-married,Prof-specialty,Not-in-family,White,Female,United-States,>50K\r,>50K\r
9,10,2,159449,13,2,0,2,Private,Bachelors,Married-civ-spouse,Exec-managerial,Husband,White,Male,United-States,>50K\r,>50K\r
