# Distributed Hyper-parameter searching

In [1]:
%matplotlib inline

# Hide info messages from paramiko
import logging
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.WARN)

import time
import random
import threading
import pandas as pd
import numpy as np
import plotly.plotly as py
import plotly.graph_objs as go
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = (2, 2)

from distributed import progress, Client
from pprint import pprint

from sklearn.decomposition import PCA
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import cross_val_score
from sklearn.datasets import load_digits

from cclyde.cluster import Cluster

---
## Create and launch AWS instances.

In [2]:
cluster = Cluster(key_name='default_linux', n_nodes=4, cluster_name='default', instance_type='t2.micro')

Connecting to Boto3 and EC2 resources...Done. 
Ready to configure in preparation to launch cluster! Run: >>> cluster.configure()


In [3]:
cluster.configure()

Checking keypair exists using key_name: "default_linux"...
	Found pem_key: "default_linux"...Done.
Checking for Cluster Clyde's Virtual Private Cloud (cclyde_vpc) on AWS...
	Found existing VPC...Done.
Checking for Cluster Clyde's Subnet in the VPC...
	Found existing subnet...Done.
Validating security group...Found existing cclyde security group, connecting to it...Done.
Configuring security group...
	Working on permission: tcp from port: 22 to port: 22...already exists! Passing.

	Working on permission: tcp from port: 80 to port: 8786...already exists! Passing.

	Working on permission: tcp from port: 80 to port: 8787...already exists! Passing.

	Working on permission: tcp from port: 0 to port: 65535...already exists! Passing.
Done configuring security group.
Checking for Cluster Clyde's internet gateway...found existing cclyde gateway...Done.
Attaching internet gateway to VPC if needed...gateway already associated with VPC...Done.
Confirming proper VPC route table configuration...Found

In [4]:
cluster.launch_instances_nonblocking()



All instances in running state, waiting for all to be reachable...
All 4 instances ready!
Setting node names...Done.


# MNIST dataset
### Grayscale hand-written digits

In [None]:
X, y = load_digits(return_X_y=True)
X = np.asarray([x.flatten() for x in X])

for i in range(3):
    plt.imshow(X[i].reshape((8, 8)), cmap='Greys_r')
    plt.title('Digit: {}'.format(y[i]))
    plt.show()

# Train a NN to predict the numbers (as simple as it gets)
## This also demonstrates the time problem of adjusting hyper-parameters

---


In [None]:
pca = PCA(n_components=30)

print 'Features before: ', X.shape[1]

X = pca.fit_transform(X)

print 'Features after: ', X.shape[1]
print '{}% Explained Variance'.format(round(sum(pca.explained_variance_ratio_) * 100, 1))

Train with some given parameters...

In [None]:
lr = MLPClassifier(hidden_layer_sizes=(10, 5), batch_size=10,
                   solver='sgd', learning_rate_init=0.01, early_stopping=True)

start = time.time()
scores = cross_val_score(estimator=lr,
                         X=X, 
                         y=y,
                         cv=5)

print("\nAccuracy: {}% (+/- {})".format(round(scores.mean() * 100, 2), round(scores.std(), 3) * 2))
print('Finished in {}sec\n'.format(round(time.time() - start, 2)))

---
Alright, how about something else...

In [None]:
lr = MLPClassifier(hidden_layer_sizes=(10, 10,), batch_size=100,
                   solver='sgd', learning_rate_init=0.01, early_stopping=True)

start = time.time()
scores = cross_val_score(estimator=lr,
                         X=X, 
                         y=y,
                         cv=5)

print("\nAccuracy: {}% (+/- {})".format(round(scores.mean() * 100, 2), round(scores.std(), 3) * 2))
print('Finished in {}sec\n'.format(round(time.time() - start, 2)))

and now something different than that..

In [None]:
lr = MLPClassifier(hidden_layer_sizes=(10, 10, 10,), batch_size=100,
                   solver='sgd', learning_rate_init=0.01, early_stopping=True)

start = time.time()
scores = cross_val_score(estimator=lr,
                         X=X, 
                         y=y,
                         cv=5)

print("\nAccuracy: {}% (+/- {})".format(round(scores.mean() * 100, 2), round(scores.std(), 3) * 2))
print('Finished in {}sec\n'.format(round(time.time() - start, 2)))

