In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col , lag
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window 
from pyspark.sql.functions import isnan, when, count,col, isnull,lit

spark = SparkSession.builder.appName("csv_example").getOrCreate()

df = spark.read.csv('/FileStore/tables/OBS_ASOS_DD_20240108155427.csv',header=True, encoding='cp949')
select_columns = df.columns
print("select_name",select_columns)
selected_columns = [col_name for col_name in df.columns if col_name not in ['지점', '지점명','일시']]

for col_name in selected_columns:
    df = df.withColumn(col_name, df[col_name].cast(IntegerType()))
df.show()


select_name ['지점', '지점명', '일시', '평균기온(°C)', '최저기온(°C)', '최고기온(°C)', '일강수량(mm)', '최대 풍속(m/s)', '평균 이슬점온도(°C)', '최소 상대습도(%)', '평균 상대습도(%)', '합계 일조시간(hr)', '평균 지면온도(°C)', '최저 초상온도(°C)']
+----+------+----------+------------+------------+------------+------------+--------------+-------------------+----------------+----------------+-----------------+-----------------+-----------------+
|지점|지점명|      일시|평균기온(°C)|최저기온(°C)|최고기온(°C)|일강수량(mm)|최대 풍속(m/s)|평균 이슬점온도(°C)|최소 상대습도(%)|평균 상대습도(%)|합계 일조시간(hr)|평균 지면온도(°C)|최저 초상온도(°C)|
+----+------+----------+------------+------------+------------+------------+--------------+-------------------+----------------+----------------+-----------------+-----------------+-----------------+
|  95|  철원|2013-12-01|           0|          -4|           6|        NULL|             3|                 -2|              41|              85|                5|                1|               -9|
|  95|  철원|2013-12-02|           0|          -4|           6|        NULL|         

In [0]:
columns_to_fill_forward = [
    '평균기온(°C)', '최저기온(°C)', '최고기온(°C)', '최대 풍속(m/s)', '평균 지면온도(°C)',
    '최저 초상온도(°C)', '평균 이슬점온도(°C)', '최소 상대습도(%)', '평균 상대습도(%)'
]
for col_name in columns_to_fill_forward:
    window_spec = Window.orderBy("일시")
    df = df.withColumn(col_name, when(col(col_name).isNull(), lag(col_name).over(window_spec)).otherwise(col(col_name)))
df = df.withColumn('합계 일조시간(hr)', when(col('합계 일조시간(hr)').isNull(), lag('합계 일조시간(hr)').over(window_spec)).otherwise(col('합계 일조시간(hr)')))
df = df.withColumn('일강수량(mm)', when(col('일강수량(mm)').isNull(), 0).otherwise(col('일강수량(mm)')))
df.show()

+----+------+----------+------------+------------+------------+------------+--------------+-------------------+----------------+----------------+-----------------+-----------------+-----------------+
|지점|지점명|      일시|평균기온(°C)|최저기온(°C)|최고기온(°C)|일강수량(mm)|최대 풍속(m/s)|평균 이슬점온도(°C)|최소 상대습도(%)|평균 상대습도(%)|합계 일조시간(hr)|평균 지면온도(°C)|최저 초상온도(°C)|
+----+------+----------+------------+------------+------------+------------+--------------+-------------------+----------------+----------------+-----------------+-----------------+-----------------+
|  95|  철원|2013-12-01|           0|          -4|           6|           0|             3|                 -2|              41|              85|                5|                1|               -9|
| 203|  이천|2013-12-01|           2|          -2|           8|           0|             3|                 -3|              40|              69|                6|                2|               -5|
| 261|  해남|2013-12-01|           4|           0|          11|       

In [0]:
df1 = spark.read.csv('/FileStore/tables/CLM_SPI_DD_20240108160110.csv', header=True, encoding='cp949')
  
# Convert the 'SPI1' column to integer type and apply the condition
df1 = df1.withColumn('SPI1', when(col('SPI1') <= -1, 1).otherwise(0).cast(IntegerType()))

# Count the occurrences of each value in the 'SPI1' column
df1.groupBy('SPI1').count().show()

+----+-----+
|SPI1|count|
+----+-----+
|   1| 1910|
|   0| 7558|
+----+-----+



In [0]:
result = df.join(df1, ['지점', '일시'], 'inner')

# 필요없는 열 제거
result1_df = result.drop('지점명', '일시', '지점')


In [0]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import when

# VectorAssembler를 사용하여 feature 벡터 생성
vector_assembler = VectorAssembler(inputCols=result1_df.columns[:-1], outputCol="features")
transformed_df = vector_assembler.transform(result1_df)

