This notebook was used to test possible solutions to https://github.com/microsoft/LightGBM/issues/3713.

In [1]:
import dask.array as da

from dask.distributed import Client, LocalCluster, wait

from lightgbm.dask import DaskLGBMRegressor, DaskLGBMClassifier

Create a cluster with 3 workers. Since this is a `LocalCluster`, those workers are just 3 local processes.

In [2]:
n_workers = 3
cluster = LocalCluster(n_workers=n_workers)
client = Client(cluster)

In [3]:
print(f"View the dashboard: {cluster.dashboard_link}")

View the dashboard: http://127.0.0.1:8787/status


In [None]:
client.wait_for_workers(n_workers)

In [89]:
!pip install pytest

Collecting pytest
  Downloading pytest-6.2.1-py3-none-any.whl (279 kB)
[K     |████████████████████████████████| 279 kB 1.8 MB/s eta 0:00:01
Collecting pluggy<1.0.0a1,>=0.12
  Downloading pluggy-0.13.1-py2.py3-none-any.whl (18 kB)
Collecting iniconfig
  Downloading iniconfig-1.1.1-py2.py3-none-any.whl (5.0 kB)
Collecting toml
  Downloading toml-0.10.2-py2.py3-none-any.whl (16 kB)
Collecting py>=1.8.2
  Downloading py-1.10.0-py2.py3-none-any.whl (97 kB)
[K     |████████████████████████████████| 97 kB 1.5 MB/s eta 0:00:011
Installing collected packages: pluggy, iniconfig, toml, py, pytest
Successfully installed iniconfig-1.1.1 pluggy-0.13.1 py-1.10.0 pytest-6.2.1 toml-0.10.2


Click the link above to view a diagnostic dashboard while you run the training code below.

In [90]:
import os
import socket
import sys

import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd
import scipy.sparse
from dask.array.utils import assert_eq
from distributed.utils_test import client, cluster_fixture, gen_cluster, loop
from sklearn.datasets import make_blobs, make_regression

import lightgbm
import lightgbm.dask as dlgbm

data_output = ['array', 'scipy_csr_matrix', 'dataframe']
data_centers = [[[-4, -4], [4, 4]], [[-4, -4], [4, 4], [-4, 4]]]


def _create_data(objective, n_samples=100, centers=2, output='array', chunk_size=50):
    if objective == 'classification':
        X, y = make_blobs(n_samples=n_samples, centers=centers, random_state=42)
    elif objective == 'regression':
        X, y = make_regression(n_samples=n_samples, random_state=42)
    else:
        raise ValueError(objective)
    rnd = np.random.RandomState(42)
    weights = rnd.random(X.shape[0]) * 0.01

    if output == 'array':
        dX = da.from_array(X, (chunk_size, X.shape[1]))
        dy = da.from_array(y, chunk_size)
        dw = da.from_array(weights, chunk_size)
    elif output == 'dataframe':
        X_df = pd.DataFrame(X, columns=['feature_%d' % i for i in range(X.shape[1])])
        y_df = pd.Series(y, name='target')
        dX = dd.from_pandas(X_df, chunksize=chunk_size)
        dy = dd.from_pandas(y_df, chunksize=chunk_size)
        dw = dd.from_array(weights, chunksize=chunk_size)
    elif output == 'scipy_csr_matrix':
        dX = da.from_array(X, chunks=(chunk_size, X.shape[1])).map_blocks(scipy.sparse.csr_matrix)
        dy = da.from_array(y, chunks=chunk_size)
        dw = da.from_array(weights, chunk_size)
    else:
        raise ValueError("Unknown output type %s" % output)

    return X, y, weights, dX, dy, dw

Right now, the Dask Arrays `data` and `labels` are lazy. Before training, you can force the cluster to compute them by running `.persist()` and then wait for that computation to finish by `wait()`-ing on them.

In [91]:
X, y, w, dX, dy, dw = _create_data(
    'classification',
    output="array",
    centers=data_centers[0]
)

With the data set up on the workers, train a model. `lightgbm.dask.DaskLGBMRegressor` has an interface that tries to stay as close as possible to the non-Dask scikit-learn interface to LightGBM (`lightgbm.sklearn.LGBMRegressor`).

In [59]:
dask_reg = DaskLGBMRegressor(
    random_state=708,
    objective="regression_l2",
    tree_learner="data",
    n_estimators=10
)

dask_reg.fit(
    client=client,
    X=dX,
    y=dy,
)

# predictions asking for predcontrib should add
# the contribution column
preds = dask_reg.predict(
    X,
    raw_score=True
).compute()
preds_with_contrib = dask_reg.predict(
    data[:1000, :],
    pred_contrib=True,
    raw_score=True
).compute()

In [60]:
from lightgbm.sklearn import LGBMRegressor

local_reg = LGBMRegressor(
    random_state=708,
    objective="regression_l2",
    tree_learner="data",
    n_estimators=10
)

local_reg.fit(
    X=data.compute(),
    y=reg_target.compute(),
)

# predictions asking for predcontrib should add
# the contribution column
local_preds = local_reg.predict(
    data.compute()[:1000, :],
    raw_score=True
)
local_preds_with_contrib = local_reg.predict(
    data.compute()[:1000, :],
    pred_contrib=True,
    raw_score=True
)

  return f(*args, **kwargs)


In [85]:
[x[100] for x in preds_with_contrib[:10]]

[0.5005821951251901,
 0.5005821951251901,
 0.5005821951251901,
 0.5005821951251901,
 0.5005821951251901,
 0.5005821951251901,
 0.5005821951251901,
 0.5005821951251901,
 0.5005821951251901,
 0.5005821951251901]

In [86]:
[x[100] for x in local_preds_with_contrib[:10]]

[0.500582195028606,
 0.500582195028606,
 0.500582195028606,
 0.500582195028606,
 0.500582195028606,
 0.500582195028606,
 0.500582195028606,
 0.500582195028606,
 0.500582195028606,
 0.500582195028606]

In [68]:
from dask.array.utils import assert_eq

assert_eq(preds_with_contrib, local_preds_with_contrib, atol=0.01)

AssertionError: 

The model produced by this training run is an instance of `DaskLGBMRegressor`. To get a regular non-Dask model (which can be pickled and saved), run `.to_local()`.

In [None]:
local_model = dask_reg.to_local()
type(local_model)

You can visualize this model by looking at a data frame representation of it.

In [None]:
local_model.booster_.trees_to_dataframe()