# Steps to prepare the environment

- Create a conda env (see [Development Environment](https://fasrc.github.io/pycausalgps/setup_env_dev.html#setting-up-a-new-environment))
- Install pycausalgps

In [1]:
# Useful resources:
# - https://blog.dask.org/2017/03/28/dask-xgboost 

In [2]:
import numpy as np
import pandas as pd
import xgboost as xgb
import matplotlib.pyplot as plt
from pycausalgps.gps_utils import generate_syn_pop
from scipy.stats import norm
from scipy import stats
from sklearn.neighbors import KernelDensity

In [3]:
# Generate synthetic data

data_obj = generate_syn_pop(10000)
df = data_obj.data
df

Unnamed: 0,Y,treat,cf1,cf2,cf3,cf4,cf5,cf6
0,-33.710755,9.128798,1.088013,-1.040795,1.546794,0.647522,-2,-0.338821
1,-6.917718,9.526059,0.394032,-1.100086,-0.058984,0.108440,2,0.745696
2,-89.842294,2.363495,0.061547,0.701527,0.213724,-0.504911,-2,-0.374944
3,-55.530468,6.236718,2.283811,0.241855,-1.233216,-0.520288,-2,-1.040341
4,-0.388687,10.309740,-0.251576,0.530213,-0.270462,1.191965,1,0.894189
...,...,...,...,...,...,...,...,...
9995,-4.940754,15.202446,-0.063624,0.921056,0.238664,0.173765,0,-1.097896
9996,-35.728414,7.958106,1.417532,0.410957,-2.155039,-0.843747,1,2.088619
9997,-7.728335,14.782084,-1.781578,-0.009090,0.546043,-0.675359,-2,-0.321661
9998,-87.591738,2.171151,-0.041114,-0.585819,0.355793,-0.018440,1,-1.182776


## Workflow

- Step 1: Set up Dask client
   
   - This part requires some research to find the best combination. We want the users to be able to choose different schedulers without dealing with internal details.  
       
- Step 2: Load data
   
   - You need to load data into a dask data.frame directly. If you reading from a csv or any other physical file, dask can support it. In this example, I just generate the psuedo population and then convert it into dask dataframe. [TODO: see if it is necessary to generate a dask data frame directly.]
            
- Step 3: Data preprocessing and cleaning
   
    - This step also should happen on a dask dataframe to speed up the process.
    - Cleaning data includes:
        - Handling missing values
        - Handling column names (replace space with _)
        - Sanity check all columns (see the unique values)
        - Hotencoding
    
- Step 4: Hyper parameters and user defined parameters
   
   - percentage of data used for training
   - objective function       
   
- Step 5: Generate training and test data

     
- Step 6: Estimate GPS Value (parametric)
    
    - Estimate treatment (w) from counfonders --> e_gps
    - Compute standard deviation of (w - e_gps) --> e_gps_std
    - Compute normal distributeion of each value given mean (e_gps) and standard deviation (e_gps_std) --> gps
    
- Step 7: Estimate GPS Value (non-parametric)

    - Estimate treatment (w) from counfonders --> e_gps
    - Estimate standard deviation (abs(w-e_gps)) from confounders --> e_gps_std
    - Estimate residuals (w - g_gps)/(e_gps_std) --> w_resid
    - Compute gps from density --> gps
        - compute kernel density based on residuals (w_resid), drop na values.
        - approximate (interpolate) corrosponding kernel density for each residual value. 
    

In [4]:
# Step 1

import dask.dataframe as dd
from dask.distributed import Client

# local cluster
from dask.distributed import LocalCluster

# parameters
n_workers = 4

cluster = LocalCluster(n_workers=n_workers)
client = Client(cluster)
client

distributed.diskutils - INFO - Found stale lock file and directory '/Users/nak443/Documents/Naeem_folder_mac_h/Research_projects/F2021_001_Harvard/experiment_001_20211110_tasks/analysis/pycgps_estimategps_211109/dask-worker-space/worker-os6btywa', purging
distributed.diskutils - INFO - Found stale lock file and directory '/Users/nak443/Documents/Naeem_folder_mac_h/Research_projects/F2021_001_Harvard/experiment_001_20211110_tasks/analysis/pycgps_estimategps_211109/dask-worker-space/worker-pr2rzab8', purging
distributed.diskutils - INFO - Found stale lock file and directory '/Users/nak443/Documents/Naeem_folder_mac_h/Research_projects/F2021_001_Harvard/experiment_001_20211110_tasks/analysis/pycgps_estimategps_211109/dask-worker-space/worker-ice9u_y1', purging
distributed.diskutils - INFO - Found stale lock file and directory '/Users/nak443/Documents/Naeem_folder_mac_h/Research_projects/F2021_001_Harvard/experiment_001_20211110_tasks/analysis/pycgps_estimategps_211109/dask-worker-space/wo

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 12,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:53356,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 12
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:53374,Total threads: 3
Dashboard: http://127.0.0.1:53376/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:53362,
Local directory: /Users/nak443/Documents/Naeem_folder_mac_h/Research_projects/F2021_001_Harvard/experiment_001_20211110_tasks/analysis/pycgps_estimategps_211109/dask-worker-space/worker-idj883u8,Local directory: /Users/nak443/Documents/Naeem_folder_mac_h/Research_projects/F2021_001_Harvard/experiment_001_20211110_tasks/analysis/pycgps_estimategps_211109/dask-worker-space/worker-idj883u8

0,1
Comm: tcp://127.0.0.1:53368,Total threads: 3
Dashboard: http://127.0.0.1:53369/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:53359,
Local directory: /Users/nak443/Documents/Naeem_folder_mac_h/Research_projects/F2021_001_Harvard/experiment_001_20211110_tasks/analysis/pycgps_estimategps_211109/dask-worker-space/worker-y553fd1b,Local directory: /Users/nak443/Documents/Naeem_folder_mac_h/Research_projects/F2021_001_Harvard/experiment_001_20211110_tasks/analysis/pycgps_estimategps_211109/dask-worker-space/worker-y553fd1b

0,1
Comm: tcp://127.0.0.1:53371,Total threads: 3
Dashboard: http://127.0.0.1:53372/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:53360,
Local directory: /Users/nak443/Documents/Naeem_folder_mac_h/Research_projects/F2021_001_Harvard/experiment_001_20211110_tasks/analysis/pycgps_estimategps_211109/dask-worker-space/worker-xis3gr_8,Local directory: /Users/nak443/Documents/Naeem_folder_mac_h/Research_projects/F2021_001_Harvard/experiment_001_20211110_tasks/analysis/pycgps_estimategps_211109/dask-worker-space/worker-xis3gr_8

0,1
Comm: tcp://127.0.0.1:53375,Total threads: 3
Dashboard: http://127.0.0.1:53378/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:53361,
Local directory: /Users/nak443/Documents/Naeem_folder_mac_h/Research_projects/F2021_001_Harvard/experiment_001_20211110_tasks/analysis/pycgps_estimategps_211109/dask-worker-space/worker-zf3czj5e,Local directory: /Users/nak443/Documents/Naeem_folder_mac_h/Research_projects/F2021_001_Harvard/experiment_001_20211110_tasks/analysis/pycgps_estimategps_211109/dask-worker-space/worker-zf3czj5e


In [5]:
# Step 2

# parameters
npartitions=4

ddf = dd.from_pandas(df, npartitions=npartitions)
ddf

Unnamed: 0_level_0,Y,treat,cf1,cf2,cf3,cf4,cf5,cf6
npartitions=4,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
0,float64,float64,float64,float64,float64,float64,int64,float64
2500,...,...,...,...,...,...,...,...
5000,...,...,...,...,...,...,...,...
7500,...,...,...,...,...,...,...,...
9999,...,...,...,...,...,...,...,...


In [6]:
# Step 3

# This step needs more attention in real data. For now, I just categorize cf5.
# Remember One-hot encoding is not good for linear and logestic regression
from dask_ml.preprocessing import DummyEncoder

# make cf5 category
ddf = ddf.categorize(columns=["cf5"])
dm = DummyEncoder()
ddf_enc = dm.fit_transform(ddf)
ddf_enc.head()

Unnamed: 0,Y,treat,cf1,cf2,cf3,cf4,cf6,cf5_-2,cf5_2,cf5_1,cf5_0,cf5_-1
0,-33.710755,9.128798,1.088013,-1.040795,1.546794,0.647522,-0.338821,1,0,0,0,0
1,-6.917718,9.526059,0.394032,-1.100086,-0.058984,0.10844,0.745696,0,1,0,0,0
2,-89.842294,2.363495,0.061547,0.701527,0.213724,-0.504911,-0.374944,1,0,0,0,0
3,-55.530468,6.236718,2.283811,0.241855,-1.233216,-0.520288,-1.040341,1,0,0,0,0
4,-0.388687,10.30974,-0.251576,0.530213,-0.270462,1.191965,0.894189,0,0,1,0,0


In [7]:
# Step 4

# percent of training data

tr_pr = 0.8
parametric = False


In [8]:
# Step 5
from dask_ml.model_selection import train_test_split

# create training data 
## features
X = ddf_enc.drop(["Y", "treat"], axis=1).copy()

# label
y = ddf_enc["Y"].copy()

X_train, X_test, y_train, y_test = train_test_split(X, y, train_size= tr_pr, random_state=123, shuffle=True)


In [9]:
print(f'Length of training data: {len(X_train)} which is {(len(X_train)/len(X))*100} percent of original data.')
print(f'Length of training data: {len(X_test)} which is {(len(X_test)/len(X))*100} percent of original data.')

Length of training data: 8065 which is 80.65 percent of original data.
Length of training data: 1935 which is 19.35 percent of original data.


In [10]:
# Step 6

if parametric:
    X = ddf_enc.drop(["Y", "treat"], axis=1).copy()
    y = ddf_enc["Y"].copy()
    X_train, X_test, y_train, y_test = train_test_split(X, y, train_size= tr_pr,
                                                        random_state=123, shuffle=True)
    dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)
    dall_data = xgb.dask.DaskDMatrix(client, X, y)
    output = xgb.dask.train(
        client, 
        {"verbosity":2, "objective":"reg:squarederror"},
        dtrain,
        num_boost_round=20,
        evals=[(dtrain, "train")],
    )
    # predict gps value for all data
    e_gps = xgb.dask.predict(client, output, dall_data)
    e_gps_tmp = (e_gps - ddf_enc["treat"])
    e_gps_obj = e_gps_tmp.std()
    e_gps_std = e_gps_obj.compute()
    gps = norm.pdf(ddf_enc["treat"], e_gps, e_gps_std)
    plt.plot(gps[1:100])

In [11]:
# Step 7 

if not parametric:
    X = ddf_enc.drop(["Y", "treat"], axis=1).copy()
    y = ddf_enc["Y"].copy()
    X_train, X_test, y_train, y_test = train_test_split(X, y, train_size= tr_pr,
                                                        random_state=123, shuffle=True)
    dtrain1 = xgb.dask.DaskDMatrix(client, X_train, y_train)
    dall_data_1 = xgb.dask.DaskDMatrix(client, X, y)
    output1 = xgb.dask.train(
        client, 
        {"verbosity":2, "objective":"reg:squarederror"},
        dtrain1,
        num_boost_round=20,
        evals=[(dtrain1, "train")],
    )
    # predict gps value for all data
    e_gps = xgb.dask.predict(client, output1, dall_data_1)
    
    y2 = (y - e_gps).abs()
    X_train_2, X_test_2, y_train_2, y_test_2 = train_test_split(X, y2, train_size= tr_pr,
                                                       random_state=123, shuffle=True)
    dtrain_2 = xgb.dask.DaskDMatrix(client, X_train_2, y_train_2)
    dall_data_2 = xgb.dask.DaskDMatrix(client, X, y2)
    output_2 = xgb.dask.train(
        client, 
        {"verbosity":2, "objective":"reg:squarederror"},
        dtrain_2,
        num_boost_round=4,
        evals=[(dtrain_2, "train")],
    )
    # predict standard deviation of gps value for all data
    e_gps_std = xgb.dask.predict(client, output_2, dall_data_2)
    
    # compute residual
    w_resid = (y - e_gps)/e_gps_std
    
    # compute kernel density estimate ("gaussain")
    kernel = stats.gaussian_kde(w_resid)
    gps = kernel(w_resid)
    


[0]	train-rmse:54.49600
[1]	train-rmse:49.09535
[2]	train-rmse:45.71896
[3]	train-rmse:43.50631
[4]	train-rmse:42.05592
[5]	train-rmse:41.08343
[6]	train-rmse:40.29062
[7]	train-rmse:39.78020
[8]	train-rmse:39.41623
[9]	train-rmse:39.00407
[10]	train-rmse:38.82005
[11]	train-rmse:38.34788
[12]	train-rmse:37.99800
[13]	train-rmse:37.75954
[14]	train-rmse:37.62613
[15]	train-rmse:37.13777
[16]	train-rmse:36.85887
[17]	train-rmse:36.56160
[18]	train-rmse:36.36369
[19]	train-rmse:36.19000


[18:04:27] task [xgboost.dask]:tcp://127.0.0.1:53371 got new rank 0
[18:04:27] task [xgboost.dask]:tcp://127.0.0.1:53375 got new rank 1
[18:04:27] task [xgboost.dask]:tcp://127.0.0.1:53374 got new rank 2
[18:04:27] task [xgboost.dask]:tcp://127.0.0.1:53368 got new rank 3
[18:04:27] INFO: /Users/runner/miniforge3/conda-bld/xgboost-split_1634712680264/work/src/gbm/gbtree.cc:138: Tree method is automatically selected to be 'approx' for distributed training.
[18:04:27] INFO: /Users/runner/miniforge3/conda-bld/xgboost-split_1634712680264/work/src/gbm/gbtree.cc:138: Tree method is automatically selected to be 'approx' for distributed training.
[18:04:27] INFO: /Users/runner/miniforge3/conda-bld/xgboost-split_1634712680264/work/src/gbm/gbtree.cc:138: Tree method is automatically selected to be 'approx' for distributed training.[18:04:27] INFO: /Users/runner/miniforge3/conda-bld/xgboost-split_1634712680264/work/src/gbm/gbtree.cc:138: Tree method is automatically selected to be 'approx' for dis

[0]	train-rmse:30.60576
[1]	train-rmse:27.52124
[2]	train-rmse:25.72144
[3]	train-rmse:24.66389


[18:04:28] task [xgboost.dask]:tcp://127.0.0.1:53368 got new rank 0
[18:04:28] task [xgboost.dask]:tcp://127.0.0.1:53371 got new rank 1
[18:04:28] task [xgboost.dask]:tcp://127.0.0.1:53375 got new rank 2
[18:04:28] task [xgboost.dask]:tcp://127.0.0.1:53374 got new rank 3
[18:04:28] INFO: /Users/runner/miniforge3/conda-bld/xgboost-split_1634712680264/work/src/gbm/gbtree.cc:138: Tree method is automatically selected to be 'approx' for distributed training.
[18:04:28] INFO: /Users/runner/miniforge3/conda-bld/xgboost-split_1634712680264/work/src/gbm/gbtree.cc:138: Tree method is automatically selected to be 'approx' for distributed training.
[18:04:28] INFO: /Users/runner/miniforge3/conda-bld/xgboost-split_1634712680264/work/src/gbm/gbtree.cc:138: Tree method is automatically selected to be 'approx' for distributed training.
[18:04:28] INFO: /Users/runner/miniforge3/conda-bld/xgboost-split_1634712680264/work/src/gbm/gbtree.cc:138: Tree method is automatically selected to be 'approx' for di