# 데이터 분할
train_df, test_df = transformed_df.randomSplit([0.8, 0.2], seed=42)

# 클래스 불균형을 고려하여 가중치 계산
gbt = GBTClassifier(maxIter=100, maxDepth=5, seed=42, labelCol="SPI1")
gbt_model = gbt.fit(train_df)

# 테스트 데이터에 대한 예측
predictions = gbt_model.transform(test_df)

# 모델 성능 평가
evaluator = MulticlassClassificationEvaluator(labelCol="SPI1", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

# 결과 출력
print("Accuracy:", accuracy)


Downloading artifacts:   0%|          | 0/14 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Accuracy: 0.8148959474260679


In [0]:
### PipeLine으로 실행

stage_1 = VectorAssembler(inputCols=result1_df.columns[:-1], outputCol="featured")
stage_2 = GBTClassifier(featuresCol="featured",maxIter=100, maxDepth=5, seed=42, labelCol="SPI1")
pipeline = Pipeline(stages=[stage_1,stage_2])
pipeline_model = pipeline.fit(train_df)

predictions = pipeline_model.transform(test_df)
evaluator_accruacy = MulticlassClassificationEvaluator(labelCol="SPI1", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("정확도",accuracy)

Downloading artifacts:   0%|          | 0/16 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

정확도 0.8148959474260679


In [0]:
display(predictions)

평균기온(°C),최저기온(°C),최고기온(°C),일강수량(mm),최대 풍속(m/s),평균 이슬점온도(°C),최소 상대습도(%),평균 상대습도(%),합계 일조시간(hr),평균 지면온도(°C),최저 초상온도(°C),SPI1,features,featured,rawPrediction,probability,prediction
-16,-18,-8,0,5,-24,33,48,7,-6,-22,1,"Map(vectorType -> dense, length -> 11, values -> List(-16.0, -18.0, -8.0, 0.0, 5.0, -24.0, 33.0, 48.0, 7.0, -6.0, -22.0))","Map(vectorType -> dense, length -> 11, values -> List(-16.0, -18.0, -8.0, 0.0, 5.0, -24.0, 33.0, 48.0, 7.0, -6.0, -22.0))","Map(vectorType -> dense, length -> 2, values -> List(0.4306927058624362, -0.4306927058624362))","Map(vectorType -> dense, length -> 2, values -> List(0.7029500252663139, 0.2970499747336861))",0.0
-14,-19,-8,0,4,-21,29,60,8,0,-10,0,"Map(vectorType -> dense, length -> 11, values -> List(-14.0, -19.0, -8.0, 0.0, 4.0, -21.0, 29.0, 60.0, 8.0, 0.0, -10.0))","Map(vectorType -> dense, length -> 11, values -> List(-14.0, -19.0, -8.0, 0.0, 4.0, -21.0, 29.0, 60.0, 8.0, 0.0, -10.0))","Map(vectorType -> dense, length -> 2, values -> List(1.3774086038654876, -1.3774086038654876))","Map(vectorType -> dense, length -> 2, values -> List(0.9401848317912331, 0.05981516820876687))",0.0
-14,-18,-10,0,2,-20,39,61,8,-5,-12,0,"Map(vectorType -> dense, length -> 11, values -> List(-14.0, -18.0, -10.0, 0.0, 2.0, -20.0, 39.0, 61.0, 8.0, -5.0, -12.0))","Map(vectorType -> dense, length -> 11, values -> List(-14.0, -18.0, -10.0, 0.0, 2.0, -20.0, 39.0, 61.0, 8.0, -5.0, -12.0))","Map(vectorType -> dense, length -> 2, values -> List(0.5538118941407474, -0.5538118941407474))","Map(vectorType -> dense, length -> 2, values -> List(0.7516858467670886, 0.2483141532329114))",0.0
-13,-18,-8,0,3,-24,24,38,8,-7,-18,1,"Map(vectorType -> dense, length -> 11, values -> List(-13.0, -18.0, -8.0, 0.0, 3.0, -24.0, 24.0, 38.0, 8.0, -7.0, -18.0))","Map(vectorType -> dense, length -> 11, values -> List(-13.0, -18.0, -8.0, 0.0, 3.0, -24.0, 24.0, 38.0, 8.0, -7.0, -18.0))","Map(vectorType -> dense, length -> 2, values -> List(0.7916397377165685, -0.7916397377165685))","Map(vectorType -> dense, length -> 2, values -> List(0.8296684703628696, 0.17033152963713039))",0.0
-12,-20,-4,0,2,-18,33,62,8,-8,-22,1,"Map(vectorType -> dense, length -> 11, values -> List(-12.0, -20.0, -4.0, 0.0, 2.0, -18.0, 33.0, 62.0, 8.0, -8.0, -22.0))","Map(vectorType -> dense, length -> 11, values -> List(-12.0, -20.0, -4.0, 0.0, 2.0, -18.0, 33.0, 62.0, 8.0, -8.0, -22.0))","Map(vectorType -> dense, length -> 2, values -> List(0.44639502325953306, -0.44639502325953306))","Map(vectorType -> dense, length -> 2, values -> List(0.7094656044826647, 0.2905343955173353))",0.0
-12,-18,-6,0,2,-20,26,52,8,-6,-20,0,"Map(vectorType -> dense, length -> 11, values -> List(-12.0, -18.0, -6.0, 0.0, 2.0, -20.0, 26.0, 52.0, 8.0, -6.0, -20.0))","Map(vectorType -> dense, length -> 11, values -> List(-12.0, -18.0, -6.0, 0.0, 2.0, -20.0, 26.0, 52.0, 8.0, -6.0, -20.0))","Map(vectorType -> dense, length -> 2, values -> List(0.10538810620830194, -0.10538810620830194))","Map(vectorType -> dense, length -> 2, values -> List(0.5524998310681319, 0.44750016893186806))",0.0
-11,-19,-4,0,4,-17,25,66,8,-6,-22,1,"Map(vectorType -> dense, length -> 11, values -> List(-11.0, -19.0, -4.0, 0.0, 4.0, -17.0, 25.0, 66.0, 8.0, -6.0, -22.0))","Map(vectorType -> dense, length -> 11, values -> List(-11.0, -19.0, -4.0, 0.0, 4.0, -17.0, 25.0, 66.0, 8.0, -6.0, -22.0))","Map(vectorType -> dense, length -> 2, values -> List(-0.33148040472925117, 0.33148040472925117))","Map(vectorType -> dense, length -> 2, values -> List(0.34007482073501477, 0.6599251792649852))",1.0
-11,-17,-5,0,1,-16,43,69,8,0,-11,0,"Map(vectorType -> dense, length -> 11, values -> List(-11.0, -17.0, -5.0, 0.0, 1.0, -16.0, 43.0, 69.0, 8.0, 0.0, -11.0))","Map(vectorType -> dense, length -> 11, values -> List(-11.0, -17.0, -5.0, 0.0, 1.0, -16.0, 43.0, 69.0, 8.0, 0.0, -11.0))","Map(vectorType -> dense, length -> 2, values -> List(1.6681883008110379, -1.6681883008110379))","Map(vectorType -> dense, length -> 2, values -> List(0.965655876276793, 0.034344123723207054))",0.0
-11,-14,-6,0,4,-21,26,42,8,-1,-17,0,"Map(vectorType -> dense, length -> 11, values -> List(-11.0, -14.0, -6.0, 0.0, 4.0, -21.0, 26.0, 42.0, 8.0, -1.0, -17.0))","Map(vectorType -> dense, length -> 11, values -> List(-11.0, -14.0, -6.0, 0.0, 4.0, -21.0, 26.0, 42.0, 8.0, -1.0, -17.0))","Map(vectorType -> dense, length -> 2, values -> List(0.802337355458967, -0.802337355458967))","Map(vectorType -> dense, length -> 2, values -> List(0.8326707267771206, 0.16732927322287938))",0.0
-11,-13,-6,0,5,-22,23,37,8,-5,-14,0,"Map(vectorType -> dense, length -> 11, values -> List(-11.0, -13.0, -6.0, 0.0, 5.0, -22.0, 23.0, 37.0, 8.0, -5.0, -14.0))","Map(vectorType -> dense, length -> 11, values -> List(-11.0, -13.0, -6.0, 0.0, 5.0, -22.0, 23.0, 37.0, 8.0, -5.0, -14.0))","Map(vectorType -> dense, length -> 2, values -> List(1.1661666538658049, -1.1661666538658049))","Map(vectorType -> dense, length -> 2, values -> List(0.9115197023780259, 0.08848029762197407))",0.0


In [0]:
### 하이퍼파라미터 조정코드
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

param_grid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [50, 100, 200]) \
    .addGrid(gbt.maxDepth, [3, 5, 7]) \
    .build()

cross_val = CrossValidator(estimator=gbt,
                           estimatorParamMaps=param_grid,
                           evaluator=evaluator,
                           numFolds=3)

pipeline_01 = Pipeline(stages=[stage_1,cross_val])
pipeline_model_01 = pipeline_01.fit(train_df)
print(pipeline_model_01.stages)
print(type(pipeline_model_01.stages[-1]))


com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:103)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:103)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:714)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:430)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:430)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecutio