In [None]:
!pip install pyspark

_**Documentacion:** http://spark.apache.org/docs/latest/ml-features.html_

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("LinearRegression").getOrCreate()

### Transformaciones

In [2]:
df = spark.read.csv(path = "../data/fake_customers.csv", 
                    inferSchema = True, header = True)

df.show()

+-------+----------+-----+
|   Name|     Phone|Group|
+-------+----------+-----+
|   John|4085552424|    A|
|   Mike|3105552738|    B|
| Cassie|4085552424|    B|
|  Laura|3105552438|    B|
|  Sarah|4085551234|    A|
|  David|3105557463|    C|
|   Zach|4085553987|    C|
|  Kiera|3105552938|    A|
|  Alexa|4085559467|    C|
|Karissa|3105553475|    A|
+-------+----------+-----+



In [4]:
df.groupBy('Group').count().show()

+-----+-----+
|Group|count|
+-----+-----+
|    B|    3|
|    C|    3|
|    A|    4|
+-----+-----+



### StringIndexer

Para transformar los valores categoricos a numericos.

In [5]:
from pyspark.ml.feature import StringIndexer

indexer1 = StringIndexer(inputCol = "Group",
                         outputCol = "GroupIndex")

indexed1 = indexer1.fit(df).transform(df)

indexed1.show()

+-------+----------+-----+----------+
|   Name|     Phone|Group|GroupIndex|
+-------+----------+-----+----------+
|   John|4085552424|    A|       0.0|
|   Mike|3105552738|    B|       1.0|
| Cassie|4085552424|    B|       1.0|
|  Laura|3105552438|    B|       1.0|
|  Sarah|4085551234|    A|       0.0|
|  David|3105557463|    C|       2.0|
|   Zach|4085553987|    C|       2.0|
|  Kiera|3105552938|    A|       0.0|
|  Alexa|4085559467|    C|       2.0|
|Karissa|3105553475|    A|       0.0|
+-------+----------+-----+----------+



In [6]:
indexer2 = StringIndexer(inputCol = "Name",
                         outputCol = "NameIndex")

indexed2 = indexer2.fit(indexed1).transform(indexed1)

indexed2.show()

+-------+----------+-----+----------+---------+
|   Name|     Phone|Group|GroupIndex|NameIndex|
+-------+----------+-----+----------+---------+
|   John|4085552424|    A|       0.0|      3.0|
|   Mike|3105552738|    B|       1.0|      7.0|
| Cassie|4085552424|    B|       1.0|      1.0|
|  Laura|3105552438|    B|       1.0|      6.0|
|  Sarah|4085551234|    A|       0.0|      8.0|
|  David|3105557463|    C|       2.0|      2.0|
|   Zach|4085553987|    C|       2.0|      9.0|
|  Kiera|3105552938|    A|       0.0|      5.0|
|  Alexa|4085559467|    C|       2.0|      0.0|
|Karissa|3105553475|    A|       0.0|      4.0|
+-------+----------+-----+----------+---------+



### VectorIndexer
Para transformar los datos a un vector denso. Esto es obligatorio para los modelos de ML en PySpark.

In [7]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols = ["Phone", "GroupIndex", "NameIndex"],
                            outputCol = "features")

output = assembler.transform(indexed2)

output.select(["Phone", "GroupIndex", "NameIndex", "features"]).show()

