In [1]:
pip install tensorflow


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m23.3.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49m/usr/local/bin/python3 -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install scikit-learn


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m23.3.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49m/usr/local/bin/python3 -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


## Example Python pipeline

Due to the temporary inability to implement certain code in SQL, such as WordVec, I have commented out these sections to obtain data from other operations. I have stored intermediate outputs as corresponding CSV files, and the train and test data used for model training after various operations have been saved as TXT files. This facilitates subsequent checks to ensure consistency between the data obtained from SQL operations and these files.

In [3]:
# pylint: disable-all
import os
import warnings

import numpy 
import pandas as pd
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import tensorflow
from sklearn.compose import ColumnTransformer
from sklearn.metrics import accuracy_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.models import Sequential

#from example_pipelines.healthcare.healthcare_utils import MyW2VTransformer, MyKerasClassifier

# Disable tensorflow API and optimization warnings for a readable output
warnings.filterwarnings('ignore')

seed = 1234
tensorflow.random.set_seed(seed)
numpy.seed = seed


def combine(patients, patient_histories, consent_required):
    if consent_required:
        patients = patients[patients['gave_consent'] == True]
    with_history = patients.merge(patient_histories, on="ssn")
    return with_history


def create_neural_net(input_dim):
    # Model definition
    nn = Sequential([
        Dense(8, activation='relu', input_dim=input_dim), Dropout(0.3),
        Dense(4, activation='relu'),
        Dense(2, activation='softmax')])
    nn.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics='accuracy')
    return nn


def featurization():
    # Featurization
    encode = ColumnTransformer(transformers=[
        ('numerical_features', StandardScaler(), ['weight']),
        ('categorical_features', OneHotEncoder(handle_unknown='ignore'), ['smokes'])])
        #('textual_features', MyW2VTransformer(min_count=1, size=5), ['notes'])])
    print(encode)
    return encode


def execute_pipeline():
    # Relational preprocessing
    patients = pd.read_csv("patients.csv")
    histories = pd.read_csv("histories.csv")
    test_histories = pd.read_csv("test_histories.csv")
    
    histories = histories[histories['hospital'].isin(["AL", "AK", "AR"])]
    # Export the DataFrame to a CSV file for subsequent inspection and comparison. Additionally, for ease of comparison, sort the DataFrame by SSN. The following operations are similar.
    histories_sorted = histories.sort_values('ssn')
    histories_sorted.to_csv('histories_pipeline.csv', index=False)

    train = combine(patients, histories, consent_required=True)
    train_sorted = train.sort_values('ssn')
    train_sorted.to_csv('train_pipeline_initial.csv', index=False)

    test = combine(patients, test_histories, consent_required=True)
    test_sorted = test.sort_values('ssn')
    test_sorted.to_csv('test_pipeline_initial.csv', index=False)

    encode_and_learn = Pipeline([
        ('features', featurization())])
        #('learner', MyKerasClassifier(create_neural_net, epochs=5, verbose=0))])
    model = encode_and_learn.fit(train, train['has_complication'])
    print(model)
    #pred = model.predict(test)
    #return accuracy_score(test['has_complication'], pred)
    
    # Store the final train and test datasets for convenient comparison at the end.
    train_transformed = encode_and_learn.transform(train)
    test_transformed = encode_and_learn.transform(test)
    
    numpy.savetxt('train_transformed.txt', train_transformed)
    numpy.savetxt('test_transformed.txt', test_transformed)

    return 1


score = execute_pipeline()
#print(f"Score: {score}")

ColumnTransformer(transformers=[('numerical_features', StandardScaler(),
                                 ['weight']),
                                ('categorical_features',
                                 OneHotEncoder(handle_unknown='ignore'),
                                 ['smokes'])])
Pipeline(steps=[('features',
                 ColumnTransformer(transformers=[('numerical_features',
                                                  StandardScaler(),
                                                  ['weight']),
                                                 ('categorical_features',
                                                  OneHotEncoder(handle_unknown='ignore'),
                                                  ['smokes'])]))])


## Implementation in DuckDB

In [4]:
pip install duckdb


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m23.3.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49m/usr/local/bin/python3 -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


Connect to DuckDB

In [5]:
import duckdb
import pandas as pd

%load_ext sql
conn = duckdb.connect()
%sql conn --alias duckdb

