In [1]:
#%%time
#create DASK scheduler and worker remote containers
#this will take at least one minute as there are delays added on purpose to allow containers to spawn
#on successful run you should see scheduler URL printed

#!python3 daskmaster.py

In [2]:
# defined as global
daskschurl = ""

In [3]:
import cdsw
import os
import time

def dask_distributed_launch(nworkers=1,ncpu=2,nmemory=4):
    # modify global copy
    global daskschurl
    
    #check if already running
    if daskschurl!="":
        #print(cdsw.list_workers())
        print(" Dask Scheduler Already Launched " + daskschurl)
        return(daskschurl)
    
    # Launch CDSW workers. These are engines that will run in 
    # the same project, execute a given code or script, and exit.
    # Scheduler engine will keep running in background until session is closed
    dask_scheduler = cdsw.launch_workers(n=1, cpu=2, memory=4, 
                                  kernel="python3",script="daskschedular.py")


    # IP of launched container comes up unknown for a while
    # Wait for a while so IP is available in data structure
    time.sleep(30)

    # Get schedular IP
    schedulerid = dask_scheduler[0]["id"]
    listtemp = cdsw.list_workers()

    for x in listtemp:
      if x["id"] == schedulerid:
        schedulerip = x["ip_address"]


    print(" Scheduler IP: " + schedulerip)

    #Scheduler protocol and port - defaults from Dask
    schproto = "tcp://"
    schport = ":8786"

    schloc = schproto + schedulerip + schport
    print(" Scheduler URL: " + schloc)

    dask_client = []
    # Launch at least one Dask Worker
    for c in range(nworkers):
        dask_client = dask_client + cdsw.launch_workers(n=1, cpu=ncpu, memory=nmemory, 
                                  kernel="python3",script="daskworker.py",
                                      env={"DASKSCHURL": schloc})

        # wait for a while until the container is launched successfully
        time.sleep(10)
    
    #set scheduler URL as environment variable
    #os.putenv("DASKSCHURL", schloc)
    daskschurl = schloc

    #return scheduler URL
    return(schloc)


def dask_stop_workers():
    global daskschurl
    cdsw.stop_workers()
    daskschurl = ""
    

In [4]:
def dask_test():
    from dask.distributed import Client
    client = Client(daskschurl)
    import dask.array as da
    x = da.random.random((40000,40000),chunks=(1000,1000))
    y = da.exp(x).sum()
    print(" Result of DASK distributed array test: " + str(y.compute()) + "\n")


In [5]:
#stop any previous dask distributed containers
dask_stop_workers()

In [6]:
#launch scheduler and worker container(s) - you can specify number of workers as argument
#to relaunch - first call dask_stop_workers() and then call this again
dask_distributed_launch(1,4,8)

 Scheduler IP: 10.10.17.63
 Scheduler URL: tcp://10.10.17.63:8786


'tcp://10.10.17.63:8786'

In [7]:
#check if global variable has the right URL
#we will use this to register a client
print(daskschurl)

tcp://10.10.17.63:8786


In [8]:
import pandas as pd
import cdsw
#any previously stopped pods may show up with status failed
#we are not showing those
workers_list = cdsw.list_workers()
print(" === List of launched running pods === ")
print(" ===      (scheduler + workers)    === ")

#collect relevant fields
workersl = []
for l in workers_list:
    #print(l)
    #print(" id: " + l["id"] + " IP Addr: " + l["ip_address"] + " CPUs: " + str(l["cpu"]) + " Memory: " + str(l["memory"]) \
    #     + " Status: " + l["status"])
    workersl = workersl + [[l["id"],l["ip_address"],l["cpu"],l["memory"],l["status"]]]

workersactive = pd.DataFrame(workersl,columns=["Id","IP Address","CPUs","Memory","Status"])
print(workersactive[workersactive.Status=="running"])

 === List of launched running pods === 
 ===      (scheduler + workers)    === 
                 Id   IP Address  CPUs  Memory   Status
0  lkiof5w5zdk1loe2    10.10.7.8     4       8  running
1  2kwx483q1coujw8q  10.10.17.63     2       4  running


In [9]:
%%time
#check distributed dask is working
dask_test()

#Register a DASK client and run a test
#from dask.distributed import Client
#client = Client(daskschurl)
#import dask.array as da
#x = da.random.random((40000,40000),chunks=(1000,1000))
#y = da.exp(x).sum()
#print("DASK test result: ") 
#print(y.compute())

 Result of DASK distributed array test: 2749263366.92

CPU times: user 6.35 s, sys: 48.4 ms, total: 6.4 s
Wall time: 12.3 s


