In [1]:
import datetime
from glob import glob
from os.path import join

import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point

from rasterio.warp import (
    calculate_default_transform,
    reproject,
    Resampling
)

from geocube.api.core import make_geocube

import xarray as xr
from rioxarray.rioxarray import _make_coords

import requests
from matplotlib import pyplot as plt
import h5py
from tqdm import tqdm

from multiprocessing import Pool

In [3]:
DATAPATH = "/media/user/dsk0/bggo/cmaqProjectdata"
URL = 'http://apis.data.go.kr/B552584/MsrstnInfoInqireSvc/getMsrstnList'
KEY = "k5wXUhoJHwee1cncQCBmm81YbQ+exttb0vdJcyF5GuGJn0mbGBNNL/ER2VfkrJMlExfc+FZjPeRuOM2bvgDYyQ=="

In [4]:
def get_emission_info() -> gpd.GeoDataFrame:
    em_meta_path = glob(join(DATAPATH, "시간단위_기상자료개방포털", "rawdata", "meta", "*.csv"))
    em_meta = pd.read_csv(em_meta_path[0], encoding='euc-kr')
    em_meta['geometry'] = list(map(
        lambda meta: Point(meta[1].경도, meta[1].위도),
        em_meta.iterrows()
    ))
    em_meta = gpd.GeoDataFrame(em_meta, geometry='geometry')
    em_meta.crs = {'init':'epsg:4326'}
    return em_meta

emission_info = get_emission_info()
emission_info.head()

  in_crs_string = _prepare_from_proj_string(in_crs_string)


Unnamed: 0,지점,시작일,종료일,지점명,지점주소,관리관서,위도,경도,노장해발고도(m),기압계(관측장비지상높이(m)),기온계(관측장비지상높이(m)),풍속계(관측장비지상높이(m)),강우계(관측장비지상높이(m)),geometry
0,90,1968-01-01,,속초,강원도 고성군토성면 봉포5길9 속초자동기상관측소,속초기상대(90),38.2509,128.5647,17.53,18.73,1.7,10.0,1.4,POINT (128.56470 38.25090)
1,93,2016-10-01,,북춘천,강원도 춘천시신북읍 산천리264(장본1길 12) 춘천기상대,춘천기상대(101),37.9474,127.7544,95.78,96.78,1.5,10.0,1.4,POINT (127.75440 37.94740)
2,95,1988-01-01,,철원,강원도 철원군갈말읍 명성로179번길 26 철원자동기상관측소,춘천기상대(101),38.1479,127.3042,155.48,156.98,1.8,13.0,1.5,POINT (127.30420 38.14790)
3,98,1998-02-01,,동두천,경기도 동두천시방죽로 16-47동두천서비스센터,수도권기상청(119),37.9019,127.0607,115.62,116.74,1.7,10.0,1.0,POINT (127.06070 37.90190)
4,99,2013-10-22,,파주,경기도 파주시문산읍 마정로46-29(파주기상대),수도권기상청(119),37.8859,126.7665,30.59,31.99,1.7,10.0,1.0,POINT (126.76650 37.88590)


In [5]:
def get_site_info() -> gpd.GeoDataFrame:
    params = {'serviceKey' : KEY, 'pageNo': 1, 'numOfRows': 640, 'returnType': 'json'}
    data = requests.get(URL, params=params, verify=False).json()
    site_info = pd.DataFrame(data['response']['body']['items'])

    def site_mapping(info):
        try: return Point(info[1].dmY, info[1].dmX)
        except: return None
    site_info['geometry'] = list(map(
        site_mapping,
        site_info.iterrows()
    ))
    site_info = site_info.loc[~site_info.isna().geometry.values]
    site_info = gpd.GeoDataFrame(site_info, geometry='geometry')
    site_info.crs = {'init':'epsg:4326'}
    return site_info

site_info = get_site_info()
site_info.head()

  in_crs_string = _prepare_from_proj_string(in_crs_string)


Unnamed: 0,dmX,item,mangName,year,addr,stationName,dmY,geometry
0,36.00799,"SO2, CO, O3, NO2, PM10, PM2.5",항만,2019,충남 서천군 장항읍 장산로 270-3 장항해양수산사무소 옥상,장항항,126.690312,POINT (126.69031 36.00799)
1,36.98556,"SO2, CO, O3, NO2, PM10, PM2.5",항만,2021,충남 당진시 송악읍 고대공단2길 79-30 고대관리부두 내,평택당진항(당진항),126.746821,POINT (126.74682 36.98556)
2,36.592906,"SO2, CO, O3, NO2, PM10, PM2.5",도시대기,2015,세종 조치원읍 군청로 87-16(신흥동) 세종특별자치시 조치원청사 옥상,신흥동,127.292253,POINT (127.29225 36.59291)
3,36.51212,"SO2, CO, O3, NO2, PM10, PM2.5",도시대기,2014,세종특별자치시 보듬3로 114 아름동커뮤니티센터 옥상 (아름동),아름동,127.24643,POINT (127.24643 36.51212)
4,36.474172,"SO2, CO, O3, NO2, PM10, PM2.5",도시대기,2018,"세종특별자치시 누리로 27 첫마을 6단지 관리사무소 옥상 (한솔동, 첫마을6단지)",한솔동,127.252529,POINT (127.25253 36.47417)


