# Logistic Regression with Dask

In this notebook we walk through a simple example to test out an experimental implementation of gradient descent for fitting a logistic regression model. Our example dataset is taken from [here](http://mlcomp.org/datasets/872), and contains 270k intermediate game stats from a [dominion game server](https://dominion.isotropic.org/). The goal is to predict the game outcome from these stats.

The data is separated into a train and test dataset, stored in a sparse format. Each row first contains the game outcome (either `-1` or `1`), and is followed by a number of column indices (1 indexed) mapping to the value. There doesn't seem to be a standard reader for this format, so I rolled my own.

In [None]:
!head -n 1 ../dominion_stats/train

In [None]:
import numpy as np

def read(fp, n, p):
    """Read the sparse matrix format"""
    x = np.zeros((n, p), dtype='f8')
    y = np.zeros(n, dtype='i1')
    with open(fp) as f:
        for i, line in enumerate(f):
            chunks = line.split()
            if chunks[0] == '1':
                y[i] = 1
            for c in chunks[1:]:
                j, v = map(int, c.split(':'))
                x[i, j - 1] = v
    return x, y


X_train, y_train = read('../dominion_stats/train', 193657, 596)
X_test, y_test = read('../dominion_stats/test', 82996, 596)

In [None]:
print("Training Data: {0} samples".format(X_train.shape[0]))
print("Testing Data: {0} samples".format(X_test.shape[0]))

## In memory with Scikit-Learn

For comparison purposes, we'll first fit using scikit-learn, using the [LogisticRegression model](http://scikit-learn.org/stable/modules/generated/sklearn.linear_model.LogisticRegression.html#sklearn.linear_model.LogisticRegression). The `C` parameter is set high to minimize the effect of regularization (which isn't yet implemented in the dask version).

In [None]:
from sklearn import linear_model

logreg = linear_model.LogisticRegression(C=1e5)

%time logreg.fit(X_train, y_train)

Note that the timings here are just to provide a sense of the order of magnitude. This example is *not* to compete with scikit-learn.

---

After fitting, we can predict outcomes, and score our model on how well it fits using the test dataset. Scikit-learn has a nice standard interface for this, with the `predict` and `score` methods.

In [None]:
logreg.predict(X_test[:5])

In [None]:
s = logreg.score(X_test, y_test)
print("{0}% accurate".format(s * 100))

So ~69% of the game outcomes in the test dataset were predicted correctly. There may be some settings for this model that could be tuned to achieve better performance, but that's not the point of this example.

## In memory with dask-learn

As a proof-of-concept, an api compatible version of the `LogisticRegression` model has been implemented using dask. Fitting is done using gradient descent. The goal here is not to necessarily be speedy, but rather to handle larger datasets by working out-of-core.

Currently regularization isn't implemented, and only binomial problems are supported. Still, a few of the keywords avaialable to the scikit-learn version are also supported here, as well as the common `fit`, `predict`, and `score` methods.

Note that the code is generic between numpy and dask, and works fine with either. We'll start with numpy arrays.

In [None]:
from dask_learn.linear_model import LogisticRegression

dlogreg = LogisticRegression()

%time dlogreg.fit(X_train, y_train)

So the fitting happened quicker (on this problem). Again, it should be noted that this is not the point of this example. 

---

The interface of our `LogisticRegression` class is similar to that of the equivalent scikit-learn class.

In [None]:
sorted(m for m in dir(dlogreg) if not m.startswith('_'))

After fitting, we'll run the same `predict` and `score` methods:

In [None]:
dlogreg.predict(X_test[:5])

In [None]:
ds = dlogreg.score(X_test, y_test)
print("{0}% accurate".format(ds * 100))

The accuracy here is worse than that provided scikit-learn. I'm not sure why that is - there may be some things that could be done to improve the performance of our algorithm. Still, we're doing better than random guessing (50%).

## In Memory Dask Arrays

Next, we'll run the same model using dask arrays. Note that this is still in memory.

In [None]:
import dask.array as da

# Convert the numpy arrays to in memory dask arrays
dX_train = da.from_array(X_train, chunks=(10000, 596))
dy_train = da.from_array(y_train, chunks=10000)

In [None]:
# Imports for profiling
from dask.diagnostics import Profiler, ResourceProfiler, visualize
from bokeh.io import output_notebook
output_notebook(hide_banner=True)

