# Objetivo

Presentar código que sirva como "template" o punto de partida para diseñar soluciones no triviales para aprendizaje basado en grandes escalas de datos (tanto en $n$ como en $p$).

Mostramos:
* Cómo aprender (y validar) progresivamente de un flujo masivo y continuo de datos (hasta billones de observaciones diarias) aprovechando la interfaz `partial_fit` de los estimadores SGD.
* Cómo crear interacciones ricas de variables categóricas y numéricas en estructuras muy "sparse" (potencialmente con millones de columnas).
* Cómo combatir la alta dimensionalidad resultante mediante el "hashing trick" y la regularización.
* Cómo despachar trabajos entre procesos y coordinarlos mediante archivos y estructuras elementales de IPC (inter-process communication) para explotar el paralelismo inherente a la selección de modelos.
* Cómo administrar eficientemente la memoria evitando cargar todos los datos y/o estimadores al mismo tiempo.

Notar que `GridSearchCV` no funciona con `partial_fit` ni validación progresiva, por lo que vamos a realizar una parte del trabajo a mano. Pero no será de gusto nada más, porque es importante tener algún contacto con las funcionalidades de multiprocesamiento y multithreading básicas, ya que en la práctica suelen presentarse situaciones que no son "de manual".

# Inicialización

In [1]:
import shelve
import operator

import multiprocessing as mp
import numpy as np
import pandas as pd

from collections import Counter
from functools import reduce
from itertools import count, product as iproduct

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer
from sklearn.feature_extraction import FeatureHasher
from sklearn.linear_model import SGDClassifier
from sklearn.metrics import log_loss, mean_squared_error
from sklearn.base import BaseEstimator, ClassifierMixin

# Configuración

* `in_paths` es la lista de archivos de entrada.

* `out_path` es el shelve donde se guardan los scores y pipelines resultantes del fit.

* `gen_path` y `gen_size` indican el archivo y el tamaño de los datos de prueba simulados por el generador de números aleatorios.

* `n_jobs` es el número de procesos a correr en paralelo.

* `use_squared_loss` debe ser `True` para usar score $-rss$ o `False` para $-logloss$.

* `batch_size` es el tamaño de cada minibatch.

* `n_batches` es la cantidad de minibatches que se leen al mismo tiempo desde el archivo de entrada.

* `drop_scores` es el porcentaje de scores de minibatches iniciales a descartar. Con el resto se calcula el score medio.

* `grid` es una lista de diccionarios cada uno de los cuales tiene tres entradas (`"extractor"`, `"vectorizer"`, `"classifier"`) cuyos valores son, a su vez, diccionarios indicando parámetros para la etapa correspondiente del pipeline.

In [2]:
in_paths = ['data.csv']

out_path = 'models.db'

gen_path = 'data.csv'

gen_size = 100000

n_jobs = 8

use_squared_loss = True

batch_size = 1000

n_batches = 20

drop_scores = 0.7

grid = [dict(extractor=dict(features=features),
             vectorizer=dict(n_features=n_features, degree=degree),
             classifier=dict(loss='log', max_iter=1000, tol=1e-3,
                             l1_ratio=l1_ratio, alpha=alpha))
        for features in [['x1', 'x2'], ['x1', 'x2', 'x3']]
        for n_features in [2**20, 2**21, 2**22]
        for degree in [1, 2]
        for l1_ratio in [0.15, 0.5, 0.85]
        for alpha in [0.0001, 0.001, 0.01, 0.1]]

# Generación de datos de prueba

La siguiente celda genera datos de prueba en base a un modelo logístico de tres variables, sus cuadrados, sus interacciones y un intercept. Cada variable se distribuye de forma $N(0,1)$. Los 10 coeficientes se mueven de forma escalonada desde 0.9 (el intercept) hasta 0.1 (una de las interacciones) para facilitar la comparación con los coeficientes estimados por el classificador SGD. 

In [3]:
b0, b1, b2, b3, b12, b13, b23, b11, b22, b33 = reversed(
    np.linspace(0.1, 0.9, 10))