Create tables in the database

In [6]:
%%sql
CREATE TABLE histories AS SELECT * FROM read_csv_auto('histories.csv');
SELECT * FROM histories;

CREATE TABLE patients AS SELECT * FROM read_csv_auto('patients.csv');
SELECT * FROM patients;

CREATE TABLE test_histories AS SELECT * FROM read_csv_auto('test_histories.csv');
SELECT * FROM test_histories;

notes,has_complication,ssn,hospital
normal risk,False,819-66-5146,AK
normal risk,False,140-45-9448,AR
high risk,False,643-13-0728,AL
normal risk,False,661-10-1912,AZ
normal risk,False,702-18-9881,AL
normal risk,True,168-69-3664,AR
normal risk,True,078-52-8543,AR
normal risk,True,567-03-8647,AZ
normal risk,False,296-18-9628,AL
normal risk,False,813-71-5848,AZ


In [7]:
%sql SELECT * FROM histories;

notes,has_complication,ssn,hospital
normal risk,False,844-25-3185,AK
normal risk,True,704-22-1415,AR
normal risk,False,291-52-4957,AL
normal risk,False,554-60-6241,AL
normal risk,False,371-22-8266,AK
normal risk,True,761-31-5342,AR
high risk,False,423-64-3297,AZ
normal risk,False,763-29-0394,AK
normal risk,False,365-08-4223,AK
high risk,True,150-03-7624,AZ


In [8]:
%sql SELECT * FROM patients;

smokes,weight,gave_consent,ssn
no,57.98341830412999,True,844-25-3185
no,62.741663157827546,True,704-22-1415
no,81.73846816241817,True,291-52-4957
no,58.33552143345234,True,554-60-6241
no,77.18224566942787,False,371-22-8266
yes,71.09854327546942,True,761-31-5342
yes,81.13481270935448,True,423-64-3297
no,74.23670445374523,True,763-29-0394
no,84.64247606449092,True,365-08-4223
yes,61.95273301921088,True,150-03-7624


In [9]:
%sql SELECT * FROM test_histories;

notes,has_complication,ssn,hospital
normal risk,False,819-66-5146,AK
normal risk,False,140-45-9448,AR
high risk,False,643-13-0728,AL
normal risk,False,661-10-1912,AZ
normal risk,False,702-18-9881,AL
normal risk,True,168-69-3664,AR
normal risk,True,078-52-8543,AR
normal risk,True,567-03-8647,AZ
normal risk,False,296-18-9628,AL
normal risk,False,813-71-5848,AZ


### CTE

Following the example in the paper, I first transformed the Python code for each node in the DAG using CTE. However, here, for the convenience of comparing the results obtained from SQL with those obtained by running Python code, I have converted it into a View.

In [10]:
%%sql

-- CTE: Original History
WITH orig_history AS (
    SELECT rowid AS history_rowid, * FROM histories
),

-- CTE: Original Test
orig_test AS (
    SELECT rowid AS test_rowid, * FROM test_histories
),

-- CTE: Original Patient
orig_patient AS (
    SELECT rowid AS patient_rowid, * FROM patients
),

-- CTE: Block 3
block_3 AS (
    SELECT hospital, history_rowid FROM orig_history
),

-- CTE: Block 4
block_4 AS (
    SELECT * FROM block_3 WHERE hospital IN ('AL', 'AK', 'AR')
),

-- CTE: Block 5
block_5 AS (
    SELECT * FROM orig_history WHERE hospital IN ('AL', 'AK', 'AR')
),

-- CTE: Block 6
block_6 AS (
    SELECT gave_consent, patient_rowid FROM orig_patient
),

-- CTE: Block 7
block_7 AS (
    SELECT * FROM block_6 WHERE gave_consent = TRUE
),

-- CTE: Block 8
block_8 AS (
    SELECT * FROM orig_patient WHERE gave_consent = TRUE
),

-- CTE: Block 9
block_9 AS (
    SELECT * FROM block_5 tb1 INNER JOIN block_8 tb2 ON tb1.ssn = tb2.ssn
),

-- CTE: Block 10
block_10 AS (
    SELECT gave_consent, patient_rowid FROM orig_patient
),

-- CTE: Block 11
block_11 AS (
    SELECT * FROM block_6 WHERE gave_consent = TRUE
),