In [7]:
col_list = [
    '기온(°C)', '강수량(mm)', '풍속(m/s)', '풍향(16방위)', '습도(%)', '증기압(hPa)', 
    '이슬점온도(°C)', '현지기압(hPa)', '해면기압(hPa)', '일조(hr)', '일사(MJ/m2)', 
    '적설(cm)', '3시간신적설(cm)', '전운량(10분위)', '중하층운량(10분위)'
]

def get_meteorological_info():
    air_pollution_df = pd.read_csv(join(DATAPATH, "merged_airpol_20132022.csv"), encoding='euc-kr')
    asos_df = pd.read_csv(join(DATAPATH, "merged_asos_20132022.csv"), encoding='euc-kr')

    def to_datetime(date):
        return datetime.datetime(int(date[:4]),int(date[4:6]),int(date[6:8]),int(date[8:] if date[8:] !='24' else '0'))
    air_pollution_df.loc[:,'datetime'] = list(map(
        to_datetime,
        air_pollution_df.측정일시.values.astype(str)
    ))
    asos_df.loc[:,'datetime'] = pd.to_datetime(asos_df.일시)

    asos_df = asos_df.loc[:, ['지점', '일시', 'datetime']+col_list]
    air_pollution_df = air_pollution_df.loc[:,['지역','측정소명','측정소코드','측정일시','SO2','CO','O3','NO2','PM10','주소','datetime']]

    return air_pollution_df, asos_df

air_pollution_df, asos_df = get_meteorological_info()

  air_pollution_df = pd.read_csv(join(DATAPATH, "merged_airpol_20132022.csv"), encoding='euc-kr')
  asos_df = pd.read_csv(join(DATAPATH, "merged_asos_20132022.csv"), encoding='euc-kr')


In [8]:
air_pollution_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 35004984 entries, 0 to 35004983
Data columns (total 11 columns):
 #   Column    Dtype         
---  ------    -----         
 0   지역        object        
 1   측정소명      object        
 2   측정소코드     int64         
 3   측정일시      int64         
 4   SO2       float64       
 5   CO        float64       
 6   O3        float64       
 7   NO2       float64       
 8   PM10      float64       
 9   주소        object        
 10  datetime  datetime64[ns]
dtypes: datetime64[ns](1), float64(5), int64(2), object(3)
memory usage: 2.9+ GB


In [9]:
asos_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8248947 entries, 0 to 8248946
Data columns (total 18 columns):
 #   Column       Dtype         
---  ------       -----         
 0   지점           int64         
 1   일시           object        
 2   datetime     datetime64[ns]
 3   기온(°C)       float64       
 4   강수량(mm)      float64       
 5   풍속(m/s)      float64       
 6   풍향(16방위)     float64       
 7   습도(%)        float64       
 8   증기압(hPa)     float64       
 9   이슬점온도(°C)    float64       
 10  현지기압(hPa)    float64       
 11  해면기압(hPa)    float64       
 12  일조(hr)       float64       
 13  일사(MJ/m2)    float64       
 14  적설(cm)       float64       
 15  3시간신적설(cm)   float64       
 16  전운량(10분위)    float64       
 17  중하층운량(10분위)  float64       
dtypes: datetime64[ns](1), float64(15), int64(1), object(1)
memory usage: 1.1+ GB


In [10]:
time_dropdup_list = list(map(str, list(set(air_pollution_df.측정일시))))
time_dropdup_datetime_list = list(map(
    lambda date: datetime.datetime(int(date[:4]),int(date[4:6]),int(date[6:8]),int(date[8:] if date[8:] !='24' else '0')),
    time_dropdup_list
))
time_dropdup_datetime_list.sort()
time_dropdup_datetime_df = pd.DataFrame(time_dropdup_datetime_list, columns=['datetime'])
time_dropdup_datetime_df.loc[:,'year'] = time_dropdup_datetime_df.datetime.dt.year
year_list = list(range(2013, 2023, 1))

