In [1]:
# to reconnect to the cluster
import coiled
# cluster = coiled.Cluster(name='DarishSakeesing-e2b407d7-f')
cluster = coiled.Cluster(n_workers=10)

from dask.distributed import Client
client = Client(cluster)
print('Dashboard:', client.dashboard_link)

Output()

Dashboard: http://ec2-3-142-171-109.us-east-2.compute.amazonaws.com:8787

+---------+---------------+---------------+---------------+
| Package | client        | scheduler     | workers       |
+---------+---------------+---------------+---------------+
| python  | 3.8.5.final.0 | 3.8.8.final.0 | 3.8.8.final.0 |
| tornado | 6.0.4         | 6.1           | 6.1           |
+---------+---------------+---------------+---------------+


In [2]:
import dask.dataframe as dd

raw_data = dd.read_csv(
    "s3://lending-club/accepted_2007_to_2018Q4.csv",
    dtype={'desc': 'object', 
            'id': 'object',
            'sec_app_earliest_cr_line': 'object'}, 
    parse_dates = ['issue_d','earliest_cr_line'],
    low_memory=False,
    storage_options={"anon": True},
    blocksize="16 MiB",
)

In [3]:
# Filtering out the relavant features
raw_data = raw_data[['id',
                     'addr_state', # Need to dummify
                     'annual_inc',
                     'application_type', # Need to binarize 
                     'disbursement_method', # Need to binarize
                     'dti',
                     'earliest_cr_line',
                     'emp_length', # Need to convert to number and add NAs
                     'emp_title', # Needs to be encoded
                     'fico_range_high', 
                     'fico_range_low',
                     'grade', # Need to dummify or be ordinal encoded
                     'home_ownership', # Need to dummify
                     'initial_list_status', # Need to dummify (binarize)
                     'installment',
                     'int_rate',
                     'issue_d',
                     'loan_amnt',
                     'open_acc', 
                     'pub_rec', 
                     'pub_rec_bankruptcies',
                     'purpose', # Need to dummify
                     'sub_grade', # Need to dummify or be ordinal encoded
                     'term', # Need to convert to integer from string
                     'verification_status',
                     'zip_code',
                     'loan_status' # Need to dummify
                    ]]

In [4]:
# Look at the values in the term column

#print('Values in term \n', raw_data.term.unique().compute())

In [5]:
# Drop the observation with NAs in the term columns, then convert term to integer
raw_data = raw_data.dropna(subset=['term'])

raw_data.term = raw_data.term.str.strip(' months').astype(int)

In [6]:
# Separate 36 and 60 months loans
df_3 = raw_data[raw_data.term == 36]
df_5 = raw_data[raw_data.term == 60]

In [7]:
# Only consider loans that are either charged off or fully paid
df_3 = df_3[df_3['loan_status'].isin(['Charged Off','Fully Paid'])]
df_5 = df_5[df_5['loan_status'].isin(['Charged Off','Fully Paid'])]

In [8]:
# Check if there are NAs in the issue_d field
# print('NAs in issue_d among 3-year loans:', sum(df_3.issue_d.isna()))
# print('NAs in issue_d among 5-year loans:', sum(df_5.issue_d.isna()))

In [9]:
# drop application_type column
df_3 = df_3.drop(['application_type', 'term', 'open_acc'], axis=1)
df_5 = df_5.drop(['application_type', 'term', 'open_acc'], axis=1)

In [10]:
# Look at only a subset of years for each loan term
df_3 = df_3[df_3['issue_d'].dt.year <= 2015]
df_5 = df_5[df_5['issue_d'].dt.year <= 2013]

In [11]:
# Drop obs that have NAs in those columns because they are probably bad observations

df_3 = df_3.dropna(subset=['annual_inc',
                              'dti',
                              'pub_rec',
                              'pub_rec_bankruptcies',
                              'int_rate',
                              'loan_amnt',
                              'grade',
                              'sub_grade',
                              'verification_status'
                              ])
df_5 = df_5.dropna(subset=['annual_inc',
                              'dti',
                              'pub_rec',
                              'pub_rec_bankruptcies',
                              'int_rate',
                              'loan_amnt',
                              'grade',
                              'sub_grade',
                              'verification_status'
                              ])

In [12]:
# check the type columns
# print(df_3.dtypes)
# print(df_5.dtypes)

In [13]:
# Create feature that calculates the number of days between earliest credit line and issue_d
df_3['days_since_first_credit'] = (df_3['issue_d'] - df_3['earliest_cr_line']).dt.days
df_5['days_since_first_credit'] = (df_5['issue_d'] - df_5['earliest_cr_line']).dt.days