-- CTE: Block 12
block_12 AS (
    SELECT * FROM orig_patient WHERE gave_consent = TRUE
),

-- CTE: Block 13
block_13 AS (
    SELECT * FROM block_12 tb1 INNER JOIN orig_test tb2 ON tb1.ssn = tb2.ssn
),

-- CTE: Block 14
block_14 AS (
    SELECT has_complication, history_rowid, patient_rowid FROM block_9
),

-- CTE: Block 15
block_15 AS (
    SELECT weight, history_rowid, patient_rowid FROM block_9
),

-- CTE: Block 16
block_16 AS (
    SELECT (weight - (SELECT AVG(weight) FROM block_9)) / (SELECT STDDEV_POP(weight) FROM block_9) AS weight, history_rowid, patient_rowid FROM block_9
),

-- CTE: Block 17
block_17 AS (
    SELECT smokes, history_rowid, patient_rowid FROM block_9
),

-- CTE: One Hot Encoding Helper
one_hot_help AS (
    SELECT
        smokes,
        list_resize(list_value(0), rank-1, 0) || [1] || list_resize(list_value(0), CAST((SELECT COUNT(DISTINCT smokes) FROM block_17) AS int)-rank, 0) AS one_hot
    FROM (
        SELECT smokes, CAST(ROW_NUMBER() OVER() AS int) AS rank
        FROM (SELECT DISTINCT(smokes) FROM block_17) oh
    ) one_hot_help
),

-- CTE: Block 18
block_18 AS (
    SELECT b9.history_rowid, b9.patient_rowid, o1.one_hot AS smokes FROM one_hot_help o1, block_9 b9 WHERE b9.smokes = o1.smokes
),

-- CTE: Block 19
block_19 AS (
    SELECT notes, history_rowid, patient_rowid FROM block_9
),

-- CTE: Block 21
block_21 AS (
    SELECT * FROM block_16 b16 INNER JOIN block_18 b18 ON b16.history_rowid = b18.history_rowid
),

-- CTE: Block 25
block_25 AS (
    SELECT weight, patient_rowid, test_rowid FROM block_13
),

-- CTE: Block 26
block_26 AS (
    SELECT (weight - (SELECT AVG(weight) FROM block_9)) / (SELECT STDDEV_POP(weight) FROM block_9) AS weight, patient_rowid, test_rowid FROM block_25
),

-- CTE: Block 27
block_27 AS (
    SELECT smokes, patient_rowid, test_rowid FROM block_13
),

-- CTE: One Hot Encoding Helper 2
one_hot_help_2 AS (
    SELECT
        smokes,
        list_resize(list_value(0), rank-1, 0) || [1] || list_resize(list_value(0), CAST((SELECT COUNT(DISTINCT smokes) FROM block_27) AS int)-rank, 0) AS one_hot
    FROM (
        SELECT smokes, CAST(ROW_NUMBER() OVER() AS int) AS rank
        FROM (SELECT DISTINCT(smokes) FROM block_27) oh
    ) one_hot_help
),

-- CTE: Block 28
block_28 AS (
    SELECT patient_rowid, test_rowid, o2.one_hot AS smokes FROM one_hot_help_2 o2, block_27 b27 WHERE b27.smokes = o2.smokes
),

-- CTE: Block 29
block_29 AS (
    SELECT notes, patient_rowid, test_rowid FROM block_13
),

-- CTE: Block 31
block_31 AS (
    SELECT * FROM block_26 b26 INNER JOIN block_28 b28 ON b26.patient_rowid = b28.patient_rowid
),

-- CTE: Block 34
block_34 AS (
    SELECT has_complication, patient_rowid, test_rowid FROM block_13
)

-- Main Query
SELECT * FROM block_34;


has_complication,patient_rowid,test_rowid
False,10001,1
False,10002,2
False,10004,4
True,10005,5
True,10006,6
True,10007,7
False,10008,8
False,10009,9
False,10011,11
False,10013,13


### View

In [11]:
%%sql
CREATE VIEW orig_history AS
SELECT rowid AS history_rowid, * FROM histories;

CREATE VIEW orig_test AS
SELECT rowid AS test_rowid, * FROM test_histories;

CREATE VIEW orig_patient AS
SELECT rowid AS patient_rowid, * FROM patients;

