## Import Modules

In [1]:
from pyspark.sql import SparkSession
import pyspark.pandas as ps
import numpy as np

from pyspark.sql.functions import col, isnull, count, when

## Set Spark Session

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
prev = spark.conf.get('spark.sql.execution.arrow.pyspark.enabled')
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)
ps.set_option('compute.default_index_type', 'distributed')

# Set its default value back.
#ps.reset_option("compute.default_index_type")
#spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", prev)  

## Load Data

In [4]:
# load data
sdf = spark.read.csv("ParisHousing.csv", inferSchema=True, header=True)
# SQL
sdf.createOrReplaceTempView('dfTable')

                                                                                

## Inspect Data

In [5]:
sdf.printSchema()

root
 |-- squareMeters: integer (nullable = true)
 |-- numberOfRooms: integer (nullable = true)
 |-- hasYard: integer (nullable = true)
 |-- hasPool: integer (nullable = true)
 |-- floors: integer (nullable = true)
 |-- cityCode: integer (nullable = true)
 |-- cityPartRange: integer (nullable = true)
 |-- numPrevOwners: integer (nullable = true)
 |-- made: integer (nullable = true)
 |-- isNewBuilt: integer (nullable = true)
 |-- hasStormProtector: integer (nullable = true)
 |-- basement: integer (nullable = true)
 |-- attic: integer (nullable = true)
 |-- garage: integer (nullable = true)
 |-- hasStorageRoom: integer (nullable = true)
 |-- hasGuestRoom: integer (nullable = true)
 |-- price: double (nullable = true)



In [6]:
sdf.show(3)

+------------+-------------+-------+-------+------+--------+-------------+-------------+----+----------+-----------------+--------+-----+------+--------------+------------+---------+
|squareMeters|numberOfRooms|hasYard|hasPool|floors|cityCode|cityPartRange|numPrevOwners|made|isNewBuilt|hasStormProtector|basement|attic|garage|hasStorageRoom|hasGuestRoom|    price|
+------------+-------------+-------+-------+------+--------+-------------+-------------+----+----------+-----------------+--------+-----+------+--------------+------------+---------+
|       75523|            3|      0|      1|    63|    9373|            3|            8|2005|         0|                1|    4313| 9005|   956|             0|           7|7559081.5|
|       80771|           39|      1|      1|    98|   39381|            8|            6|2015|         1|                0|    3653| 2436|   128|             1|           2|8085989.5|
|       55712|           58|      0|      1|    19|   34457|            6|           

In [7]:
print(f'sdf shape: ({sdf.count()},{len(sdf.columns)})')

sdf shape: (10000,17)


In [8]:
for i in sdf.columns:
    print('{:18}: '.format(i), sdf.select(i).distinct().count(), '\t', [row[i] for row in sdf.select(i).distinct().collect()[:5]])

                                                                                

squareMeters      :  9483 	 [67753, 36131, 11458, 92188, 34061]
numberOfRooms     :  100 	 [31, 85, 65, 53, 78]
hasYard           :  2 	 [1, 0]
hasPool           :  2 	 [1, 0]
floors            :  100 	 [31, 85, 65, 53, 78]
cityCode          :  9509 	 [89878, 16503, 55155, 2142, 73305]
cityPartRange     :  10 	 [1, 6, 3, 5, 9]
numPrevOwners     :  10 	 [1, 6, 3, 5, 9]
made              :  32 	 [1990, 2003, 2007, 2018, 2015]
isNewBuilt        :  2 	 [1, 0]
hasStormProtector :  2 	 [1, 0]
basement          :  6352 	 [2142, 1238, 4519, 2366, 6620]
attic             :  6267 	 [7880, 6466, 6397, 7554, 4101]
garage            :  901 	 [463, 148, 496, 833, 471]
hasStorageRoom    :  2 	 [1, 0]
hasGuestRoom      :  11 	 [1, 6, 3, 5, 9]
price             :  10000 	 [9944705.3, 149234.3, 9626665.8, 2870499.1, 4851198.2]


In [9]:
sdf.select('squareMeters', 'numberOfRooms', 'hasYard', 'hasPool', 'floors', 'cityCode', 'cityPartRange', 'numPrevOwners', 'made', 'isNewBuilt', 'hasStormProtector', 'basement', 'attic', 'garage', 'hasStorageRoom', 'hasGuestRoom', 'price').describe().to_pandas_on_spark()

                                                                                

