In [3]:
%matplotlib inline
import numpy as np
from seaborn import plt

# Some nice default configuration for plots
plt.rcParams['figure.figsize'] = 10, 7.5
plt.rcParams['axes.grid'] = True

In [4]:
%%javascript
$.getScript('http://asimjalis.github.io/ipyn-ext/js/ipyn-present.js')

<IPython.core.display.Javascript object>

<h1 class="tocheading">Cluster Computing</h1>
<div id="toc"></div>

# Distributed Model Selection and Assessment

## Pop Quiz

<details><summary>What does it mean for a task to be "embarrassingly parallel"?</summary>
Little or no effort is required to separate the problem into a number of parallel tasks.</details>

<details><summary>List some algorithms that are and are not embarrassingly parallel</summary>
<table><tr><th>Embarassingly Parallel<th>Not Embarassingly Parallel</th></tr>
<tr><td>Random Forests</td><td>AdaBoost</td></tr>
<tr><td>Grid Search</td><td>MaxEnt</td></tr>
<tr><td>...</td><td>...</td></tr>
</table></details>

## Motivation

When doing model evaluations and parameters tuning, many models must be trained independently on the same data. This is an embarrassingly parallel problem but having a copy of the dataset in memory for each process is waste of RAM:

<img src="https://s3-us-west-2.amazonaws.com/dsci6007/assets/grid_search_parameters.png" style="display:inline; width: 49%" />
<img src="https://s3-us-west-2.amazonaws.com/dsci6007/assets/grid_search_cv_splits.png" style="display:inline; width: 49%" />

When doing 3 folds cross validation on a 9 parameters grid, a naive implementation could read the data from the disk and load it in memory 27 times. If this happens concurrently (e.g. on a compute node with 32 cores) the RAM might blow up hence breaking the potential linear speed up.

### Pop Quiz
<details><summary>How many copies would be needed to do 5-fold cross-validation if we added `1000` as a possibility for `param_1`?</summary>
60</details>


## IPython.parallel, a Primer

This section gives a primer on some tools best utilizing computational resources when doing predictive modeling in the Python / NumPy ecosystem namely:

- optimal usage of available CPUs and cluster nodes with **`IPython.parallel`**

- optimal memory re-use using shared memory between Python processes using **`numpy.memmap`** and **`joblib`**

### What is so great about `IPython.parallel`:

- Single node multi-CPUs
- Multiple node multi-CPUs
- Interactive In-memory computing
- Jupyter integration with `%px` and `%%px` magics
- Possibility to interactively connect to individual computing processes to launch interactive debugger (`#priceless`)

### Let's get started:

Let start an IPython cluster using the `ipcluster` common (usually run from your operating system console). To make sure that we are not running several clusters on the same host, let's try to shut down any running IPython cluster first:

In [5]:
!ipcluster stop

2015-08-20 15:26:06.202 [IPClusterStop] CRITICAL | Could not read pid file, cluster is probably not running.


In [None]:
!ipcluster start -n=2 --daemon

Go to the "Cluster" tab of the notebook and **start a local cluster with 2 engines**. Then come back here. We should now be able to use our cluster from our notebook session (or any other Python process running on localhost):

In [None]:
from IPython.parallel import Client

client = Client()

len(client)

#### The %px and %%px magics

All the engines of the client can be accessed imperatively using the `%px` and `%%px` IPython cell magics:

In [None]:
%%px

import os
import socket

print("This is running in process with pid {0} on host '{1}'.".format(
      os.getpid(), socket.gethostname()))

The content of the `__main__` namespace can also be read and written via the `%px` magic:

In [None]:
%px a = 1

In [None]:
%px print(a)

In [None]:
%%px

a *= 2
print(a)

It is possible to restrict the `%px` and `%%px` magic instructions to specific engines:

In [None]:
%%px --targets=-1
a *= 2
print(a)

In [None]:
%px print(a)

#### The DirectView objects

Cell magics are very nice to work interactively from the notebook but it's also possible to replicate their behavior programmatically with more flexibility with a `DirectView` instance. A `DirectView` can be created by slicing the client object:

In [None]:
all_engines = client[:]
all_engines

The namespace of the `__main__` module of each running python engine can be accessed in read and write mode as a python dictionary:

In [None]:
all_engines['a'] = 1

In [None]:
all_engines['a']

Direct views can also execute the same code in parallel on each engine of the view:

In [None]:
def my_sum(a, b):
    return a + b

my_sum_apply_results = all_engines.apply(my_sum, 11, 31)
my_sum_apply_results

The ouput of the `apply` method is an asynchronous handle returned immediately without waiting for the end of the computation. To block until the results are ready use:

In [None]:
my_sum_apply_results.get()

Here is a more useful example to fetch the network hostname of each engine in the cluster. Let's study it in more details:

In [None]:
def hostname():
    """Return the name of the host where the function is being called"""
    import socket
    return socket.gethostname()

hostname_apply_result = all_engines.apply(hostname)

When doing the above, the `hostname` function is first defined locally (the client python process). The `DirectView.apply` method introspects it, serializes its name and bytecode and ships it to each engine of the cluster where it is reconstructed as local function on each engine. This function is then called on each engine of the view with the optionally provided arguments.

In return, the client gets a python object that serves as an handle to asynchronously fetch the list of the results of the calls:

In [None]:
hostname_apply_result

In [None]:
hostname_apply_result.get()

It is also possible to key the results explicitly with the engine ids with the `AsyncResult.get_dict` method. This is a very simple idiom to fetch metadata on the runtime environment of each engine of the direct view:

In [None]:
hostnames = hostname_apply_result.get_dict()
hostnames

It can be handy to invert this mapping to find one engine id per host in the cluster so as to execute host specific operation:

In [None]:
one_engine_by_host = dict((hostname, engine_id) for engine_id, hostname
                      in hostnames.items())
one_engine_by_host

In [None]:
one_engine_per_host_view = client[one_engine_by_host.values()]
one_engine_per_host_view

**Trick:** you can even use those engines ids to execute shell commands in parallel on each host of the cluster:

In [None]:
one_engine_by_host.values()

In [None]:
%%px --targets=-1  # replace with one_engine_by_host.values()

