PySpark을 로컬머신에 설치하고 노트북을 사용하기 보다는 머신러닝 관련 다양한 라이브러리가 이미 설치되었고 좋은 하드웨어를 제공해주는 Google Colab을 통해 실습을 진행한다.

이를 위해 pyspark과 Py4J 패키지를 설치한다. Py4J 패키지는 파이썬 프로그램이 자바가상머신상의 오브젝트들을 접근할 수 있게 해준다. Local Standalone Spark을 사용한다.

In [1]:
!pip install pyspark==3.0.1 py4j==0.10.9

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==3.0.1
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.2/204.2 MB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m19.8 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612229 sha256=de42f022eda0cd8625192fa17ed56124dcc9faee3bdabcf76e72bb097f8f326e
  Stored in directory: /root/.cache/pip/wheels/19/b0/c8/6cb894117070e130fc44352c2a13f15b6c27e440d04a84fb48
Successfully built pyspark
Installing collected packages: py4j, py

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Taipei Housing Price Prediction") \
    .getOrCreate()

# 타이베이 주택 가격 예측 모델 만들기




데이터셋 설명

이번 문제는 대만 타이베이 시의 신단 지역에서 수집된 주택 거래 관련 정보를 바탕으로 주택 가격(정확히는 주택의 평당 가격)을 예측하는 Regression 모델을 만들어보는 것이다. 총 6개의 피쳐와 주택의 평당 가격에 해당하는 레이블 정보가 훈련 데이터로 제공된다. 레이블의 경우에는 주택의 최종 가격이 아니라 평당 가격이란 점을 다시 한번 강조한다.

각 컬럼에 대한 설명은 아래와 같으며 모든 필드는 X4를 제외하고는 실수 타입이다.

* X1: 주택 거래 날짜를 실수로 제공한다. 소수점 부분은 달을 나타낸다. 예를 들어 2013.250이라면 2013년 3월임을 나타낸다 (0.250 = 3/12)
* X2: 주택 나이 (년수)
* X3: 가장 가까운 지하철역까지의 거리 (미터)
* X4: 주택 근방 걸어갈 수 있는 거리내 편의점 수
* X5: 주택 위치의 위도 (latitude)
* X6: 주택 위치의 경도 (longitude)
* Y: 주택 평당 가격



In [None]:
spark

In [3]:
!wget https://grepp-reco-test.s3.ap-northeast-2.amazonaws.com/Taipei_sindan_housing.csv

--2023-06-16 10:56:34--  https://grepp-reco-test.s3.ap-northeast-2.amazonaws.com/Taipei_sindan_housing.csv
Resolving grepp-reco-test.s3.ap-northeast-2.amazonaws.com (grepp-reco-test.s3.ap-northeast-2.amazonaws.com)... 3.5.141.138
Connecting to grepp-reco-test.s3.ap-northeast-2.amazonaws.com (grepp-reco-test.s3.ap-northeast-2.amazonaws.com)|3.5.141.138|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 20014 (20K) [text/csv]
Saving to: ‘Taipei_sindan_housing.csv’


2023-06-16 10:56:36 (103 KB/s) - ‘Taipei_sindan_housing.csv’ saved [20014/20014]



In [4]:
!ls -tl

total 24
drwxr-xr-x 1 root root  4096 Jun 14 18:27 sample_data
-rw-r--r-- 1 root root 20014 Jul 17  2021 Taipei_sindan_housing.csv


In [5]:
data = spark.read.csv('./Taipei_sindan_housing.csv', header=True, inferSchema=True)

In [6]:
data.printSchema()

root
 |-- X1: double (nullable = true)
 |-- X2: double (nullable = true)
 |-- X3: double (nullable = true)
 |-- X4: integer (nullable = true)
 |-- X5: double (nullable = true)
 |-- X6: double (nullable = true)
 |-- Y: double (nullable = true)



In [7]:
data.show()

