# Score and Predict Large Datasets with Dask Openstack Cloud Provider

This example combines the [Score and Predict Large Datasets example at dask-examples git repository](https://github.com/dask/dask-examples/blob/main/machine-learning/parallel-prediction.ipynb) with Dask Openstack Cloud Provider.

Details: https://examples.dask.org/machine-learning/parallel-prediction.html

## Prerequisites

```bash
pip install dask-cloudprovider
pip install numpy scikit-learn dask_ml
```

In [46]:
# > (optional) Let's disable the deprecation warnings to improve readability.
import warnings
warnings.filterwarnings('ignore', category=UserWarning)

## Create and Connect to Dask Cluster with Openstack Cloud Provider

In [49]:
import dask
import dask_cloudprovider
from instances import OpenStackCluster
from dask.distributed import Client, progress

In [50]:
cluster = OpenStackCluster(n_workers=2, shutdown_on_close=True, docker_image="armagankaratosun/dask-ml:latest")

Launching cluster with the following configuration: 
  OS Image: ubuntu-22-04 
  Flavor: 4vcpu-8gbram-50gbdisk 
  Docker Image: armagankaratosun/dask-ml:latest 
  Security Group: all-open 
Creating scheduler instance
dask-d523cb04-scheduler
	Internal IP: 10.0.30.30
	External IP: None
Waiting for scheduler to run at 10.0.30.30:8786
Scheduler is running
Creating worker instance
Creating worker instance
dask-d523cb04-worker-96fd7205
	Internal IP: 10.0.30.95
	External IP: None
dask-d523cb04-worker-e0f7591e
	Internal IP: 10.0.30.107
	External IP: None


In [None]:
client = Client(cluster)

In [55]:
client

0,1
Connection method: Cluster object,Cluster type: instances.OpenStackCluster
Dashboard: http://10.0.30.30:8787/status,

0,1
Dashboard: http://10.0.30.30:8787/status,Workers: 2
Total threads: 8,Total memory: 15.32 GiB

0,1
Comm: tls://10.0.30.30:8786,Workers: 2
Dashboard: http://10.0.30.30:8787/status,Total threads: 8
Started: 4 minutes ago,Total memory: 15.32 GiB

0,1
Comm: tls://10.0.30.95:40477,Total threads: 4
Dashboard: http://10.0.30.95:43771/status,Memory: 7.66 GiB
Nanny: tls://10.0.30.95:44951,
Local directory: /tmp/dask-scratch-space/worker-e5rljt6_,Local directory: /tmp/dask-scratch-space/worker-e5rljt6_

0,1
Comm: tls://10.0.30.107:35117,Total threads: 4
Dashboard: http://10.0.30.107:46297/status,Memory: 7.66 GiB
Nanny: tls://10.0.30.107:43675,
Local directory: /tmp/dask-scratch-space/worker-5l075jui,Local directory: /tmp/dask-scratch-space/worker-5l075jui


## Score and Predict Large Datasets

In [56]:
import numpy as np
import dask.array as da
from sklearn.datasets import make_classification

We'll generate a small random dataset with scikit-learn.

In [57]:
X_train, y_train = make_classification(
    n_features=2, n_redundant=0, n_informative=2,
    random_state=1, n_clusters_per_class=1, n_samples=1000)
X_train[:5]

array([[ 1.53682958, -1.39869399],
       [ 1.36917601, -0.63734411],
       [ 0.50231787, -0.45910529],
       [ 1.83319262, -1.29808229],
       [ 1.04235568,  1.12152929]])

And we'll clone that dataset many times with dask.array. X_large and y_large represent our larger than memory dataset.

In [58]:
# Scale up: increase N, the number of times we replicate the data.
N = 100
X_large = da.concatenate([da.from_array(X_train, chunks=X_train.shape)
                          for _ in range(N)])
y_large = da.concatenate([da.from_array(y_train, chunks=y_train.shape)
                          for _ in range(N)])
X_large

Unnamed: 0,Array,Chunk
Bytes,1.53 MiB,15.62 kiB
Shape,"(100000, 2)","(1000, 2)"
Dask graph,100 chunks in 2 graph layers,100 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 1.53 MiB 15.62 kiB Shape (100000, 2) (1000, 2) Dask graph 100 chunks in 2 graph layers Data type float64 numpy.ndarray",2  100000,

Unnamed: 0,Array,Chunk
Bytes,1.53 MiB,15.62 kiB
Shape,"(100000, 2)","(1000, 2)"
Dask graph,100 chunks in 2 graph layers,100 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


Since our training dataset fits in memory, we can use a scikit-learn estimator as the actual estimator fit during training. But we know that we'll want to predict for a large dataset, so we'll wrap the scikit-learn estimator with ParallelPostFit.

In [59]:
from sklearn.linear_model import LogisticRegressionCV
from dask_ml.wrappers import ParallelPostFit

In [60]:
clf = ParallelPostFit(LogisticRegressionCV(cv=3), scoring="r2")

See the note in the dask-ml's documentation about when and why a scoring parameter is needed: https://ml.dask.org/modules/generated/dask_ml.wrappers.ParallelPostFit.html#dask_ml.wrappers.ParallelPostFit.

Now we'll call clf.fit. Dask-ML does nothing here, so this step can only use datasets that fit in memory.

In [61]:
clf.fit(X_train, y_train)

Now that training is done, we'll turn to predicting for the full (larger than memory) dataset.

In [62]:
y_pred = clf.predict(X_large)
y_pred

Unnamed: 0,Array,Chunk
Bytes,781.25 kiB,7.81 kiB
Shape,"(100000,)","(1000,)"
Dask graph,100 chunks in 3 graph layers,100 chunks in 3 graph layers
Data type,int64 numpy.ndarray,int64 numpy.ndarray
"Array Chunk Bytes 781.25 kiB 7.81 kiB Shape (100000,) (1000,) Dask graph 100 chunks in 3 graph layers Data type int64 numpy.ndarray",100000  1,

Unnamed: 0,Array,Chunk
Bytes,781.25 kiB,7.81 kiB
Shape,"(100000,)","(1000,)"
Dask graph,100 chunks in 3 graph layers,100 chunks in 3 graph layers
Data type,int64 numpy.ndarray,int64 numpy.ndarray


`y_pred` is a Dask array. Workers can write the predicted values to a shared file system, without ever having to collect the data on a single machine.

Or we can check the models score on the entire large dataset. The computation will be done in parallel, and no single machine will have to hold all the data.

## Results

In [63]:
from dask.distributed import Client, performance_report
from IPython.display import IFrame

In [64]:
with performance_report(filename="dask-report.html"):
    # Place your Dask computations here
    # For example, a delayed operation or compute()
    result = clf.score(X_large, y_large)    

In [65]:
result

np.float64(0.596)

In [66]:
# Display the performance report within the notebook
IFrame(src="dask-report.html", width="100%", height="500px")

## Clean Up

After the prediction, we can close the client and throw-away the Dask Openstack Cloud Cluster

In [67]:
client.close()

In [68]:
cluster.close()

Terminated instance dask-d523cb04-worker-96fd7205
Terminated instance dask-d523cb04-worker-e0f7591e
Terminated instance dask-d523cb04-scheduler


## Credits

All credit for [Score and Predict Large Datasets](https://github.com/dask/dask-examples/blob/main/machine-learning/parallel-prediction.ipynb) belongs to their respective developers

* https://examples.dask.org/machine-learning/parallel-prediction.html
* https://github.com/dask/dask-examples/blob/main/machine-learning/parallel-prediction.ipynb