CREATE VIEW block_3 AS
SELECT hospital, history_rowid FROM orig_history;

CREATE VIEW block_4 AS
SELECT * FROM block_3 WHERE hospital IN ('AL', 'AK', 'AR');

CREATE VIEW block_5 AS
SELECT * FROM orig_history WHERE hospital IN ('AL', 'AK', 'AR');

CREATE VIEW block_6 AS
SELECT gave_consent, patient_rowid FROM orig_patient;

CREATE VIEW block_7 AS
SELECT * FROM block_6 WHERE gave_consent = TRUE;

CREATE VIEW block_8 AS
SELECT * FROM orig_patient WHERE gave_consent = TRUE;

CREATE VIEW block_9 AS
SELECT * FROM block_5 tb1 INNER JOIN block_8 tb2 ON tb1.ssn = tb2.ssn;

CREATE VIEW block_10 AS
SELECT gave_consent, patient_rowid FROM orig_patient;

CREATE VIEW block_11 AS
SELECT * FROM block_6 WHERE gave_consent = TRUE;

CREATE VIEW block_12 AS
SELECT * FROM orig_patient WHERE gave_consent = TRUE;

CREATE VIEW block_13 AS
SELECT * FROM block_12 tb1 INNER JOIN orig_test tb2 ON tb1.ssn = tb2.ssn;

CREATE VIEW block_14 AS
SELECT has_complication, history_rowid, patient_rowid FROM block_9;

CREATE VIEW block_15 AS
SELECT weight, history_rowid, patient_rowid FROM block_9;

CREATE VIEW block_16 AS
SELECT (weight - (SELECT AVG(weight) FROM block_9)) / (SELECT STDDEV_POP(weight) FROM block_9) AS weight, history_rowid, patient_rowid FROM block_9;

CREATE VIEW block_17 AS
SELECT smokes, history_rowid, patient_rowid FROM block_9;

CREATE VIEW one_hot_help AS
SELECT smokes, list_resize(list_value(0), rank-1, 0) || [1] || list_resize(list_value(0), CAST((SELECT COUNT(DISTINCT smokes) FROM block_17) AS int)-rank, 0) AS one_hot
FROM(SELECT smokes, CAST(ROW_NUMBER() OVER() AS int) AS rank
FROM (SELECT DISTINCT(smokes) FROM block_17) oh) one_hot_help;

CREATE VIEW block_18 AS
SELECT b9.history_rowid, b9.patient_rowid, o1.one_hot AS smokes FROM one_hot_help o1, block_9 b9 WHERE b9.smokes = o1.smokes;

CREATE VIEW block_19 AS
SELECT notes, history_rowid, patient_rowid FROM block_9;

CREATE VIEW block_21 AS
SELECT * FROM block_16 b16 INNER JOIN block_18 b18 ON b16.history_rowid = b18.history_rowid;

CREATE VIEW block_25 AS
SELECT weight, patient_rowid, test_rowid FROM block_13;

CREATE VIEW block_26 AS
SELECT (weight - (SELECT AVG(weight) FROM block_9)) / (SELECT STDDEV_POP(weight) FROM block_9) AS weight, patient_rowid, test_rowid FROM block_25;

CREATE VIEW block_27 AS
SELECT smokes, patient_rowid, test_rowid FROM block_13;

CREATE VIEW one_hot_help_2 AS
SELECT smokes, list_resize(list_value(0), rank-1, 0) || [1] || list_resize(list_value(0), CAST((SELECT COUNT(DISTINCT smokes) FROM block_27) AS int)-rank, 0) AS one_hot
FROM(SELECT smokes, CAST(ROW_NUMBER() OVER() AS int) AS rank
FROM (SELECT DISTINCT(smokes) FROM block_27) oh) one_hot_help;

CREATE VIEW block_28 AS
SELECT patient_rowid, test_rowid, o2.one_hot AS smokes FROM one_hot_help_2 o2, block_27 b27 WHERE b27.smokes = o2.smokes;

CREATE VIEW block_29 AS
SELECT notes, patient_rowid, test_rowid FROM block_13;

CREATE VIEW block_31 AS
SELECT * FROM block_26 b26 INNER JOIN block_28 b28 ON b26.patient_rowid = b28.patient_rowid;

