Lecturer: `Sirasit Lochanachit`

Course: `06026213 Big Data Systems`

Term: `02/2024`

Exercise prepared by `Duangkamon Phobsungnoen (TA)`

## **Exercise 1 : Weather Analysis by Country**

ให้ใช้ข้อมูลสภาพอาการจาก kaggle เพื่อวิเคราะห์การเปลี่ยนแปลงสภาพอากาศในสถานที่ต่าง ๆ โดยใช้การแปลงข้อมูลใน PySpark เพื่อเตรียมข้อมูลสำหรับโมเดล Machine Learning โดยเราจะใช้ air_quality_PM2.5, air_quality_PM10, และ location_name ในการแบ่งกลุ่มสถานที่ตามคุณภาพอากาศ

**STEP 1 : ใช้ kaggle ในการดึงข้อมูล Weather dataset**

- 1.1 ติดตั้ง kaggle

In [None]:
!pip install kaggle



- 1.2 ใช้ kagglehub.dataset_download ในการดาวน์โหลดข้อมูลจาก Kaggle

In [None]:
# ตั้งค่าคีย์ API
import os
os.makedirs('/root/.kaggle', exist_ok=True)

# ดาวน์โหลด dataset
!kaggle datasets download -d nelgiriyewithana/global-weather-repository

# แตกไฟล์ zip
!unzip global-weather-repository.zip -d /content/


Dataset URL: https://www.kaggle.com/datasets/nelgiriyewithana/global-weather-repository
License(s): other
Downloading global-weather-repository.zip to /content
 76% 3.00M/3.96M [00:00<00:00, 5.47MB/s]
100% 3.96M/3.96M [00:00<00:00, 5.81MB/s]
Archive:  global-weather-repository.zip
  inflating: /content/GlobalWeatherRepository.csv  
  inflating: /content/state.db       


**STEP 2 : ตั้งค่า Spark Session และโหลดข้อมูล**

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("GlobalWeather").getOrCreate()

data = spark.read.csv("/content/GlobalWeatherRepository.csv", header=True, inferSchema=True)

data.show(5)

+-----------+----------------+--------+---------+--------------+------------------+-------------------+-------------------+----------------------+--------------+--------+--------+-----------+--------------+-----------+-----------+---------+---------+--------+-----+------------------+---------------------+-------------+----------------+--------+--------+--------+---------------------------+-----------------+----------------------------+---------------------------+-----------------+----------------+------------------------+--------------------------+--------+--------+--------+--------+--------------+-----------------+
|    country|   location_name|latitude|longitude|      timezone|last_updated_epoch|       last_updated|temperature_celsius|temperature_fahrenheit|condition_text|wind_mph|wind_kph|wind_degree|wind_direction|pressure_mb|pressure_in|precip_mm|precip_in|humidity|cloud|feels_like_celsius|feels_like_fahrenheit|visibility_km|visibility_miles|uv_index|gust_mph|gust_kph|air_quality_

**STEP 3 : ใช้ StringIndexer เพื่อแปลง location_name เป็นตัวเลข**

เนื่องจาก location_name เป็นข้อมูลประเภทหมวดหมู่ (Categorical Data) เราจึงใช้ StringIndexer เพื่อแปลงให้เป็นค่าตัวเลขที่สามารถใช้ในโมเดลได้

In [None]:
from pyspark.ml.feature import StringIndexer

indexer_globalWeather = StringIndexer(inputCol='location_name', outputCol='location_index', stringOrderType='frequencyDesc')
indexer_globalWeather

StringIndexer_c04bd29fa0eb

In [None]:
data_indexed = indexer_globalWeather.fit(data).transform(data)
data_indexed.show()