+----------+----------+---------+--------------------+
|     Phone|GroupIndex|NameIndex|            features|
+----------+----------+---------+--------------------+
|4085552424|       0.0|      3.0|[4.085552424E9,0....|
|3105552738|       1.0|      7.0|[3.105552738E9,1....|
|4085552424|       1.0|      1.0|[4.085552424E9,1....|
|3105552438|       1.0|      6.0|[3.105552438E9,1....|
|4085551234|       0.0|      8.0|[4.085551234E9,0....|
|3105557463|       2.0|      2.0|[3.105557463E9,2....|
|4085553987|       2.0|      9.0|[4.085553987E9,2....|
|3105552938|       0.0|      5.0|[3.105552938E9,0....|
|4085559467|       2.0|      0.0|[4.085559467E9,2....|
|3105553475|       0.0|      4.0|[3.105553475E9,0....|
+----------+----------+---------+--------------------+



In [8]:
output.toPandas()

Unnamed: 0,Name,Phone,Group,GroupIndex,NameIndex,features
0,John,4085552424,A,0.0,3.0,"[4085552424.0, 0.0, 3.0]"
1,Mike,3105552738,B,1.0,7.0,"[3105552738.0, 1.0, 7.0]"
2,Cassie,4085552424,B,1.0,1.0,"[4085552424.0, 1.0, 1.0]"
3,Laura,3105552438,B,1.0,6.0,"[3105552438.0, 1.0, 6.0]"
4,Sarah,4085551234,A,0.0,8.0,"[4085551234.0, 0.0, 8.0]"
5,David,3105557463,C,2.0,2.0,"[3105557463.0, 2.0, 2.0]"
6,Zach,4085553987,C,2.0,9.0,"[4085553987.0, 2.0, 9.0]"
7,Kiera,3105552938,A,0.0,5.0,"[3105552938.0, 0.0, 5.0]"
8,Alexa,4085559467,C,2.0,0.0,"[4085559467.0, 2.0, 0.0]"
9,Karissa,3105553475,A,0.0,4.0,"[3105553475.0, 0.0, 4.0]"


## LinearRegression

In [9]:
from pyspark.ml.regression import LinearRegression

In [10]:
data = spark.read.format("libsvm").load("../data/sample_linear_regression_data.txt")

data.show()

+-------------------+--------------------+
|              label|            features|
+-------------------+--------------------+
| -9.490009878824548|(10,[0,1,2,3,4,5,...|
| 0.2577820163584905|(10,[0,1,2,3,4,5,...|
| -4.438869807456516|(10,[0,1,2,3,4,5,...|
|-19.782762789614537|(10,[0,1,2,3,4,5,...|
| -7.966593841555266|(10,[0,1,2,3,4,5,...|
| -7.896274316726144|(10,[0,1,2,3,4,5,...|
| -8.464803554195287|(10,[0,1,2,3,4,5,...|
| 2.1214592666251364|(10,[0,1,2,3,4,5,...|
| 1.0720117616524107|(10,[0,1,2,3,4,5,...|
|-13.772441561702871|(10,[0,1,2,3,4,5,...|
| -5.082010756207233|(10,[0,1,2,3,4,5,...|
|  7.887786536531237|(10,[0,1,2,3,4,5,...|
| 14.323146365332388|(10,[0,1,2,3,4,5,...|
|-20.057482615789212|(10,[0,1,2,3,4,5,...|
|-0.8995693247765151|(10,[0,1,2,3,4,5,...|
| -19.16829262296376|(10,[0,1,2,3,4,5,...|
|  5.601801561245534|(10,[0,1,2,3,4,5,...|
|-3.2256352187273354|(10,[0,1,2,3,4,5,...|
| 1.5299675726687754|(10,[0,1,2,3,4,5,...|
| -0.250102447941961|(10,[0,1,2,3,4,5,...|
+----------

Este es el formato que Spark usa para hacer ML, dos columnas, una llamada "label" y otra de "features", que serían "y" y "X" respectivamente.

La columna "label" o "y" debe ser una columna numerica, al igual que cuando haciamos modelos de SciKit-Learn.

La columna "features" o "X" esta formada por vectores , estos vectores son los elementos agrupados de las columnas iniciales.

In [11]:
# Inicializamos el modelo
lr = LinearRegression(featuresCol = "features",
                      labelCol = "label",
                      predictionCol = "prediction")

# Nota: si en el DataFrame las columnas tienen otros nombres podemos decirle al modelo que trabaje con esos nombres
# Por convencion "X" se llama "features", "y" se llama "label" y las predicciones se llaman "prediction"
# Y esos son los valores por defecto de los modelo de ML.

# Entrenamos el modelo 
model = lr.fit(data)

In [12]:
# Podemos imprimir los coeficientes de la regresion
print("Coeficientes: {}".format(str(model.coefficients)))
print("\n")
print("Intercepcion:{}".format(str(model.intercept)))

Coeficientes: [0.0073350710225801715,0.8313757584337543,-0.8095307954684084,2.441191686884721,0.5191713795290003,1.1534591903547016,-0.2989124112808717,-0.5128514186201779,-0.619712827067017,0.6956151804322931]


Intercepcion:0.14228558260358093


### Summary

In [21]:
# El modelo tiene el metodo .summary
# Este summary se hace sobre el set de entrenamiento y calcula las metricas del modelo
sumario = model.summary

In [14]:
sumario.residuals.show()
# .residuals es la diferencia entre el valor real y el valor predicho 

print("RMSE: {}".format(sumario.rootMeanSquaredError))
print("r2: {}".format(sumario.r2))

+-------------------+
|          residuals|
+-------------------+
|-11.011130022096554|
| 0.9236590911176538|
|-4.5957401897776675|
|  -20.4201774575836|
|-10.339160314788181|
|-5.9552091439610555|
|-10.726906349283922|
|  2.122807193191233|
|  4.077122222293811|
|-17.316168071241652|
| -4.593044343959059|
|  6.380476690746936|
| 11.320566035059846|
|-20.721971774534094|
| -2.736692773777401|
| -16.66886934252847|
|  8.242186378876315|
|-1.3723486332690233|
|-0.7060332131264666|
|-1.1591135969994064|
+-------------------+
only showing top 20 rows

RMSE: 10.16309157133015
r2: 0.027839179518600154


### Train/Test Split

No existe la función **train_test_split**... Pero los objetos DataFrames de Spark tienen un método que hace lo mismo que **train_test_split**.

In [30]:
data.show(3)

+------------------+--------------------+
|             label|            features|
+------------------+--------------------+
|-9.490009878824548|(10,[0,1,2,3,4,5,...|
|0.2577820163584905|(10,[0,1,2,3,4,5,...|
|-4.438869807456516|(10,[0,1,2,3,4,5,...|
+------------------+--------------------+
only showing top 3 rows



In [31]:
# .randomSplit()

train, test = data.randomSplit(weights = [0.7, 0.3], seed = 42)

In [32]:
train.show(3)

test.show(3)

+-------------------+--------------------+
|              label|            features|
+-------------------+--------------------+
|-28.571478869743427|(10,[0,1,2,3,4,5,...|
|-28.046018037776633|(10,[0,1,2,3,4,5,...|
|-26.736207182601724|(10,[0,1,2,3,4,5,...|
+-------------------+--------------------+
only showing top 3 rows

+-------------------+--------------------+
|              label|            features|
+-------------------+--------------------+
|-26.805483428483072|(10,[0,1,2,3,4,5,...|
|-22.949825936196074|(10,[0,1,2,3,4,5,...|
|-21.432387764165806|(10,[0,1,2,3,4,5,...|
+-------------------+--------------------+
only showing top 3 rows



In [46]:
import pandas as pd

In [51]:
train.select('features').collect()[0][0]

SparseVector(10, {0: -0.4597, 1: -0.5489, 2: 0.3342, 3: -0.1599, 4: -0.731, 5: 0.1824, 6: -0.4839, 7: 0.0814, 8: -0.8401, 9: -0.8896})

In [63]:
model = lr.fit(train)

In [65]:
model.summary.r2

0.07273225877410616

In [66]:
y_hat = model.evaluate(test)

In [67]:
y_hat.r2

-0.14679155085585793

In [68]:
y_hat.predictions.toPandas()

Unnamed: 0,label,features,prediction
0,-26.805483,"(0.4572552704218824, -0.576096954000229, -0.20...",1.500419
1,-22.949826,"(0.4797855980916854, 0.01997502546020402, -0.8...",6.540722
2,-21.432388,"(-0.4785033857256795, 0.520350718059089, -0.29...",1.436978
3,-20.212077,"(0.5609065808412279, -0.9201904391147984, 0.90...",1.315605
4,-19.782763,"(-0.0388509668871313, -0.4166870051763918, 0.8...",-0.095102
...,...,...,...
121,17.403292,"(0.9155980216177384, -0.35593866074295355, 0.4...",-2.867510
122,18.479681,"(0.9635517137863321, 0.9954507816218203, 0.119...",2.136439
123,19.341343,"(-0.32052868280788616, 0.954507993011956, 0.38...",2.736410
124,20.456948,"(-0.21923785332346513, 0.11340668617783778, 0....",-2.750814


In [36]:
y_hat.residuals.show()
print("RMSE: {}".format(y_hat.rootMeanSquaredError))

+-------------------+
|          residuals|
+-------------------+
|-28.305902730922302|
|-29.490547492772325|
| -22.86936529151847|
|-21.527682553818114|
|-19.687660427789638|
| -19.79380269286442|
|-18.994876037916928|
|-16.420875732937652|
|  -20.1251816195632|
|-19.488242333300025|
|-16.690287207468383|
|-17.732540358670345|
| -15.17297252570881|
|-13.517777209767612|
|-20.504549034361794|
| -17.52072428950006|
| -15.06463157411349|
|-18.728361362879223|
|-15.995976443402697|
| -15.16368146542394|
+-------------------+
only showing top 20 rows

RMSE: 11.929738804585622


Hasta aquí llegaría el modelo, ahora vamos a ver como hacer predicciones de datos nuevos.

Vamos a trabajar con la columna de "features" de test, así solo nos quedamos con "X" y no sabemos el valor de "y".

In [37]:
y_hat.r2

-0.14679155085585793

In [69]:
nueva_data = test.select("features")
nueva_data.show(3)

+--------------------+
|            features|
+--------------------+
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
+--------------------+
only showing top 3 rows



In [70]:
# En lugar de usar .evaluate() vamos a usar .transform() y esto nos retorna un nuevo DataFrame
y_hat = model.transform(nueva_data)
y_hat.show()

+--------------------+--------------------+
|            features|          prediction|
+--------------------+--------------------+
|(10,[0,1,2,3,4,5,...|   1.500419302439231|
|(10,[0,1,2,3,4,5,...|   6.540721556576252|
|(10,[0,1,2,3,4,5,...|  1.4369775273526635|
|(10,[0,1,2,3,4,5,...|  1.3156052948594428|
|(10,[0,1,2,3,4,5,...|-0.09510236182489817|
|(10,[0,1,2,3,4,5,...| 0.12648407749270263|
|(10,[0,1,2,3,4,5,...|-0.40745999229762575|
|(10,[0,1,2,3,4,5,...| -1.3827504557268635|
|(10,[0,1,2,3,4,5,...|  2.6965070486236957|
|(10,[0,1,2,3,4,5,...|    2.42284270742401|
|(10,[0,1,2,3,4,5,...|-0.33620505674116263|
|(10,[0,1,2,3,4,5,...|  1.5811910073932323|
|(10,[0,1,2,3,4,5,...| -0.9126865153126812|
|(10,[0,1,2,3,4,5,...| -2.4337353560269603|
|(10,[0,1,2,3,4,5,...|  4.7238640017384945|
|(10,[0,1,2,3,4,5,...|  1.7972086764514907|
|(10,[0,1,2,3,4,5,...| -0.3727532193177282|
|(10,[0,1,2,3,4,5,...|   3.393593882956883|
|(10,[0,1,2,3,4,5,...|   1.173823533651508|
|(10,[0,1,2,3,4,5,...| 0.4009232

### Guardar el modelo

In [None]:
model.save(path = "modelo.model")

In [None]:
################################################################################################################################