CREATE VIEW block_34 AS
SELECT has_complication, patient_rowid, test_rowid FROM block_13;

Count


## Comparison

### Comparison 1

Python code for comparison: `histories = histories[histories['hospital'].isin(["AL", "AK", "AR"])]`

Corresponding files: `histories_pipeline.csv`

In [12]:
%%sql
SELECT notes,has_complication,ssn,hospital FROM block_5;

COPY (SELECT notes,has_complication,ssn,hospital FROM block_5 ORDER BY ssn) TO 'histories_duckdb.csv' WITH HEADER DELIMITER ',';

Count
7577


In [13]:
file1_path = "histories_pipeline.csv"
histories_duckdb = pd.read_csv(file1_path, index_col=None)
histories_duckdb.head()

Unnamed: 0,notes,has_complication,ssn,hospital
0,high risk,True,001-15-5323,AL
1,normal risk,True,001-24-5429,AR
2,normal risk,False,001-44-2119,AK
3,normal risk,False,001-63-3296,AR
4,normal risk,True,001-98-3642,AL


In [14]:
file2_path = "histories_duckdb.csv"
histories_duckdb = pd.read_csv(file2_path, index_col=None)
histories_duckdb.head()

Unnamed: 0,notes,has_complication,ssn,hospital
0,high risk,True,001-15-5323,AL
1,normal risk,True,001-24-5429,AR
2,normal risk,False,001-44-2119,AK
3,normal risk,False,001-63-3296,AR
4,normal risk,True,001-98-3642,AL


I've written a function to check if the contents of two CSV files are identical. Since the order of data in the two CSV files might differ, the function takes a 'sort' parameter for sorting.

In [15]:
import pandas as pd

def compare_csv_files(file1, file2, sort):
    
    df1 = pd.read_csv(file1, index_col=None).sort_values(by=sort)
    print(df1.head())
    df2 = pd.read_csv(file2, index_col=None).sort_values(by=sort)
    print(df2.head())
    
    print()
    
    if df1.values.tolist() == df2.values.tolist():
        print("Files are identical.")
        return True
    else:
        print("Files are not identical.")
        return False


In [16]:
file1_path = "histories_pipeline.csv"
file2_path = "histories_duckdb.csv"

compare_csv_files(file1_path, file2_path, 'ssn')

         notes  has_complication          ssn hospital
0    high risk              True  001-15-5323       AL
1  normal risk              True  001-24-5429       AR
2  normal risk             False  001-44-2119       AK
3  normal risk             False  001-63-3296       AR
4  normal risk              True  001-98-3642       AL
         notes  has_complication          ssn hospital
0    high risk              True  001-15-5323       AL
1  normal risk              True  001-24-5429       AR
2  normal risk             False  001-44-2119       AK
3  normal risk             False  001-63-3296       AR
4  normal risk              True  001-98-3642       AL

Files are identical.


True

### Comparison 2

Python code for comparison: `train = combine(patients, histories, consent_required=True)`

Corresponding files: `train_pipeline_initial.csv`

In [17]:
%%sql
SELECT smokes,weight,gave_consent,ssn,notes,has_complication,hospital FROM block_9;

COPY (SELECT smokes,weight,gave_consent,ssn,notes,has_complication,hospital FROM block_9) TO 'train_duckdb_initial.csv' WITH HEADER DELIMITER ',';

Count
7414


In [18]:
file1_path = "train_pipeline_initial.csv"
file2_path = "train_duckdb_initial.csv"

compare_csv_files(file1_path, file2_path, 'ssn')

  smokes     weight  gave_consent          ssn        notes  has_complication  \
0    yes  89.015953          True  001-15-5323    high risk              True   
1     no  81.807574          True  001-24-5429  normal risk              True   
2     no  60.288345          True  001-44-2119  normal risk             False   
3     no  60.803919          True  001-63-3296  normal risk             False   
4     no  79.317285          True  001-98-3642  normal risk              True   

  hospital  
0       AL  
1       AR  
2       AK  
3       AR  
4       AL  
     smokes     weight  gave_consent          ssn        notes  \
3311    yes  89.015953          True  001-15-5323    high risk   
2693     no  81.807574          True  001-24-5429  normal risk   
3680     no  60.288345          True  001-44-2119  normal risk   
5396     no  60.803919          True  001-63-3296  normal risk   
114      no  79.317285          True  001-98-3642  normal risk   

      has_complication hospital  