+-------------------+----------------+--------+---------+--------------------+------------------+-------------------+-------------------+----------------------+--------------------+--------+--------+-----------+--------------+-----------+-----------+---------+---------+--------+-----+------------------+---------------------+-------------+----------------+--------+--------+--------+---------------------------+-----------------+----------------------------+---------------------------+-----------------+----------------+------------------------+--------------------------+--------+--------+--------+----------+--------------+-----------------+--------------+
|            country|   location_name|latitude|longitude|            timezone|last_updated_epoch|       last_updated|temperature_celsius|temperature_fahrenheit|      condition_text|wind_mph|wind_kph|wind_degree|wind_direction|pressure_mb|pressure_in|precip_mm|precip_in|humidity|cloud|feels_like_celsius|feels_like_fahrenheit|visibility_km

**STEP 4 : การใช้ Imputer เพื่อเติมค่าที่ขาดหาย**

ในบางกรณี อาจมีข้อมูลที่ขาดหาย เช่น ค่าของ air_quality_PM2.5 หรือ air_quality_PM10 ซึ่งเราสามารถใช้ Imputer เพื่อเติมข้อมูลที่ขาดหายไป

เพิ่มเติม: ใช้ withColumnRenamed() เพื่อเปลี่ยนชื่อคอลัมน์ air_quality_PM2.5 เป็น air_quality_PM2_5 หากไม่เปลี่ยนอาจจะเจอปัญหาเรื่อง (.)

[Imputer Doc](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Imputer.html)

In [None]:
data_indexed = data_indexed.withColumnRenamed('air_quality_PM2.5', 'air_quality_PM2_5')
data_indexed.columns

['country',
 'location_name',
 'latitude',
 'longitude',
 'timezone',
 'last_updated_epoch',
 'last_updated',
 'temperature_celsius',
 'temperature_fahrenheit',
 'condition_text',
 'wind_mph',
 'wind_kph',
 'wind_degree',
 'wind_direction',
 'pressure_mb',
 'pressure_in',
 'precip_mm',
 'precip_in',
 'humidity',
 'cloud',
 'feels_like_celsius',
 'feels_like_fahrenheit',
 'visibility_km',
 'visibility_miles',
 'uv_index',
 'gust_mph',
 'gust_kph',
 'air_quality_Carbon_Monoxide',
 'air_quality_Ozone',
 'air_quality_Nitrogen_dioxide',
 'air_quality_Sulphur_dioxide',
 'air_quality_PM2_5',
 'air_quality_PM10',
 'air_quality_us-epa-index',
 'air_quality_gb-defra-index',
 'sunrise',
 'sunset',
 'moonrise',
 'moonset',
 'moon_phase',
 'moon_illumination',
 'location_index']

In [None]:
from pyspark.ml.feature import Imputer
imputer = Imputer()   # default Strategy = 'mean'
imputer.setInputCols(['air_quality_PM2_5', 'air_quality_PM10'])
imputer.setOutputCols(['pm2_5_imputed', 'pm10_imputed'])
data_imputed = imputer.fit(data_indexed).transform(data_indexed)
data_imputed.show(3)

+-----------+-------------+--------+---------+--------------+------------------+-------------------+-------------------+----------------------+--------------+--------+--------+-----------+--------------+-----------+-----------+---------+---------+--------+-----+------------------+---------------------+-------------+----------------+--------+--------+--------+---------------------------+-----------------+----------------------------+---------------------------+-----------------+----------------+------------------------+--------------------------+--------+--------+--------+--------+--------------+-----------------+--------------+-------------+------------+
|    country|location_name|latitude|longitude|      timezone|last_updated_epoch|       last_updated|temperature_celsius|temperature_fahrenheit|condition_text|wind_mph|wind_kph|wind_degree|wind_direction|pressure_mb|pressure_in|precip_mm|precip_in|humidity|cloud|feels_like_celsius|feels_like_fahrenheit|visibility_km|visibility_miles|uv_

**STEP 5 : ใช้ VectorAssembler เพื่อรวมฟีเจอร์ที่เป็นตัวเลข**