!pip install scikit-learn

#### Note on Importing Modules on Remote Engines

In the previous example we put the `import socket` statement inside the body of the `hostname` function to make sure to make sure that is is available when the rest of the function is executed in the python processes of the remote engines.

Alternatively it is possible to import the required modules ahead of time on all the engines of a directview using a context manager / with syntax:

In [None]:
with all_engines.sync_imports():
    import numpy

However this method does **not** support alternative import syntaxes:
    
    >>> import numpy as np
    >>> from numpy import linalg

Hence the method of importing in the body of the "applied" functions is more flexible. Additionally, this does not pollute the `__main__` namespace of the engines as it only impact the local namespace of the function itself.

**Optional Exercise**:

- Write a function that returns the memory usage of each engine process in the cluster.
- Allocate a largish numpy array of zeros of known size (e.g. 100MB) on each engine of the cluster.

Hints:

Use the `psutil` module to collect the runtime info on a specific process or host. For instance to fetch the memory usage of the currently running process in MB:

    >>> import os
    >>> import psutil
    >>> psutil.Process(os.getpid()).get_memory_info().rss / 1e6

To allocate a numpy array with 1000 zeros stored as 64bit floats you can use:

    >>> import numpy as np
    >>> z = np.zeros(1000, dtype=np.float64)

The size in bytes of such a numpy array can then be fetched with ``z.nbytes``:
    
    >>> z.nbytes / 1e6
    0.008

#### Load Balanced View

`LoadBalancedView` is an alternative to the `DirectView` to run one function call at a time on a free engine.

In [None]:
lv = client.load_balanced_view()

In [None]:
def slow_square(x):
    import time
    time.sleep(2)
    return x ** 2

In [None]:
result = lv.apply(slow_square, 4)

In [None]:
result

In [None]:
result.ready()

In [None]:
result.get()  # blocking call

It is possible to spread some tasks among the engines of the LB view by passing a callable and an iterable of task arguments to the `LoadBalancedView.map` method:

In [None]:
results = lv.map(slow_square, [0, 1, 2, 3])
results

In [None]:
results.ready()

In [None]:
results.progress

In [None]:
# results.abort()

In [None]:
# Iteration on AsyncMapResult is blocking
for r in results:
    print(r)

The load balanced view will be used in the following to schedule work on the cluster while being able to monitor progress and occasionally add new computing nodes to the cluster while computing to speed up the processing when using EC2 and StarCluster (see later).

### Pop Quiz
<details><summary>What is the difference between a direct view and a load balanced view?</summary>
In a direct view, engines are addressed explicitly.<br />
In a load balanced view, the scheduler is trusted with assigning work to appropriate engines.</details>


## Sharing Read-only Data between Processes on the Same Host with Memmapping

The numpy package makes it possible to memory map large contiguous chunks of binary files as shared memory for all the Python processes running on a given host:

In [None]:
%px import numpy as np

Creating a `numpy.memmap` instance with the `w+` mode creates a file on the filesystem and zeros its content. Let's do it from the first engine process or our current IPython cluster:

In [None]:
%%px --targets=-1  
# replace with one_engine_by_host.values()

# Cleanup any existing file from past session (necessary for windows)
import os
if os.path.exists('small.mmap'):
    os.unlink('small.mmap')

mm_w = np.memmap('small.mmap', shape=10, dtype=np.float32, mode='w+')
print(mm_w)

This binary file can then be mapped as a new numpy array by all the engines having access to the same filesystem. The `mode='r+'` opens this shared memory area in read write mode:

In [None]:
%%px

mm_r = np.memmap('small.mmap', dtype=np.float32, mode='r+')
print(mm_r)

In [None]:
%%px --targets=-1  
# replace with one_engine_by_host.values()

mm_w[0] = 42
print(mm_w)
print(mm_r)

In [None]:
%px print(mm_r)

Memory mapped arrays created with `mode='r+'` can be modified and the modifications are shared with all the engines:

In [None]:
%%px --targets=1

mm_r[1] = 43

In [None]:
%%px
print(mm_r)

Be careful those, there is no builtin read nor write lock available on this such datastructures so it's better to avoid concurrent read & write operations on the same array segments unless there engine operations are made to cooperate with some synchronization or scheduling orchestrator.

Memmap arrays generally behave very much like regular in-memory numpy arrays:

In [None]:
%%px
print("sum={0:.3}, mean={1:.3}, std={2:.3}".format(
    mm_r.sum(), np.mean(mm_r), np.std(mm_r)))

Before allocating more data in memory on the cluster let us define a couple of utility functions from the previous exercise (and more) to monitor what is used by which engine and what is still free on the cluster as a whole:

In [None]:
def get_host_free_memory(client):
    """Free memory on each host of the cluster in MB."""
    all_engines = client[:]
    def hostname():
        import socket
        return socket.gethostname()
    
    hostnames = all_engines.apply(hostname).get_dict()
    one_engine_per_host = dict((hostname, engine_id)
                               for engine_id, hostname
                               in hostnames.items())

    def host_free_memory():
        import psutil
        return psutil.virtual_memory().free / 1e6
    
    
    host_mem = client[one_engine_per_host.values()].apply(
        host_free_memory).get_dict()
    
    return dict((hostnames[eid], m) for eid, m in host_mem.items())

In [None]:
get_host_free_memory(client)

Let's allocate a 80MB memmap array in the first engine and load it in readwrite mode in all the engines:

In [None]:
%%px --targets=-1  
# replace with one_engine_by_host.values()

# Cleanup any existing file from past session (necessary for windows)
import os
if os.path.exists('big.mmap'):
    os.unlink('big.mmap')

np.memmap('big.mmap', shape=10 * int(1e6), dtype=np.float64, mode='w+')

In [None]:
%%px
ls -lh big.mmap

In [None]:
get_host_free_memory(client)

No significant memory was used in this operation as we just asked the OS to allocate the buffer on the hard drive and just maitain a virtual memory area as a cheap reference to this buffer.

Let's open new references to the same buffer from all the engines at once:

In [None]:
%px %time big_mmap = np.memmap('big.mmap', dtype=np.float64, mode='r+')