In [9]:
%%time
#Read file from S3
#configure access via aws configure in terminal
#need pip3 install awscli for it to work

#Dask dataframe has poor ability to infer column types
#Need to be fixed manually
#options can be modified in ~/.config/dask/distributed.yaml 


import dask.dataframe as dd
import s3fs
df = dd.read_csv("s3://harshalpatil-s3/loans_accepted_2007_to_2018Q4.csv", \
                blocksize="25MB",sample=10000000,dtype={'id': 'object', \
       'sec_app_earliest_cr_line': 'object', 'desc': 'object'})

CPU times: user 801 ms, sys: 171 ms, total: 972 ms
Wall time: 12.5 s


In [10]:
#check number of partitions 

#df.known_divisions
#df.set_index("grade")
#df = df.repartition(npartitions=10000)
df.npartitions


68

In [11]:
#Drop any columns with ALL missing values - else may cause certain dask df functions to fail
nonnullcnt=pd.DataFrame(df.count().compute())

  args2 = [_execute_task(a, cache) for a in args]
  args2 = [_execute_task(a, cache) for a in args]
  args2 = [_execute_task(a, cache) for a in args]
  args2 = [_execute_task(a, cache) for a in args]
  args2 = [_execute_task(a, cache) for a in args]
  args2 = [_execute_task(a, cache) for a in args]
  args2 = [_execute_task(a, cache) for a in args]
  args2 = [_execute_task(a, cache) for a in args]


In [12]:
from IPython.display import display, HTML
display(HTML(nonnullcnt.to_html()))

Unnamed: 0,0
id,2260701
member_id,0
loan_amnt,2260668
funded_amnt,2260668
funded_amnt_inv,2260668
term,2260668
int_rate,2260668
installment,2260668
grade,2260668
sub_grade,2260668


In [15]:
####
display(HTML(df.head(1).to_html()))

Unnamed: 0,id,member_id,loan_amnt,funded_amnt,funded_amnt_inv,term,int_rate,installment,grade,sub_grade,emp_title,emp_length,home_ownership,annual_inc,verification_status,issue_d,loan_status,pymnt_plan,url,desc,purpose,title,zip_code,addr_state,dti,delinq_2yrs,earliest_cr_line,fico_range_low,fico_range_high,inq_last_6mths,mths_since_last_delinq,mths_since_last_record,open_acc,pub_rec,revol_bal,revol_util,total_acc,initial_list_status,out_prncp,out_prncp_inv,total_pymnt,total_pymnt_inv,total_rec_prncp,total_rec_int,total_rec_late_fee,recoveries,collection_recovery_fee,last_pymnt_d,last_pymnt_amnt,next_pymnt_d,last_credit_pull_d,last_fico_range_high,last_fico_range_low,collections_12_mths_ex_med,mths_since_last_major_derog,policy_code,application_type,annual_inc_joint,dti_joint,verification_status_joint,acc_now_delinq,tot_coll_amt,tot_cur_bal,open_acc_6m,open_act_il,open_il_12m,open_il_24m,mths_since_rcnt_il,total_bal_il,il_util,open_rv_12m,open_rv_24m,max_bal_bc,all_util,total_rev_hi_lim,inq_fi,total_cu_tl,inq_last_12m,acc_open_past_24mths,avg_cur_bal,bc_open_to_buy,bc_util,chargeoff_within_12_mths,delinq_amnt,mo_sin_old_il_acct,mo_sin_old_rev_tl_op,mo_sin_rcnt_rev_tl_op,mo_sin_rcnt_tl,mort_acc,mths_since_recent_bc,mths_since_recent_bc_dlq,mths_since_recent_inq,mths_since_recent_revol_delinq,num_accts_ever_120_pd,num_actv_bc_tl,num_actv_rev_tl,num_bc_sats,num_bc_tl,num_il_tl,num_op_rev_tl,num_rev_accts,num_rev_tl_bal_gt_0,num_sats,num_tl_120dpd_2m,num_tl_30dpd,num_tl_90g_dpd_24m,num_tl_op_past_12m,pct_tl_nvr_dlq,percent_bc_gt_75,pub_rec_bankruptcies,tax_liens,tot_hi_cred_lim,total_bal_ex_mort,total_bc_limit,total_il_high_credit_limit,revol_bal_joint,sec_app_fico_range_low,sec_app_fico_range_high,sec_app_earliest_cr_line,sec_app_inq_last_6mths,sec_app_mort_acc,sec_app_open_acc,sec_app_revol_util,sec_app_open_act_il,sec_app_num_rev_accts,sec_app_chargeoff_within_12_mths,sec_app_collections_12_mths_ex_med,sec_app_mths_since_last_major_derog,hardship_flag,hardship_type,hardship_reason,hardship_status,deferral_term,hardship_amount,hardship_start_date,hardship_end_date,payment_plan_start_date,hardship_length,hardship_dpd,hardship_loan_status,orig_projected_additional_accrued_interest,hardship_payoff_balance_amount,hardship_last_payment_amount,disbursement_method,debt_settlement_flag,debt_settlement_flag_date,settlement_status,settlement_date,settlement_amount,settlement_percentage,settlement_term
0,68407277,,3600.0,3600.0,3600.0,36 months,13.99,123.03,C,C4,leadman,10+ years,MORTGAGE,55000.0,Not Verified,Dec-2015,Fully Paid,n,https://lendingclub.com/browse/loanDetail.action?loan_id=68407277,,debt_consolidation,Debt consolidation,190xx,PA,5.91,0.0,Aug-2003,675.0,679.0,1.0,30.0,,7.0,0.0,2765.0,29.7,13.0,w,0.0,0.0,4421.723917,4421.72,3600.0,821.72,0.0,0.0,0.0,Jan-2019,122.67,,Mar-2019,564.0,560.0,0.0,30.0,1.0,Individual,,,,0.0,722.0,144904.0,2.0,2.0,0.0,1.0,21.0,4981.0,36.0,3.0,3.0,722.0,34.0,9300.0,3.0,1.0,4.0,4.0,20701.0,1506.0,37.2,0.0,0.0,148.0,128.0,3.0,3.0,1.0,4.0,69.0,4.0,69.0,2.0,2.0,4.0,2.0,5.0,3.0,4.0,9.0,4.0,7.0,0.0,0.0,0.0,3.0,76.9,0.0,0.0,0.0,178050.0,7746.0,2400.0,13734.0,,,,,,,,,,,,,,N,,,,,,,,,,,,,,,Cash,N,,,,,,


