### *방재기상관측(AWS) 데이터 전처리 시간별*
#### - 250개 시군구 기준으로 날씨를 정리함
> 1. 시군구의 위경도와 센서간의 유클리드 거리 계산
    - 시군구 위경도 기준으로 센서들의 위경도로 유클리드 거리를 추출
> 2. 거리 별 가중치를 계산하여 각 날씨 관련 변수를 계산
    - 정리된 시군구와 센서의 거리 정보를 실제 센서 값을 결합함 (온도가 없는 경우를 제거하여 
      실데이터가 존재하는 센서만 사용)
    - 최고, 최저 온도 발생 시각 및 최고풍속 발생 시각은 초 단위로 계산하여 가중치 적용
    - 결합된 센서들 중 top3에 가중치를 적용하여 가 지역별 변수값 추출
    
#### - 가중치 적용 공식
>  - w1 = 1/2 * (1 - {|d-a1|/(|d-a1| + |d-a2| + |d-a3| + |d-a4|)})
>  - w2 = 1/2 * (1 - {|d-a2|/(|d-a1| + |d-a2| + |d-a3| + |d-a4|)})
>  - w3 = 1/2 * (1 - {|d-a3|/(|d-a1| + |d-a2| + |d-a3| + |d-a4|)})

### - 원천 데이터 리스트
>  - sigungu_xy.csv (시군구)
>  - censerinfo.csv (센서데이터)
>  - OBS_AWS_HH_202001_202006.csv (센서별 날씨관련 정보)
>  - OBS_AWS_HH_202007_202010.csv (센서별 날씨관련 정보)

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit

spark = SparkSession.builder.master("local[6]") \
                    .appName('wtw') \
                    .getOrCreate()
spark

### 1. 250개의 시군구 위경도 로딩하여 전처리

#### 1.1 시군구 위경도 로딩 컬럼 타입 수정 및 sql사용을 위해 데이터셋 등록

In [2]:
SigunguXY = spark\
          .read\
          .option("inferSchema", "true")\
          .option("header", "true")\
          .csv("data/sigungu_xy.csv")
SigunguXY = SigunguXY.withColumn("sigungu_x", SigunguXY.sigungu_x.cast("float")).withColumn("sigungu_y", SigunguXY.sigungu_y.cast("float"))
SigunguXY.createOrReplaceTempView("Sigunguxy")

print(SigunguXY.count())
# 250

250


#### 1.2 검증

In [3]:
print(SigunguXY.count())
print(SigunguXY.distinct().count())
SigunguXY.describe().show()
SigunguXY.show(2)
# 250

250
250
+-------+--------+-------+------------------+------------------+
|summary|    sido|sigungu|         sigungu_x|         sigungu_y|
+-------+--------+-------+------------------+------------------+
|  count|     250|    250|               250|               250|
|   mean|    null|   null|127.63907656860351| 36.41521324157715|
| stddev|    null|   null| 0.912011813226167|1.0729488769413968|
|    min|  강원도| 가평군|         124.67009|         33.254066|
|    max|충청북도| 횡성군|         130.90572|         38.380592|
+-------+--------+-------+------------------+------------------+

+------+-------+---------+---------+
|  sido|sigungu|sigungu_x|sigungu_y|
+------+-------+---------+---------+
|강원도| 강릉시| 128.8759| 37.75211|
|강원도| 고성군|128.46786|38.380592|
+------+-------+---------+---------+
only showing top 2 rows



### 2. 524개의 센서 데이터를 전처리

#### 2.1 데이터 로딩, 컬럼 타입 수정 및 sql사용을 위해 데이터셋 등록

In [4]:
SenserInfo = spark\
          .read\
          .option("inferSchema", "true")\
          .option("header", "true")\
          .csv("data/censerinfo.csv")
SenserInfo = SenserInfo.withColumn("x", SenserInfo.x.cast("float")).withColumn("y", SenserInfo.y.cast("float"))
SenserInfo.createOrReplaceTempView("SenserInfo")

print(SenserInfo.count())
# 1893

1893


#### 2.2 데이터 정리 - 이력관리 되고 있는 센서 위치 정보를 최근으로 데이터로 가져 오고 필요 데이터만 으로 subset을 만듬

In [5]:
# 데이터 확인
print(SenserInfo.distinct().count())

SenserInfo.describe().show()
# SenserInfo.show()
# 1893