x1 = np.random.normal(size=gen_size)
x2 = np.random.normal(size=gen_size)
x3 = np.random.normal(size=gen_size)

b = (b0 + b1 * x1 + b2 * x2 + b3 * x3 +
     b12 * x1 * x2 + b13 * x1 * x3 + b23 * x2 * x3 +
     b11 * x1**2 + b22 * x2**2 + b33 * x3**2)
p = 1 / (1 + np.exp(-b))
y = np.random.binomial(1, p)

pd.DataFrame(dict(x1=x1, x2=x2, x3=x3, y=y)).to_csv(gen_path)


# Código principal

Levanta varios procesos/jobs cada uno de los cuales corre la función `job` iterando sobre una parte de la grilla y fiteando un modelo por cada parametrización.

Para ejecutarlo, correr la función `fit()` desde otra celda.

Para inspeccionar los `n` mejores resultados, correr la función `results(n)` desde otra celda. Ojo que si `n` es grande se van a cargar muchos pipeline a la vez en memoria!

In [4]:
class Extractor(FunctionTransformer):
    """
    Extrae files de un DataFrame como diccionarios con atributos `features`.
    """

    def __init__(self, features):
        self.features = features
        super().__init__(self._extract, validate=False)

    def _extract(self, X):
        return X.loc[:, self.features].to_dict('records')


class Vectorizer(FeatureHasher):
    """
    Vectoriza diccionarios de features teniendo en cuenta interacciones hasta
    grado `degree`.
    
    Las potencias de dummies y las interacciones entre dummies provenientes de
    la misma variable categórica no son incluidas.
    
    El resultado se hashea (hashing trick) en una tabla de tamaño 2**`n_features`.
    No se guarda el mapa de features a hashes, este transformer es "stateless".
    
    Por defecto, cualquier valor que no sea de tipo `float` o `np.float64` se
    considera categórico y se codifica como dummy. Otros tipos numéricos pueden
    indicarse mediante el parámetro `num_types`.
    """

    def __init__(self, degree=2, n_features=2**20,
                 num_types=[float, np.float64]):
        self.degree = degree
        self.num_types = num_types
        super().__init__(n_features=n_features)

    def transform(self, X):
        return super().transform(map(self._encode, X))

    def _encode(self, dic):
        dic = {k if type(v) in self.num_types else f'{k}={v}':
               float(v) if type(v) in self.num_types else 1
               for k, v in dic.items()}
        dic_keys = list(dic.keys())
        for deg in range(2, self.degree + 1):
            for term_keys in iproduct(dic_keys, repeat=deg):
                term_names, term_facts = [], []
                for k, n in Counter(term_keys).items():
                    v = dic[k]
                    if type(v) is int and n > 1:
                        break
                    term_names.append(k if n == 1 else f'{k}^{n}')
                    term_facts.append(v**n)
                else:  # No dummy feature was included more than once
                    dic['*'.join(sorted(term_names))] = product(term_facts)
        return dic

    
class BenchmarkClassifier(BaseEstimator, ClassifierMixin):
    """
    Clasificador "benchmark" con modelo nulo para p: un promedio móvil.
    """
    
    def __init__(self):
        self.sum_ = 0
        self.n_ = 0
    
    def partial_fit(self, X, y):
        self.sum_ += y.sum()
        self.n_ += len(y)
        return self
        
    def predict(self, X):
        return self.predict_proba(X)[:, 1] >= 0.5
    
    def predict_proba(self, X):
        p = np.full(X.shape[0], self.sum_ / self.n_)
        return np.column_stack([1 - p, p]) 


def product(iterable, start=1):
    """
    Devuelve la productoria de los números en `iterable` con base `start`.
    """
    return reduce(operator.mul, iterable, start)


def split(seq, n):
    """
    Divide la secuencia `seq` en partes de tamaño `n` o `n+1`, si es posible.
    """
    m, k = divmod(len(seq), n)
    i = 0
    for _ in range(n):
        j = i + m + (1 if k > 0 else 0)
        yield seq[i:j]
        i = j
        k -= 1