3311

True

### Comparison 3

Python code for comparison: `test = combine(patients, test_histories, consent_required=True)`

Corresponding files: `test_pipeline_initial.csv`

In [19]:
%%sql
SELECT smokes,weight,gave_consent,ssn,notes,has_complication,hospital FROM block_13;

COPY (SELECT smokes,weight,gave_consent,ssn,notes,has_complication,hospital FROM block_13) TO 'test_duckdb_initial.csv' WITH HEADER DELIMITER ',';

Count
3910


In [20]:
file1_path = "test_pipeline_initial.csv"
file2_path = "test_duckdb_initial.csv"

compare_csv_files(file1_path, file2_path, 'ssn')

  smokes     weight  gave_consent          ssn        notes  has_complication  \
0    yes  82.339362          True  001-50-6900  normal risk             False   
1     no  81.674947          True  001-64-3715  normal risk             False   
2     no   0.000000          True  001-91-5590  normal risk             False   
3     no  64.290579          True  001-94-8928  normal risk              True   
4    yes  82.575749          True  002-03-5384  normal risk             False   

  hospital  
0       AL  
1       AR  
2       AK  
3       AZ  
4       AR  
     smokes     weight  gave_consent          ssn        notes  \
2690    yes  82.339362          True  001-50-6900  normal risk   
3679     no  81.674947          True  001-64-3715  normal risk   
1853     no   0.000000          True  001-91-5590  normal risk   
347      no  64.290579          True  001-94-8928  normal risk   
77      yes  82.575749          True  002-03-5384  normal risk   

      has_complication hospital  
2690

True

### Comparison 4

Python code for comparison:

 `('numerical_features', StandardScaler(), ['weight'])`

  `('categorical_features', OneHotEncoder(handle_unknown='ignore'), ['smokes'])`

Corresponding files: `train_transformed.txt, test_transformed.txt`

First, I converted two text files to CSV files.

In [21]:
import csv

input_txt_file = 'train_transformed.txt'
output_csv_file = 'train_transformed.csv'

with open(input_txt_file, 'r') as txt_file:
    lines = txt_file.readlines()

with open(output_csv_file, 'w', newline='') as csv_file:
    csv_writer = csv.writer(csv_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)

    for line in lines:
        fields = line.strip().split(' ')
        csv_writer.writerow(fields)

In [22]:
import csv

input_txt_file = 'test_transformed.txt'
output_csv_file = 'test_transformed.csv'

with open(input_txt_file, 'r') as txt_file:
    lines = txt_file.readlines()

with open(output_csv_file, 'w', newline='') as csv_file:
    csv_writer = csv.writer(csv_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)

    for line in lines:
        fields = line.strip().split(' ')
        csv_writer.writerow(fields)

### train

In [23]:
%%sql
SELECT weight, smokes FROM block_21;

COPY (SELECT weight, smokes FROM block_21) TO 'train_duckdb.csv' WITH HEADER DELIMITER ',';

Count
7414


Process the data obtained from DuckDB and the pipeline to ensure consistent formatting.

In [24]:
file_path = 'train_duckdb.csv'
df = pd.read_csv(file_path, index_col=None)
df.head()

Unnamed: 0,weight,smokes
0,1.085348,"[1, 0]"
1,-1.089705,"[1, 0]"
2,-0.741307,"[1, 0]"
3,1.010239,"[0, 1]"
4,-0.919553,"[1, 0]"


In [25]:
df.rename(columns={'weight': 'col_0'}, inplace=True)
df['smokes'] = df['smokes'].apply(eval)

# Create separate columns for 'col_1' and 'col_2'
df[['col_1', 'col_2']] = pd.DataFrame(df['smokes'].to_list(), columns=['col_1', 'col_2'])

# Drop the original 'smokes' column
df = df.drop('smokes', axis=1).sort_values(by=['col_0', 'col_1'])

df.head()


Unnamed: 0,col_0,col_1,col_2
1361,-1.903985,1,0
5891,-1.85877,1,0
282,-1.852974,1,0
6869,-1.814613,1,0
165,-1.776032,0,1


In [26]:
df.to_csv('train_duckdb_final.csv', index=False)

