In [None]:
import dask.dataframe as dd
import joblib
from sklearn.tree import DecisionTreeClassifier
from dask_ml.model_selection import train_test_split
from dask.distributed import Client
from dask_ml.wrappers import ParallelPostFit


In [None]:
client = Client(processes=False) 

In [None]:
# Read dataset from git
df = dd.read_csv('data/taiwanese-bankruptcy.csv', dtype={' Research and development expense rate': 'float64',
       ' Total Asset Growth Rate': 'float64'})

In [None]:
def start_pipeline(df: dd):
    return df.copy()

def rename_columns(df: dd):
    columns = df.columns.to_list()
    columns_without_spaces = [column.strip() for column in columns]
    return df.rename(columns=dict(zip(columns, columns_without_spaces)))


cleaned_dataset = (df.pipe(start_pipeline)
                    .pipe(rename_columns))

# Because we cleaned it now, we can persist the result, so we do not need to recompute it all the time
# It's interesting to see, how the CPU usage differs if this command is not called and the other cells are called subsequently
cleaned_dataset = cleaned_dataset.persist()

In [None]:
X = cleaned_dataset.drop('Bankrupt?', axis=1)
y = cleaned_dataset['Bankrupt?']

In [None]:
# This is a dask function
X_train, X_test, y_train, y_test = train_test_split(X, y, shuffle=True)

In [None]:
# Parallelize Training
dt = ParallelPostFit(DecisionTreeClassifier())

with joblib.parallel_backend('dask'):
    dt.fit(X_train, y_train)

dt.score(X_test, y_test)

In [None]:
# Close the client
client.close()