# DASK ML

Use dask in case of high computation time or high memory usage. Otherwise it is not an advantage. 

Many of scikit-learn’s ML functions, including cross-validation, hyperparameter search, clustering, regression, imputation, and scoring methods, have a dask equivalent. You must make sure your data is a data colletion of dask, and if you use `to_array()` you explicit the chunck size. 



In [1]:
import dask.dataframe as dd
from dask_ml.linear_model import LinearRegression
from dask_ml.model_selection import train_test_split
from sklearn.metrics import r2_score

# https://archive.ics.uci.edu/dataset/504/qsar+fish+toxicity
# Data set containing values for 6 attributes (molecular descriptors) 
# of 908 chemicals used to predict quantitative acute aquatic toxicity 
# towards the fish Pimephales promelas (fathead minnow).

path = "data/qsar_fish_toxicity.csv"
column_names = [
    'CIC0', 'SM1_Dz(Z)', 'GATS1i', 'NdsCH', 'NdssC', 'MLOGP', 'LC50'
]
df = dd.read_csv(path, names=column_names, sep=';')

# Prepare the data for regression
regr_X = df.drop(columns=['LC50'])
regr_y = df[['LC50']]

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(regr_X, 
                                                    regr_y, 
                                                    test_size=0.2, 
                                                    random_state=42, 
                                                    shuffle=False)

# Convert to Dask arrays
X_train = X_train.to_dask_array(lengths=True)
X_test = X_test.to_dask_array(lengths=True)
y_train = y_train.to_dask_array(lengths=True)
y_test = y_test.to_dask_array(lengths=True)

# Train the linear regression model
reg = LinearRegression()
reg.fit(X_train, y_train)

# Predict on the test set
y_pred = reg.predict(X_test)

# Compute the R^2 score
score = r2_score(y_test.compute(), y_pred.compute())  # Compute only at evaluation step
print(f'R^2 score: {score}')


R^2 score: 0.591318253692751


If there is a function that exists in scikit-learn or other data science libraries but not in Dask-ML, you can use DASK-ML as a wrapper around scikit-learn to make it distributed. 

In [3]:
import dask
from sklearn.linear_model import LinearRegression 

# List of estimators
estimators = [LinearRegression ()]

# Delayed tasks for training
train_chunck = [dask.delayed(estimator.fit)(X_train, y_train) for estimator in estimators]

# Delayed tasks for prediction
predict_chunks= [dask.delayed(estimator.predict)(X_test) for estimator in train_chunck]

# Delayed tasks for scoring
scores = [dask.delayed(r2_score)(y_test, y_pred) for y_pred in predict_chunks]

# Execute the entire pipeline
scores = dask.compute(*scores)
print("R^2 Scores:", scores)

R^2 Scores: (0.5920439021777781,)


# Distributed (Dask Client and Scheduler)

Imagine you're solving a challenging problem that requires significant computational power, like tuning a Ridge regression model on a large dataset with complex hyperparameters. While Dask works well locally on a single machine, its true strength lies in its distributed scheduler. By connecting to a Dask cluster, you can scale your computations across multiple machines, whether it's a local cluster or a larger distributed system. This portability allows Dask to adapt to the resources available, efficiently distributing tasks across workers. By this it can handle demanding workloads, making large-scale hyperparameter tuning and other computationally intensive tasks fast, efficient, and scalable. Before you can work with your code you need to initiate a scheduler and one or more workers:

Start the Dask scheduler and workers using the dask-scheduler and dask-worker commands in separate terminal windows.
```{bash}
# Start the Dask scheduler
dask-scheduler

# In a separate terminal window, start one or more Dask workers
dask-worker tcp://127.0.0.1:8786

# you can also specify memory and threads
dask-worker tcp://127.0.0.1:8786 --nthreads 4 --memory-limit 2GB
```



In [13]:
from dask.distributed import Client 
client = Client('127.0.0.1:8786', timeout=60)  # Increase timeout to 60 seconds
#client = Client(nworkers=4)
print(client)
# Get the status of the workers
workers = client.scheduler_info()['workers']
print(f"Number of workers: {len(workers)}")
for worker, info in workers.items():
    print(f"Worker: {worker}")
    print(f"  Memory: {info['memory_limit'] / 1e9:.2f} GB")
    print(f"  CPU: {info['nthreads']} threads")
    print(f"  Tasks: {info['nthreads']} tasks")

<Client: 'tcp://192.168.2.186:8786' processes=2 threads=12, memory=17.86 GiB>
Number of workers: 2
Worker: tcp://127.0.0.1:56658
  Memory: 17.18 GB
  CPU: 8 threads
  Tasks: 8 tasks