In [27]:
file_path = 'train_transformed.csv'
df = pd.read_csv(file_path, index_col=None,header=None)
df.columns = [f'col_{i}' for i in range(df.shape[1])]
# convert the float numbers to int 
df[['col_1', 'col_2']] = df[['col_1', 'col_2']].astype(int)
df = df.sort_values(by=['col_0', 'col_1'])
df.head()

Unnamed: 0,col_0,col_1,col_2
2226,-1.903985,1,0
4890,-1.85877,1,0
1304,-1.852974,1,0
7392,-1.814613,1,0
753,-1.776032,0,1


In [28]:
df.to_csv('train_pipeline_final.csv', index=False)

### test

In [29]:
%%sql
SELECT weight, smokes FROM block_31;

COPY (SELECT weight, smokes FROM block_31) TO 'test_duckdb.csv' WITH HEADER DELIMITER ',';

Count
3910


In [30]:
file_path = 'test_duckdb.csv'
df = pd.read_csv(file_path, index_col=None)
df.head()

Unnamed: 0,weight,smokes
0,0.713701,"[1, 0]"
1,5569.845352,"[0, 1]"
2,5569.845352,"[0, 1]"
3,0.636801,"[1, 0]"
4,5569.845352,"[1, 0]"


In [31]:
df.describe()

Unnamed: 0,weight
count,3910.0
mean,1100.102419
std,2219.069214
min,-6.511366
25%,-1.075109
50%,0.264598
75%,1.389069
max,5569.845352


In [32]:
df.rename(columns={'weight': 'col_0'}, inplace=True)
df['smokes'] = df['smokes'].apply(eval)

# Create separate columns for 'col_1' and 'col_2'
df[['col_1', 'col_2']] = pd.DataFrame(df['smokes'].to_list(), columns=['col_1', 'col_2'])

# Drop the original 'smokes' column
df = df.drop('smokes', axis=1)
df = df.sort_values(by=['col_0', 'col_1'])
df.head()

Unnamed: 0,col_0,col_1,col_2
28,-6.511366,0,1
31,-6.511366,0,1
47,-6.511366,0,1
97,-6.511366,0,1
116,-6.511366,0,1


In [33]:
df.to_csv('test_duckdb_final.csv', index=False)

In [34]:
file_path = 'test_transformed.csv'
df = pd.read_csv(file_path, index_col=None, header=None)
df.columns = [f'col_{i}' for i in range(df.shape[1])]
# convert the float numbers to int 
df[['col_1', 'col_2']] = df[['col_1', 'col_2']].astype(int)
df = df.sort_values(by=['col_0', 'col_1'])
df.head()

Unnamed: 0,col_0,col_1,col_2
42,-6.511366,0,1
50,-6.511366,0,1
53,-6.511366,0,1
56,-6.511366,0,1
83,-6.511366,0,1


In [35]:
df.to_csv('test_pipeline_final.csv', index=False)

Given the potential variability in precision across different systems, we establish a tolerance to facilitate the comparison of floating-point data.

In [38]:
import numpy as np

def compare_csv_with_tolerance(file1, file2, tolerance=1e-6):
    df1 = pd.read_csv(file1)
    df2 = pd.read_csv(file2)

    if df1.shape != df2.shape:
        print("CSV files have different shapes.")
        return

    differences = []

    for row in range(df1.shape[0]):
        for col in df1.columns:
            val1 = df1.at[row, col]
            val2 = df2.at[row, col]

            if not np.isclose(val1, val2, rtol=tolerance, atol=tolerance):
                differences.append((row, col))

    if differences:
        print("CSV files are not equal. Differences found at:")
        for diff in differences:
            print(f"Row: {diff[0]}, Column: {diff[1]}")
    else:
        print("CSV files are equal.")


In [39]:
# train
file1_path = "train_pipeline_final.csv"
file2_path = "train_duckdb_final.csv"

compare_csv_with_tolerance(file1_path, file2_path)

CSV files are equal.


In [40]:
# test
file3_path = "test_pipeline_final.csv"
file4_path = "test_duckdb_final.csv"

compare_csv_with_tolerance(file3_path, file4_path)

CSV files are equal.


### All checks have been completed at this point, and the data obtained after DuckDB operations matches the data generated by the Python pipeline.