1893
+-------+-----------------+-------------------+-------------------+-----------+------------------+------------------+------------------+------------------+------------------+
|summary|         senserid|          startdate|            enddate|sernsername|                 x|                 y|region_1depth_name|region_2depth_name|region_3depth_name|
+-------+-----------------+-------------------+-------------------+-----------+------------------+------------------+------------------+------------------+------------------+
|  count|             1893|               1893|               1893|       1893|              1893|              1893|              1893|              1893|              1893|
|   mean|693.0612783940835|               null|               null|       null|127.53129761419281|36.158902869045704|              null|              null|              null|
| stddev| 174.599815603351|               null|               null|       null|0.9911930175069503|1.3313135706812764|   

#### 2.3 유지되고 있는 센서 이력은 enddate가 공백이고 유지되지 않는 센서는 enddate에 작동 중지 일이 등록 되어 있음 (csv파일에 enddate가 공백인 경우 큰 수 2222년을 등록)

In [6]:
LastSenserInfo = spark.sql("""
SELECT A.senserid, A.sernsername, A.x AS senser_x, A.y AS senser_y,
        A.region_1depth_name AS senser_region_1,  
        A.region_3depth_name AS senser_region_2,  
        A.region_2depth_name AS senser_region_3
FROM SenserInfo AS A

INNER JOIN (
    SELECT senserid, max(enddate) AS lenddate
    FROM SenserInfo
    GROUP BY senserid
    ) AS B
ON A.senserid = B.senserid
AND A.enddate = B.lenddate

WHERE 1 = 1

""")
LastSenserInfo.createOrReplaceTempView("LastSenserInfo")
LastSenserInfo.describe().show()
print(LastSenserInfo.count())
# 524

+-------+-----------------+-----------+------------------+------------------+---------------+---------------+---------------+
|summary|         senserid|sernsername|          senser_x|          senser_y|senser_region_1|senser_region_2|senser_region_3|
+-------+-----------------+-----------+------------------+------------------+---------------+---------------+---------------+
|  count|              524|        524|               524|               524|            524|            524|            524|
|   mean|679.4561068702291|       null|127.52589231593008|36.187192130634806|           null|           null|           null|
| stddev|184.2299668080408|       null|1.0150116146748691| 1.364833816970453|           null|           null|           null|
|    min|               12|     가거도|          124.6305|           33.1221|         강원도|         가산면|         가평군|
|    max|              978|       횡성|          131.8698|           38.5439|       충청북도|         흑산면|         횡성군|
+-------+-------

### 3. 시군구 와 센서의 위경도 값을 이용하여 유클리드 거리 계산 (250 * 524 = 131000의 조합)

#### 3.1 시군구별 전체 센서의 조합을 위해 full join

In [7]:
Sigungu_dist = spark.sql("""
SELECT *
FROM SigunguXY AS A

FULL JOIN LastSenserInfo AS B

WHERE 1 = 1
-- AND A.sigungu = '강릉시'

""")
Sigungu_dist.createOrReplaceTempView("Sigungu_dist")
print(Sigungu_dist.count())
Sigungu_dist.show(2)
# 250 * 524 = 131000

131000
+------+-------+---------+---------+--------+-----------+--------+--------+---------------+---------------+---------------+
|  sido|sigungu|sigungu_x|sigungu_y|senserid|sernsername|senser_x|senser_y|senser_region_1|senser_region_2|senser_region_3|
+------+-------+---------+---------+--------+-----------+--------+--------+---------------+---------------+---------------+
|강원도| 강릉시| 128.8759| 37.75211|      12| 안면도(감)|126.3167| 36.5333|       충청남도|         안면읍|         태안군|
|강원도| 고성군|128.46786|38.380592|      12| 안면도(감)|126.3167| 36.5333|       충청남도|         안면읍|         태안군|
+------+-------+---------+---------+--------+-----------+--------+--------+---------------+---------------+---------------+
only showing top 2 rows



#### 3.2 유클리드 거리 및 순위 추출 (pyspark dataframe에서 처리 어려워 pandas 객체로 변환하여처리)

In [1]:
# 유클리드 거리 계산 함수
from scipy.spatial import distance
print(distance.euclidean((128.8759061, 37.75210808), (126.3167, 36.5333)))

2.8346126716273883


In [9]:
# pyspark dataframe에서 처리 어려워 pandas 객체로 변환하여처리
from scipy.spatial import distance
# print(distance.euclidean((128.8759061, 37.75210808), (126.3167, 36.5333)))
print('변환 전', type(Sigungu_dist))
# Sigunguxy_dist['distance'] = Sigunguxy_dist.map(lambda x : distance.euclidean((x['sigungu_x'], x['sigungu_y']), (x['x'], x['y'])), axis=1)

