## 대기 오염 정보 분석 Hyper Parameter 구하기

여러가지 대기 오염 변수, 지역 변수, 시간 변수로 초 미세먼지(u_fine_dust)수치를 예측하는 기존 모델의 하이퍼 파라미터를 구한다.

In [1]:
from pyspark.sql import SparkSession

In [2]:
MAX_MEMORY = "5g"
spark = SparkSession.builder.appName("air-pollution-degree-analysis")\
                            .config("spark.executor.memory", MAX_MEMORY)\
                            .config("spark.driver.memory", MAX_MEMORY)\
                            .getOrCreate()

22/04/25 18:51:30 WARN Utils: Your hostname, devkhk-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 172.30.1.27 instead (on interface en0)
22/04/25 18:51:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/25 18:51:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
air_pollution_dir = "/Users/devkhk/Documents/public-data-engineering/data/air_pollution_degree/"
air_pollution_df = spark.read.csv(f"file:///{air_pollution_dir}air-pollution-degree.csv", encoding="euc-kr", header=True, inferSchema=True)\
                            .toDF("city", "city2", "region", "region2", "region_code", "measure_date", "sulfur_diox", "fine_dust", "ozone", "nitrogen_diox", "carbon_monox","u_fine_dust")

                                                                                

In [4]:
air_pollution_df.show()

+----+-----+------+-------+-----------+----------------+-----------+---------+-----+-------------+------------+-----------+
|city|city2|region|region2|region_code|    measure_date|sulfur_diox|fine_dust|ozone|nitrogen_diox|carbon_monox|u_fine_dust|
+----+-----+------+-------+-----------+----------------+-----------+---------+-----+-------------+------------+-----------+
|서울| 서울|  중구|   중구|     111121|2020-01-01 01:00|      0.003|    0.036|0.002|          0.5|          24|         19|
|서울| 서울|  중구|   중구|     111121|2020-01-01 02:00|      0.003|    0.039|0.001|          0.6|          25|         21|
|서울| 서울|  중구|   중구|     111121|2020-01-01 03:00|      0.003|    0.037|0.001|          0.9|          29|         23|
|서울| 서울|  중구|   중구|     111121|2020-01-01 04:00|      0.002|    0.036|0.001|          0.6|          26|         22|
|서울| 서울|  중구|   중구|     111121|2020-01-01 05:00|      0.002|    0.035|0.001|          0.6|          25|         19|
|서울| 서울|  중구|   중구|     111121|2020-01-01 06:00|

In [5]:
air_pollution_df.printSchema()

root
 |-- city: string (nullable = true)
 |-- city2: string (nullable = true)
 |-- region: string (nullable = true)
 |-- region2: string (nullable = true)
 |-- region_code: integer (nullable = true)
 |-- measure_date: string (nullable = true)
 |-- sulfur_diox: double (nullable = true)
 |-- fine_dust: double (nullable = true)
 |-- ozone: double (nullable = true)
 |-- nitrogen_diox: double (nullable = true)
 |-- carbon_monox: integer (nullable = true)
 |-- u_fine_dust: integer (nullable = true)



In [6]:
# air_pollution_df = air_pollution_df.withColumnRenamed("시도", "city")

In [7]:
air_pollution_df.createOrReplaceTempView("origin")

In [8]:
query = """
SELECT
    city,
    region,
    region_code,
    TO_DATE(measure_date) as date,
    HOUR(measure_date) as hour,
    sulfur_diox,
    fine_dust,
    ozone,
    nitrogen_diox,
    carbon_monox,
    u_fine_dust
FROM origin
"""
origin_df = spark.sql(query)

In [9]:
spark.sql(query).show(100)