+--------+----+--------+---+--------+---------+----+
|      X1|  X2|      X3| X4|      X5|       X6|   Y|
+--------+----+--------+---+--------+---------+----+
|2012.917|32.0|84.87882| 10|24.98298|121.54024|37.9|
|2012.917|19.5|306.5947|  9|24.98034|121.53951|42.2|
|2013.583|13.3|561.9845|  5|24.98746|121.54391|47.3|
|  2013.5|13.3|561.9845|  5|24.98746|121.54391|54.8|
|2012.833| 5.0|390.5684|  5|24.97937|121.54245|43.1|
|2012.667| 7.1| 2175.03|  3|24.96305|121.51254|32.1|
|2012.667|34.5|623.4731|  7|24.97933|121.53642|40.3|
|2013.417|20.3|287.6025|  6|24.98042|121.54228|46.7|
|  2013.5|31.7|5512.038|  1|24.95095|121.48458|18.8|
|2013.417|17.9| 1783.18|  3|24.96731|121.51486|22.1|
|2013.083|34.8|405.2134|  1|24.97349|121.53372|41.4|
|2013.333| 6.3|90.45606|  9|24.97433| 121.5431|58.1|
|2012.917|13.0|492.2313|  5|24.96515|121.53737|39.3|
|2012.667|20.4|2469.645|  4|24.96108|121.51046|23.8|
|  2013.5|13.2|1164.838|  4|24.99156|121.53406|34.3|
|2013.583|35.7|579.2083|  2| 24.9824|121.54619

In [9]:
data.describe().show()

+-------+------------------+------------------+------------------+------------------+--------------------+--------------------+------------------+
|summary|                X1|                X2|                X3|                X4|                  X5|                  X6|                 Y|
+-------+------------------+------------------+------------------+------------------+--------------------+--------------------+------------------+
|  count|               414|               414|               414|               414|                 414|                 414|               414|
|   mean|2013.1489710144933| 17.71256038647343|1083.8856889130436| 4.094202898550725|  24.969030072463745|  121.53336108695667| 37.98019323671498|
| stddev|0.2819672402629999|11.392484533242524| 1262.109595407851|2.9455618056636177|0.012410196590450208|0.015347183004592374|13.606487697735316|
|    min|          2012.667|               0.0|          23.38284|                 0|            24.93207|           1

## 간단한 모델 만들기
- X1을 제외한 나머지의 데이터를 가지고 모델링

In [53]:
features = data.select(data.columns[1:])

In [54]:
features.show()

+----+--------+---+--------+---------+----+
|  X2|      X3| X4|      X5|       X6|   Y|
+----+--------+---+--------+---------+----+
|32.0|84.87882| 10|24.98298|121.54024|37.9|
|19.5|306.5947|  9|24.98034|121.53951|42.2|
|13.3|561.9845|  5|24.98746|121.54391|47.3|
|13.3|561.9845|  5|24.98746|121.54391|54.8|
| 5.0|390.5684|  5|24.97937|121.54245|43.1|
| 7.1| 2175.03|  3|24.96305|121.51254|32.1|
|34.5|623.4731|  7|24.97933|121.53642|40.3|
|20.3|287.6025|  6|24.98042|121.54228|46.7|
|31.7|5512.038|  1|24.95095|121.48458|18.8|
|17.9| 1783.18|  3|24.96731|121.51486|22.1|
|34.8|405.2134|  1|24.97349|121.53372|41.4|
| 6.3|90.45606|  9|24.97433| 121.5431|58.1|
|13.0|492.2313|  5|24.96515|121.53737|39.3|
|20.4|2469.645|  4|24.96108|121.51046|23.8|
|13.2|1164.838|  4|24.99156|121.53406|34.3|
|35.7|579.2083|  2| 24.9824|121.54619|50.5|
| 0.0|292.9978|  6|24.97744|121.54458|70.1|
|17.7|350.8515|  1|24.97544|121.53119|37.4|
|16.9|368.1363|  8| 24.9675|121.54451|42.3|
| 1.5|23.38284|  7|24.96772|121.

In [55]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=data.columns[1:-1], outputCol='features')
data_vec = assembler.transform(features)

In [56]:
data_vec.show()

