In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# SparkSession 초기화
spark = SparkSession.builder.appName("Spark PostgreSQL Example").config("spark.jars", "/path/to/postgresql/jdbc/driver.jar").getOrCreate()

# PostgreSQL 연결 정보 설정
database_url = "jdbc:postgresql://172.24.80.1:5432/postgres"

properties = {"user": "postgres", "driver": "org.postgresql.Driver"}

# PostgreSQL에서 데이터 로드
df = spark.read.jdbc(url=database_url, table="kwater", properties=properties)

# 데이터 확인
df.show()


+----------------+------+------+------+----------+--------+----------+-------+------------+--------+-----------+
|         logTime|  탁도|    pH|  수온|전기전도도|알칼리도|PACS투입률|cluster|원수유입유량|예측탁도|update_time|
+----------------+------+------+------+----------+--------+----------+-------+------------+--------+-----------+
| 2013-01-01 1:00|7.7017|7.6528|2.5044|  314.8864| 99.5002|  19.44016|    0.0|    2425.803|    NULL|       NULL|
| 2013-01-01 2:00|7.2569|7.6505|2.4357|  316.8224| 99.5422| 19.246073|    0.0|   2423.7048|    NULL|       NULL|
| 2013-01-01 3:00| 6.919|7.6477|2.6493|  318.1849| 99.5546|  19.09852|    0.0|   2441.6343|    NULL|       NULL|
| 2013-01-01 4:00|6.9069| 7.648|2.3842|  319.0541| 99.5257| 19.233006|    0.0|   2442.0156|    NULL|       NULL|
| 2013-01-01 5:00|6.7461|7.6477|2.4758|  320.2157| 99.5107| 19.553661|    0.0|    2435.912|    NULL|       NULL|
| 2013-01-01 6:00|6.6437|7.6478|2.5177|   321.575| 99.5215| 19.545351|    0.0|   2441.6343|    NULL|       NULL|
| 2013-01

In [2]:
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor, LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# 클러스터가 0인 데이터만 필터링
df_cluster_0 = df.filter(df['Cluster'] == 0).select("탁도", "pH", "수온", "전기전도도", "알칼리도", "PACS투입률")

# 특성 선택 및 벡터 어셈블러 정의
assembler = VectorAssembler(inputCols=["탁도", "pH", "수온", "전기전도도", "알칼리도"], outputCol="features")
df_assembled = assembler.transform(df_cluster_0)

# 데이터 분할
train_df, test_df = df_assembled.randomSplit([0.8, 0.2])

# 모델 정의
rf = RandomForestRegressor(featuresCol="features", labelCol="PACS투입률")
gbt = GBTRegressor(featuresCol="features", labelCol="PACS투입률")
lr = LinearRegression(featuresCol="features", labelCol="PACS투입률")

# 랜덤포레스트 모델을 위한 파라미터 그리드
paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

# 랜덤포레스트 모델을 위한 교차 검증
crossval_rf = CrossValidator(estimator=rf,
                             estimatorParamMaps=paramGrid_rf,
                             evaluator=RegressionEvaluator(labelCol="PACS투입률", metricName="rmse"),
                             numFolds=3)

# 교차검증, 파라미터 튜닝을 통한 최적 모델로 예측
cvModel_rf = crossval_rf.fit(train_df)
best_rf_model = cvModel_rf.bestModel
rf_predictions = best_rf_model.transform(test_df).withColumnRenamed("prediction", "rf_pred")

# GBT 모델을 위한 파라미터 그리드
paramGrid_gbt = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [10, 20]) \
    .addGrid(gbt.maxIter, [5, 10]) \
    .build()

# GBT 모델을 위한 교차 검증
crossval_gbt = CrossValidator(estimator=gbt,
                              estimatorParamMaps=paramGrid_gbt,
                              evaluator=RegressionEvaluator(labelCol="PACS투입률", metricName="rmse"),
                              numFolds=3)

# 교차검증, 파라미터 튜닝을 통한 최적 모델로 예측
cvModel_gbt = crossval_gbt.fit(train_df)
best_gbt_model = cvModel_gbt.bestModel
gbt_predictions = best_gbt_model.transform(test_df).withColumnRenamed("prediction", "gbt_pred")



lr_model = lr.fit(train_df)
lr_predictions = lr_model.transform(test_df).withColumnRenamed("prediction", "lr_pred")

# 예측 데이터 결합
df_combined = rf_predictions \
    .join(gbt_predictions, ["탁도", "pH", "수온", "전기전도도", "알칼리도", "PACS투입률","features"]) \
    .join(lr_predictions, ["탁도", "pH", "수온", "전기전도도", "알칼리도", "PACS투입률","features"])

# 메타 특성 생성
meta_assembler = VectorAssembler(inputCols=["rf_pred", "gbt_pred", "lr_pred"], outputCol="meta_features")
df_meta_assembled = meta_assembler.transform(df_combined)

# 메타 모델 정의 및 학습
meta_lr = LinearRegression(featuresCol="meta_features", labelCol="PACS투입률")
meta_model = meta_lr.fit(df_meta_assembled)

# 메타 모델을 사용한 예측
meta_predictions = meta_model.transform(df_meta_assembled)

# 모델 평가
evaluator = RegressionEvaluator(labelCol="PACS투입률", metricName="rmse")
rmse = evaluator.evaluate(meta_predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")


Root Mean Squared Error (RMSE) on test data = 2.287942214665739