def loss(classifier, X, y):
    """
    Calcula la pérdida apropiada de predecir con `classifier` sobre `X` vs `y`.
    """
    if use_squared_loss:
        return mean_squared_error(y, classifier.predict_proba(X)[:, 1])
    else:
        return log_loss(y, classifier.predict(X))


def job(n_job):
    """
    Crea un pipeline de clasificación para el split `n_job` de la grilla.
    
    Lee de a minibatches los archivos de entrada, alimentando progresivamente
    al pipeline con ellos. Calcula el score medio usando validación progresiva.
    El i-ésimo pipeline se almacena en un shelve bajo la clave f'{n_job}-{i}'.
    Su score se guarda con esa mismo identificador bajo la clave 'scores'.
    Los parámetros que controlan la operación se detallan en la sección
    Configuración arriba.    
    """
    for i, params in enumerate(list(split(grid, n_jobs))[n_job]):
        extractor = Extractor(**params['extractor'])
        vectorizer = Vectorizer(**params['vectorizer'])
        sgd_classifier = SGDClassifier(**params['classifier'])
        sgd_scores = []
        bench_classifier = BenchmarkClassifier()
        bench_scores = []
        for in_path in in_paths:
            for j in count(0, batch_size * n_batches):
                batches = pd.read_csv(in_path,
                                      nrows=batch_size * n_batches,
                                      skiprows=range(1, j + 1))
                if len(batches) == 0:
                    break
                for batch in split(batches, n_batches):
                    X = extractor.transform(batch)
                    X = vectorizer.transform(X)
                    y = batch.loc[:, 'y']
                    if j > 0:
                        sgd_scores.append(-loss(sgd_classifier, X, y))
                        bench_scores.append(-loss(bench_classifier, X, y))
                    sgd_classifier.partial_fit(X, y, classes=[0, 1])
                    bench_classifier.partial_fit(X, y)
            key = f'{n_job}-{i}'
        pipe = Pipeline([('extractor', extractor),
                         ('vectorizer', vectorizer),
                         ('classifier', sgd_classifier)])
        from_scores = int(drop_scores * len(sgd_scores))
        score = np.mean(sgd_scores[from_scores:])
        bench_score = np.mean(bench_scores[from_scores:])
        r2_score = abs((score - bench_score) / bench_score)
        with lock: # Lockea el archivo ya que todos los procesos van a escribir en un  mismo archivo
            with shelve.open(out_path) as out:
                # Guardamos los scores bajo otra clave para no tener que cargar
                # en memoria todos los estimadores al ordenarlos en results()
                out[key] = pipe
                scores = out['scores']
                scores[key] = score, r2_score
                out['scores'] = scores
        print(f'Fitted {key} with score {score, r2_score} and params {params}\n\n')

    
def fit():
    """
    Despacha jobs de aprendizaje a diferentes procesos y espera que finalicen. 
    """
    print(f'Fitting grid of size {len(grid)}\n\n')
    with shelve.open(out_path, 'n') as out:  # Recrea la base
        out['scores'] = {}
    if n_jobs > 1:
        procs = []
        for n_job in range(n_jobs):
            proc = mp.Process(target=job, args=[n_job])
            proc.start()
            procs.append(proc)
        for proc in procs:
            proc.join() #La funcion join se encarga de que los procesos terminen
    else:
        fit(0)


def results(n=5):
    """
    Lee y presenta los `n` mejores resultados del fit almacenados en el shelve de salida.
    """
    with lock:
        with shelve.open(out_path) as out:
            rank = sorted(((s, k) for k, s in out['scores'].items()),
                          reverse=True)
            return [(s, out[k]) for s, k in rank[:n]]


# Usamos este lock para garantizar que solo un proceso lee/escribe a la vez en el shelve
lock = mp.Lock()

# Fit y análisis de resultados

In [5]:
# Recordar que esta función queda corriendo en background

fit()

Fitting grid of size 144


