Здравствуйте.

# Задание
Ниже обучается и оцениваться модель. 

Нужно перевести этот в Pipeline (вам понадобится VectorAssembler), а затем оценить MAE с помощью spark. И попробовать потюнить параметры модели.


In [129]:
# https://scikit-learn.org/stable/datasets/toy_dataset.html#boston-dataset

from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator

In [130]:
import pandas as pd
from sklearn.datasets import load_diabetes, load_iris, load_boston
from sklearn.metrics import mean_absolute_error

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.ml.regression import RandomForestRegressor

spark = SparkSession.builder\
    .master("local[2]")\
    .appName("Lesson_2")\
    .config("spark.executor.instances",2)\
    .config("spark.executor.memory",'2g')\
    .config("spark.executor.cores",1)\
    .getOrCreate()
sc = spark.sparkContext

In [131]:
import warnings
warnings.filterwarnings("ignore")

In [132]:
data = load_boston()
dataset = pd.DataFrame(data['data'], columns=data['feature_names'])
dataset['target'] = data['target']

cols_to_vector = F.udf(lambda l: Vectors.dense(l), VectorUDT())

spark_dataset = spark.createDataFrame(dataset).select(cols_to_vector(F.array(*data['feature_names'])).alias('features'), 'target').cache()

In [133]:
train, test = spark_dataset.randomSplit([0.7, 0.3])

lr = RandomForestRegressor(labelCol='target')
lr = lr.fit(train)
train_predictions = lr.transform(train)
test_predictions = lr.transform(test)

                                                                                

In [134]:
# Заменить нужно эту часть
pandas_train_predictions = train_predictions.toPandas()
pandas_test_predictions = test_predictions.toPandas()

print(f'''
    Scores:: 
        train: {mean_absolute_error(
            pandas_train_predictions.target, 
            pandas_train_predictions.prediction)}, 
        test: {mean_absolute_error(
            pandas_test_predictions.target, 
            pandas_test_predictions.prediction)}
    ''')


    Scores:: 
        train: 1.9248439814434293, 
        test: 2.1745401466446843
    


Используйте RegressionEvaluator. Evaluation ипортируется следующим образом:


```
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator
```

В частности [RegressionEvaluator](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.RegressionEvaluator.html#pyspark.ml.evaluation.RegressionEvaluator.metricName)

# 1 Переведите в пайплайн

In [135]:
from pyspark.ml import Pipeline

In [136]:
lr = RandomForestRegressor(labelCol='target')

In [137]:
pipe = Pipeline(stages=[lr])
fitted_pipe = pipe.fit(train)

In [138]:
train_pred = fitted_pipe.transform(train)
test_pred = fitted_pipe.transform(test)

train_pred.show(3), test_pred.show(3);

+--------------------+------+------------------+
|            features|target|        prediction|
+--------------------+------+------------------+
|[0.01311,90.0,1.2...|  35.4| 33.59250373927148|
|[0.0136,75.0,4.0,...|  18.9|20.605932795213498|
|[0.01381,80.0,0.4...|  50.0| 46.90220697080697|
+--------------------+------+------------------+
only showing top 3 rows

+--------------------+------+------------------+
|            features|target|        prediction|
+--------------------+------+------------------+
|[0.00632,18.0,2.3...|  24.0|28.985415548361846|
|[0.01432,100.0,1....|  31.6| 31.05084902873346|
|[0.01778,95.0,1.4...|  32.9|  33.1172993200348|
+--------------------+------+------------------+
only showing top 3 rows



# 2 Оцените модель

In [139]:
from pyspark.ml.evaluation import RegressionEvaluator

In [140]:
regr_eval = RegressionEvaluator(predictionCol='prediction',
                                labelCol='target',
                                metricName='mae'
                               )

print(f'train mae: {regr_eval.evaluate(train_pred):0.2f}\ntest mae: {regr_eval.evaluate(test_pred):0.2f}')

train mae: 1.92
test mae: 2.17


# 3 Улучшите модель

В этом вам может помочь gridsearch

```
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder


paramgrid = ParamGridBuilder()\. 
.addGrid()\. 
.addGrid().build(). 
TrainValidationSplit(
    estimator = pipe,  
    evaluator = evaluator,   
    estimatorParamMaps = paramgrid). 
```

In [141]:
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

In [153]:
paramgrid = (ParamGridBuilder().addGrid(lr.maxDepth, [5, 7, 12])
                               .addGrid(lr.numTrees, [15, 20, 25])
                               .addGrid(lr.subsamplingRate, [0.6, 0.8, 1])
                               .build()
            )

In [154]:
tvs = TrainValidationSplit(estimator=lr, 
                           estimatorParamMaps=paramgrid, 
                           evaluator=regr_eval,
                           parallelism=2, 
                           seed=42)

In [155]:
tvsModel = tvs.fit(train)

23/01/22 14:46:37 WARN DAGScheduler: Broadcasting large task binary with size 1065.6 KiB


In [156]:
tvsModel.validationMetrics

[2.3543210729703876,
 2.284143063004749,
 2.184314968620492,
 2.278105813138483,
 2.283447069275894,
 2.2486400987772868,
 2.2527475972267603,
 2.289586170874206,
 2.367354168127326,
 2.3540486839871444,
 2.238095361496401,
 2.1644573980245063,
 2.218304560941528,
 2.403887143839786,
 2.145432628748238,
 2.1546647509663437,
 2.256084084330268,
 2.3299761946014352,
 2.3665339159384113,
 2.1516069796400266,
 2.18613849030703,
 2.1620009971302103,
 2.3195837713111516,
 2.1220314837499648,
 2.0878127246037352,
 2.251026249293778,
 2.3056384295853958]

In [157]:
best_i = tvsModel.validationMetrics.index(min(tvsModel.validationMetrics))

In [158]:
paramgrid[best_i]

{Param(parent='RandomForestRegressor_01703731615d', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 12,
 Param(parent='RandomForestRegressor_01703731615d', name='numTrees', doc='Number of trees to train (>= 1).'): 25,
 Param(parent='RandomForestRegressor_01703731615d', name='subsamplingRate', doc='Fraction of the training data used for learning each decision tree, in range (0, 1].'): 0.6}

In [159]:
print('best params: ', dict(zip(['maxDepth', 'numTrees', 'subsamplingRate'], [paramgrid[best_i][key] for key in paramgrid[best_i]])))

best params:  {'maxDepth': 12, 'numTrees': 25, 'subsamplingRate': 0.6}


Трейн улучшился, тест - немного.

In [160]:
regr_eval.evaluate(tvsModel.transform(train)), regr_eval.evaluate(tvsModel.transform(test))

(1.3983229983020642, 2.0786862820981837)

In [161]:
spark.stop()