Unnamed: 0,summary,squareMeters,numberOfRooms,hasYard,hasPool,floors,cityCode,cityPartRange,numPrevOwners,made,isNewBuilt,hasStormProtector,basement,attic,garage,hasStorageRoom,hasGuestRoom,price
0,count,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0
1,mean,49870.1312,50.3584,0.5087,0.4968,50.2763,50225.4861,5.5101,5.5217,2005.4885,0.4991,0.4999,5033.1039,5028.0106,553.1212,0.503,4.9946,4993447.525749963
2,stddev,28774.37535029503,28.81669636927458,0.4999493023602426,0.5000147612582521,28.88917127111252,29006.675799293174,2.87202417160515,2.856666792700276,9.308089589340009,0.5000241918339955,0.5000249918746555,2876.7295448116365,2894.3322098165813,262.0501698906411,0.5000160013441162,3.1764098913678978,2877424.109945015
3,min,89.0,1.0,0.0,0.0,1.0,3.0,1.0,1.0,1990.0,0.0,0.0,0.0,1.0,100.0,0.0,0.0,10313.5
4,max,99999.0,100.0,1.0,1.0,100.0,99953.0,10.0,10.0,2021.0,1.0,1.0,10000.0,10000.0,1000.0,1.0,10.0,10006771.2


In [10]:
quantile = sdf.approxQuantile(sdf.columns, [0.25, 0.5, 0.75], 0)
for i in range(len(sdf.columns)):
    print(f'{sdf.columns[i]}: \n quantile_25={quantile[i][0]} quantile_50={quantile[i][1]} quantile_75={quantile[i][2]}')

                                                                                

squareMeters: 
 quantile_25=25079.0 quantile_50=50102.0 quantile_75=74606.0
numberOfRooms: 
 quantile_25=25.0 quantile_50=50.0 quantile_75=75.0
hasYard: 
 quantile_25=0.0 quantile_50=1.0 quantile_75=1.0
hasPool: 
 quantile_25=0.0 quantile_50=0.0 quantile_75=1.0
floors: 
 quantile_25=25.0 quantile_50=50.0 quantile_75=76.0
cityCode: 
 quantile_25=24666.0 quantile_50=50693.0 quantile_75=75681.0
cityPartRange: 
 quantile_25=3.0 quantile_50=5.0 quantile_75=8.0
numPrevOwners: 
 quantile_25=3.0 quantile_50=5.0 quantile_75=8.0
made: 
 quantile_25=1997.0 quantile_50=2005.0 quantile_75=2014.0
isNewBuilt: 
 quantile_25=0.0 quantile_50=0.0 quantile_75=1.0
hasStormProtector: 
 quantile_25=0.0 quantile_50=0.0 quantile_75=1.0
basement: 
 quantile_25=2559.0 quantile_50=5092.0 quantile_75=7511.0
attic: 
 quantile_25=2512.0 quantile_50=5044.0 quantile_75=7540.0
garage: 
 quantile_25=327.0 quantile_50=554.0 quantile_75=777.0
hasStorageRoom: 
 quantile_25=0.0 quantile_50=1.0 quantile_75=1.0
hasGuestRoom: 

In [11]:
# correlation 相關性
#from pyspark.ml.stat import Correlation
from pyspark.mllib.stat import Statistics

sdf_features = sdf.select('squareMeters', 'numberOfRooms', 'hasYard', 'hasPool', 'floors', 'cityCode', 'cityPartRange', 'numPrevOwners', 'made', 'isNewBuilt', 'hasStormProtector', 'basement', 'attic', 'garage', 'hasStorageRoom', 'hasGuestRoom')
rdd_table = sdf_features.rdd.map(lambda row: row[0:])
corr_matrix = Statistics.corr(rdd_table, method='pearson')
print(corr_matrix)

Traceback (most recent call last):                                              
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError
22/05/04 16:47:05 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/05/04 16:47:05 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
                                                                                