In [None]:
# Fit, with profiling
dlogreg = LogisticRegression()

with Profiler() as prof, ResourceProfiler(0.1) as rprof:
    dlogreg.fit(dX_train, dy_train)
rprof.close()

In [None]:
visualize([prof, rprof])

Looking at the above profile plot, one can see the distinct periods where dask was computing things, and the gaps in between where the next step was going determined (click on the wheel zoom to zoom in to see this). My machine has 4 real cores (8 virtual cores), and I'm seeing ~400% cpu usage throughout. So we're making good use of parallelism.

The change in memory usage is negligible, which makes sense as we're only persisting the training arrays. However, the size of the training data is ~1 GB. For data of that size, having it all in memory is fine, but for more samples or features you'd potentially want to move to an out-of-core solution (see below).

In [None]:
dlogreg.predict(X_test[:5])

In [None]:
ds = dlogreg.score(X_test, y_test)
print("{0}% accurate".format(ds * 100))

From this we can see that we get the same results using either numpy or dask arrays (as expected).

## Out-of-core Dask Arrays

Now we'll store the training data into an `hdf5` file, and run the same fitting code out-of-core. Even though the data fits in memory, this will hopefully be a descent test of how well this code performs out-of-core.

In [None]:
import h5py
import os

# Store the data into an hdf5 file, if it doesn't already exist
if not os.path.exists('dominion.hdf5'):
    with h5py.File('dominion.hdf5') as f:
        f.create_dataset('/X', data=X_train, chunks=(10000, 596))
        f.create_dataset('/y', data=y_train, chunks=(10000,))

In [None]:
# Load the training data into out-of-core dask arrays
f = h5py.File('dominion.hdf5')
dX_train = da.from_array(f['X'], chunks=(10000, 596))
dy_train = da.from_array(f['y'], chunks=10000)

In [None]:
# Fit, with profiling
dlogreg = LogisticRegression()

with Profiler() as prof, ResourceProfiler(0.2) as rprof:
    dlogreg.fit(dX_train, dy_train)
rprof.close()

In [None]:
visualize([prof, rprof])

As expected, fitting takes longer due to the overhead of reading the array from disk every iteration. We're averaging ~130% cpu usage, so this is only slightly running in parallel. This makes sense as h5py holds the GIL, which means that all disk reads (our bottleneck) must be done in serial.

What's interesting here is that the change in memory usage is only ~300 MB - roughly a third the size of our training data (the initial offset is due to the data also being in memory elsewhere, but we're not using that here). Using a different chunking might reduce this even more.

In [None]:
dlogreg.score(X_test, y_test)
print("{0}% accurate".format(ds * 100))

As expected, the fit is the same out-of-core as it is in memory.

---

Finally, we'll try this same experiment, but using the synchronous scheduler. This will be a bit more conservative on memory, as it won't try and load more than one subarray at once. This shouldn't decrease our runtime significantly (might even make it faster), since h5py holds the GIL.

In [None]:
from dask.async import get_sync
from dask.context import set_options

dlogreg = LogisticRegression()

with set_options(get=get_sync), ResourceProfiler(0.2) as rprof:
    dlogreg.fit(dX_train, dy_train)
rprof.close()

In [None]:
rprof.visualize()

As expected, the change in memory usage decreased to only ~70 MB. It also ran slightly faster, which is probably due to not fighting with the GIL when using threading.

## Conclusion

### What worked well

- The code for this model was fairly cheap to create - roughly 150 lines total. The algorithm used to fit is simple, and written using familiar numpy operations.
- By writing in a generic style, the code is able to work transparently with either numpy or dask arrays (or even a mix of both).
- When using dask with some on disk storage, the fit was able to be done out-of-core using minimal memory, without any changes to our code.

### What could be better

- Gradient descent results in more iterations, with the benefit that each iteration is fairly cheap. When running out-of-core the loading cost of reading from disk is larger than the cost of computation (I/O bound). In this case it might make more sense to use a second order method with the hope of using fewer iterations.
- There are almost certainly ways to make our gradient descent code more performant - both in terms of speed and in terms of goodness-of-fit.
- There are many niceties that the scikit-learn model provides that aren't yet implemented.

### Questions

Is this work useful? Are there things I'm doing that are wrong/could be better? If it is useful, what would be a good next step?