Fitted 0-0 with score (-0.19201208885367613, 0.023538565665445457) and params {'extractor': {'features': ['x1', 'x2']}, 'vectorizer': {'n_features': 1048576, 'degree': 1}, 'classifier': {'loss': 'log', 'max_iter': 1000, 'tol': 0.001, 'l1_ratio': 0.15, 'alpha': 0.0001}}


Fitted 4-0 with score (-0.1907473375626859, 0.02997035267974086) and params {'extractor': {'features': ['x1', 'x2', 'x3']}, 'vectorizer': {'n_features': 1048576, 'degree': 1}, 'classifier': {'loss': 'log', 'max_iter': 1000, 'tol': 0.001, 'l1_ratio': 0.15, 'alpha': 0.0001}}


Fitted 3-0 with score (-0.18717844825509342, 0.048119641055686155) and params {'extractor': {'features': ['x1', 'x2']}, 'vectorizer': {'n_features': 4194304, 'degree': 1}, 'classifier': {'loss': 'log', 'max_iter': 1000, 'tol': 0.001, 'l1_ratio': 0.5, 'alpha': 0.01}}


Fitted 7-0 with score (-0.18372455697053566, 0.0656841165941247) and params {'extractor': {'features': ['x1', 'x2', 'x3']}, 'vectorizer': {'n_features': 419


Fitted 4-7 with score (-0.2161587457900901, 0.0992572406157057) and params {'extractor': {'features': ['x1', 'x2', 'x3']}, 'vectorizer': {'n_features': 1048576, 'degree': 1}, 'classifier': {'loss': 'log', 'max_iter': 1000, 'tol': 0.001, 'l1_ratio': 0.5, 'alpha': 0.1}}


Fitted 3-5 with score (-0.2197339788564591, 0.1174387896468576) and params {'extractor': {'features': ['x1', 'x2']}, 'vectorizer': {'n_features': 4194304, 'degree': 1}, 'classifier': {'loss': 'log', 'max_iter': 1000, 'tol': 0.001, 'l1_ratio': 0.85, 'alpha': 0.1}}


Fitted 1-2 with score (-0.19549610124948846, 0.005820911732485032) and params {'extractor': {'features': ['x1', 'x2']}, 'vectorizer': {'n_features': 1048576, 'degree': 2}, 'classifier': {'loss': 'log', 'max_iter': 1000, 'tol': 0.001, 'l1_ratio': 0.85, 'alpha': 0.0001}}


Fitted 0-8 with score (-0.19096272861649197, 0.028874999472565492) and params {'extractor': {'features': ['x1', 'x2']}, 'vectorizer': {'n_features': 1048576, 'degree': 1}, 'classifier': {'lo


Fitted 1-7 with score (-0.1871797487717107, 0.048113027387093346) and params {'extractor': {'features': ['x1', 'x2']}, 'vectorizer': {'n_features': 2097152, 'degree': 1}, 'classifier': {'loss': 'log', 'max_iter': 1000, 'tol': 0.001, 'l1_ratio': 0.15, 'alpha': 0.001}}


Fitted 7-7 with score (-0.17377374280395919, 0.11628815060000673) and params {'extractor': {'features': ['x1', 'x2', 'x3']}, 'vectorizer': {'n_features': 4194304, 'degree': 2}, 'classifier': {'loss': 'log', 'max_iter': 1000, 'tol': 0.001, 'l1_ratio': 0.15, 'alpha': 0.001}}


Fitted 5-3 with score (-0.1743235218359699, 0.11349229526939077) and params {'extractor': {'features': ['x1', 'x2', 'x3']}, 'vectorizer': {'n_features': 1048576, 'degree': 2}, 'classifier': {'loss': 'log', 'max_iter': 1000, 'tol': 0.001, 'l1_ratio': 0.85, 'alpha': 0.001}}


Fitted 0-14 with score (-0.1849764729038675, 0.05931760271874535) and params {'extractor': {'features': ['x1', 'x2']}, 'vectorizer': {'n_features': 1048576, 'degree': 2}, 'classi


Fitted 7-9 with score (-0.18770886773174755, 0.045422237126252436) and params {'extractor': {'features': ['x1', 'x2', 'x3']}, 'vectorizer': {'n_features': 4194304, 'degree': 2}, 'classifier': {'loss': 'log', 'max_iter': 1000, 'tol': 0.001, 'l1_ratio': 0.15, 'alpha': 0.1}}


Fitted 3-12 with score (-0.18454724651003807, 0.061500397680641444) and params {'extractor': {'features': ['x1', 'x2']}, 'vectorizer': {'n_features': 4194304, 'degree': 2}, 'classifier': {'loss': 'log', 'max_iter': 1000, 'tol': 0.001, 'l1_ratio': 0.5, 'alpha': 0.01}}


Fitted 5-6 with score (-0.19147522517950813, 0.026268741022892308) and params {'extractor': {'features': ['x1', 'x2', 'x3']}, 'vectorizer': {'n_features': 2097152, 'degree': 1}, 'classifier': {'loss': 'log', 'max_iter': 1000, 'tol': 0.001, 'l1_ratio': 0.15, 'alpha': 0.0001}}


Fitted 6-5 with score (-0.17397333950223295, 0.11527311826819679) and params {'extractor': {'features': ['x1', 'x2', 'x3']}, 'vectorizer': {'n_features': 2097152, 'degree': 2},

Process Process-6:
Traceback (most recent call last):
  File "/home/paulati/miniconda2/envs/dhds/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/paulati/miniconda2/envs/dhds/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-4-7d05f51b95be>", line 156, in job
    out[key] = pipe
  File "/home/paulati/miniconda2/envs/dhds/lib/python3.7/shelve.py", line 125, in __setitem__
    self.dict[key.encode(self.keyencoding)] = f.getvalue()
  File "/home/paulati/miniconda2/envs/dhds/lib/python3.7/dbm/dumb.py", line 208, in __setitem__
    self._addkey(key, self._addval(val))
  File "/home/paulati/miniconda2/envs/dhds/lib/python3.7/dbm/dumb.py", line 170, in _addval
    f.write(val)
OSError: [Errno 28] No space left on device
Process Process-3:
Traceback (most recent call last):
  File "/home/paulati/miniconda2/envs/dhds/lib/python3.7/multiprocessing/process.py", line 297, in

In [6]:
# Ojo que si n es grande se van a cargar muchos pipeline a la vez en memoria!

results(3)

[((-0.17377374280395919, 0.11628815060000673), Pipeline(memory=None,
           steps=[('extractor', Extractor(features=['x1', 'x2', 'x3'])),
                  ('vectorizer',
                   Vectorizer(degree=2, n_features=4194304,
                              num_types=[<class 'float'>,
                                         <class 'numpy.float64'>])),
                  ('classifier',
                   SGDClassifier(alpha=0.001, average=False, class_weight=None,
                                 early_stopping=False, epsilon=0.1, eta0=0.0,
                                 fit_intercept=True, l1_ratio=0.15,
                                 learning_rate='optimal', loss='log',
                                 max_iter=1000, n_iter_no_change=5, n_jobs=None,
                                 penalty='l2', power_t=0.5, random_state=None,
                                 shuffle=True, tol=0.001, validation_fraction=0.1,
                                 verbose=0, warm_start=False))],
 

In [7]:
best = results(1)[0][1].named_steps['classifier']
idx = list(best.coef_[0].nonzero()[0])
coefs = list(best.coef_[0][idx]) + [best.intercept_[0]]
idx = idx + [None]
list(sorted(zip(coefs, idx), key=lambda coef_idx: abs(coef_idx[0])))

[(-0.05135134906205183, 1895073),
 (-0.17522086774407616, 3674509),
 (0.22589380820274818, 3540263),
 (0.35957813028630814, 610204),
 (-0.443522956457671, 2084643),
 (0.46683769893610566, 1019690),
 (0.6712174093200043, 969760),
 (-0.706163176665084, 477344),
 (-0.7528122945467871, 3270775),
 (0.8392415570481887, None)]