## Issue: What hyper params are best? 

Train for all/most?

In [None]:

# Define hyper parameter ranges
batch_sizes = np.linspace(start=5, stop=750, num=30, dtype=np.int64)
n_layers = range(1, 8, 1)

# Make a list of all combinations
params = []
for batch_size in batch_sizes:
    for n_layer in n_layers:

        n_neuron = np.random.randint(low=5, high=200)
        params.append({'batch_size': batch_size,
                       'hidden_layer_sizes': tuple(n_neuron for _ in range(n_layer)),
                       'solver': 'sgd',
                       'learning_rate_init': 0.01,
                       'early_stopping': True
                      })

print '{} different combinations.'.format(len(params))
pprint(params[:2])

# This will take a while, even if using all cores on a local machine; let's distribute the workload

---

Before executing the next few blocks, make sure the instances are ready to connect. If launched with the non-blocking thread, we can check if it's done with `.is_alive()`

In [5]:
print 'Lauching thread is alive: ', cluster.instance_launching_thread.is_alive()

Lauching thread is alive:  False


In [6]:
cluster.install_anaconda()

Installing Anaconda on cluster...


-------------------

Host: 54.172.100.211	Exit Code: 0

-------------------

Host: 54.158.227.73	Exit Code: 0

-------------------

Host: 54.87.136.28	Exit Code: 0

-------------------

Host: 52.90.147.78	Exit Code: 0
Done.

In [7]:
cluster.install_python_packages(['scikit-learn', 'numpy', 'pandas', 'dask', 'futures'], method='conda')


Installing package: scikit-learn
-------------------

Host: 54.172.100.211	Exit Code: 0

-------------------

Host: 54.158.227.73	Exit Code: 0

-------------------

Host: 54.87.136.28	Exit Code: 0

-------------------

Host: 52.90.147.78	Exit Code: 0

Installed scikit-learn

Installing package: numpy
-------------------

Host: 54.172.100.211	Exit Code: 0

-------------------

Host: 54.158.227.73	Exit Code: 0

-------------------

Host: 54.87.136.28	Exit Code: 0

-------------------

Host: 52.90.147.78	Exit Code: 0

Installed numpy

Installing package: pandas
-------------------

Host: 54.172.100.211	Exit Code: 0

-------------------

Host: 54.158.227.73	Exit Code: 0

-------------------

Host: 54.87.136.28	Exit Code: 0

-------------------

Host: 52.90.147.78	Exit Code: 0

Installed pandas

Installing package: dask
-------------------

Host: 54.172.100.211	Exit Code: 0

-------------------

Host: 54.158.227.73	Exit Code: 0

-------------------

Host: 54.87.136.28	Exit Code: 0

-------

True

In [8]:
cluster.launch_dask()

Installing dask.distributed on cluster

-------------------

Host: 54.172.100.211	Exit Code: 0

-------------------

Host: 54.158.227.73	Exit Code: 0

-------------------

Host: 54.87.136.28	Exit Code: 0

-------------------

Host: 52.90.147.78	Exit Code: 0

Launching scheduler on master node...
-------------------

Host: 52.90.147.78	Exit Code: 0
	STDOUT for 52.90.147.78:
		()Done.

Launching workers...
-------------------

Host: 54.172.100.211	Exit Code: 0
	STDOUT for 54.172.100.211:
		()
-------------------

Host: 54.158.227.73	Exit Code: 0
	STDOUT for 54.158.227.73:
		()
-------------------

Host: 54.87.136.28	Exit Code: 0
	STDOUT for 54.87.136.28:
		()Done.

Scheduler should be available here: 52.90.147.78:8786
Web Dashboard should be available here: 52.90.147.78:8787

## Connect to the resulting scheduler

In [9]:
c = Client(address='52.90.147.78:8786')
c

<Client: scheduler="52.90.147.78:8786" processes=3 cores=3>

# Define functions which will be distributed to workers...

In [10]:
def get_data(kwargs):
    """
    Function which gets data and performs PCA on it.
    """
    from sklearn.datasets import load_digits
    from sklearn.decomposition import PCA
    import numpy as np
    
    X, y = load_digits(return_X_y=True)
    X = np.asarray([x.flatten() for x in X])
    pca = PCA(n_components=30)
    X = pca.fit_transform(X)
    
    return (kwargs, X, y)