In [14]:
#drop earliest_cr_line and issue_d
df_3 = df_3.drop(['earliest_cr_line', 'issue_d'], axis=1)
df_5 = df_5.drop(['earliest_cr_line', 'issue_d'], axis=1)

In [15]:
# Check for any remaining NAs in all the columns
# Takes a long time to run (maybe i am not using the full potential of clusters, use map_partitions?)

# print('===== 3 YEAR LOANS =====')
# for col in df_3.columns:
#     print(f'NAs in {col}:', sum(df_3[col].isna()))
# print('===== 5 YEAR LOANS =====')
# for col in df_5.columns:
#     print(f'NAs in {col}:', sum(df_5[col].isna()))

In [16]:
# Clean emp_length
import dask.array as da

## 3 YEARS
df_3.emp_length = df_3.emp_length.replace(to_replace='< 1 year', value='0')
df_3.emp_length = df_3.emp_length.str.strip('<+ years')

mean_emp_length_3 = da.floor(df_3.emp_length.dropna().astype(int).mean())
df_3.emp_length = df_3.emp_length.fillna(mean_emp_length_3)
df_3.emp_length = df_3.emp_length.astype(int)

## 5 YEARS
df_5.emp_length = df_5.emp_length.replace(to_replace='< 1 year', value='0')
df_5.emp_length = df_5.emp_length.str.strip('<+ years')

mean_emp_length_5 = da.floor(df_5.emp_length.dropna().astype(int).mean())
df_5.emp_length = df_5.emp_length.fillna(mean_emp_length_5)
df_5.emp_length = df_5.emp_length.astype(int)

In [17]:
# Check the different values in loan_status

# print('Values in loan_status (3 years): \n', df_3.loan_status.unique().compute())
# print('Values in loan_status (5 years): \n', df_5.loan_status.unique().compute())

In [18]:
# Only consider grade A, B, C, D, E
df_3 = df_3[df_3['grade'].isin(['A', 'B', 'C', 'D', 'E'])]
df_5 = df_5[df_5['grade'].isin(['A', 'B', 'C', 'D', 'E'])]

# drop grade and emp_titlte after that
df_3 = df_3.drop(['grade', 'emp_title'], axis=1)
df_5 = df_5.drop(['grade','emp_title'], axis=1)

In [19]:
# Remove outliers in annual_inc
df_3.annual_inc = df_3.annual_inc[df_3.annual_inc < 2e7]
df_5.annual_inc = df_5.annual_inc[df_5.annual_inc < 2e7]

In [20]:
# Separate target and features
y_3 = df_3.loan_status
X_3 = df_3[['addr_state', 'annual_inc', 'disbursement_method', 'dti', 'emp_length', 'fico_range_high', 'fico_range_low', 'home_ownership', 'initial_list_status', 'installment', 'int_rate', 'loan_amnt', 'pub_rec', 'pub_rec_bankruptcies', 'purpose', 'sub_grade', 'verification_status', 'days_since_first_credit']]

y_5 = df_5.loan_status
X_5 = df_5[['addr_state', 'annual_inc', 'disbursement_method', 'dti', 'emp_length', 'fico_range_high', 'fico_range_low', 'home_ownership', 'initial_list_status', 'installment', 'int_rate', 'loan_amnt', 'pub_rec', 'pub_rec_bankruptcies', 'purpose', 'sub_grade', 'verification_status', 'days_since_first_credit']]

In [21]:
# Dummy encode categorical variables in X
from dask_ml.preprocessing import Categorizer, DummyEncoder, LabelEncoder

ce_3 = Categorizer(columns=['addr_state', 'disbursement_method', 'emp_length', 'home_ownership', 'initial_list_status', 'purpose', 'verification_status', 'sub_grade'])
X_3 = ce_3.fit_transform(X_3)
ce_5 = Categorizer(columns=['addr_state', 'disbursement_method', 'emp_length', 'home_ownership', 'initial_list_status', 'purpose', 'verification_status', 'sub_grade'])
X_5 = ce_5.fit_transform(X_5)

de_3 = DummyEncoder(columns=['addr_state', 'disbursement_method', 'emp_length', 'home_ownership', 'initial_list_status', 'purpose', 'verification_status', 'sub_grade'])
X_3 = de_3.fit_transform(X_3)
de_5 = DummyEncoder(columns=['addr_state', 'disbursement_method', 'emp_length', 'home_ownership', 'initial_list_status', 'purpose', 'verification_status', 'sub_grade'])
X_5 = de_5.fit_transform(X_5)

