# Spark MLib

Nuestra fuente de datos (de nuevo) es ficticia.

**ACLARACIÓN**: El objetivo es mostrar como usar `Spark ML`, no como hacer _machine learning_ de manera exitosa.

In [None]:
import pyspark
from pyspark.sql import SQLContext
sc = pyspark.SparkContext('local[*]')
sqlContext = SQLContext(sc) 

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import LinearRegressionModel
from pyspark.ml.regression import RandomForestRegressionModel
from pyspark.ml.regression import RandomForestRegressor
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vector
from pyspark.mllib.linalg import Vectors

In [None]:
import numpy as np
x = np.random.uniform(1, 100, 1000)
y = np.log(x) + np.random.normal(0, .3, 1000)

In [None]:
%pylab inline
import pylab as pl

pl.figure(figsize=(10,10))
pl.scatter(x, y, s=1, label="log(x) con ruido")
pl.plot(np.arange(1, 100), np.log(np.arange(1, 100)), c="b", label="log(x)")
pl.xlabel("x")
pl.ylabel("f(x) = log(x)")
pl.legend(loc="best")
pl.title("Datos observados")

Guardemos los datos a archivo

In [None]:
with open('data/log_datos.dat', 'w') as o:
    for i in np.arange(1000): 
        o.write('{0:f} {1:f} \n'.format(x[i], y[i]))
    

In [None]:
!head data/log_datos.dat

In [None]:
from pyspark.sql import Row

In [None]:
puntos = sc.textFile("data/log_datos.dat")
Data = Row('x', 'y')

def parseDataToDF(linea):
    cells = linea.strip().split(' ')
    cells[0] = float(cells[0])
    cells[1] = float(cells[1])
    return Data(*cells)

def parseDataToFeatures(linea):
    cells = linea.strip().split(' ')
    cells[0] = float(cells[0])
    cells[1] = float(cells[1])
    return LabeledPoint(cells[1], Vectors.dense(cells[0]))

In [None]:
puntos_rdd = puntos.map(parseDataToDF)
puntos_rdd.first()


In [None]:
puntos_features = puntos.map(parseDataToFeatures)
puntos_features.first()

In [None]:
df = puntos_features.toDF()

In [None]:
lr = LinearRegression()\
  .setMaxIter(10)\
  .setRegParam(0.3)\
  .setElasticNetParam(0.8)

In [None]:
lr_model = lr.fit(df)

In [None]:
lr_transformed = lr_model.transform(df)

In [None]:
import pandas as pd

In [None]:
lr_predictions = lr_transformed.map(lambda r: Row(x = r.features[0].item(),
                                                  y = r.label,
                                                  yp = r.prediction
                                                 )).toDF().toPandas()

In [None]:
lr_predictions

In [None]:
%pylab inline

pl.figure(figsize=(10,10))
pl.scatter(lr_predictions['x'], lr_predictions['yp'], s=20, c="r", label="LR prediccion")
pl.plot(np.arange(1, 100), np.log(np.arange(1, 100)), c="b", label="log(x)")
pl.scatter(lr_predictions['x'], lr_predictions['y'], s=1, label="log(x) con ruido")
pl.xlabel("x")
pl.ylabel("f(x) = log(x)")
pl.legend(loc="best")
pl.title("Datos observados")

In [None]:
rf = RandomForestRegressor(numTrees=3, maxDepth=4)
rf_model = rf.fit(df)

In [None]:
rf_transformed = rf_model.transform(df)

In [None]:
rf_predictions = rf_transformed.map(lambda r: Row(x = r.features[0].item(),
                                                  y = r.label,
                                                  yp = r.prediction
                                                 )).toDF().toPandas()

In [None]:
%pylab inline

pl.figure(figsize=(10,10))
pl.scatter(rf_predictions['x'], rf_predictions['yp'], s=20, c="r", label="RF prediccion")
pl.plot(np.arange(1, 100), np.log(np.arange(1, 100)), c="b", label="log(x)")
pl.scatter(rf_predictions['x'], rf_predictions['y'], s=1, label="log(x) con ruido")
pl.xlabel("x")
pl.ylabel("f(x) = log(x)")
pl.legend(loc="best")
pl.title("Datos observados")