def model_tester(package):
    """
    Function which is mapped to cluster. Passes kwargs to model to be trained.
    Returns score based on those kwargs.
    """
    
    kwargs, X, y = package
    
    import time
    import numpy as np
    from sklearn.neural_network import MLPClassifier
    from sklearn.model_selection import cross_val_score
    
    # Initialize model with given kwargs
    lr = MLPClassifier(**kwargs)
    scores = cross_val_score(estimator=lr,
                             X=X, 
                             y=y,
                             cv=5)
    return (kwargs, scores.mean(), scores.std())


def score_combiner(package):
    """
    Not needed, but more functions == more pretty colors
    """
    import time
    import random
    time.sleep(random.random())
    
    kwargs, score_m, score_std = package
    kwargs.update({'score': score_m, 'std': score_std})
    
    return kwargs


def double(n):
    '''
    Useless worker function # 1
    '''
    import time
    import random
    import sklearn
    time.sleep(random.random())
    return n * 2, 2


def add_two(package):
    """
    Useless worker function # 2
    """
    n, n2 = package
    import time
    import random
    time.sleep(random.random())
    return n + n2
    

## Test out some test functions...

In [13]:
futures = c.map(double, range(250))
futures = c.map(add_two, futures)
progress(futures)

Widget Javascript not detected.  It may not be installed properly. Did you enable the widgetsnbextension? If not, then run "jupyter nbextension enable --py --sys-prefix widgetsnbextension"


## Distribute the actual work

In [None]:
futures = c.map(get_data, params)
futures = c.map(model_tester, futures)
futures = c.map(score_combiner, futures)
progress(futures)

In [None]:
c.gather(futures)

In [None]:
results = c.gather(futures)

In [None]:
df = pd.DataFrame(results)
df['n_layers'] = df.hidden_layer_sizes.map(lambda _tuple: len(_tuple)) 
df['n_neurons'] = df.hidden_layer_sizes.map(lambda _tuple: _tuple[0])
df.head()

In [None]:
df.n_layers.unique()

In [None]:

data = []
for n_layers in df.n_layers.unique():
    
    temp = df[df.n_layers == n_layers]
    
    trace = go.Scatter(
        x = temp.n_neurons,
        y = temp.n_layers,
        mode='markers',
        text=['{}%<br>Layers: {}'.format(round(v * 100, 2), l) 
              for v, l in zip(temp.score.values, temp.n_layers.values)],
        name='{} layers'.format(n_layers),
        marker=dict(
            size=temp.batch_size / 20.0,
            color = temp.score, #set color equal to a variable
            colorscale='Viridis',
            showscale=False
        )
    )
    data.append(trace)

layout = dict(title = 'Best performing models.<br>(size = batch size)',
              xaxis = dict(zeroline = False, title='Neuron Count'),
              yaxis = dict(zeroline = False, title='Layer Count'),
             )

fig = dict(data=data, layout=layout)
py.iplot(fig, filename='styled-scatter')


In [None]:
df.ix[df.score.argmax(), :]

# Also create a distributed queue system...

In [None]:
from Queue import Queue
local_q = Queue()
remote_q = c.scatter(local_q)

def long_calc1(n):
    
    import time
    import random
    
    time.sleep(random.random())
    
    return n + 2

def long_calc2(n):
    
    import time
    import random
    
    time.sleep(random.random())
    
    return n * 2

def long_calc3(n):
    
    import time
    import random
    
    time.sleep(random.random())
    
    return n - 2


long_calc1_q = c.map(long_calc1, remote_q)
long_calc2_q = c.map(long_calc2, long_calc1_q)
long_calc3_q = c.map(long_calc3, long_calc2_q)
result_q = c.gather(long_calc3_q)



## queue is currently empty...

In [None]:
result_q.qsize()

## Start submitting jobs to the queue with a thread

In [None]:
def start_jobs():

    jobs = range(500)

    for job in jobs:
        time.sleep(random.random())
        local_q.put(job)
        
    return 

thread = threading.Thread(target=start_jobs)
thread.start()

## and begin receiving the results...

In [None]:
def get_jobs():
    while True:
        print result_q.get()
        
    return

finish_thread = threading.Thread(target=get_jobs)
finish_thread.start()

In [15]:
cluster.terminate_cluster()