# Label Encode target variable
le_3 = LabelEncoder()
y_3 = le_3.fit_transform(y_3)
le_5 = LabelEncoder()
y_5 = le_5.fit_transform(y_5)

In [22]:
# Scale features
from dask_ml.preprocessing import StandardScaler
scaler_3 = StandardScaler()
X_3 = scaler_3.fit_transform(X_3)

scaler_5 = StandardScaler()
X_5 = scaler_5.fit_transform(X_5)

In [23]:
print(X_3.dtypes)
print(X_3.dtypes)
print(da.unique(y_3).compute())
print(da.unique(y_5).compute())

annual_inc         float64
dti                float64
fico_range_high    float64
fico_range_low     float64
installment        float64
                    ...   
sub_grade_E5       float64
sub_grade_A3       float64
sub_grade_D5       float64
sub_grade_E2       float64
sub_grade_E4       float64
Length: 123, dtype: object
annual_inc         float64
dti                float64
fico_range_high    float64
fico_range_low     float64
installment        float64
                    ...   
sub_grade_E5       float64
sub_grade_A3       float64
sub_grade_D5       float64
sub_grade_E2       float64
sub_grade_E4       float64
Length: 123, dtype: object
[0 1]
[0 1]


In [24]:
# Create train_test_split
from dask_ml.model_selection import train_test_split

X_3_train, X_3_test, y_3_train, y_3_test = train_test_split(X_3, y_3, test_size=0.2, shuffle=True, convert_mixed_types=True)
X_5_train, X_5_test, y_5_train, y_5_test = train_test_split(X_5, y_5, test_size=0.2, shuffle=True, convert_mixed_types=True)

In [25]:
#Constructing priors list
priors = []
for x in range(0, 101, 1):
    priors.append([x/100, (100-x)/100])

In [26]:
import joblib
from sklearn.model_selection import GridSearchCV
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis

In [27]:
from sklearn.metrics import confusion_matrix
def custom_scoring(estimator, X, y):
    cm = confusion_matrix(y, estimator.predict(X))
    score = 0.7*(cm[0, 1]) + 0.3*(cm[1, 0])
    return score

In [28]:
params = {'priors': priors}
lda_3 = LinearDiscriminantAnalysis()
lda_5 = LinearDiscriminantAnalysis()

grid_search_3 = GridSearchCV(estimator=lda_3, param_grid=params, scoring='balanced_accuracy', n_jobs=-1, cv=3, verbose=3)
grid_search_5 = GridSearchCV(estimator=lda_5, param_grid=params, scoring='balanced_accuracy', n_jobs=-1, cv=3, verbose=3)

In [29]:
print(type(X_3_train))
print(type(y_3_train))

<class 'dask.array.core.Array'>
<class 'dask.array.core.Array'>


In [30]:
from sklearn.model_selection import train_test_split as sktt

In [31]:
with joblib.parallel_backend('dask', n_jobs=-1, scatter=[X_3_train, y_3_train]):
    X_3_train, X_3_test, y_3_train, y_3_test = sktt(X_3.compute(), y_3.compute(), test_size=0.2, shuffle=True)
    X_5_train, X_5_test, y_5_train, y_5_test = sktt(X_5.compute(), y_5.compute(), test_size=0.2, shuffle=True)
    grid_search_3.fit(X_3_train, y_3_train)
    grid_search_5.fit(X_5_train, y_5_train)

Fitting 3 folds for each of 101 candidates, totalling 303 fits
[Parallel(n_jobs=-1)]: Using backend DaskDistributedBackend with 40 concurrent workers.
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7fbdf98afbb0>>, <Task finished name='Task-1214' coro=<DaskDistributedBackend.apply_async.<locals>.f() done, defined at /Users/darishsakeesing/opt/anaconda3/lib/python3.8/site-packages/joblib/_dask.py:316> exception=CommClosedError('in <closed TLS>: BrokenPipeError: [Errno 32] Broken pipe')>)
Traceback (most recent call last):
  File "/Users/darishsakeesing/opt/anaconda3/lib/python3.8/site-packages/tornado/iostream.py", line 971, in _handle_write
    num_bytes = self.write_to_fd(self._write_buffer.peek(size))
  File "/Users/darishsakeesing/opt/anaconda3/lib/python3.8/site-packages/tornado/iostream.py", line 1568, in write_to_fd
    return self.socket.send(data)  # type: ignore
  F

KeyboardInterrupt: 