In [31]:
df=df.drop(columns=["member_id"])

In [33]:
summarydf=df.describe().compute()

In [17]:
#Use the wine dataset
from sklearn.datasets import load_wine
data = load_wine()

from dask import dataframe as dd
#since the data is numpy series we will not use df = dd.from_pandas(data[‘data’])
#dask has various ways to convert numpy and pandas to dask dataframes      
df = dd.from_array(data['data'])
df.columns = data['feature_names']

#print a few lines
print("\n Dataframe: ")
print(df.head())

#Get target variable
dt = dd.from_array(data['target'])
dt.columns = ["target"]
      
#print target classes example
print("\n Target: ")
print(dt.head())      

# train and test split
from dask_ml.model_selection import train_test_split
train, test, train_labels, test_labels = train_test_split(df,dt,random_state=123)      
      
#xgboost
from dask_ml.xgboost import XGBClassifier
est = XGBClassifier()      
      
#fit model      
model = est.fit(train, train_labels)

#which features contribute most
import pandas as pd
featureimp = pd.DataFrame(model.feature_importances_)
featureimp.columns = ['classifier_feature_importance']
featureimp["variable"] = data['feature_names']
print("\n\n === Xgboost Classifier Feature Importance: === ")
print(featureimp.sort_values(by="classifier_feature_importance", ascending=False))
#featureimp.to_csv()


#predictions
ypred = model.predict(test)

#sample some predictions
print("\n Sample initial five predictions: ")      
print(ypred[[0,1,2,3,4]].compute())

#ensure model is predicting all classes - not just 0
print("\n Check classes other than zero predicted: ")
print(ypred[ypred>0].compute())
      
#check accuracy on test set      
from dask_ml import metrics
print("\n\n Model Accuracy: ")      
print(metrics.accuracy_score(test_labels,model.predict(test)))
      
print("\n === End Dask Xgboost === \n")



 Dataframe: 
   alcohol  malic_acid   ash  alcalinity_of_ash  magnesium  total_phenols  \
0    14.23        1.71  2.43               15.6      127.0           2.80   
1    13.20        1.78  2.14               11.2      100.0           2.65   
2    13.16        2.36  2.67               18.6      101.0           2.80   
3    14.37        1.95  2.50               16.8      113.0           3.85   
4    13.24        2.59  2.87               21.0      118.0           2.80   

   flavanoids  nonflavanoid_phenols  proanthocyanins  color_intensity   hue  \
0        3.06                  0.28             2.29             5.64  1.04   
1        2.76                  0.26             1.28             4.38  1.05   
2        3.24                  0.30             2.81             5.68  1.03   
3        3.49                  0.24             2.18             7.80  0.86   
4        2.69                  0.39             1.82             4.32  1.04   

   od280/od315_of_diluted_wines  proline  
0    