หลังจากที่เราได้ข้อมูลที่สมบูรณ์แล้ว (ไม่มีค่าที่ขาดหาย), เราจะใช้ VectorAssembler เพื่อรวมฟีเจอร์ที่เป็นตัวเลข (เช่น pm2_5_imputed, pm10_imputed, และ location_index) ให้เป็นเวกเตอร์ ซึ่งจะทำให้ข้อมูลพร้อมใช้งานในโมเดล Machine Learning

In [None]:
from pyspark.ml.feature import VectorAssembler
assembler_globalWeather = VectorAssembler(inputCols=["location_index", "pm2_5_imputed", "pm10_imputed"],
                                   outputCol='features')
assembler_globalWeather

VectorAssembler_69883118e28d

In [None]:
data_assembled = assembler_globalWeather.transform(data_imputed)
data_assembled.show(3)

+-----------+-------------+--------+---------+--------------+------------------+-------------------+-------------------+----------------------+--------------+--------+--------+-----------+--------------+-----------+-----------+---------+---------+--------+-----+------------------+---------------------+-------------+----------------+--------+--------+--------+---------------------------+-----------------+----------------------------+---------------------------+-----------------+----------------+------------------------+--------------------------+--------+--------+--------+--------+--------------+-----------------+--------------+-------------+------------+----------------+
|    country|location_name|latitude|longitude|      timezone|last_updated_epoch|       last_updated|temperature_celsius|temperature_fahrenheit|condition_text|wind_mph|wind_kph|wind_degree|wind_direction|pressure_mb|pressure_in|precip_mm|precip_in|humidity|cloud|feels_like_celsius|feels_like_fahrenheit|visibility_km|vis

**STEP 6 : ใช้ StandardScaler เพื่อปรับขนาดข้อมูล**

การสเกลข้อมูลเป็นสิ่งสำคัญในการวิเคราะห์สภาพอากาศ เนื่องจากคุณสมบัติ (features) เช่น PM2.5 และ PM10 อาจมีค่าต่างกันมาก ซึ่งสามารถส่งผลต่อผลลัพธ์ของโมเดลได้ ดังนั้นการใช้ StandardScaler จะทำให้ข้อมูลทุกฟีเจอร์มีค่าเฉลี่ยเป็น 0 และค่าเบี่ยงเบนมาตรฐานเป็น 1

In [None]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scaler_globalWeather = scaler.fit(data_assembled)

In [None]:
data_scaled = scaler_globalWeather.transform(data_assembled)
data_scaled.select("features", "scaledFeatures").show(truncate=False)