In [None]:
%px big_mmap

In [None]:
get_host_free_memory(client)

No physical memory was allocated in the operation as it just took a couple of ms to do so. This is also confirmed by the engines process stats:

Let's trigger an actual load of the data from the drive into the in-memory disk cache of the OS, this can take some time depending on the speed of the hard drive (on the order of 100MB/s to 300MB/s hence 3s to 8s for this dataset):

In [None]:
%%px --targets=-1  
# replace with one_engine_by_host.values()

%time np.sum(big_mmap)

In [None]:
get_host_free_memory(client)

We can see that the first engine has now access to the data in memory and the free memory on the host has decreased by the same amount.

We can now access this data from all the engines at once much faster as the disk will no longer be used: the shared memory buffer will instead accessed directly by all the engines:

In [None]:
%px %time np.sum(big_mmap)

In [None]:
get_host_free_memory(client)

So it seems that the engines have loaded a whole copy of the data but this actually not the case as the total amount of free memory was not impacted by the parallel access to the shared buffer. Furthermore, once the data has been preloaded from the hard drive using one process, all the of the other processes on the same host can access it almost instantly saving a lot of IO wait.

This strategy makes it very interesting to load the readonly datasets of machine learning problems, especially when the same data is reused over and over by concurrent processes as can be the case when doing learning curves analysis or grid search.

## Memmaping Nested Numpy-based Data Structures with Joblib

joblib is a utility library included in the sklearn package. Among other things it provides tools to serialize objects that comprise large numpy arrays and reload them as memmap backed datastructures.

To demonstrate it, let's create an arbitrary python datastructure involving numpy arrays:

In [None]:
import numpy as np

class MyDataStructure(object):
    
    def __init__(self, shape):
        self.float_zeros = np.zeros(shape, dtype=np.float32)
        self.integer_ones = np.ones(shape, dtype=np.int64)
        
data_structure = MyDataStructure((3, 4))
data_structure.float_zeros, data_structure.integer_ones

We can now persist this datastructure to disk:

In [None]:
from sklearn.externals import joblib

joblib.dump(data_structure, 'data_structure.pkl')

In [None]:
!ls -l data_structure*

A memmapped copy of this datastructure can then be loaded:

In [None]:
memmaped_data_structure = joblib.load('data_structure.pkl', mmap_mode='r+')
memmaped_data_structure.float_zeros, memmaped_data_structure.integer_ones

## Memmaping CV Splits for Multiprocess Dataset Sharing

We can leverage the previous tools to build a utility function that extracts Cross Validation splits ahead of time to persist them on the hard drive in a format suitable for memmaping by IPython engine processes.

In [None]:
from sklearn.externals import joblib
from sklearn.cross_validation import ShuffleSplit
import os

def persist_cv_splits(X, y, n_cv_iter=5, name='data',
    suffix="_cv_%03d.pkl", test_size=0.25, random_state=None):
    """Materialize randomized train test splits of a dataset."""

    cv = ShuffleSplit(X.shape[0], n_iter=n_cv_iter,
        test_size=test_size, random_state=random_state)
    cv_split_filenames = []
    
    for i, (train, test) in enumerate(cv):
        cv_fold = (X[train], y[train], X[test], y[test])
        cv_split_filename = name + suffix % i
        cv_split_filename = os.path.abspath(cv_split_filename)
        joblib.dump(cv_fold, cv_split_filename)
        cv_split_filenames.append(cv_split_filename)
    
    return cv_split_filenames

Let's try it on the digits dataset, we can run this from the :

In [None]:
from sklearn.datasets import load_digits

digits = load_digits()
digits_split_filenames = persist_cv_splits(digits.data, digits.target,
    name='digits', random_state=42)
digits_split_filenames

In [None]:
ls -lh digits*

Each of the persisted CV splits can then be loaded back again using memmaping:

In [None]:
X_train, y_train, X_test, y_test = joblib.load(
    'digits_cv_002.pkl',  mmap_mode='r+')

In [None]:
X_train

In [None]:
y_train

## Parallel Model Selection and Grid Search

Let's leverage IPython.parallel and the Memory Mapping features of joblib to write a custom grid search utility that runs on cluster in a memory efficient manner.

Assume that we want to reproduce the grid search from the previous session:

In [None]:
import numpy as np
from pprint import pprint

svc_params = {
    'C': np.logspace(-1, 2, 4),
    'gamma': np.logspace(-4, 0, 5),
}
pprint(svc_params)

`GridSearchCV` internally uses the following `ParameterGrid` utility iterator class to build the possible combinations of parameters:

In [None]:
from sklearn.grid_search import ParameterGrid

list(ParameterGrid(svc_params))

Let's write a function to load the data from a CV split file and compute the validation score for a given parameter set and model:

In [None]:
def compute_evaluation(cv_split_filename, model, params):
    """Function executed by a worker to evaluate a model on a CV split"""
    # All module imports should be executed in the worker namespace
    from sklearn.externals import joblib

    X_train, y_train, X_validation, y_validation = joblib.load(
        cv_split_filename, mmap_mode='c')
    
    model.set_params(**params)
    model.fit(X_train, y_train)
    validation_score = model.score(X_validation, y_validation)
    return validation_score

In [None]:
def grid_search(lb_view, model, cv_split_filenames, param_grid):
    """Launch all grid search evaluation tasks."""
    all_tasks = []
    all_parameters = list(ParameterGrid(param_grid))
    
    for i, params in enumerate(all_parameters):
        task_for_params = []
        
        for j, cv_split_filename in enumerate(cv_split_filenames):    
            t = lb_view.apply(
                compute_evaluation, cv_split_filename, model, params)
            task_for_params.append(t) 
        
        all_tasks.append(task_for_params)
        
    return all_parameters, all_tasks

Let's try on the digits dataset that we splitted previously as memmapable files:

In [None]:
from sklearn.svm import SVC

lb_view = client.load_balanced_view()
model = SVC()
svc_params = {
    'C': np.logspace(-1, 2, 4),
    'gamma': np.logspace(-4, 0, 5),
}

all_parameters, all_tasks = grid_search(
   lb_view, model, digits_split_filenames, svc_params)

In [None]:
import time
time.sleep(5)