[[ 1.00000000e+00  9.57277600e-03 -6.64985476e-03 -5.59433537e-03
   1.10931387e-03 -1.54055371e-03  8.75822010e-03  1.66186661e-02
  -7.20714802e-03 -1.06671233e-02  7.48003701e-03 -3.96028379e-03
  -5.88037866e-04 -1.72459023e-02 -3.48628971e-03 -6.22953798e-04]
 [ 9.57277600e-03  1.00000000e+00 -1.12400597e-02  1.70154244e-02
   2.22442495e-02  9.03965019e-03  8.34014538e-03  1.67658622e-02
   3.97843162e-03 -2.86497594e-03 -1.65635599e-03 -1.39900157e-02
   1.20606364e-02  2.31877803e-02 -4.75973114e-03 -1.55287913e-02]
 [-6.64985476e-03 -1.12400597e-02  1.00000000e+00  1.55140264e-02
  -8.83125295e-04  6.75990352e-03  5.02334401e-03  4.27942172e-03
   2.21358520e-03 -8.36996070e-03 -7.59767037e-03 -8.55807452e-03
  -3.08491974e-03 -4.62565237e-03 -9.50601021e-03 -7.27572793e-03]
 [-5.59433537e-03  1.70154244e-02  1.55140264e-02  1.00000000e+00
  -4.00633983e-03  8.07198022e-03  1.46125504e-02 -6.84801560e-03
   1.89379810e-03  1.88484166e-04 -1.00130053e-03 -7.26773521e-03
  -1.19

In [None]:
#plottong.plt(sdf.select('squareMeters'))

## Missing data

In [12]:
# missing spark_dataframe
sdf.select([count(when(isnull(c), c)).alias(c) for c in sdf.columns]).to_pandas_on_spark().sum()

squareMeters         0
numberOfRooms        0
hasYard              0
hasPool              0
floors               0
cityCode             0
cityPartRange        0
numPrevOwners        0
made                 0
isNewBuilt           0
hasStormProtector    0
basement             0
attic                0
garage               0
hasStorageRoom       0
hasGuestRoom         0
price                0
dtype: int64

In [13]:
for c in sdf.columns:
    print(c, sdf.where(col(c).isNull()).count())

squareMeters 0
numberOfRooms 0
hasYard 0
hasPool 0
floors 0
cityCode 0
cityPartRange 0
numPrevOwners 0
made 0
isNewBuilt 0
hasStormProtector 0
basement 0
attic 0
garage 0
hasStorageRoom 0
hasGuestRoom 0
price 0


In [14]:
for c in sdf.columns:
    print(c, sdf.filter(col(c).isNull()).count())

squareMeters 0
numberOfRooms 0
hasYard 0
hasPool 0
floors 0
cityCode 0
cityPartRange 0
numPrevOwners 0
made 0
isNewBuilt 0
hasStormProtector 0
basement 0
attic 0
garage 0
hasStorageRoom 0
hasGuestRoom 0
price 0


In [15]:
# missing ps_dataframe
psdf = sdf.to_pandas_on_spark()
psdf.isnull().sum()

squareMeters         0
numberOfRooms        0
hasYard              0
hasPool              0
floors               0
cityCode             0
cityPartRange        0
numPrevOwners        0
made                 0
isNewBuilt           0
hasStormProtector    0
basement             0
attic                0
garage               0
hasStorageRoom       0
hasGuestRoom         0
price                0
dtype: int64

## Model Prepare

In [16]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [17]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

In [18]:
features = list(sdf.columns[:-1])
print(features)

['squareMeters', 'numberOfRooms', 'hasYard', 'hasPool', 'floors', 'cityCode', 'cityPartRange', 'numPrevOwners', 'made', 'isNewBuilt', 'hasStormProtector', 'basement', 'attic', 'garage', 'hasStorageRoom', 'hasGuestRoom']


In [19]:
assembler = VectorAssembler(inputCols=features, 
                            outputCol='features', handleInvalid="keep")

In [20]:
output = assembler.transform(sdf)
output.show(3)

22/05/04 16:47:30 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+------------+-------------+-------+-------+------+--------+-------------+-------------+----+----------+-----------------+--------+-----+------+--------------+------------+---------+--------------------+
|squareMeters|numberOfRooms|hasYard|hasPool|floors|cityCode|cityPartRange|numPrevOwners|made|isNewBuilt|hasStormProtector|basement|attic|garage|hasStorageRoom|hasGuestRoom|    price|            features|
+------------+-------------+-------+-------+------+--------+-------------+-------------+----+----------+-----------------+--------+-----+------+--------------+------------+---------+--------------------+
|       75523|            3|      0|      1|    63|    9373|            3|            8|2005|         0|                1|    4313| 9005|   956|             0|           7|7559081.5|[75523.0,3.0,0.0,...|
|       80771|           39|      1|      1|    98|   39381|            8|            6|2015|         1|                0|    3653| 2436|   128|             1|           2|8085989.5|[8

In [21]:
final_df = output.selectExpr('features', 'price as label')
final_df.show(3)

+--------------------+---------+
|            features|    label|
+--------------------+---------+
|[75523.0,3.0,0.0,...|7559081.5|
|[80771.0,39.0,1.0...|8085989.5|
|[55712.0,58.0,0.0...|5574642.1|
+--------------------+---------+
only showing top 3 rows



In [22]:
train_data, test_data = final_df.randomSplit([0.7, 0.3], seed=42)

In [23]:
train_data.describe().to_pandas_on_spark()

                                                                                

Unnamed: 0,summary,label
0,count,7104.0
1,mean,4966910.833741547
2,stddev,2886153.040712149
3,min,10313.5
4,max,10006771.2


In [24]:
test_data.describe().to_pandas_on_spark()

Unnamed: 0,summary,label
0,count,2896.0
1,mean,5058543.057527635
2,stddev,2855350.286167349
3,min,13229.1
4,max,9998411.0


## 1. Linear Regression

In [25]:
from pyspark.ml import pipeline
from pyspark.ml.regression import LinearRegression

In [26]:
lr = LinearRegression()

In [27]:
lr_model = lr.fit(train_data)

22/05/04 16:47:39 WARN Instrumentation: [94391890] regParam is zero, which might cause numerical instability and overfitting.
22/05/04 16:47:40 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [28]:
unpre_data = test_data.select('features')
prediction = lr_model.transform(unpre_data)
prediction.show(3)

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[123.0,61.0,0.0,0...|13194.961464998556|
|[143.0,27.0,0.0,0...| 17812.20253481645|
|[149.0,28.0,1.0,1...|23825.246280130203|
+--------------------+------------------+
only showing top 3 rows



In [29]:
predict = lr_model.transform(test_data)
predict.show(3)

+--------------------+-------+------------------+
|            features|  label|        prediction|
+--------------------+-------+------------------+
|[123.0,61.0,0.0,0...|13229.1|13194.961464998556|
|[143.0,27.0,0.0,0...|17071.0| 17812.20253481645|
|[149.0,28.0,1.0,1...|23653.1|23825.246280130203|
+--------------------+-------+------------------+
only showing top 3 rows



## Evaluate Linear Regression Model

In [30]:
# 回歸係數
import pandas as pd
pd.DataFrame({'Coefficients':lr_model.coefficients}, index=sdf.columns[:-1])

Unnamed: 0,Coefficients
squareMeters,100.000384
numberOfRooms,0.641165
hasYard,3006.583988
hasPool,2961.059771
floors,54.592653
cityCode,-0.000642
cityPartRange,45.508321
numPrevOwners,1.551113
made,-1.484274
isNewBuilt,119.845654


In [31]:
# 評估誤差-1
res = lr_model.evaluate(test_data)

In [32]:
# 殘差
res.residuals.show()



+-------------------+
|          residuals|
+-------------------+
| 34.138535001444325|
| -741.2025348164498|
| -172.1462801302041|
| -674.1307109770787|
|  67.72837277026338|
| -2661.198823280123|
|-1191.5557354118791|
|-355.17211949058037|
|  748.8388887156252|
|-1903.2873417641022|
|-1330.3612016867992|
| 1253.3220612419973|
|  -1446.12624930944|
| -402.1927624577984|
|  39.82893388398952|
| -646.8377220557595|
| -2808.118655675302|
| -1033.509804676105|
| 1488.7461616748915|
|-280.78234800041537|
+-------------------+
only showing top 20 rows



In [33]:
print('RAE: ', res.meanAbsoluteError)
print('MSE: ', res.meanSquaredError)
print('RMSE: ', res.rootMeanSquaredError)
print('r2: ', res.r2)
print('r2adj: ', res.r2adj)

RAE:  1494.763464072096
MSE:  3694918.1953876577
RMSE:  1922.2170000776857
r2:  0.9999995466474849
r2adj:  0.999999544127985


In [34]:
# 評估誤差-2
evaluatorRMSE = RegressionEvaluator().setLabelCol('label').setPredictionCol('prediction').setMetricName('rmse')
lr_RMSE_test = evaluatorRMSE.evaluate(predict)
print(lr_RMSE_test)

1922.2170000776857


In [35]:
evaluatorR2 = RegressionEvaluator().setLabelCol('label').setPredictionCol('prediction').setMetricName('r2')
lr_R2 = evaluatorR2.evaluate(predict)
lr_R2

0.9999995466474849

## 2. Random Forest

In [36]:
from pyspark.ml.regression import RandomForestRegressor

In [37]:
rf = RandomForestRegressor()

In [38]:
rf_model = rf.fit(train_data)

                                                                                

In [39]:
rf_prediction = rf_model.transform(test_data)
rf_prediction.show(3)

+--------------------+-------+------------------+
|            features|  label|        prediction|
+--------------------+-------+------------------+
|[123.0,61.0,0.0,0...|13229.1|1448624.5544290647|
|[143.0,27.0,0.0,0...|17071.0|1602939.1396955396|
|[149.0,28.0,1.0,1...|23653.1|1405611.2925890624|
+--------------------+-------+------------------+
only showing top 3 rows



## Evaluate Random Forest Model

In [40]:
rf_RMSE = evaluatorRMSE.evaluate(predict)
print('rf_RMSE:', rf_RMSE)

rf_RMSE: 1922.2170000776857


In [41]:
rf_R2 = evaluatorR2.evaluate(predict)
print('rf_R2:', rf_R2)

rf_R2: 0.9999995466474849


## Hyperparameter Tuning

In [43]:
paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxDepth, [10,30])
            .addGrid(rf.maxBins, [10,15])
            .addGrid(rf.numTrees, [8])
            .build())

In [45]:
evaluator = RegressionEvaluator()

In [46]:
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid,
                   evaluator=evaluator, numFolds=7)

In [47]:
cv_model = cv.fit(train_data)

22/05/04 16:52:56 WARN DAGScheduler: Broadcasting large task binary with size 1168.0 KiB
22/05/04 16:53:01 WARN DAGScheduler: Broadcasting large task binary with size 1109.2 KiB
22/05/04 16:53:06 WARN DAGScheduler: Broadcasting large task binary with size 1168.0 KiB
22/05/04 16:53:07 WARN DAGScheduler: Broadcasting large task binary with size 1874.7 KiB
22/05/04 16:53:08 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
22/05/04 16:53:10 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB
22/05/04 16:53:12 WARN DAGScheduler: Broadcasting large task binary with size 4.9 MiB
22/05/04 16:53:14 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
22/05/04 16:53:16 WARN DAGScheduler: Broadcasting large task binary with size 6.6 MiB
22/05/04 16:53:18 WARN DAGScheduler: Broadcasting large task binary with size 7.2 MiB
22/05/04 16:53:19 WARN DAGScheduler: Broadcasting large task binary with size 7.6 MiB
22/05/04 16:53:21 WARN DAGScheduler: Broad

In [48]:
best_rf_model = cv_model.bestModel

In [49]:
f'Best Param(maxBins): {best_rf_model._java_obj.getMaxBins()}'

'Best Param(maxBins): 15'

## Evaluate Tuned Model

In [50]:
best_predict = best_rf_model.transform(test_data)

In [51]:
best_rf_RMSE = evaluatorRMSE.evaluate(best_predict)
print('best_rf_RMSE:', best_rf_RMSE)

best_rf_RMSE: 261558.49694467278


In [52]:
best_rf_R2 = evaluatorR2.evaluate(best_predict)
print('best_rf_R2:', best_rf_R2)

best_rf_R2: 0.9916060018766223


In [53]:
best_rf_model.featureImportances

SparseVector(16, {0: 0.9774, 1: 0.0024, 2: 0.0004, 3: 0.0005, 4: 0.0021, 5: 0.0019, 6: 0.0018, 7: 0.0018, 8: 0.0023, 9: 0.0004, 10: 0.0004, 11: 0.0021, 12: 0.0023, 13: 0.0019, 14: 0.0006, 15: 0.0017})

In [88]:
from pyspark.sql.types import *
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata['ml_attr']['attrs']:
        list_extract = list_extract + dataset.schema[featuresCol].metadata['ml_attr']['attrs'][i]
    varlist = ps.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return varlist.sort_values('score', ascending = False)

In [89]:
ExtractFeatureImp(best_rf_model.featureImportances, final_df, 'features')

                                                                                

Unnamed: 0,idx,name,score
0,0,squareMeters,0.977401
1,1,numberOfRooms,0.002428
12,12,attic,0.002268
8,8,made,0.002259
11,11,basement,0.002145
4,4,floors,0.002051
13,13,garage,0.001921
5,5,cityCode,0.001916
6,6,cityPartRange,0.001805
7,7,numPrevOwners,0.00175


In [67]:
data = [0, (best_rf_model.featureImportances)]
data

[0,
 SparseVector(16, {0: 0.9774, 1: 0.0024, 2: 0.0004, 3: 0.0005, 4: 0.0021, 5: 0.0019, 6: 0.0018, 7: 0.0018, 8: 0.0023, 9: 0.0004, 10: 0.0004, 11: 0.0021, 12: 0.0023, 13: 0.0019, 14: 0.0006, 15: 0.0017})]

In [77]:
['features']

['features']