+----+------+-----------+----------+----+-----------+---------+-----+-------------+------------+-----------+
|city|region|region_code|      date|hour|sulfur_diox|fine_dust|ozone|nitrogen_diox|carbon_monox|u_fine_dust|
+----+------+-----------+----------+----+-----------+---------+-----+-------------+------------+-----------+
|서울|  중구|     111121|2020-01-01|   1|      0.003|    0.036|0.002|          0.5|          24|         19|
|서울|  중구|     111121|2020-01-01|   2|      0.003|    0.039|0.001|          0.6|          25|         21|
|서울|  중구|     111121|2020-01-01|   3|      0.003|    0.037|0.001|          0.9|          29|         23|
|서울|  중구|     111121|2020-01-01|   4|      0.002|    0.036|0.001|          0.6|          26|         22|
|서울|  중구|     111121|2020-01-01|   5|      0.002|    0.035|0.001|          0.6|          25|         19|
|서울|  중구|     111121|2020-01-01|   6|      0.002|    0.037|0.001|          0.5|          23|         19|
|서울|  중구|     111121|2020-01-01|   7|      

In [10]:
origin_df.printSchema()

root
 |-- city: string (nullable = true)
 |-- region: string (nullable = true)
 |-- region_code: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- hour: integer (nullable = true)
 |-- sulfur_diox: double (nullable = true)
 |-- fine_dust: double (nullable = true)
 |-- ozone: double (nullable = true)
 |-- nitrogen_diox: double (nullable = true)
 |-- carbon_monox: integer (nullable = true)
 |-- u_fine_dust: integer (nullable = true)



In [11]:
origin_df.describe(["sulfur_diox", "ozone", "nitrogen_diox", "carbon_monox"]).show()

                                                                                

+-------+------------------+------------------+------------------+------------------+
|summary|       sulfur_diox|             ozone|     nitrogen_diox|      carbon_monox|
+-------+------------------+------------------+------------------+------------------+
|  count|           1048575|           1048575|           1048575|           1048575|
|   mean| -33.4756512686748|-49.20555195955904|-36.25456252532455|-20.03172019168872|
| stddev|179.79034530646655|216.23970480747434|188.07656606234713|239.99483465556153|
|    min|            -999.0|            -999.0|            -999.0|              -999|
|    max|              0.09|             0.196|               7.1|               383|
+-------+------------------+------------------+------------------+------------------+



In [12]:
origin_df.describe(["fine_dust", "u_fine_dust"]).show()


[Stage 7:>                                                          (0 + 8) / 8]

+-------+-------------------+-------------------+
|summary|          fine_dust|        u_fine_dust|
+-------+-------------------+-------------------+
|  count|            1048575|            1048575|
|   mean|-34.678330849487836|-29.780616551033546|
| stddev|  182.9309217096852| 222.66068734141197|
|    min|             -999.0|               -999|
|    max|              0.142|                158|
+-------+-------------------+-------------------+




                                                                                

In [13]:
origin_df.createOrReplaceTempView("origin_preprocess")

In [14]:
query = """
SELECT
    *
FROM
    origin_preprocess
WHERE
        sulfur_diox > 0
    and ozone > 0
    and nitrogen_diox > 0
    and carbon_monox > 0
    and carbon_monox < 200
    and fine_dust > 0
    and u_fine_dust > 0

"""

preprocessed_df = spark.sql(query)
preprocessed_df.show()

+----+------+-----------+----------+----+-----------+---------+-----+-------------+------------+-----------+
|city|region|region_code|      date|hour|sulfur_diox|fine_dust|ozone|nitrogen_diox|carbon_monox|u_fine_dust|
+----+------+-----------+----------+----+-----------+---------+-----+-------------+------------+-----------+
|서울|  중구|     111121|2020-01-01|   1|      0.003|    0.036|0.002|          0.5|          24|         19|
|서울|  중구|     111121|2020-01-01|   2|      0.003|    0.039|0.001|          0.6|          25|         21|
|서울|  중구|     111121|2020-01-01|   3|      0.003|    0.037|0.001|          0.9|          29|         23|
|서울|  중구|     111121|2020-01-01|   4|      0.002|    0.036|0.001|          0.6|          26|         22|
|서울|  중구|     111121|2020-01-01|   5|      0.002|    0.035|0.001|          0.6|          25|         19|
|서울|  중구|     111121|2020-01-01|   6|      0.002|    0.037|0.001|          0.5|          23|         19|
|서울|  중구|     111121|2020-01-01|   7|      

