In [None]:
!conda install mysql -y
!python -m pip install mysql
!mkdir /opt/conda/data
!mysqld --initialize --datadir /opt/conda/data
!mysql.server start

In [None]:
# paste in the generated password
!mysqladmin --user=root --password="generated password here" password ""
!mysql -u root -e "CREATE DATABASE IF NOT EXISTS example"

In [None]:
!pip install optuna pyarrow==5.0.0
# assume role to access S3 bucket
!rolypoly assume [put role here]

In [None]:
import dask
import xgboost
import optuna
from dask import delayed
import dask_gateway
from dask_gateway import Gateway
from dask_ml.model_selection import KFold
from xgboost.dask import DaskXGBClassifier as XGBClassifier
from sklearn.metrics import roc_auc_score
import distributed

print(dask.__version__, distributed.__version__, xgboost.__version__, dask_gateway.__version__)
# 2021.07.0 2021.07.0 1.4.0 0.9.0

def initiateCluster():
    # adjust cluster configurations as needed
    gateway = Gateway()
    options = gateway.cluster_options()
    options.worker_cores = 4
    options.worker_memory = 16
    options.scheduler_cores = 4
    options.scheduler_memory = 8
    options.role = "put s3 bucket role here so cluster can read from s3"

    cluster = gateway.new_cluster(options)
    client = cluster.get_client()
    
    cluster.scale(16)
    return cluster, client

def createStudy():
    return optuna.create_study(study_name='test_study', direction='maximize', storage="mysql://root@localhost/example", load_if_exists=True)

def calcRocAuc(cv_clf, X_val,y_val):
        predictions = cv_clf.predict(X_val)
        score = roc_auc_score(y_val,predictions)
        return score

def cv_estimate(X, y, params, n_splits=5):
        cv = KFold(n_splits=n_splits)
        cv_clf = XGBClassifier(**params)
        val_scores = 0
        i = 0
        for train, val in cv.split(X, y):
            print("KFold " + str(i))
            start = datetime.datetime.now()
            cv_clf.fit(X[train], y[train])
            val_scores += calcRocAuc(cv_clf, X[val], y[val])
            end = datetime.datetime.now()
            print("KFold " + str(i) + ", " + str(val_scores) + ", " + str((end - start).total_seconds()))
            i += 1
        val_scores /= n_splits
        return val_scores

def objective(trial, X, y):
        # change parameter ranges as needed
        params = {'n_estimators': trial.suggest_int('n_estimators', 100, 1000),
                          'learning_rate': trial.suggest_uniform('learning_rate', 0.01, 0.99),
                          'subsample': trial.suggest_uniform('subsample', 0.1, 0.9),
                          'max_depth': trial.suggest_int('max_depth', 1, 10),
                          'colsample_bytree': trial.suggest_uniform('colsample_bytree', 0.1, 0.9),
                          'min_child_weight': trial.suggest_int('min_child_weight', 1, 9)
        }
        accuracy = cv_estimate(X, y, params, 5)
        return accuracy

In [None]:
!pip install mysqlclient
!pip install pymysql
import pymysql  
pymysql.install_as_MySQLdb()
import _mysql

In [None]:
import optuna
import datetime

def train_task(identifier):
    p_string = "Process " + str(identifier)
    print("Starting " + p_string)
    g = Gateway()
    print(p_string + ": Creating cluster...")
    cluster, client = initiateCluster()
    print(p_string + ": " + client.dashboard_link)
    
    print(p_string + ": Starting XGB fit")
    start = datetime.datetime.now()
    
    # update chunksize and .rechunk() as needed
    X_p = dask.dataframe.read_parquet("s3 path to file", engine="pyarrow", chunksize=30000)
    y_p = dask.dataframe.read_parquet("s3 path to file", engine="pyarrow", chunksize=30000) 
    X = X_p.to_dask_array(lengths=True)
    y = y_p.to_dask_array(lengths=True)

    X = X.rechunk(30000, 300)
    
    print(X.compute_chunk_sizes())
    print(len(X))
    print(len(y))
    
    study = createStudy()
    # change n_trials as needed
    study.optimize(lambda trial: objective(trial, X, y), n_trials=2)
    
    end = datetime.datetime.now()
    print(p_string + " finished train an xgboost classifier")
    print(p_string + " classifier train time: " + str((end - start).total_seconds()))

from multiprocessing import Process

# 2021.07.0 2021.07.0 1.4.0 0.9.0
print(dask.__version__, distributed.__version__, xgboost.__version__, dask_gateway.__version__)

# change number of processes as needed
n = 4

# create a number of processes with different clusters
processes = [Process(target=train_task, args=(i, )) for i in range(n)]

study = createStudy()
print('Starting Proccesses')
for process in processes:
    process.start()
    
# wait for all processes to finish
for process in processes:
    process.join()

In [None]:
print("best params")
print(study.best_params)

In [None]:
from time import sleep
from dask_gateway import Gateway

gateway = Gateway()
print(gateway.list_clusters())

# close clusters
print("Closing clusters...")
clusters = gateway.list_clusters()
for cluster in clusters:
    gateway.connect(cluster.name).shutdown()
    
while len(gateway.list_clusters()) > 0:
    sleep(0.5)
    
print("Finished closing clusters")
gateway.list_clusters()