+------------------+---------------------------------------------------------------+
|features          |scaledFeatures                                                 |
+------------------+---------------------------------------------------------------+
|[23.0,8.4,26.6]   |[0.4070373205582843,0.18434345834084384,0.1848560804058028]    |
|[53.0,1.1,2.0]    |[0.9379555647647421,0.02414021478272955,0.013898953413970135]  |
|[60.0,10.4,18.4]  |[1.0618364884129154,0.22823475794580667,0.12787037140852522]   |
|[4.0,0.7,0.9]     |[0.0707890992275277,0.015361954861736984,0.0062545290362865606]|
|[28.0,183.4,262.3]|[0.4955236945926939,4.02483217377509,1.8228477402421832]       |
|[175.0,1.2,4.5]   |[3.097023091204337,0.02633477976297769,0.0312726451814328]     |
|[131.0,4.0,5.3]   |[2.318342999701532,0.08778259920992564,0.036832226547020855]   |
|[59.0,0.8,0.9]    |[1.0441392136060335,0.017556519841985127,0.0062545290362865606]|
|[75.0,3.7,5.4]    |[1.3272956105161444,0.08119890426918122,0.037

**STEP 7 : การสร้างโมเดลเพื่อการวิเคราะห์**

เราสามารถใช้โมเดล Machine Learning เช่น KMeans Clustering (สำหรับการแบ่งกลุ่มสถานที่ตามคุณภาพอากาศ)

อ่านเพิ่มเติม: https://spark.apache.org/docs/latest/ml-clustering.html#k-means

ตั้งค่า parameter k = 3 สำหรับ KMeans

In [None]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans().setK(3).setSeed(1)
model = kmeans.fit(data_scaled)

**STEP 8 : การวิเคราะห์ผลลัพธ์**

หลังจากทำการจัดกลุ่มแล้ว เราสามารถวิเคราะห์ผลลัพธ์เพื่อดูว่ากลุ่มใดมีคุณภาพอากาศที่ดีที่สุดหรือแย่ที่สุด เช่น การเปรียบเทียบค่าของ PM2.5 และ PM10 ในแต่ละกลุ่ม และสถานที่ในแต่ละกลุ่มสามารถใช้ในการวางแผนหรือนโยบายต่าง ๆ ได้

In [None]:
from pyspark.ml.evaluation import ClusteringEvaluator

predictions = model.transform(data_scaled)
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.9528116624443366


In [None]:
predictions.show(30)

+--------------------+-------------------+--------+---------+--------------------+------------------+-------------------+-------------------+----------------------+--------------------+--------+--------+-----------+--------------+-----------+-----------+---------+---------+--------+-----+------------------+---------------------+-------------+----------------+--------+--------+--------+---------------------------+-----------------+----------------------------+---------------------------+-----------------+----------------+------------------------+--------------------------+--------+--------+--------+----------+--------------+-----------------+--------------+-------------+------------+------------------+--------------------+----------+
|             country|      location_name|latitude|longitude|            timezone|last_updated_epoch|       last_updated|temperature_celsius|temperature_fahrenheit|      condition_text|wind_mph|wind_kph|wind_degree|wind_direction|pressure_mb|pressure_in|pre

In [None]:
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[97.21320997 20.0396616  33.71955433]
[ 150.          338.16678571 3616.5602381 ]
[ 81.77255985 196.19958011 546.10031952]


In [None]:
# Analyze the composition of each cluster
cluster_counts = predictions.groupBy("prediction").count()
cluster_counts.show()

+----------+-----+
|prediction|count|
+----------+-----+
|         2| 1086|
|         0|49932|
|         1|   42|
+----------+-----+



In [None]:
# Investigate specific clusters
cluster_0_data = predictions.filter(predictions["prediction"] == 0)
cluster_0_data.show()

+-------------------+----------------+--------+---------+--------------------+------------------+-------------------+-------------------+----------------------+--------------------+--------+--------+-----------+--------------+-----------+-----------+---------+---------+--------+-----+------------------+---------------------+-------------+----------------+--------+--------+--------+---------------------------+-----------------+----------------------------+---------------------------+-----------------+----------------+------------------------+--------------------------+--------+--------+--------+----------+--------------+-----------------+--------------+-------------+------------+-----------------+--------------------+----------+
|            country|   location_name|latitude|longitude|            timezone|last_updated_epoch|       last_updated|temperature_celsius|temperature_fahrenheit|      condition_text|wind_mph|wind_kph|wind_degree|wind_direction|pressure_mb|pressure_in|precip_mm|pr

In [None]:
# Investigate specific clusters
cluster_1_data = predictions.filter(predictions["prediction"] == 1)
cluster_1_data.show(5)

+------------+-------------+--------+---------+-----------+------------------+-------------------+-------------------+----------------------+--------------+--------+--------+-----------+--------------+-----------+-----------+---------+---------+--------+-----+------------------+---------------------+-------------+----------------+--------+--------+--------+---------------------------+-----------------+----------------------------+---------------------------+-----------------+----------------+------------------------+--------------------------+--------+--------+--------+--------+---------------+-----------------+--------------+-------------+------------+--------------------+--------------------+----------+
|     country|location_name|latitude|longitude|   timezone|last_updated_epoch|       last_updated|temperature_celsius|temperature_fahrenheit|condition_text|wind_mph|wind_kph|wind_degree|wind_direction|pressure_mb|pressure_in|precip_mm|precip_in|humidity|cloud|feels_like_celsius|feels_

In [None]:
# Investigate specific clusters
cluster_2_data = predictions.filter(predictions["prediction"] == 2)
cluster_2_data.show()

+---------+-------------+--------+---------+----------------+------------------+-------------------+-------------------+----------------------+--------------+--------+--------+-----------+--------------+-----------+-----------+---------+---------+--------+-----+------------------+---------------------+-------------+----------------+--------+--------+--------+---------------------------+-----------------+----------------------------+---------------------------+-----------------+----------------+------------------------+--------------------------+--------+--------+--------+--------+--------------+-----------------+--------------+-------------+------------+-------------------+--------------------+----------+
|  country|location_name|latitude|longitude|        timezone|last_updated_epoch|       last_updated|temperature_celsius|temperature_fahrenheit|condition_text|wind_mph|wind_kph|wind_degree|wind_direction|pressure_mb|pressure_in|precip_mm|precip_in|humidity|cloud|feels_like_celsius|feel

## **Exercise 2 : Predicting House Prices in Boston**

ให้ใช้ข้อมูลราคาบ้านในบอสตันจาก kaggle เพื่อสร้างโมเดลทำนายราคาบ้านในบอสตัน โดยผ่านกระบวนการต่างๆ เช่น การเลือกฟีเจอร์, การรวมฟีเจอร์เป็นเวกเตอร์, การสร้างโมเดล Linear Regression และการประเมินผลด้วย RMSE (Root Mean Squared Error) เพื่อวัดความแม่นยำของโมเดล

**STEP 1 : ใช้ kaggle ในการดึงข้อมูล Boston Housing dataset**

- 1.1 ติดตั้ง kaggle

In [1]:
!pip install kaggle



- 1.2 ใช้ kagglehub.dataset_download ในการดาวน์โหลดข้อมูลจาก Kaggle

In [2]:
# ตั้งค่าคีย์ API
import os
os.makedirs('/root/.kaggle', exist_ok=True)

# ดาวน์โหลด dataset
!kaggle datasets download -d arunjangir245/boston-housing-dataset

# แตกไฟล์ zip
!unzip boston-housing-dataset.zip -d /content/

Dataset URL: https://www.kaggle.com/datasets/arunjangir245/boston-housing-dataset
License(s): DbCL-1.0
Downloading boston-housing-dataset.zip to /content
  0% 0.00/11.6k [00:00<?, ?B/s]
100% 11.6k/11.6k [00:00<00:00, 16.6MB/s]
Archive:  boston-housing-dataset.zip
  inflating: /content/BostonHousing.csv  


**STEP 2 : ตั้งค่า Spark Session และโหลดข้อมูล**

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("BostonHousing").getOrCreate()

data = spark.read.csv("/content/BostonHousing.csv", header=True, inferSchema=True)

data.show(5)

+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
|   crim|  zn|indus|chas|  nox|   rm| age|   dis|rad|tax|ptratio|     b|lstat|medv|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
|0.00632|18.0| 2.31|   0|0.538|6.575|65.2|  4.09|  1|296|   15.3| 396.9| 4.98|24.0|
|0.02731| 0.0| 7.07|   0|0.469|6.421|78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|
|0.02729| 0.0| 7.07|   0|0.469|7.185|61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|
|0.03237| 0.0| 2.18|   0|0.458|6.998|45.8|6.0622|  3|222|   18.7|394.63| 2.94|33.4|
|0.06905| 0.0| 2.18|   0|0.458|7.147|54.2|6.0622|  3|222|   18.7| 396.9| 5.33|36.2|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
only showing top 5 rows



**STEP 3 : การเลือกฟีเจอร์ที่ต้องการใช้ (Feature Selection)**

เลือกเฉพาะคอลัมน์ที่เกี่ยวข้องกับการทำนาย ได้แก่ crim, zn, indus, chas, nox, rm, age, dis, rad, tax, ptratio, b, lstat, medv

In [4]:
data.columns

['crim',
 'zn',
 'indus',
 'chas',
 'nox',
 'rm',
 'age',
 'dis',
 'rad',
 'tax',
 'ptratio',
 'b',
 'lstat',
 'medv']

In [44]:
columns_to_keep = data.columns
selected_data = data.select(*columns_to_keep)
# selected_data = selected_data.na.drop()
selected_data.show(5)

+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
|   crim|  zn|indus|chas|  nox|   rm| age|   dis|rad|tax|ptratio|     b|lstat|medv|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
|0.00632|18.0| 2.31|   0|0.538|6.575|65.2|  4.09|  1|296|   15.3| 396.9| 4.98|24.0|
|0.02731| 0.0| 7.07|   0|0.469|6.421|78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|
|0.02729| 0.0| 7.07|   0|0.469|7.185|61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|
|0.03237| 0.0| 2.18|   0|0.458|6.998|45.8|6.0622|  3|222|   18.7|394.63| 2.94|33.4|
|0.06905| 0.0| 2.18|   0|0.458|7.147|54.2|6.0622|  3|222|   18.7| 396.9| 5.33|36.2|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
only showing top 5 rows



In [45]:
data.count(), selected_data.count()

(506, 506)

**STEP 4 : การแปลงข้อมูลเป็น Feature Vector (Feature Engineering)**

ใช้ VectorAssembler เพื่อรวมคอลัมน์ที่เลือกเป็น feature vector ที่สามารถใช้กับโมเดลได้

In [46]:
from pyspark.ml.feature import VectorAssembler
assembler_housePrice = VectorAssembler(inputCols=["crim", "zn", "indus", "chas",
                                                  "nox", "rm", "age", "dis",
                                                  "rad", "tax", "ptratio",
                                                  "b", "lstat"],
                                   outputCol='features', handleInvalid='skip')
assembler_housePrice

VectorAssembler_d83e46acb167

In [47]:
data_assembled = assembler_housePrice.transform(selected_data)
data_assembled.show(3)

+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+--------------------+
|   crim|  zn|indus|chas|  nox|   rm| age|   dis|rad|tax|ptratio|     b|lstat|medv|            features|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+--------------------+
|0.00632|18.0| 2.31|   0|0.538|6.575|65.2|  4.09|  1|296|   15.3| 396.9| 4.98|24.0|[0.00632,18.0,2.3...|
|0.02731| 0.0| 7.07|   0|0.469|6.421|78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|[0.02731,0.0,7.07...|
|0.02729| 0.0| 7.07|   0|0.469|7.185|61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|[0.02729,0.0,7.07...|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+--------------------+
only showing top 3 rows



**STEP 5 : การเลือกฟีเจอร์ที่สำคัญ (Feature Selection using VectorSlicer)**

เลือกฟีเจอร์ที่สำคัญบางตัวจาก assembled_features โดยใช้ VectorSlicer เพื่อเลือกฟีเจอร์ ตั้งแต่ 0 ถึง 8

In [48]:
from pyspark.ml.feature import VectorSlicer

vs = VectorSlicer(inputCol="features", outputCol="selected_features",
                      indices=list(range(9)))

In [49]:
data_slice = vs.transform(data_assembled)
data_slice.select(['features', 'selected_features']).show(3)

+--------------------+--------------------+
|            features|   selected_features|
+--------------------+--------------------+
|[0.00632,18.0,2.3...|[0.00632,18.0,2.3...|
|[0.02731,0.0,7.07...|[0.02731,0.0,7.07...|
|[0.02729,0.0,7.07...|[0.02729,0.0,7.07...|
+--------------------+--------------------+
only showing top 3 rows



**STEP 6 : การสร้างโมเดล Linear Regression**

ใช้ LinearRegression เพื่อสร้างโมเดลทำนายราคาบ้าน (medv) โดยใช้ฟีเจอร์ที่เลือกไว้

In [50]:
from pyspark.ml.regression import LinearRegression

linearR = LinearRegression(featuresCol='selected_features',
                           labelCol='medv')

**STEP 7 : การสร้าง Pipeline**

รวมทุกขั้นตอน (STEP 4, 5, 6) ที่ได้ทำไว้ใน Pipeline เพื่อให้การประมวลผลเป็นระบบ และสามารถฝึกโมเดลได้

In [51]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[assembler_housePrice, vs, linearR])

**STEP 8 : การแบ่งข้อมูลเป็นชุดฝึกและชุดทดสอบ**

แบ่งข้อมูลออกเป็นชุดฝึก (training set) 80% และชุดทดสอบ (test set) 20% เพื่อฝึกโมเดลและทดสอบประสิทธิภาพ

เพิ่มเติม: ลองใช้ randomSplit ในการแบ่งข้อมูลออกเป็นหลายๆส่วน

In [52]:
train_data, test_data = selected_data.randomSplit([0.8, 0.2], seed=50)

**STEP 9 : การฝึกโมเดล**

ใช้ชุดข้อมูลฝึก (training data) เพื่อฝึกโมเดล

In [53]:
model = pipeline.fit(train_data)

**STEP 10 : การทำนายและประเมินผล**

ใช้โมเดลที่ฝึกแล้วมาทำนายบนชุดข้อมูลทดสอบ (test data) และใช้ RegressionEvaluator เพื่อประเมินผลของโมเดล

Return ค่า RMSE

In [54]:
predictions = model.transform(test_data)
predictions.show(5)

+-------+----+-----+----+------+-----+----+------+---+---+-------+------+-----+----+--------------------+--------------------+------------------+
|   crim|  zn|indus|chas|   nox|   rm| age|   dis|rad|tax|ptratio|     b|lstat|medv|            features|   selected_features|        prediction|
+-------+----+-----+----+------+-----+----+------+---+---+-------+------+-----+----+--------------------+--------------------+------------------+
|0.01501|80.0| 2.01|   0| 0.435|6.635|29.7| 8.344|  4|280|   17.0|390.94| 5.99|24.5|[0.01501,80.0,2.0...|[0.01501,80.0,2.0...|27.951901764273472|
| 0.0187|85.0| 4.15|   0| 0.429|6.516|27.7|8.5353|  4|351|   17.9|392.43| 6.36|23.1|[0.0187,85.0,4.15...|[0.0187,85.0,4.15...|26.937665616270152|
|0.03427| 0.0| 5.19|   0| 0.515|5.869|46.3|5.2311|  5|224|   20.2| 396.9|  9.8|19.5|[0.03427,0.0,5.19...|[0.03427,0.0,5.19...|   19.872937312234|
|0.03445|82.5| 2.03|   0| 0.415|6.162|38.4|  6.27|  2|348|   14.7|393.77| 7.43|24.1|[0.03445,82.5,2.0...|[0.03445,82.5,2.0..

In [55]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="medv", predictionCol="prediction")

In [56]:
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
rmse

5.870106118815353

In [None]:


print(f"Root Mean Squared Error (RMSE): {rmse}")

Root Mean Squared Error (RMSE): 5.789227147392104


**STEP 11: สร้าง 5-Fold Cross Validation Pipeline**

กำหนด Parameter ดังนี้
- regParam: [0.5, 0.1, 0.01]
- elasticNetParam: [0.0, 0.25, 0.75]

และแบ่งข้อมูล Train-test สัดส่วนเป็น 70%-30% โดยใช้ค่า seed = 50

In [57]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = ParamGridBuilder().addGrid(linearR.regParam,
 [0.5, 0.1, 0.01]).addGrid(linearR.elasticNetParam,
  [0.0, 0.25, 0.75]).build()

paramGrid

[{Param(parent='LinearRegression_e5cf4839653f', name='regParam', doc='regularization parameter (>= 0).'): 0.5,
  Param(parent='LinearRegression_e5cf4839653f', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0},
 {Param(parent='LinearRegression_e5cf4839653f', name='regParam', doc='regularization parameter (>= 0).'): 0.5,
  Param(parent='LinearRegression_e5cf4839653f', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.25},
 {Param(parent='LinearRegression_e5cf4839653f', name='regParam', doc='regularization parameter (>= 0).'): 0.5,
  Param(parent='LinearRegression_e5cf4839653f', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.75},
 {Param(parent

In [58]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)
crossval

CrossValidator_4876398c05d2

In [59]:
train_data, test_data = selected_data.randomSplit([0.7,0.3],seed=50)

In [60]:
model = crossval.fit(train_data)

In [61]:
model.extractParamMap()

{Param(parent='CrossValidatorModel_3fd4d692650c', name='seed', doc='random seed.'): -6182074962225257354,
 Param(parent='CrossValidatorModel_3fd4d692650c', name='numFolds', doc='number of folds for cross validation'): 5,
 Param(parent='CrossValidatorModel_3fd4d692650c', name='foldCol', doc="Param for the column name of user specified fold number. Once this is specified, :py:class:`CrossValidator` won't do random k-fold split. Note that this column should be integer type with range [0, numFolds) and Spark will throw exception on out-of-range fold numbers."): '',
 Param(parent='CrossValidatorModel_3fd4d692650c', name='estimator', doc='estimator to be cross-validated'): Pipeline_18d8e99ecb58,
 Param(parent='CrossValidatorModel_3fd4d692650c', name='estimatorParamMaps', doc='estimator param maps'): [{Param(parent='LinearRegression_e5cf4839653f', name='regParam', doc='regularization parameter (>= 0).'): 0.5,
   Param(parent='LinearRegression_e5cf4839653f', name='elasticNetParam', doc='the El

In [62]:
bestModel = model.bestModel
bestParams = bestModel.stages[-1]  # LinearRegression is the last stage in the pipeline

print(f"Best regParam: {bestParams._java_obj.getRegParam()}")
print(f"Best elasticNetParam: {bestParams._java_obj.getElasticNetParam()}")

Best regParam: 0.1
Best elasticNetParam: 0.0


In [63]:
predictions = bestModel.transform(test_data)
predictions.show(5)

+-------+----+-----+----+------+-----+----+------+---+---+-------+------+-----+----+--------------------+--------------------+------------------+
|   crim|  zn|indus|chas|   nox|   rm| age|   dis|rad|tax|ptratio|     b|lstat|medv|            features|   selected_features|        prediction|
+-------+----+-----+----+------+-----+----+------+---+---+-------+------+-----+----+--------------------+--------------------+------------------+
|0.01501|80.0| 2.01|   0| 0.435|6.635|29.7| 8.344|  4|280|   17.0|390.94| 5.99|24.5|[0.01501,80.0,2.0...|[0.01501,80.0,2.0...|27.971481983128832|
| 0.0187|85.0| 4.15|   0| 0.429|6.516|27.7|8.5353|  4|351|   17.9|392.43| 6.36|23.1|[0.0187,85.0,4.15...|[0.0187,85.0,4.15...|26.951666751171466|
|0.02009|95.0| 2.68|   0|0.4161|8.034|31.9| 5.118|  4|224|   14.7|390.55| 2.88|50.0|[0.02009,95.0,2.6...|[0.02009,95.0,2.6...| 43.43913128584396|
|0.02729| 0.0| 7.07|   0| 0.469|7.185|61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|[0.02729,0.0,7.07...|[0.02729,0.0,7.07..

Return ค่า RMSE

In [64]:
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
rmse

5.764406705718443

Return ค่า MAE

In [65]:
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
rmse

3.8655075955351292