Primero se llama a una función que habilita las funcionalidades de Spark en esta sesión.

In [1]:
import findspark
findspark.init()

A continuación se cargan los datos descargados de la página indicada por el tutorial que se encuentran en formato CSV.
Una vez cargados en memoria, se muestra uno de los registros.

In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
sc= SparkContext()
sqlContext = SQLContext(sc)

house_df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('train.csv')
house_df.take(1)

[Row(ID=1, crim=0.00632, zn=18.0, indus=2.31, chas=0, nox=0.538, rm=6.575, age=65.2, dis=4.09, rad=1, tax=296, ptratio=15.3, black=396.9, lstat=4.98, medv=24.0)]

A continuación se muestran las columnas de cada registro para validar que la estructura coincida con la descrita por el proveedor de los datos.

In [3]:
house_df.cache()
house_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- black: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)



Ahora se usa el módulo VectorAssembler para preparar los datos en el formato que requiere el algoritmo, que es una matriz con un vector con los valores de las variables de entrada en una columna, y el listado de los valores de salida en la otra columna.

In [6]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['ID', 'crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'black', 'lstat'], outputCol = 'features')
vhouse_df = vectorAssembler.transform(house_df)
vhouse_df.take(1)

[Row(ID=1, crim=0.00632, zn=18.0, indus=2.31, chas=0, nox=0.538, rm=6.575, age=65.2, dis=4.09, rad=1, tax=296, ptratio=15.3, black=396.9, lstat=4.98, medv=24.0, features=DenseVector([1.0, 0.0063, 18.0, 2.31, 0.0, 0.538, 6.575, 65.2, 4.09, 1.0, 296.0, 15.3, 396.9, 4.98]))]

Con los datos preparados, se muestran algunos registros para visualizar la estructura con la que quedaron.

In [7]:
vhouse_df = vhouse_df.select(['features', 'medv'])
vhouse_df.show(3)

+--------------------+----+
|            features|medv|
+--------------------+----+
|[1.0,0.00632,18.0...|24.0|
|[2.0,0.02731,0.0,...|21.6|
|[4.0,0.03237,0.0,...|33.4|
+--------------------+----+
only showing top 3 rows



En el siguiente paso se divide la colección generada previamente de modo que una parte se usará para el entrenamiento y otra más pequeña para la validación posterior.

In [8]:
splits = vhouse_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

Llega el momento de calcular el modelo, lo cual se hace con la clase LinearRegression. En este caso se realizan 10 iteraciones, y con la función fit se ejecuta el entrenamiento como tal.
Luego se muestran los coeficientes obtenidos.

In [10]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol='medv', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [0.0,-0.026895761647479616,0.0016809042183590526,-0.02088242972506097,4.027246690305449,-0.8491001863067169,4.546433607112968,-0.007751772613229762,-0.5548430910818342,0.0,0.0,-0.5439745978373777,0.011492398377539742,-0.5340979987762313]
Intercept: 9.873740121004404


Los valores de los coeficientes indican la correlación entre las columnas que representan y los valores de salida, siendo aquellos cercanos a cero los que menos influencia tienen.
Ahora se calcula el error medio, que según el tutorial se debe analizar a la par con los valores mínimo y máximo de la variable de salida y su valor promedio, lo cual se hace en el siguiente bloque.

In [11]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 4.688934
r2: 0.706394


In [12]:
train_df.describe().show()

+-------+------------------+
|summary|              medv|
+-------+------------------+
|  count|               231|
|   mean|22.654545454545456|
| stddev| 8.672287880086712|
|    min|               5.6|
|    max|              50.0|
+-------+------------------+



Con el modelo calculado se pueden hacer predicciones usando la colección de datos de prueba, y en este ejemplo se muestran sólo cinco registros.
El valor de R al cuadrado indica el porcentaje de valores que son representados por el modelo obtenido, que si bien es alto, no incluye a la mayoría. Sería óptimo un valor de 90% o más.

In [13]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","medv","features").show(5)

from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="medv",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
|21.911970389472913|20.4|[14.0,0.62976,0.0...|
|21.278192127157062|18.2|[15.0,0.63796,0.0...|
|22.986537309662076|23.1|[17.0,1.05393,0.0...|
|12.757345633273603|12.7|[31.0,1.13081,0.0...|
|19.666229494534186|14.5|[32.0,1.35472,0.0...|
+------------------+----+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.687931


In [14]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 5.70307


Ahora se muestra un resumen del proceso de cálculo, donde se indican las iteraciones realizadas, y los residuos calculados para cada punto de los datos de entrenamiento del mejor modelo generado.

In [15]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 11
objectiveHistory: [0.5000000000000009, 0.43559638992295074, 0.2446733951536692, 0.2189648178145387, 0.18996521723876508, 0.18668164521224045, 0.18544547818184298, 0.18500211685254375, 0.1847540013515694, 0.18469144293499798, 0.1846448822958096]
+-------------------+
|          residuals|
+-------------------+
| -6.095562787911838|
|-3.5491424123211885|
|  3.071528591027473|
|  6.510617882814174|
|0.24606985442336793|
| -4.339900174348621|
|-2.8098134644748285|
| 1.1411217866334837|
|-1.5193145345208805|
| 2.9053671197045503|
|0.25329291185277825|
| 0.4730843494881469|
|-2.1601470251526074|
|-0.5942806209686768|
|-1.6225845145054798|
| -1.148034260831734|
|  1.957127425808526|
| 2.9768314407289154|
|-0.4324113729466639|
|-2.1563863634632945|
+-------------------+
only showing top 20 rows