#유클리드 함수가 안 돌아 가서 pdf로 변환 후 처리
pdf_Sigungu_dist = Sigungu_dist.toPandas()
print('변환 후', type(pdf_Sigungu_dist))
pdf_Sigungu_dist['udistance'] = pdf_Sigungu_dist.apply(lambda x : distance.euclidean((x['sigungu_x'], x['sigungu_y']), (x['senser_x'], x['senser_y'])), axis=1)
# 다시 스파크 프레임으로 변환
Sigungu_dist = spark.createDataFrame(pdf_Sigungu_dist)
Sigungu_dist.createOrReplaceTempView("Sigungu_dist")
print('유클리드 거리 추가 후 다시변환', type(Sigungu_dist))
print(Sigungu_dist.count())
Sigungu_dist.show(2)
# 131000

변환 전 <class 'pyspark.sql.dataframe.DataFrame'>
변환 후 <class 'pandas.core.frame.DataFrame'>
유클리드 거리 추가 후 다시변환 <class 'pyspark.sql.dataframe.DataFrame'>
131000
+------+-------+------------------+------------------+--------+-----------+------------------+-----------------+---------------+---------------+---------------+------------------+
|  sido|sigungu|         sigungu_x|         sigungu_y|senserid|sernsername|          senser_x|         senser_y|senser_region_1|senser_region_2|senser_region_3|         udistance|
+------+-------+------------------+------------------+--------+-----------+------------------+-----------------+---------------+---------------+---------------+------------------+
|강원도| 강릉시| 128.8759002685547| 37.75210952758789|      12| 안면도(감)|126.31670379638672|36.53329849243164|       충청남도|         안면읍|         태안군| 2.834605249867365|
|강원도| 고성군|128.46786499023438|38.380592346191406|      12| 안면도(감)|126.31670379638672|36.53329849243164|       충청남도|         안면읍|         태안군|2.8

### <span style="color:red">  *여기서 시도군 별 top3를 추출하여 모수를 줄여서 진행하면 좋은데 실제 센서 데이터 중 top3에 빠지는 경우가 발생하여 전체를 모든 센서와 비교하여 top3를 추출해야 함 - 로직수정 ㅠㅠ*

### 3. 일/시간별 센서 정보에 정리된 시군구 * 센서데이터 mapping

#### 3.1 날씨정보 로딩 및 필요 정보 정리

In [10]:
AWS = spark.read.format("csv")\
            .option("inferSchema", "true")\
            .option("charset","euc-kr")\
            .option("header", "true")\
            .load("data/OBS_AWS_HH_201907*.csv")
#             .load("data/OBS_AWS_HH_20*.csv")

AWS = AWS.toDF("senserid", "sersername", "datetime", "temp", "winddeg", "windspeed", \
               "rainfall", "placehpa", "seahpa", "humidity")
AWS.createOrReplaceTempView("AWS")

print(AWS.count())
# AWS.show(2)
# AWS.printSchema()
# 702506
# 16826600 전체 - 내컴으론 불가능
# 4381480 2017
# 4354207 2018
# 2174118 20191H
# 2216231 20192H

2216231


In [11]:
# AWS.describe().show()

In [12]:
SigunguWeather = spark.sql("""
SELECT A.sido, A.sigungu, B.senserid, B.sersername, A.udistance, B.datetime
       , B.temp
       , B.winddeg
       , B.windspeed
       , B.rainfall
       , B.placehpa
       , B.seahpa
       , B.humidity
FROM Sigungu_dist AS A

INNER JOIN AWS AS B
ON A.senserid = B.senserid
AND B.temp is not null
-- AND B.rainfall is not null 
-- AND B.humidity is not null
-- 센서 이상으로 온도가 나오지 않는 센서는 제거 하고 key별로 가까운 센서를 추출
WHERE 1 = 1

""")
SigunguWeather.createOrReplaceTempView("SigunguWeather")

print(SigunguWeather.count())
# SigunguWeather.show(2)
# 일
# 175545250 # 하나의 센서가 여러 sigungu에서 쓰일 수 있음 ㅋㅋㅋㅋ 일 4년치 2억건 
# 온도의 null 제거 후 174405250
# 시간
# 4204704250 # 하나의 센서가 여러 sigungu에서 쓰일 수 있음 ㅋㅋㅋㅋ 일 4년치 41억건 인생 최고
# 온도의 null 제거 후 4199398000 41억
# 1094328250 2017 10억
# 1086771750 2018
# 553321000 2017 H

553321000


#### 3.2 유클리드 거리에 따라 작은 순으로 key별로 순위를 주고 top3 추출