Worker: tcp://127.0.0.1:58338
  Memory: 2.00 GB
  CPU: 4 threads
  Tasks: 4 tasks


With the dask scheduler and workers we now can direct the joblib backend to dask

    from joblib import parallel_backend #to use dask as backend
    with parallel_backend('dask')
    

In [17]:
%time

from joblib import parallel_backend #to use dask as backend
from sklearn.model_selection import GridSearchCV
from sklearn.linear_model import Ridge
from sklearn.datasets import make_regression
from sklearn.metrics import mean_squared_error

# Generate synthetic regression data
X, y = make_regression(n_samples=1000, n_features=20, noise=0.1, random_state=42)

# Initialize the regression model
ridge = Ridge()

# Set up the grid search
param_grid = {'alpha': [0.1, 1.0, 10.0]}
gs = GridSearchCV(ridge, param_grid=param_grid, scoring='neg_mean_squared_error', cv=5)

# Perform the grid search with Dask parallel backend
with parallel_backend('dask'):
    gs.fit(X, y)

# Print the results
print(gs.cv_results_)

# Best estimator
best_ridge = gs.best_estimator_

# Predict on the training data
y_pred = best_ridge.predict(X)

# Compute the mean squared error
mse = mean_squared_error(y, y_pred)
print(f'Mean Squared Error: {mse}')

CPU times: user 3 μs, sys: 1e+03 ns, total: 4 μs
Wall time: 7.15 μs
{'mean_fit_time': array([0.00681005, 0.00672436, 0.00682592]), 'std_fit_time': array([0.00359283, 0.0032681 , 0.00354043]), 'mean_score_time': array([0.00041399, 0.00041456, 0.00044198]), 'std_score_time': array([6.61573710e-05, 5.13300085e-05, 3.99079942e-05]), 'param_alpha': masked_array(data=[0.1, 1.0, 10.0],
             mask=[False, False, False],
       fill_value=1e+20), 'params': [{'alpha': 0.1}, {'alpha': 1.0}, {'alpha': 10.0}], 'split0_test_score': array([-0.01217785, -0.09662753, -7.84541016]), 'split1_test_score': array([-0.01304742, -0.07148277, -5.926423  ]), 'split2_test_score': array([-0.00964671, -0.06847693, -6.18101719]), 'split3_test_score': array([-0.01071038, -0.06766663, -5.44388354]), 'split4_test_score': array([-0.01060364, -0.07510655, -6.60444719]), 'mean_test_score': array([-0.0112372 , -0.07587208, -6.40023621]), 'std_test_score': array([0.00121411, 0.01070195, 0.81454235]), 'rank_test_scor

# XGBOOST

Dask-XGBoost is an extension of the XGBoost library that leverages Dask, a parallel computing framework in Python, to distribute and scale the training of XGBoost models across multiple CPUs or GPUs, machines, or clusters. 

https://xgboost.readthedocs.io/en/stable/tutorials/dask.html

In [9]:
import xgboost as xgb
from dask import array as da
from dask_ml.model_selection import train_test_split

from sklearn.datasets import load_breast_cancer
data = load_breast_cancer()
feature_names = data.feature_names.tolist()
X, y = data.data, data.target

# Convert to Dask arrays
X = da.from_array(X, chunks=(100, X.shape[1]))
y = da.from_array(y, chunks=(100,))

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=True)

# Convert to DMatrix
dtrain = xgb.DMatrix(X_train.compute(), label=y_train.compute(), feature_names=feature_names)
dvalid = xgb.DMatrix(X_test.compute(), label=y_test.compute(), feature_names=feature_names)
watchlist = [(dtrain, 'train'), (dvalid, 'valid')]

# Set XGBoost parameters
xgb_pars = {
    'min_child_weight': 1,
    'eta': 0.5,
    'colsample_bytree': 0.9,
    'max_depth': 6,
    'subsample': 0.9,
    'lambda': 1.,
    'nthread': -1, #here it uses all the available threads
    'booster': 'gbtree',
    'silent': 1,
    'eval_metric': 'rmse',
    'objective': 'reg:squarederror'
}

# Train the model
model = xgb.train(xgb_pars, dtrain, 10, watchlist, early_stopping_rounds=2, maximize=False, verbose_eval=1)
print('Modeling RMSE %.5f' % model.best_score)

# Plot feature importance
xgb.plot_importance(model, max_num_features=28, height=0.7)

# Predict on the test set
pred = model.predict(dvalid)