In [15]:
# null, None 행 제거
preprocessed_df = preprocessed_df.na.drop("any")

In [16]:
preprocessed_df.show(200)

+----+------+-----------+----------+----+-----------+---------+-----+-------------+------------+-----------+
|city|region|region_code|      date|hour|sulfur_diox|fine_dust|ozone|nitrogen_diox|carbon_monox|u_fine_dust|
+----+------+-----------+----------+----+-----------+---------+-----+-------------+------------+-----------+
|서울|  중구|     111121|2020-01-01|   1|      0.003|    0.036|0.002|          0.5|          24|         19|
|서울|  중구|     111121|2020-01-01|   2|      0.003|    0.039|0.001|          0.6|          25|         21|
|서울|  중구|     111121|2020-01-01|   3|      0.003|    0.037|0.001|          0.9|          29|         23|
|서울|  중구|     111121|2020-01-01|   4|      0.002|    0.036|0.001|          0.6|          26|         22|
|서울|  중구|     111121|2020-01-01|   5|      0.002|    0.035|0.001|          0.6|          25|         19|
|서울|  중구|     111121|2020-01-01|   6|      0.002|    0.037|0.001|          0.5|          23|         19|
|서울|  중구|     111121|2020-01-01|   7|      

In [17]:
preprocessed_df.describe(["sulfur_diox", "ozone", "nitrogen_diox", "carbon_monox"]).show()
preprocessed_df.describe(["fine_dust", "u_fine_dust"]).show()

                                                                                

+-------+--------------------+--------------------+-------------------+-----------------+
|summary|         sulfur_diox|               ozone|      nitrogen_diox|     carbon_monox|
+-------+--------------------+--------------------+-------------------+-----------------+
|  count|              896919|              896919|             896919|           896919|
|   mean| 0.00310931577990743|0.026090863277507065|0.48527860375372783|38.59242807878972|
| stddev|0.001229118795065...|0.019442733724994665|0.21274261886750173| 23.0660859274277|
|    min|              1.0E-4|               0.001|                0.1|                1|
|    max|                0.09|               0.196|                7.1|              199|
+-------+--------------------+--------------------+-------------------+-----------------+





+-------+--------------------+------------------+
|summary|           fine_dust|       u_fine_dust|
+-------+--------------------+------------------+
|  count|              896919|            896919|
|   mean|  0.0231632228774271| 21.30682369310941|
| stddev|0.015494158096744386|15.493787803151514|
|    min|              1.0E-4|                 1|
|    max|               0.142|               152|
+-------+--------------------+------------------+




                                                                                

In [18]:
preprocessed_df.count()

                                                                                

896919

In [19]:
preprocessed_df.printSchema()

root
 |-- city: string (nullable = true)
 |-- region: string (nullable = true)
 |-- region_code: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- hour: integer (nullable = true)
 |-- sulfur_diox: double (nullable = true)
 |-- fine_dust: double (nullable = true)
 |-- ozone: double (nullable = true)
 |-- nitrogen_diox: double (nullable = true)
 |-- carbon_monox: integer (nullable = true)
 |-- u_fine_dust: integer (nullable = true)



In [20]:
train_df, test_df =  preprocessed_df.randomSplit([0.8, 0.2], seed=1)

In [21]:
# pipeline stages 설계
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import VectorAssembler, StandardScaler

stages = []

cat_features = [
    "region_code",
    "hour",
]

std_features = [
    "sulfur_diox",
    "fine_dust",
    "ozone",
    "nitrogen_diox",
    "carbon_monox",
]


In [22]:
for c in cat_features:
    indexer = StringIndexer(inputCol=c , outputCol= c + "_idx").setHandleInvalid("skip")
    onehot = OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol= c+ "_one")
    stages += [indexer, onehot]
for s in std_features:
    vassembler = VectorAssembler(inputCols=[s], outputCol=s + "_vc")
    stdscaler = StandardScaler(inputCol=vassembler.getOutputCol(), outputCol=s + "_std")
    stages += [vassembler, stdscaler]