In [13]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# 시군구 기준으로 센서들의 udistance에 따라 순위를 줌 
windowSpec  = Window.partitionBy("sido", "sigungu", "datetime").orderBy("udistance")
SigunguWeather = SigunguWeather.withColumn("row_number", row_number().over(windowSpec))

SigunguWeather.createOrReplaceTempView("SigunguWeather")

print(SigunguWeather.count())
# SigunguWeather.show(2)
# 174405250
# 4199398000
# 1094328250 2017
# 1086771750 2018
# 553321000 2019 H

553321000


In [14]:
# udistance에 따른 순위 중 상위 3개만 추출
SigunguSenserTop3 = spark.sql("""
    SELECT *
    FROM SigunguWeather 
    WHERE 1 = 1
    AND row_number IN (1, 2, 3)
    """)
SigunguSenserTop3.createOrReplaceTempView("SigunguSenserTop3")

print(SigunguSenserTop3.count())
# SigunguSenserTop3.show(2)
# 2017	365 * 24 = 8760
# 2018	365 * 24 = 8760
# 2019	365 * 24 = 8760
# 2020	305 * 24 = 7320
# 합게 33600 시간
# 25200000 = 250sigungu * 3senser * 33600hours = 2천5백만
# 6570000 = 250sigungu * 3senser * 8760hours = 6백50만건 2017
# 6570000 2018
# 3312000 2019 H

3312000


#### 3.3 검증

In [15]:
# 시간 좀 걸림 필요 없음
# maxSql = spark.sql("""
#     SELECT sido
#         , sigungu
#         , datetime
#         , count(*)
#     FROM SigunguSenserTop3
#     WHERE 1 = 1
#     GROUP BY sido, sigungu, datetime
#     HAVING count(*) < 3
# """)
# maxSql.show()

In [16]:
# This function use to print feature with null values and null count 
# def null_value_count(df):
#   null_columns_counts = []
#   numRows = df.count()
#   for k in df.columns:
#     nullRows = df.where(col(k).isNull()).count()
#     if(nullRows > 0):
#       temp = k,nullRows
#       null_columns_counts.append(temp)
#   return(null_columns_counts)

# 겁나 오래 돌아 감 결과를 못 봤음 ㅋ
# null_columns_count_list = null_value_count(SigunguSenserTop3)
# null_columns_count_list

In [17]:
# 겁나 오래 돌아 감 ㅋ
# LastSenserInfo.describe().show()

### 4. 센서와 시군구 거리에 의한 가중치 구하기

In [18]:
# w1 = 1/2 * [1 - {|d-a1|/(|d-a1| + |d-a2| + |d-a3| + |d-a4|)}]
# w2 = 1/2 * [1 - {|d-a2|/(|d-a1| + |d-a2| + |d-a3| + |d-a4|)}]
# w3 = 1/2 * [1 - {|d-a3|/(|d-a1| + |d-a2| + |d-a3| + |d-a4|)}]
from pyspark.sql.functions import row_number, sum

windowSp  = Window.partitionBy("sido", "sigungu", "datetime")
SigunguSenserTop3 = SigunguSenserTop3.withColumn("sumUdist", sum(SigunguSenserTop3.udistance).over(windowSp))

SigunguSenserTop3 = SigunguSenserTop3.withColumn("weight", (1/2 * (1 - SigunguSenserTop3.udistance / SigunguSenserTop3.sumUdist)))
SigunguSenserTop3.createOrReplaceTempView("SigunguSenserTop3")

print(SigunguSenserTop3.count())
# SigunguSenserTop3.show(2)
# 6570000 2017
# 6570000 2018
# 3312000 2019

3312000


In [19]:
# 검증
# maxSql = spark.sql("""
#     SELECT *
#     FROM SigunguSenserTop3
#     WHERE 1 = 1
#     AND sido = '강원도'
#     AND sigungu = '강릉시'
#     AND datetime = '2019-09-01 00:00'
# """)

# maxSql.show()

### 5. 가중치를 적용한 일별 시군구의 날씨 정보 추출

In [20]:
# 각 변수에 가중치를 적용함 1
SigunguSenserTop3_w = spark.sql("""
    SELECT   *
            ,temp           * weight  AS w_temp      
            ,winddeg        * weight  AS w_winddeg   
            ,windspeed      * weight  AS w_windspeed 
            ,rainfall       * weight  AS w_rainfall  
            ,placehpa       * weight  AS w_placehpa  
            ,seahpa         * weight  AS w_seahpa    
            ,humidity       * weight  AS w_humidity  
    FROM SigunguSenserTop3 
    WHERE 1 = 1
    """)