In [12]:
resolution = 9000
x_dim, y_dim = 68, 83
projout = '+proj=lcc +lat_1=30 +lat_2=60 +lon_1=126 +lat_0=38 +lon_0=126 +ellps=GRS80 +units=m'
x = np.arange(-180_000, -180_000 + resolution * x_dim, resolution, dtype=np.float32)
y = np.arange(-585_000, -585_000 + resolution * y_dim, resolution, dtype=np.float32)
x_m, y_m = np.meshgrid(x, y)
G = np.dstack([x_m, y_m]).reshape(-1, 2)
grid_points = list(map(Point, G))

grid_data = pd.DataFrame(grid_points, columns=['geometry'])
grid_data = gpd.GeoDataFrame(grid_data, geometry='geometry')
grid_data.crs = site_info.to_crs(projout).crs
grid_data.loc[:,'x_m'] = grid_data.geometry.x
grid_data.loc[:,'y_m'] = grid_data.geometry.y
grid_data.loc[:,'value'] = 0

grid_data

Unnamed: 0,geometry,x_m,y_m,value
0,POINT (-180000.000 -585000.000),-180000.0,-585000.0,0
1,POINT (-171000.000 -585000.000),-171000.0,-585000.0,0
2,POINT (-162000.000 -585000.000),-162000.0,-585000.0,0
3,POINT (-153000.000 -585000.000),-153000.0,-585000.0,0
4,POINT (-144000.000 -585000.000),-144000.0,-585000.0,0
...,...,...,...,...
5639,POINT (387000.000 153000.000),387000.0,153000.0,0
5640,POINT (396000.000 153000.000),396000.0,153000.0,0
5641,POINT (405000.000 153000.000),405000.0,153000.0,0
5642,POINT (414000.000 153000.000),414000.0,153000.0,0


In [14]:
base_grid = make_geocube(vector_data=grid_data, measurements=["value"],resolution=(resolution, resolution), fill=0, output_crs=projout)
base_grid_rio = base_grid["value"].rio

base_crs = '+proj=lcc +lat_1=30 +lat_2=60 +lon_1=126 +lat_0=38 +lon_0=126 +ellps=GRS80 +units=m'

base_transform, base_width, base_height = calculate_default_transform(
    base_grid_rio.crs, base_crs, base_grid_rio.width, base_grid_rio.height, *base_grid_rio.bounds()
)
base_coords = _make_coords(
    base_grid, base_transform, base_width, base_height, base_crs
)
base_xs, base_ys = base_coords["x"], base_coords["y"]

base_grid

In [33]:
time_train_list[-24:]

[datetime.datetime(2013, 12, 31, 0, 0),
 datetime.datetime(2013, 12, 31, 1, 0),
 datetime.datetime(2013, 12, 31, 2, 0),
 datetime.datetime(2013, 12, 31, 3, 0),
 datetime.datetime(2013, 12, 31, 4, 0),
 datetime.datetime(2013, 12, 31, 5, 0),
 datetime.datetime(2013, 12, 31, 6, 0),
 datetime.datetime(2013, 12, 31, 7, 0),
 datetime.datetime(2013, 12, 31, 8, 0),
 datetime.datetime(2013, 12, 31, 9, 0),
 datetime.datetime(2013, 12, 31, 10, 0),
 datetime.datetime(2013, 12, 31, 11, 0),
 datetime.datetime(2013, 12, 31, 12, 0),
 datetime.datetime(2013, 12, 31, 13, 0),
 datetime.datetime(2013, 12, 31, 14, 0),
 datetime.datetime(2013, 12, 31, 15, 0),
 datetime.datetime(2013, 12, 31, 16, 0),
 datetime.datetime(2013, 12, 31, 17, 0),
 datetime.datetime(2013, 12, 31, 18, 0),
 datetime.datetime(2013, 12, 31, 19, 0),
 datetime.datetime(2013, 12, 31, 20, 0),
 datetime.datetime(2013, 12, 31, 21, 0),
 datetime.datetime(2013, 12, 31, 22, 0),
 datetime.datetime(2013, 12, 31, 23, 0)]

In [34]:
pm_col = ['SO2', 'CO', 'O3', 'NO2', 'PM10']
me_col = [
    '기온(°C)', '강수량(mm)', '풍속(m/s)', '풍향(16방위)', '습도(%)', '증기압(hPa)', 
    '이슬점온도(°C)', '현지기압(hPa)', '해면기압(hPa)', '일조(hr)', '일사(MJ/m2)', 
    '적설(cm)', '3시간신적설(cm)', '전운량(10분위)', '중하층운량(10분위)'
]

f.close()

