In [1]:

import pyspark

from pyspark.sql.session import SparkSession
spark = SparkSession.builder.appName("spark_airport").getOrCreate()

In [45]:
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler, VectorIndexer
from pyspark.ml.regression import GBTRegressionModel, GBTRegressor

from pyspark.ml.feature import VectorAssembler


In [46]:
midata = spark.read.csv('./datasets/train.csv', header = 'true', inferSchema = 'true', sep = ','  )   

In [47]:
midata.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- MSSubClass: integer (nullable = true)
 |-- MSZoning: string (nullable = true)
 |-- LotFrontage: string (nullable = true)
 |-- LotArea: integer (nullable = true)
 |-- Street: string (nullable = true)
 |-- Alley: string (nullable = true)
 |-- LotShape: string (nullable = true)
 |-- LandContour: string (nullable = true)
 |-- Utilities: string (nullable = true)
 |-- LotConfig: string (nullable = true)
 |-- LandSlope: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Condition1: string (nullable = true)
 |-- Condition2: string (nullable = true)
 |-- BldgType: string (nullable = true)
 |-- HouseStyle: string (nullable = true)
 |-- OverallQual: integer (nullable = true)
 |-- OverallCond: integer (nullable = true)
 |-- YearBuilt: integer (nullable = true)
 |-- YearRemodAdd: integer (nullable = true)
 |-- RoofStyle: string (nullable = true)
 |-- RoofMatl: string (nullable = true)
 |-- Exterior1st: string (nullable = true)
 |--

Hacemos un cast de nuestras variables a Double

In [5]:
import pyspark.ml.feature as ft
import pyspark.sql.types as typ
midata = midata.withColumn('YearBuilt', midata['YearBuilt'].cast(typ.DoubleType()))
midata = midata.withColumn('SalePrice', midata['SalePrice'].cast(typ.DoubleType()))
midata = midata.withColumn('TotalBsmtSF', midata['TotalBsmtSF'].cast(typ.DoubleType()))
midata = midata.withColumn('OverallQual', midata['OverallQual'].cast(typ.DoubleType()))
midata = midata.withColumn('1stFlrSF', midata['1stFlrSF'].cast(typ.DoubleType()))
midata = midata.withColumn('GrLivArea', midata['GrLivArea'].cast(typ.DoubleType()))
midata = midata.withColumn('GarageArea', midata['GarageArea'].cast(typ.DoubleType()))

In [7]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer  

In [58]:
featuresCreator = VectorAssembler(inputCols=["YearBuilt","TotalBsmtSF","OverallQual","1stFlrSF","GrLivArea","GarageArea", ],outputCol='features')

In [59]:
train_data,test_data=midata.randomSplit([0.80,0.20])

In [60]:
gtb =GBTRegressor(labelCol='SalePrice' ,featuresCol='features', maxIter=100  )

In [61]:
pipeline = Pipeline(stages=[featuresCreator, gtb])

In [62]:
model = pipeline.fit(train_data)

In [63]:
predictions = model.transform(train_data)

In [64]:
predictions.select("prediction", "SalePrice", "features").show()

+------------------+---------+--------------------+
|        prediction|SalePrice|            features|
+------------------+---------+--------------------+
|  197408.686992863|   208500|[2003.0,856.0,7.0...|
|171861.22617900826|   181500|[1976.0,1262.0,6....|
|146539.94096394093|   140000|[1915.0,756.0,7.0...|
|146796.83186831567|   143000|[1993.0,796.0,5.0...|
| 285070.1061040049|   307000|[2004.0,1686.0,8....|
|  138023.381599761|   129500|[1965.0,1040.0,5....|
|342061.98114381643|   345000|[2005.0,1175.0,9....|
|118637.99273480011|   144000|[1962.0,912.0,5.0...|
|259204.59471091512|   279500|[2006.0,1494.0,7....|
|155368.35766589106|   157000|[1960.0,1253.0,6....|
|146545.36808150297|   149000|[1970.0,1004.0,6....|
| 157172.9703201445|   159000|[2004.0,1114.0,5....|
| 145824.9058573306|   139000|[1958.0,1029.0,5....|
|243274.72330742356|   230000|[2002.0,1777.0,8....|
|143148.40227495783|   129900|[1976.0,1040.0,5....|
|143260.92887491133|   154000|[1968.0,1060.0,5....|
| 256504.294

In [65]:
evaluator = RegressionEvaluator(labelCol='SalePrice', predictionCol='prediction',metricName = 'rmse')

Error cuadratico medio mide el promedio de los errores al cuadrado, es decir, la diferencia entre el estimador y lo que se estima

In [66]:
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
print("MSE: %.3f" % rmse)

MSE: 9808.728


El coeficiente de determinacion determina la calidad del modelo para Predecir los resultados

In [67]:
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
print("r2: %.3f" %r2)

r2: 0.984


In [68]:
import pyspark.ml.regression as rg
from pyspark.ml.regression import GBTRegressor
import pyspark.ml.tuning as tune

In [69]:
boot = rg.GBTRegressor(labelCol = 'SalePrice')
grid = tune.ParamGridBuilder().addGrid(boot.maxDepth, [2, 4, 6]).addGrid(boot.maxBins, [20, 60]).addGrid(boot.maxIter, [10, 20]).build()

In [70]:
evaluatorG = RegressionEvaluator(predictionCol = 'prediction', labelCol = 'SalePrice')

CROSS VALIDATION

In [71]:
gbt1 = tune.CrossValidator(estimator = boot, estimatorParamMaps = grid, evaluator = evaluatorG)

In [72]:
pipeline = Pipeline(stages=[featuresCreator])
data_transformer = pipeline.fit(train_data)

In [73]:
rgModel = gbt1.fit(data_transformer.transform(train_data))

In [78]:
data_train = data_transformer.transform(test_data)
results = rgModel.transform(data_train)

print(evaluator.evaluate(results, {evaluatorG.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results, {evaluatorG.metricName: 'areaUnderPR'}))

36973.47067243472
36973.47067243472


In [80]:
results = [
    (
        [
            {key.name: paramValue} 
            for key, paramValue 
            in zip(
                params.keys(), 
                params.values())
        ], metric
    ) 
    for params, metric 
    in zip(
        rgModel.getEstimatorParamMaps(), 
        rgModel.avgMetrics
    )
]

sorted(results, key=lambda el: el[1], reverse=True)[0]

([{'maxDepth': 4}, {'maxBins': 60}, {'maxIter': 10}], 42715.66237601244)

EXTRACCIÓN DE CARACTERÍSTICAS - PCA