SigunguSenserTop3_w.createOrReplaceTempView("SigunguSenserTop3_w")

print(SigunguSenserTop3_w.count())
# SigunguSenserTop3_w.show(2)
# 1050000 daily
# 6570000 2017
# 6570000 2018
# 3312000 2019 H

3312000


In [21]:
# 각 변수에 가중치를 적용함 2
AWS_D_Weather = spark.sql("""
    SELECT  datetime 
            ,sido
            ,sigungu
            
            ,sum(w_temp)                 AS w_temp      
            ,sum(w_winddeg)              AS w_winddeg   
            ,sum(w_windspeed)            AS w_windspeed 
            ,sum(w_rainfall)             AS w_rainfall  
            ,sum(w_placehpa)             AS w_placehpa
            ,sum(w_seahpa)               AS w_seahpa    
            ,sum(w_humidity)             AS w_humidity  
            
    FROM SigunguSenserTop3_w 
    WHERE 1 = 1
    GROUP BY datetime  
            ,sido
            ,sigungu
    """)
AWS_D_Weather.createOrReplaceTempView("AWS_D_Weather")

print(AWS_D_Weather.count())
# AWS_D_Weather.show(2)
# 250 * 1400 = 350000
# 250 * 365 * 24 = 2190000 2017
# 2190000 2018
# 1104000 2019 H

1104000


In [22]:
# 판다스 데이터 프레임으로 변환
pdf_AWS_D_Weather = AWS_D_Weather.toPandas()

# csv로 저장 na 포함
pdf_AWS_D_Weather.to_csv("./AWS_H_Weather_na_2019_2H.csv", index = False)

Py4JJavaError: An error occurred while calling o139.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 105 in stage 82.0 failed 1 times, most recent failure: Lost task 105.0 in stage 82.0 (TID 3658, localhost, executor driver): java.io.IOException: 디스크 공간이 부족합니다
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:75)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:51)
	at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
	at sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.java:516)
	at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:609)
	at org.apache.spark.util.Utils$.copyFileStreamNIO(Utils.scala:429)
	at org.apache.spark.util.Utils.copyFileStreamNIO(Utils.scala)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.writePartitionedDataWithChannel(BypassMergeSortShuffleWriter.java:232)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.writePartitionedData(BypassMergeSortShuffleWriter.java:205)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:167)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:385)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3450)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3447)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: 디스크 공간이 부족합니다
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:75)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:51)
	at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
	at sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.java:516)
	at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:609)
	at org.apache.spark.util.Utils$.copyFileStreamNIO(Utils.scala:429)
	at org.apache.spark.util.Utils.copyFileStreamNIO(Utils.scala)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.writePartitionedDataWithChannel(BypassMergeSortShuffleWriter.java:232)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.writePartitionedData(BypassMergeSortShuffleWriter.java:205)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:167)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
# 최고최저 온도 최고 시간은 이전 데이터로 null 처리
display(pdf_AWS_D_Weather.isnull().sum())
pdf_AWS_D_Weather.sort_values(by=['sido', 'sigungu', 'datetime'], axis=0, inplace=True)

pdf_AWS_D_Weather.head()

In [None]:
pdf_AWS_D_Weather.fillna(method='ffill', inplace=True)
display(pdf_AWS_D_Weather.isnull().sum())

pdf_AWS_D_Weather.head()

In [None]:
# csv로 저장
pdf_AWS_D_Weather.to_csv("./AWS_H_Weather_2019_2H.csv", index = False)

In [None]:
seoulFlag = pdf_AWS_D_Weather['sido'] == '서울특별시'
datetimeFlag = pdf_AWS_D_Weather['datetime'] >= '2019-12-01 00:00'
pdf_AWS_D_Weather_test = pdf_AWS_D_Weather[seoulFlag & datetimeFlag]

display(pdf_AWS_D_Weather_test)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# matplotlib 한글 폰트 오류 문제 해결
from matplotlib import font_manager, rc
font_path = "C:/PyStexam/data/THEdog.ttf"   #폰트파일의 위치
font_name = font_manager.FontProperties(fname=font_path).get_name()
rc('font', family=font_name)

plt.figure(figsize=(35, 15))
sns.lineplot(x="datetime", y="w_temp", hue="sigungu", data=pdf_AWS_D_Weather_test)
plt.show()

In [None]:
plt.figure(figsize=(35, 15))
sns.boxplot(x='datetime', y='w_temp', hue='sigungu', data=pdf_AWS_D_Weather_test) 
plt.show()