+----+--------+---+--------+---------+----+--------------------+
|  X2|      X3| X4|      X5|       X6|   Y|            features|
+----+--------+---+--------+---------+----+--------------------+
|32.0|84.87882| 10|24.98298|121.54024|37.9|[32.0,84.87882,10...|
|19.5|306.5947|  9|24.98034|121.53951|42.2|[19.5,306.5947,9....|
|13.3|561.9845|  5|24.98746|121.54391|47.3|[13.3,561.9845,5....|
|13.3|561.9845|  5|24.98746|121.54391|54.8|[13.3,561.9845,5....|
| 5.0|390.5684|  5|24.97937|121.54245|43.1|[5.0,390.5684,5.0...|
| 7.1| 2175.03|  3|24.96305|121.51254|32.1|[7.1,2175.03,3.0,...|
|34.5|623.4731|  7|24.97933|121.53642|40.3|[34.5,623.4731,7....|
|20.3|287.6025|  6|24.98042|121.54228|46.7|[20.3,287.6025,6....|
|31.7|5512.038|  1|24.95095|121.48458|18.8|[31.7,5512.038,1....|
|17.9| 1783.18|  3|24.96731|121.51486|22.1|[17.9,1783.18,3.0...|
|34.8|405.2134|  1|24.97349|121.53372|41.4|[34.8,405.2134,1....|
| 6.3|90.45606|  9|24.97433| 121.5431|58.1|[6.3,90.45606,9.0...|
|13.0|492.2313|  5|24.965

In [57]:
from pyspark.ml.feature import MinMaxScaler

f_scaler = MinMaxScaler(inputCol="features", outputCol="features_scaled")
f_scaler_model = f_scaler.fit(data_vec)
data_vec = f_scaler_model.transform(data_vec)

In [58]:
data_vec.show()

+----+--------+---+--------+---------+----+--------------------+--------------------+
|  X2|      X3| X4|      X5|       X6|   Y|            features|     features_scaled|
+----+--------+---+--------+---------+----+--------------------+--------------------+
|32.0|84.87882| 10|24.98298|121.54024|37.9|[32.0,84.87882,10...|[0.73059360730593...|
|19.5|306.5947|  9|24.98034|121.53951|42.2|[19.5,306.5947,9....|[0.44520547945205...|
|13.3|561.9845|  5|24.98746|121.54391|47.3|[13.3,561.9845,5....|[0.30365296803652...|
|13.3|561.9845|  5|24.98746|121.54391|54.8|[13.3,561.9845,5....|[0.30365296803652...|
| 5.0|390.5684|  5|24.97937|121.54245|43.1|[5.0,390.5684,5.0...|[0.11415525114155...|
| 7.1| 2175.03|  3|24.96305|121.51254|32.1|[7.1,2175.03,3.0,...|[0.16210045662100...|
|34.5|623.4731|  7|24.97933|121.53642|40.3|[34.5,623.4731,7....|[0.78767123287671...|
|20.3|287.6025|  6|24.98042|121.54228|46.7|[20.3,287.6025,6....|[0.46347031963470...|
|31.7|5512.038|  1|24.95095|121.48458|18.8|[31.7,5512.

In [59]:
train, test = data_vec.randomSplit([0.7, 0.3])

In [66]:
from pyspark.ml.regression import RandomForestRegressor

algo = RandomForestRegressor(featuresCol="features_scaled", labelCol="Y")
model = algo.fit(train)

In [67]:
predictions = model.transform(test)

In [68]:
predictions.show()

+---+--------+---+--------+---------+----+--------------------+--------------------+------------------+
| X2|      X3| X4|      X5|       X6|   Y|            features|     features_scaled|        prediction|
+---+--------+---+--------+---------+----+--------------------+--------------------+------------------+
|0.0|274.0144|  1| 24.9748|121.53059|43.5|[0.0,274.0144,1.0...|[0.0,0.0387696192...| 45.14330304703081|
|0.0|274.0144|  1| 24.9748|121.53059|52.2|[0.0,274.0144,1.0...|[0.0,0.0387696192...| 45.14330304703081|
|0.0|338.9679|  9|24.96853|121.54413|50.8|[0.0,338.9679,9.0...|[0.0,0.0488171266...|   48.901020383442|
|1.0|193.5845|  6|24.96571|121.54089|50.7|[1.0,193.5845,6.0...|[0.02283105022831...|48.622495182723036|
|1.1|193.5845|  6|24.96571|121.54089|49.0|[1.1,193.5845,6.0...|[0.02511415525114...|48.622495182723036|
|1.5|23.38284|  7|24.96772|121.54102|49.7|[1.5,23.38284,7.0...|[0.03424657534246...| 51.64949439803301|
|1.7|329.9747|  5|24.98254|121.54395|50.4|[1.7,329.9747,5.0...|[

In [69]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol='Y')
evaluator.evaluate(predictions)

9.334829545287294

## Pipeline 만들기
- 3 가지 모델을 바탕으로 실험
  1. `LinearRegressor`
  2. `RandomForestRegressor`
  3. `GBTRegressor`

### 공통 Feature Transformer

In [118]:
feature_cols = ['X2','X3','X4','X5','X6']
# feature_cols = ['X2','X3','X4']

In [119]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
minmax_scaler = MinMaxScaler(inputCol="features", outputCol="features_scaled")

stages = [assembler, minmax_scaler]

In [120]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol='Y')

In [121]:
df = data.select(feature_cols+['Y'])
df.show()

+----+--------+---+--------+---------+----+
|  X2|      X3| X4|      X5|       X6|   Y|
+----+--------+---+--------+---------+----+
|32.0|84.87882| 10|24.98298|121.54024|37.9|
|19.5|306.5947|  9|24.98034|121.53951|42.2|
|13.3|561.9845|  5|24.98746|121.54391|47.3|
|13.3|561.9845|  5|24.98746|121.54391|54.8|
| 5.0|390.5684|  5|24.97937|121.54245|43.1|
| 7.1| 2175.03|  3|24.96305|121.51254|32.1|
|34.5|623.4731|  7|24.97933|121.53642|40.3|
|20.3|287.6025|  6|24.98042|121.54228|46.7|
|31.7|5512.038|  1|24.95095|121.48458|18.8|
|17.9| 1783.18|  3|24.96731|121.51486|22.1|
|34.8|405.2134|  1|24.97349|121.53372|41.4|
| 6.3|90.45606|  9|24.97433| 121.5431|58.1|
|13.0|492.2313|  5|24.96515|121.53737|39.3|
|20.4|2469.645|  4|24.96108|121.51046|23.8|
|13.2|1164.838|  4|24.99156|121.53406|34.3|
|35.7|579.2083|  2| 24.9824|121.54619|50.5|
| 0.0|292.9978|  6|24.97744|121.54458|70.1|
|17.7|350.8515|  1|24.97544|121.53119|37.4|
|16.9|368.1363|  8| 24.9675|121.54451|42.3|
| 1.5|23.38284|  7|24.96772|121.

In [122]:
train, test = df.randomSplit([0.7, 0.3])

### LinearRegressor

In [124]:
from pyspark.ml.regression import LinearRegression

algo = LinearRegression(featuresCol="features_scaled", labelCol="Y")
lr_stages = stages + [algo]

In [125]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = lr_stages)

In [126]:
LinearRegression.extractParamMap(algo)

{Param(parent='LinearRegression_39eba1cfe932', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LinearRegression_39eba1cfe932', name='solver', doc='The solver algorithm for optimization. Supported options: auto, normal, l-bfgs.'): 'auto',
 Param(parent='LinearRegression_39eba1cfe932', name='standardization', doc='whether to standardize the training features before fitting the model.'): True,
 Param(parent='LinearRegression_39eba1cfe932', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LinearRegression_39eba1cfe932', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LinearRegression_39eba1cfe932', name='predictionCol', doc='prediction column name.'): 'prediction',
 Param(parent='LinearRegression_39eba1cfe932', name='featuresCol', doc='features column name.'): 'features_scaled',

In [127]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(algo.maxIter, [10, 20, 30])
             .build())

cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5
)


In [128]:
# Run cross validations.
cv_lr = cv.fit(train)
lr_cv_predictions = cv_lr.transform(test)
evaluator.evaluate(lr_cv_predictions)

7.7575969174858415

In [129]:
import pandas as pd

params = [{p.name: v for p, v in m.items()} for m in cv_lr.getEstimatorParamMaps()]
pd.DataFrame.from_dict([
    {cv_lr.getEvaluator().getMetricName(): metric, **ps}
    for ps, metric in zip(params, cv_lr.avgMetrics)
])

Unnamed: 0,rmse,maxIter
0,9.313108,10
1,9.313108,20
2,9.313108,30


In [130]:
lr_cv_predictions.select('Y', 'prediction').show()

+---+--------+---+--------+---------+----+--------------------+--------------------+------------------+
| X2|      X3| X4|      X5|       X6|   Y|            features|     features_scaled|        prediction|
+---+--------+---+--------+---------+----+--------------------+--------------------+------------------+
|0.0|185.4296|  0| 24.9711| 121.5317|37.9|[0.0,185.4296,0.0...|[0.0,0.0250666403...|42.993656131035486|
|0.0|185.4296|  0| 24.9711| 121.5317|45.5|[0.0,185.4296,0.0...|[0.0,0.0250666403...|42.993656131035486|
|0.0|185.4296|  0| 24.9711| 121.5317|52.2|[0.0,185.4296,0.0...|[0.0,0.0250666403...|42.993656131035486|
|0.0|185.4296|  0| 24.9711| 121.5317|55.2|[0.0,185.4296,0.0...|[0.0,0.0250666403...|42.993656131035486|
|0.0|185.4296|  0| 24.9711| 121.5317|55.3|[0.0,185.4296,0.0...|[0.0,0.0250666403...|42.993656131035486|
|0.0|274.0144|  1| 24.9748|121.53059|45.4|[0.0,274.0144,1.0...|[0.0,0.0387696192...| 44.75235966896983|
|1.1|193.5845|  6|24.96571|121.54089|45.1|[1.1,193.5845,6.0...|[

### RandomForestRegressor

In [131]:
from pyspark.ml.regression import RandomForestRegressor

algo = RandomForestRegressor(featuresCol="features_scaled", labelCol="Y")
lr_stages = stages + [algo]

In [132]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = lr_stages)

In [133]:
RandomForestRegressor.extractParamMap(algo)

{Param(parent='RandomForestRegressor_77aa072fbfc3', name='seed', doc='random seed.'): 2740162091498019339,
 Param(parent='RandomForestRegressor_77aa072fbfc3', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5,
 Param(parent='RandomForestRegressor_77aa072fbfc3', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 32,
 Param(parent='RandomForestRegressor_77aa072fbfc3', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 1,
 Param(parent='RandomForestRegressor_77aa072fbfc3', name='minInfoGain', doc='Minimum information gain for a split to be considered at a tree node.'): 0.0,
 Param(parent='RandomForestRegressor_77aa072fbf

In [135]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(algo.maxDepth, [3, 6, 9])
             .addGrid(algo.maxBins, [32, 64])
             .build())

cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5
)


In [136]:
# Run cross validations.
cv_rfr = cv.fit(train)
rfr_cv_predictions = cv_rfr.transform(test)
evaluator.evaluate(rfr_cv_predictions)

5.561983447536957

In [137]:
import pandas as pd

params = [{p.name: v for p, v in m.items()} for m in cv_rfr.getEstimatorParamMaps()]
pd.DataFrame.from_dict([
    {cv_rfr.getEvaluator().getMetricName(): metric, **ps}
    for ps, metric in zip(params, cv_rfr.avgMetrics)
])

Unnamed: 0,rmse,maxDepth,maxBins
0,8.210325,3,32
1,8.231432,3,64
2,7.890387,6,32
3,8.06104,6,64
4,8.008037,9,32
5,8.196898,9,64


In [138]:
rfr_cv_predictions.select('Y', 'prediction').show()

+---+--------+---+--------+---------+----+--------------------+--------------------+------------------+
| X2|      X3| X4|      X5|       X6|   Y|            features|     features_scaled|        prediction|
+---+--------+---+--------+---------+----+--------------------+--------------------+------------------+
|0.0|185.4296|  0| 24.9711| 121.5317|37.9|[0.0,185.4296,0.0...|[0.0,0.0250666403...| 51.86789447503139|
|0.0|185.4296|  0| 24.9711| 121.5317|45.5|[0.0,185.4296,0.0...|[0.0,0.0250666403...| 51.86789447503139|
|0.0|185.4296|  0| 24.9711| 121.5317|52.2|[0.0,185.4296,0.0...|[0.0,0.0250666403...| 51.86789447503139|
|0.0|185.4296|  0| 24.9711| 121.5317|55.2|[0.0,185.4296,0.0...|[0.0,0.0250666403...| 51.86789447503139|
|0.0|185.4296|  0| 24.9711| 121.5317|55.3|[0.0,185.4296,0.0...|[0.0,0.0250666403...| 51.86789447503139|
|0.0|274.0144|  1| 24.9748|121.53059|45.4|[0.0,274.0144,1.0...|[0.0,0.0387696192...|47.805040178571424|
|1.1|193.5845|  6|24.96571|121.54089|45.1|[1.1,193.5845,6.0...|[

### GBTRegressor

In [139]:
from pyspark.ml.regression import GBTRegressor

algo = GBTRegressor(featuresCol="features_scaled", labelCol="Y")
lr_stages = stages + [algo]

In [140]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = lr_stages)

In [141]:
GBTRegressor.extractParamMap(algo)

{Param(parent='GBTRegressor_921db267aa12', name='seed', doc='random seed.'): 1404129486762545858,
 Param(parent='GBTRegressor_921db267aa12', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5,
 Param(parent='GBTRegressor_921db267aa12', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 32,
 Param(parent='GBTRegressor_921db267aa12', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 1,
 Param(parent='GBTRegressor_921db267aa12', name='minInfoGain', doc='Minimum information gain for a split to be considered at a tree node.'): 0.0,
 Param(parent='GBTRegressor_921db267aa12', name='maxMemoryInMB', doc='Maximum memory in MB a

In [142]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(algo.maxDepth, [3, 6, 9])
             .addGrid(algo.maxBins, [32, 64])
             .addGrid(algo.maxIter, [10, 20])
             .build())

cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5
)


In [143]:
# Run cross validations.
cv_gbt = cv.fit(train)
gbt_cv_predictions = cv_gbt.transform(test)
evaluator.evaluate(gbt_cv_predictions)

7.114056231385392

In [144]:
import pandas as pd

params = [{p.name: v for p, v in m.items()} for m in cv_gbt.getEstimatorParamMaps()]
pd.DataFrame.from_dict([
    {cv_gbt.getEvaluator().getMetricName(): metric, **ps}
    for ps, metric in zip(params, cv_gbt.avgMetrics)
])

Unnamed: 0,rmse,maxDepth,maxBins,maxIter
0,8.612472,3,32,10
1,8.675049,3,32,20
2,8.756168,3,64,10
3,8.706528,3,64,20
4,9.600942,6,32,10
5,9.655894,6,32,20
6,9.739656,6,64,10
7,9.82887,6,64,20
8,10.134659,9,32,10
9,10.118332,9,32,20


In [145]:
gbt_cv_predictions.select('Y', 'prediction').show()

+----+------------------+
|   Y|        prediction|
+----+------------------+
|37.9|46.267781551556205|
|45.5|46.267781551556205|
|52.2|46.267781551556205|
|55.2|46.267781551556205|
|55.3|46.267781551556205|
|45.4| 50.57533917126201|
|45.1| 49.38365942606742|
|49.0| 49.38365942606742|
|27.0| 27.52025351932325|
|33.4|27.725508633772044|
|45.4| 49.38365942606742|
|36.9| 48.46810983826049|
|55.0| 53.84624341010155|
|31.1| 27.12162313858227|
|47.7| 50.10847434238368|
|36.7| 53.84624341010155|
|28.6|27.725508633772044|
|62.1| 49.97082741349973|
|51.7| 45.52483548534321|
|45.2| 53.32731785158679|
+----+------------------+
only showing top 20 rows



`RandomForest` 모델이 가장 성능이 좋았다.