In [23]:
stages

[StringIndexer_f2bdf32844b8,
 OneHotEncoder_84c303b07ac9,
 StringIndexer_9402ce39d799,
 OneHotEncoder_11b708bdc9b5,
 VectorAssembler_bdeae3e77800,
 StandardScaler_7afb8feadc98,
 VectorAssembler_7f65610a8e69,
 StandardScaler_fb66a5794d9e,
 VectorAssembler_14986d0939b8,
 StandardScaler_540bb8d9e7d7,
 VectorAssembler_b0723c18f8f1,
 StandardScaler_10a1bc253de4,
 VectorAssembler_a84ec5e548b1,
 StandardScaler_453252b463f7]

In [24]:
# vector된 데이터들을 하나로 모으는 assembler
assembler_list = [c + "_one" for c in cat_features ] + [s + "_std" for s in std_features]
assembler = VectorAssembler(inputCols=assembler_list, outputCol="features")
stages += [assembler]

In [26]:
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [81]:
cv_lr = LinearRegression(
    maxIter=30,
    labelCol="u_fine_dust",
    solver="normal",
)

In [82]:
cv_stages = stages + [cv_lr]

In [83]:
cv_pipeline = Pipeline(stages=cv_stages)

In [124]:
param_grid = ParamGridBuilder()\
                .addGrid(cv_lr.regParam, [0.01, 0.02, 0.03, 0.04, 0.05])\
                .addGrid(cv_lr.elasticNetParam, [0.1, 0.2, 0.3, 0.4, 0.05])\
                .build()

In [125]:
cv = CrossValidator(
        estimator=cv_pipeline,
        estimatorParamMaps=param_grid,
        evaluator=RegressionEvaluator(labelCol="u_fine_dust"),
        numFolds=5
)

In [126]:
toy_df = train_df.sample(False, 0.1, seed=1)

In [127]:
cv_model = cv.fit(toy_df)

                                                                                

In [128]:
alpha = cv_model.bestModel.stages[-1]._java_obj.getElasticNetParam()
reg_param = cv_model.bestModel.stages[-1]._java_obj.getRegParam()

In [129]:
cv_model.bestModel.stages[-1]

LinearRegressionModel: uid=LinearRegression_693a03793ced, numFeatures=144

In [130]:
alpha, reg_param

(0.05, 0.01)

## Training

In [131]:
lr = LinearRegression(
    maxIter=100,
    labelCol="u_fine_dust",
    solver="normal",
    regParam=reg_param,
    elasticNetParam=alpha,
)

In [132]:
pipeline = Pipeline(stages=stages)

In [133]:
fitted_pipeline = pipeline.fit(train_df)

                                                                                

In [134]:
vtrain_df = fitted_pipeline.transform(train_df)

In [135]:
model = lr.fit(vtrain_df)

                                                                                

In [136]:
vtest_df = fitted_pipeline.transform(test_df)

In [137]:
prediction = model.transform(vtest_df)

In [138]:
prediction.select("u_fine_dust", "prediction").show()

+-----------+------------------+
|u_fine_dust|        prediction|
+-----------+------------------+
|         12|22.565848497382643|
|         26| 22.61179479657357|
|         22|22.421599870844112|
|         25| 29.03876323468124|
|         28| 30.69290317019555|
|         29| 38.29928217284746|
|         29|32.530432151116706|
|         29| 32.65144096834152|
|         37|38.793767292019666|
|         40| 39.41078509088461|
|         42| 47.83494517758343|
|         43| 48.98513041325915|
|         36| 37.40306985953547|
|         26|23.931340137296896|
|         30| 42.12812618116859|
|         31| 41.01461953096729|
|         26| 30.14923869602041|
|         27|  29.0673356245322|
|         24|24.836461706900046|
|         29| 35.89594522528131|
+-----------+------------------+
only showing top 20 rows



In [139]:
model.summary.rootMeanSquaredError

7.6730683446109955

In [140]:
model.summary.r2

0.7547696073508314

In [141]:
spark.stop()