# for i, y_tar in enumerate(year_list[:2]):
def worker(y_tar):
    f = h5py.File(f'hdf5/dataset_monitoring_{y_tar}.hdf5', 'w')

    time_train_list = np.array(time_dropdup_datetime_list)[time_dropdup_datetime_df.year == y_tar].tolist()[1:-24]

    for j, datetime_idx in enumerate(time_train_list):
        print(f"\r{j}/{len(time_train_list)} : {datetime_idx}", end="")

        air_pollution_datetime_df = air_pollution_df.loc[air_pollution_df.datetime == datetime_idx]
        air_pollution_datetime_df.index = range(len(air_pollution_datetime_df))
        air_pollution_datetime_df = pd.merge(
            air_pollution_datetime_df, 
            site_info.loc[:,['stationName', 'geometry']], 
            how='left',
            left_on='측정소명',
            right_on='stationName'
        )
        air_pollution_datetime_df = air_pollution_datetime_df.loc[~air_pollution_datetime_df.stationName.isna()]

        air_pollution_datetime_gdf = gpd.GeoDataFrame(air_pollution_datetime_df, geometry='geometry')
        air_pollution_datetime_gdf = air_pollution_datetime_gdf.fillna(0)
        air_pollution_datetime_gdf = air_pollution_datetime_gdf.loc[~(air_pollution_datetime_gdf.PM10 == -999)]  # 값이 -999찍혀있을때가 있음 기기오작동으로보고 제거
        air_pollution_datetime_gdf.index = range(len(air_pollution_datetime_gdf))

        pm_grid = make_geocube(
            vector_data  = air_pollution_datetime_gdf, 
            measurements = ['SO2', 'CO', 'O3', 'NO2', 'PM10'],
            resolution   = (resolution, resolution), 
            fill         = 0, 
            output_crs   = base_crs
        ) # for most crs negative comes first in resolution
        pm_grid_rio = pm_grid.rio
        pm_transform, pm_width, pm_height = calculate_default_transform(
            pm_grid_rio.crs, base_crs, pm_grid_rio.width, pm_grid_rio.height, *pm_grid_rio.bounds()
        )

        pm_bands = len(pm_grid.to_array())
        pm_data = np.empty((pm_bands, base_height, base_width), dtype=np.float64)
        for i in range(pm_bands):
            reproject(
                source        = pm_grid.to_array(),
                destination   = pm_data,
                src_transform = pm_grid_rio.transform(),
                src_crs       = pm_grid_rio.crs,
                dst_transform = base_transform,
                dst_crs       = base_crs,
                resampling    = Resampling.nearest
            )

        

        pm_reproj_grid = xr.Dataset(
            data_vars=dict(zip(
                pm_col, list(map(
                    lambda data: (["y", "x"], data), pm_data
                ))
            )),
            coords=dict(
                y=(["y"], base_ys),
                x=(["x"], base_xs)
            )
        )

        asos_datetime_df = asos_df.loc[asos_df.datetime == datetime_idx]
        asos_datetime_df.index = range(len(asos_datetime_df))
        asos_datetime_df = pd.merge(
            asos_datetime_df,
            emission_info.loc[:,['지점', 'geometry']],
            how='left',
            on = '지점'
        )
        asos_datetime_df = asos_datetime_df.loc[~asos_datetime_df.geometry.isna()]
        asos_datetime_gdf = gpd.GeoDataFrame(asos_datetime_df, geometry='geometry')
        asos_datetime_gdf = asos_datetime_gdf.fillna(0)

        me_grid = make_geocube(
            vector_data  = asos_datetime_gdf, 
            measurements = me_col,
            resolution   = (resolution, resolution), 
            fill         = 0, 
            output_crs   = projout
        )
        me_grid_rio = me_grid.rio
        me_transform, me_width, me_height = calculate_default_transform(
            me_grid_rio.crs, base_crs, me_grid_rio.width, me_grid_rio.height, *me_grid_rio.bounds()
        )

        me_bands = len(me_grid.to_array())
        me_data = np.empty((me_bands, base_height, base_width), dtype=np.float64)
        for i in range(me_bands):
            reproject(
                source        = me_grid.to_array()[i],
                destination   = me_data[i],
                src_transform = me_grid_rio.transform(),
                src_crs       = me_grid_rio.crs,
                dst_transform = base_transform,
                dst_crs       = base_crs,
                resampling    = Resampling.nearest
            )

        me_reproj_grid = xr.Dataset(
            data_vars=dict(zip(
                me_col, list(map(
                    lambda data: (["y", "x"], data), me_data
                ))
            )),
            coords=dict(
                y=(["y"], base_ys),
                x=(["x"], base_xs)
            )
        )

        group_key = f'{datetime_idx.month}/{datetime_idx.day}/{datetime_idx.hour}'

        f[f'{group_key}/air_quality'] = pm_reproj_grid.to_array()
        f[f'{group_key}/weather'] = me_reproj_grid.to_array()

    print()
    f.close()

with Pool(processes=16) as pool:
    pool.map(worker, year_list)

160/8759 : 2020-01-07 17:00:00