The `grid_search` function is using the asynchronous API of the `LoadBalancedView`, we can hence monitor the progress:

In [None]:
def progress(tasks):
    return np.mean([task.ready() for task_group in tasks
                                 for task in task_group])

In [None]:
print("Tasks completed: {0}%".format(100 * progress(all_tasks)))

Even better, we can introspect the completed task to find the best parameters set so far:

In [None]:
def find_bests(all_parameters, all_tasks, n_top=5):
    """Compute the mean score of the completed tasks"""
    mean_scores = []
    
    for param, task_group in zip(all_parameters, all_tasks):
        scores = [t.get() for t in task_group if t.ready()]
        if len(scores) == 0:
            continue
        mean_scores.append((np.mean(scores), param))
                   
    return sorted(mean_scores, reverse=True)[:n_top]

In [None]:
from pprint import pprint

print("Tasks completed: {0}%".format(100 * progress(all_tasks)))
pprint(find_bests(all_parameters, all_tasks))

### Optimization Trick: Truncated Randomized Search

It is often wasteful to search all the possible combinations of parameters as done previously, especially if the number of parameters is large (e.g. more than 3).

To speed up the discovery of good parameters combinations, it is often faster to randomized the search order and allocate a budget of evaluations, e.g. 10 or 100 combinations.

See [this JMLR paper by James Bergstra](http://jmlr.csail.mit.edu/papers/v13/bergstra12a.html) for an empirical analysis of the problem. The interested reader should also have a look at [hyperopt](https://github.com/jaberg/hyperopt) that further refines this parameter search method using meta-optimizers.

Randomized Parameter Search has just been implemented in the master branch of scikit-learn be part of the 0.14 release.

## A More Complete Parallel Model Selection and Assessment Example

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np

# Some nice default configuration for plots
plt.rcParams['figure.figsize'] = 10, 7.5
plt.rcParams['axes.grid'] = True
plt.gray();

In [None]:
lb_view = client.load_balanced_view()
model = SVC()

In [None]:
import sys, imp
from collections import OrderedDict
sys.path.append('..')
import model_selection, mmap_utils
imp.reload(model_selection), imp.reload(mmap_utils)

lb_view.abort()

svc_params = OrderedDict([
    ('gamma', np.logspace(-4, 0, 5)),
    ('C', np.logspace(-1, 2, 4)),
])

search = model_selection.RandomizedGridSeach(lb_view)
search.launch_for_splits(model, svc_params, digits_split_filenames)

In [None]:
time.sleep(5)

In [None]:
print(search.report())

In [None]:
time.sleep(5)

In [None]:
print(search.report())
search.boxplot_parameters(display_train=False)

# StarCluster  

<img src=http://star.mit.edu/cluster/docs/latest/_images/scoverview.png />

StarCluster comes out of the STAR program at MIT. STAR stands for "Software Tools for Academics and Researchers". It is used to quickly provision a cluster of EC2 instances. Like EMR, it automatically configures them to be used as a cluster (rather than as independent machines) with one controller and many workers. Unlike EMR, it does not have a GUI. Also, Hadoop is just one of many plugins for StarCluster. The most important plugin, however, is IPython Cluster.

<!--

## Distributing the Computation on EC2 Spot Instances with StarCluster

### Installation

To provision a cheap transient compute cluster on Amazon EC2, the first step is to register on EC2 with a credit card and put your EC2 credentials as environment variables. For instance under Linux / OSX:

    [laptop]% export AWS_ACCESS_KEY_ID=XXXXXXXXXXXXXXXXXXXXX
    [laptop]% export AWS_SECRET_ACCESS_KEY=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

You can put those exports in your `~/.bashrc` to automatically get those credentials loaded in new shell sessions.

Then proceed to the installation of StarCluster it-self:

    [laptop]% pip install StarCluster

### Configuration

Let's run the help command a first time and create a template configuration file:

    [laptop]% starcluster help
    StarCluster - (http://star.mit.edu/cluster)
    Software Tools for Academics and Researchers (STAR)
    Please submit bug reports to starcluster@mit.edu
    
    cli.py:87 - ERROR - config file /home/user/.starcluster/config does not exist
    
    Options:
    --------
    [1] Show the StarCluster config template
    [2] Write config template to /home/user/.starcluster/config
    [q] Quit
    
    Please enter your selection:
    2

and create a password-less ssh key that will be dedicated to this transient cluster:
    
    [laptop]% starcluster createkey mykey -o ~/.ssh/mykey.rsa

    
You can now edit the file `/home/user/.starcluster/config` and remplace its content with the following sample configuration:
    
    [global]
    DEFAULT_TEMPLATE=iptemplate
    REFRESH_INTERVAL=5
    
    [key mykey]
    KEY_LOCATION=~/.ssh/mykey.rsa
    
    [plugin ipcluster]
    SETUP_CLASS = starcluster.plugins.ipcluster.IPCluster
    ENABLE_NOTEBOOK = True
    NOTEBOOK_PASSWD = aaaa
    
    [plugin ipclusterstop]
    SETUP_CLASS = starcluster.plugins.ipcluster.IPClusterStop
    
    [plugin ipclusterrestart]
    SETUP_CLASS = starcluster.plugins.ipcluster.IPClusterRestartEngines
    
    [plugin pypackages]
    setup_class = starcluster.plugins.pypkginstaller.PyPkgInstaller
    packages = scikit-learn, psutil
    
    # Base configuration for IPython.parallel cluster
    [cluster iptemplate]
    KEYNAME = mykey
    CLUSTER_SIZE = 1
    CLUSTER_USER = ipuser
    CLUSTER_SHELL = bash
    REGION = us-east-1
    NODE_IMAGE_ID = ami-5b3fb632     # REGION and NODE_IMAGE_ID go in pair
    NODE_INSTANCE_TYPE = c1.xlarge   # 8 CPUs
    DISABLE_QUEUE = True             # We don't need SGE, faster cluster startup
    PLUGINS = pypackages, ipcluster

### Launching a Cluster

Start a new cluster using the `myclustertemplate` section of the `~/.startcluster/config` file:

    [laptop]% starcluster start -c iptemplate -s 3 -b 0.5 mycluster
    
- The `-s` option makes it possible to select the number of EC2 instance to start.

- The `-b` option makes it possible to provision non-master instances on the Spot Instance market

- To also provision the master node on the Spot Instance market you can further add the `--force-spot-master` flag to the previous commandline.

- Provisioning Spot Instances is typically up to 5x cheaper than regular instances for largish instance types such as `c1.xlarge` but you run the risk of having your instances shut down if the price goes up. Also provisioning new instances on the Spot market can be slower: often a couple of minutes instead of 30s for On Demand instances.

- You can access the price history of spot instances of a specific region with:

        [laptop]% starcluster -r us-west-1 spothistory c1.xlarge
        StarCluster - (http://star.mit.edu/cluster) (v. 0.9999)
        Software Tools for Academics and Researchers (STAR)
        Please submit bug reports to starcluster@mit.edu

        >>> Current price: $0.11
        >>> Max price: $0.75
        >>> Average price: $0.13

Connect to the master node via ssh:

    [laptop]% starcluster sshmaster -A -u ipuser

- The `-A` flag makes it possible to use your local ssh agent to manage your keys: makes it possible to `git clone` / `git push` github repositories from the master node as you would from your local folder.

- The StarCluster AMI comes with `tmux` installed by default.

It is possible to ssh into other cluster nodes from the master using local DNS aliases such as:

    [myuser@master]% ssh node001

### Dynamically Resizing the Cluster

When using the `LoadBalancedView` API of `IPython.parallel.Client` is it possible to dynamically grow the cluster to shorten the duration of the processing of a queue of task without having to restart from scratch.

This can be achieved using the `addnode` command, for instance to add 3 more nodes using \$0.50 bid price on the Spot Instance market:
    
    [laptop]% starcluster addnode -s 3 -b 0.5 mycluster
    
Each node will automatically run the `IPCluster` plugin and register new `IPEngine` processes to the existing `IPController` process running on master.

It is also possible to terminate individual running nodes of the cluster with `removenode` command but this will kill any task running on that node and IPython.parallel will **not** restart the failed task automatically.

### Terminating a Cluster

Once your are done with your computation, don't forget to shutdown the whole cluster and EBS volume so as to only pay for the resources you used.

Before doing so, don't forget to backup any result file you would like to keep, by either pushing them to the S3 storage service (recommended for large files that you would want to reuse on EC2 later) or fetching them locally using the `starcluster get` command.

The cluster shutdown itself can be achieved with a single command:

    [laptop]% starcluster terminate mycluster

Alternatively to can also keep your data by preserving the EBS volume attached to the master node by remplacing the `terminate` command with the `stop` command:

    [laptop]% starcluster stop mycluster

You can then later restart the same cluster again with the `start` command to automatically remount the EBS volume.
-->

Add the `ipcluster` plugin if you haven't already.

Near the bottom of `.starcluster/config`:
```bash
######################
## Built-in Plugins ##
######################
# The following plugins ship with StarCluster and should work out-of-the-box.
# Uncomment as needed. Don't forget to update your PLUGINS list!
# See http://star.mit.edu/cluster/docs/latest/plugins for plugin details.
# .
# .
# .
[plugin ipcluster]
SETUP_CLASS = starcluster.plugins.ipcluster.IPCluster
# Enable the IPython notebook server (optional)
ENABLE_NOTEBOOK = True
# Set a password for the notebook for increased security
# This is optional but *highly* recommended
NOTEBOOK_PASSWD = a-secret-password
```

Set `CLUSTER_SIZE` to `3` for more memory *(see [aws.amazon.com/ec2/instance-types](http://aws.amazon.com/ec2/instance-types/) for details)*:
```bash
[cluster smallcluster]
# number of ec2 instances to launch
CLUSTER_SIZE = 3
NODE_IMAGE_ID = ami-6b211202
PLUGINS = ipcluster
SPOT_BID = 0.10
```
Also set `SPOT_BID` to `0.10` (or less?) to save \$\$\$ *(see [aws.amazon.com/ec2/purchasing-options/spot-instances](http://aws.amazon.com/ec2/purchasing-options/spot-instances/) for details)*

Start your new cluster:

`$ starcluster start my_cluster`

Copy your credentials to your cluster:

`$ starcluster put my_cluster --user sgeadmin ~/Downloads/credentials.csv /home/sgeadmin/`

This should, as a side effect, add your cluster to the list of known hosts on your machine. 
In my experience, it often doesn't, however. 
Therefore, **you will want to:**

```bash
starcluster sshmaster my_cluster
```
**before you do the following (or `Client` will hang forever):**

In [None]:
from os.path import expanduser
url_file = expanduser('~/.starcluster/ipcluster/SecurityGroup:@sc-my_medium_cluster-us-east-1.json')
sshkey = expanduser('~/.ssh/mykey.pem')  # replace with your pem
client = Client(url_file, sshkey = sshkey)

Check to see how many engines you have running:

In [None]:
dview = client.direct_view()
len(client.ids)

If there are fewer engines running than expected, that's probably because they didn't start on some of the instances. We can use the following to see which instances have engines:

In [None]:
%%px
%%bash
ec2metadata --local-ipv4

In [None]:
%%bash
starcluster put dat12 --user sgeadmin digits_cv_00* /mnt/sgeadmin/

We need to set up each of our engines individually. We can do this quickly with the `%%px` magic:

In [None]:
%%px -t0
%%bash
scp /mnt/sgeadmin/digits_cv_00* node001:/mnt/sgeadmin/
scp /mnt/sgeadmin/digits_cv_00* node002:/mnt/sgeadmin/

In [None]:
remote_filenames = ['/mnt/sgeadmin/' + filename.split('/')[-1] for filename in digits_split_filenames]
remote_filenames

In [None]:
from sklearn.svm import SVC

lb_view = client.load_balanced_view()
model = SVC()
svc_params = {
    'C': np.logspace(-1, 2, 4),
    'gamma': np.logspace(-4, 0, 5),
}

all_parameters, all_tasks = grid_search(
   lb_view, model, remote_filenames, svc_params)

<!--

# Large Scale Text Classification for Sentiment Analysis

## Outline of the Section

- Limitations of the Vocabulary-Based Vectorizer
- The **Hashing Trick**
- **Online / Streaming** Text Feature Extraction and Classification
- **Parallel** Text Feature Extraction and Classification

## Scalability Issues

The `sklearn.feature_extraction.text.CountVectorizer` and `sklearn.feature_extraction.text.TfidfVectorizer` classes suffer from a number of scalability issues that all stem from the internal usage of the `vocabulary_` attribute (a Python dictionary) used to map the unicode string feature names to the integer feature indices.

The main scalability issues are:

- **Memory usage of the text vectorizer**: the all the string representations of the features are loaded in memory
- **Parallelization problems for text feature extraction**: the `vocabulary_` would be a shared state: complex synchronization and overhead
- **Impossibility to do online or out-of-core / streaming learning**: the `vocabulary_` needs to be learned from the data: its size cannot be known before making one pass over the full dataset
    
    
To better understand the issue let's have a look at how the `vocabulary_` attribute work. At `fit` time the tokens of the corpus are uniquely indentified by a integer index and this mapping stored in the vocabulary:

from sklearn.feature_extraction.text import CountVectorizer

vectorizer = CountVectorizer(min_df=1)

vectorizer.fit([
    "The cat sat on the mat.",
])
vectorizer.vocabulary_

The vocabulary is used at `transform` time to build the occurence matrix:

X = vectorizer.transform([
    "The cat sat on the mat.",
    "This cat is a nice cat.",
]).toarray()

print(len(vectorizer.vocabulary_))
print(vectorizer.get_feature_names())
print(X)

Let's refit with a slightly larger corpus:

vectorizer = CountVectorizer(min_df=1)

vectorizer.fit([
    "The cat sat on the mat.",
    "The quick brown fox jumps over the lazy dog.",
])
vectorizer.vocabulary_

The `vocabulary_` is the (logarithmically) growing with the size of the training corpus. Note that we could not have built the vocabularies in parallel on the 2 text documents as they share some words hence would require some kind of shared datastructure or synchronization barrier which is complicated to setup, especially if we want to distribute the processing on a cluster.

With this new vocabulary, the dimensionality of the output space is now larger:

X = vectorizer.transform([
    "The cat sat on the mat.",
    "This cat is a nice cat.",
]).toarray()

print(len(vectorizer.vocabulary_))
print(vectorizer.get_feature_names())
print(X)

## The Sentiment 140 Dataset

To illustrate the scalabitiy issues of the vocabulary-based vectorizers, let's load a more reallistic dataset for a classical text classification task: sentiment analysis on tweets. The goald is to tell appart negative from positive tweets on a variety of topics.

Assuming that the `../fetch_data.py` script was run successfully the following files should be available:

import os

sentiment140_folder = os.path.join('datasets', 'sentiment140')
training_csv_file = os.path.join(sentiment140_folder, 'training.1600000.processed.noemoticon.csv')
testing_csv_file = os.path.join(sentiment140_folder, 'testdata.manual.2009.06.14.csv')

Those files were downloaded from the research archive of the http://www.sentiment140.com/ project. The first file was gathered using the twitter streaming API by running stream queries for the positive ":)" and negative ":(" emoticons to collect tweets that are explicitly positive or negative. To make the classification problem non-trivial, the emoticons were stripped out of the text in the CSV files:

!ls -lh datasets/sentiment140/training.1600000.processed.noemoticon.csv

!wc -l datasets/sentiment140/training.1600000.processed.noemoticon.csv

Let's parse the CSV files and load everything in memory. As loading everything can take up to 2GB, let's limit the collection to 100K tweets of each (positive and negative) out of the total of 1.6M tweets.

FIELDNAMES = ('polarity', 'id', 'date', 'query', 'author', 'text')

def read_csv(csv_file, fieldnames=FIELDNAMES, max_count=None,
             n_partitions=1, partition_id=0):
    
    import csv  # put the import inside for use in IPython.parallel
    
    texts = []
    targets = []
    with open(csv_file, 'rb') as f:
        reader = csv.DictReader(f, fieldnames=fieldnames,
                                delimiter=',', quotechar='"')
        pos_count, neg_count = 0, 0
        for i, d in enumerate(reader):
            if i % n_partitions != partition_id:
                # Skip entry if not in the requested partition
                continue
                
            # only include strong positives (4) and negatives (0) for now
            if d['polarity'] == '4':
                if max_count and pos_count >= max_count / 2:
                    # we have enough positive tweets, don't collect any more
                    continue
                pos_count += 1
                texts.append(d['text'])
                targets.append(1)  # 4 becomes 1 for positive

            elif d['polarity'] == '0':
                if max_count and neg_count >= max_count / 2:
                    # we have enough negative tweets, don't collect any more
                    continue
                neg_count += 1
                texts.append(d['text'])
                targets.append(-1)  # 0 becomes -1 for negative

    return texts, targets  # include only text and new targets (ignore id, etc.)

%time text_train_all, target_train_all = read_csv(training_csv_file, max_count=200000)

len(text_train_all), len(target_train_all)

Let's display the first samples:

for text in text_train_all[:3]:
    print(text + "\n")

print(target_train_all[:3])

A polarity of "0" means negative while a polarity of "4" means positive. All the positive tweets are at the end of the file:

for text in text_train_all[-3:]:
    print(text + "\n")

print(target_train_all[-3:])

Let's split the training CSV file into a smaller training set and a validation set with 100k random tweets each:

from sklearn.cross_validation import train_test_split

text_train_small, text_validation, target_train_small, target_validation = train_test_split(
    text_train_all, np.array(target_train_all), test_size=.5, random_state=42)

len(text_train_small)

(target_train_small == -1).sum(), (target_train_small == 1).sum()

len(text_validation)

(target_validation == -1).sum(), (target_validation == 1).sum()

Let's open the manually annotated tweet files. The evaluation set also has neutral tweets with a polarity of "2" which we ignore. We can build the final evaluation set with only the positive and negative tweets of the evaluation CSV file:

text_test_all, target_test_all = read_csv(testing_csv_file)

len(text_test_all), len(target_test_all)

## The Hashing Trick

To workaround the limitations of the vocabulary-based vectorizers, one can use the hashing trick. Instead of building and storing an explicit mapping from the feature names to the feature indices in a Python dict, we can just use a hash function and a modulus operation:

from sklearn.utils.murmurhash import murmurhash3_bytes_u32

for word in "the cat sat on the mat".split():
    print("{0} => {1}".format(
        word, murmurhash3_bytes_u32(word, 0) % 2 ** 20))

This mapping is completly stateless and the dimensionality of the output space is explicitly fixed in advance (here we use a modulo `2 ** 20` which means roughly 1M dimensions). The makes it possible to workaround the limitations of the vocabulary based vectorizer both for parallelizability and online / out-of-core learning.

The `HashingVectorizer` class is an alternative to the `TfidfVectorizer` class with `use_idf=False` that internally uses the murmurhash hash function:

from sklearn.feature_extraction.text import HashingVectorizer

h_vectorizer = HashingVectorizer(encoding='latin-1')
h_vectorizer

It shares the same "preprocessor", "tokenizer" and "analyzer" infrastructure:

analyzer = h_vectorizer.build_analyzer()
analyzer('This is a test sentence.')

We can vectorize our datasets into a scipy sparse matrix exactly as we would have done with the `CountVectorizer` or `TfidfVectorizer`, except that we can directly call the `transform` method: there is no need to `fit` as `HashingVectorizer` is a stateless transformer:

%time X_train_small = h_vectorizer.transform(text_train_small)

The dimension of the output is fixed ahead of time to `n_features=2 ** 20` by default (nearly 1M features) to minimize the rate of collision on most classification problem while having reasonably sized linear models (1M weights in the `coef_` attribute):

X_train_small

As only the non-zero elements are stored, `n_features` has little impact on the actual size of the data in memory. We can combine the hashing vectorizer with a Passive-Aggressive linear model in a pipeline:

from sklearn.linear_model import PassiveAggressiveClassifier
from sklearn.pipeline import Pipeline

h_pipeline = Pipeline((
    ('vec', HashingVectorizer(encoding='latin-1')),
    ('clf', PassiveAggressiveClassifier(C=1, n_iter=1)),
))

%time h_pipeline.fit(text_train_small, target_train_small).score(text_validation, target_validation)

Let's check that the score on the validation set is reasonably in line with the set of manually annotated tweets:

h_pipeline.score(text_test_all, target_test_all)

As the `text_train_small` dataset is not that big we can still use a vocabulary based vectorizer to check that the hashing collisions are not causing any significative performance drop on the validation set (**WARNING** this is twice as slow as the hashing vectorizer version, skip this cell if your computer is too slow):

from sklearn.feature_extraction.text import TfidfVectorizer

vocabulary_vec = TfidfVectorizer(encoding='latin-1', use_idf=False)
vocabulary_pipeline = Pipeline((
    ('vec', vocabulary_vec),
    ('clf', PassiveAggressiveClassifier(C=1, n_iter=1)),
))

%time vocabulary_pipeline.fit(text_train_small, target_train_small).score(text_validation, target_validation)

We get almost the same score but almost twice as slower with also a big, slow to (un)pickle datastructure in memory:

len(vocabulary_vec.vocabulary_)

More info and reference for the original papers on the Hashing Trick in the answers to this http://metaoptimize.com/qa question: [What is the Hashing Trick?](http://metaoptimize.com/qa/questions/6943/what-is-the-hashing-trick).

## Out-of-Core learning

Out-of-Core learning is the task of training a machine learning model on a dataset that does not fit in the main memory. This requires the following conditions:
    
- a **feature extraction** layer with **fixed output dimensionality**
- knowing the list of all classes in advance (in this case we only have positive and negative tweets)
- a machine learning **algorithm that supports incremental learning** (the `partial_fit` method in scikit-learn).

Let us simulate an infinite tweeter stream that can generate batches of annotated tweet texts and there polarity. We can do this by recombining randomly pairs of positive or negative tweets from our fixed dataset:

from random import Random


class InfiniteStreamGenerator(object):
    """Simulate random polarity queries on the twitter streaming API"""
    
    def __init__(self, texts, targets, seed=0, batchsize=100):
        self.texts_pos = [text for text, target in zip(texts, targets)
                               if target > 0]
        self.texts_neg = [text for text, target in zip(texts, targets)
                               if target <= 0]
        self.rng = Random(seed)
        self.batchsize = batchsize

    def next_batch(self, batchsize=None):
        batchsize = self.batchsize if batchsize is None else batchsize
        texts, targets = [], []
        for i in range(batchsize):
            # Select the polarity randomly
            target = self.rng.choice((-1, 1))
            targets.append(target)
            
            # Combine 2 random texts of the right polarity
            pool = self.texts_pos if target > 0 else self.texts_neg
            text = self.rng.choice(pool) + " " + self.rng.choice(pool)
            texts.append(text)
        return texts, targets

infinite_stream = InfiniteStreamGenerator(text_train_small, target_train_small)

texts_in_batch, targets_in_batch = infinite_stream.next_batch(batchsize=3)

for t in texts_in_batch:
    print(t + "\n")

targets_in_batch

We can now use our infinte tweet source to train an online machine learning algorithm using the hashing vectorizer. Note the use of the `partial_fit` method of the `PassiveAggressiveClassifier` instance in place of the traditional call to the `fit` method that needs access to the full training set.

From time to time, we evaluate the current predictive performance of the model on our validation set that is guaranteed to not overlap with the infinite training set source:

n_batches = 1000
validation_scores = []
training_set_size = []

# Build the vectorizer and the classifier
h_vectorizer = HashingVectorizer(encoding='latin-1')
clf = PassiveAggressiveClassifier(C=1)

# Extract the features for the validation once and for all
X_validation = h_vectorizer.transform(text_validation)
classes = np.array([-1, 1])

n_samples = 0
for i in range(n_batches):
    
    texts_in_batch, targets_in_batch = infinite_stream.next_batch()    
    n_samples += len(texts_in_batch)

    # Vectorize the text documents in the batch
    X_batch = h_vectorizer.transform(texts_in_batch)
    
    # Incrementally train the model on the new batch
    clf.partial_fit(X_batch, targets_in_batch, classes=classes)
    
    if n_samples % 100 == 0:
        # Compute the validation score of the current state of the model
        score = clf.score(X_validation, target_validation)
        validation_scores.append(score)
        training_set_size.append(n_samples)

    if i % 100 == 0:
        print("n_samples: {0}, score: {1:.4f}".format(n_samples, score))

We can now plot the collected validation score values, versus the number of samples generated by the infinite source and feed to the model:

plt.plot(training_set_size, validation_scores)
plt.ylim(0.5, 1)
plt.xlabel("Number of samples")
plt.ylabel("Validation score")

## Parallelizing Text Classification

### Partitioning the Data and Training in Parallel

As the `HashingVectorizer` is stateless, one can use a separate instance (with the same parameters) in parallel or distributed processes to extract the features on independant partitions of a big text dataset. Each partition of extracted features can then be fed to independent instances of a linear classifier model on each computing node:

<img src="parallel_text_clf.png" style="width: 500px" />

### Final Linear Model Averaging

Once all the nodes are ready we can average the linear models:
    
<img src="parallel_text_clf_average.png" style="width: 500px" />

### Sample Implementation on the Tweet Data

Let's use IPython parallel to read partitions of the train CSV in different Python processes using the interactive IPython.parallel interface:

```bash
starcluster put dat12 --user sgeadmin fetch_data.py /mnt/sgeadmin/
```


```bash
starcluster sn dat12 master 'cd /mnt/sgeadmin/ && python fetch_data.py sentiment140'
starcluster sn dat12 node001 'cd /mnt/sgeadmin/ && python fetch_data.py sentiment140'
starcluster sn dat12 node002 'cd /mnt/sgeadmin/ && python fetch_data.py sentiment140'
```

Let's tell each engine which partition of the data it will have to handle:

dv = client.direct_view()

dv.scatter('partition_ids', range(len(client)), block=True)

%px print(partition_ids)

%px partition_id = partition_ids[0]

%px print(partition_id)

Let's send all we need to the engines

from sklearn.feature_extraction.text import HashingVectorizer

h_vectorizer = HashingVectorizer(encoding='latin-1')
dv['h_vectorizer'] = h_vectorizer
dv['read_csv'] = read_csv
dv['training_csv_file'] = '/mnt/sgeadmin/' + training_csv_file
dv['n_partitions'] = len(client)

%px print(training_csv_file)
%px print(n_partitions)

We are now ready to read the data partition from the CSV file, vectorize it, and train an indepenent model on each IPython.parallel engine:

%%px

max_count = 50000
print("Parsing %d items for partition %d..." % (max_count, partition_id))

texts, targets = read_csv(training_csv_file, n_partitions=n_partitions,
                         partition_id=partition_id, max_count=50000)

%%px
print("Shuffling the positive and negative examples...")

from sklearn.utils import shuffle
texts, targets = shuffle(texts, targets, random_state=1)

%%px
print("Vectorizing text data...")

vectors = h_vectorizer.transform(texts)

%%px
print("Fitting a linear model...")

from sklearn.linear_model import Perceptron
clf = Perceptron(n_iter=1).fit(vectors, targets)

print("Done!")

classifiers = dv.gather('clf', block=True)
classifiers

We can now compute the average linear model:

from copy import copy

def average_linear_model(models):
    """Compute a linear model that is the average of the others"""
    avg = copy(models[0])

    avg.coef_ = np.sum([m.coef_ for m in models], axis=0)
    avg.coef_ /= len(models)
    
    avg.intercept_ = np.sum([m.intercept_ for m in models], axis=0)
    avg.intercept_ /= len(models)

    return avg
    

clf = average_linear_model(classifiers)

Let's compare the score of the average model with the scores of the individual classifiers. The average models can have a better generalization than the individual models being averaged:

clf.score(h_vectorizer.transform(text_test_all), target_test_all)

for c in classifiers:
    print(c.score(h_vectorizer.transform(text_test_all), target_test_all))

Averaging linear models learned on different datasets that follow the same distribution is a form of Ensemble method. Other Ensemble methods include:
    
- Boosted models (see [Gradient Tree Boosting](http://scikit-learn.org/dev/modules/ensemble.html#gradient-tree-boosting) available in 0.13 and [AdaBoost](http://scikit-learn.org/dev/modules/ensemble.html#adaboost) in master),
- Bagging (Bootstrap Aggregating) as done in [Random Forests](http://scikit-learn.org/dev/modules/ensemble.html#random-forests). Decision Trees as the base model
- Other non-bootstraped, randomized aggregate of Decision Trees such as [Extremely Randomized Trees](http://scikit-learn.org/dev/modules/ensemble.html#extremely-randomized-trees).
- Averaging the probabilistic estimate of a library of randomized and / or heterogeneous linear or non-linear models.
- Stacking, for instance: training a final Random Forest on the probabilistic class assignment output of a library of randomized and / or heterogeneous linear or non-linear models.

### Limitations of the Hashing Vectorizer

Using the Hashing Vectorizer makes it possible to implement streaming and parallel text classification but can also introduce some issues:
    
- The collisions can introduce too much noise in the data and degrade prediction quality,
- The `HashingVectorizer` does not provide "Inverse Document Frequency" reweighting (lack of a `use_idf=True` option).
- There is no easy way to inverse the mapping and find the feature names from the feature index.

The collision issues can be controlled by increasing the `n_features` parameters.

The IDF weighting might be reintroduced by appending a `TfidfTransformer` instance on the output of the vectorizer. However computing the `idf_` statistic used for the feature reweighting will require to do at least one additional pass over the training set before being able to start training the classifier: this breaks the online learning scheme.

The lack of inverse mapping (the `get_feature_names()` method of `TfidfVectorizer`) is even harder to workaround. That would require extending the `HashingVectorizer` class to add a "trace" mode to record the mapping of the most important features to provide statistical debugging information.

In the mean time to debug feature extraction issues, it is recommended to use `TfidfVectorizer(use_idf=False)` on a small-ish subset of the dataset to simulate a `HashingVectorizer()` instance that have the `get_feature_names()` method and no collision issues.

-->