# STEP2 캐글 날씨 데이터 EDA, 데이터 재수집

In [3]:
import root
import json
import h5py
import glob
import time
import requests
import numpy as np
import pandas as pd
import xarray as xr
import netCDF4 as nc
import geopandas as gpd
from shapely import wkt
from netCDF4 import num2date
from sklearn.neighbors import KNeighborsClassifier
from module import eda

with open(root.get_path('config'), 'r') as config_file:
    config = json.load(config_file)

In [4]:
kaggle_weather = pd.read_csv(root.get_path('kaggle_weather'))
kaggle_weather['date'] = pd.to_datetime(kaggle_weather['date'])
kaggle_weather.sort_values(by='date', inplace=True)
title = '2019년 국가별 온도 추세'
x_label, x = '수집된 날', 'date'
y_label, y = '온도', 'temp'
hue = 'region'
eda.selectable_lines(df=kaggle_weather, title=title, x_label=x_label, y_label=y_label, x=x, y=y, hue=hue)

In [5]:
title = '2019년 국가별 습도 추세'
x_label, x = '수집된 날', 'date'
y_label, y = '습도', 'humidity'
hue = 'region'
eda.selectable_lines(df=kaggle_weather, title=title, x_label=x_label, y_label=y_label, x=x, y=y, hue=hue)

- 오스트리아를 제외한 모든 국가가 같은 데이터를 가지고 있음

### 데이터 재수집 & 필요한 데이터 추출

In [6]:
''' =========================================== 날씨 =========================================== '''
class weather:
    def __init__(self):
        self.h5_cols = ['Rainf', 'Snowf', 'PSurf', 'Qair', 'Tair', 'Wind']
        self.h5_pattern = {
            'Rainf': f"{root.get_path('h5py_rainf_pattern')}*.h5",
            'Snowf': f"{root.get_path('h5py_snowf_pattern')}*.h5",
            'PSurf': f"{root.get_path('h5py_air_pressure_pattern')}*.h5",
            'Qair': f"{root.get_path('h5py_humidity_pattern')}*.h5",
            'Tair': f"{root.get_path('h5py_temperature_pattern')}*.h5",
            'Wind': f"{root.get_path('h5py_wind_pattern')}*.h5"
        }
        self.change_columns = {
            'Rainf': 'RAINFALL',
            'Snowf': 'SNOWFALL',
            'PSurf': 'AIR_PRESSURE',
            'Qair': 'HUMIDITY',
            'Tair': 'TEMPERATURE',
            'Wind': 'WIND_SPEED'
        }

        # 국가 코드, 경계면 포함
        self.state = pd.read_csv(root.get_path('state'))
        self.state['GEOMETRY'] = self.state['GEOMETRY'].apply(lambda x: wkt.loads(x)) # WKT 문자열을 공간 데이터 객체로 변환
        self.state = gpd.GeoDataFrame(self.state, geometry='GEOMETRY')

    ''' ----------------------------------------- 날씨 전체 데이터 return ----------------------------------------- '''
    def get(self):
        dataframes = []

        for col in self.h5_cols:
            dataframe = pd.DataFrame()
            files = glob.glob(self.h5_pattern[col])

            for file in files:
                df = self.__partial_weather(path=file, col=col).rename(columns={'time': 'TIME', 'lat': 'LAT', 'lng': 'LNG'})
                df = self.time_group(df).dropna()
                dataframe = pd.concat([dataframe, df], ignore_index=True)
            dataframes.append(dataframe)

        merged_df = None

        for df in dataframes:
            df = df.groupby(['DATE', 'TIME_GROUP', 'LAT', 'LNG'], as_index=False).mean().reset_index(drop=True)
            if merged_df is None:
                merged_df = df
            else:
                merged_df = pd.merge(merged_df, df, on=['DATE', 'TIME_GROUP', 'LAT', 'LNG'], how='inner', validate="one_to_one")

        merged_df = (
            self.near_city(merged_df).groupby(['DATE', 'TIME_GROUP', 'CITY'], as_index=False).mean()
            .reset_index(drop=True)
        )
        merged_df.rename(columns=self.change_columns, inplace=True)

        selected_columns = ['DATE', 'TIME_GROUP', 'CITY'] + [col.upper() for col in self.change_columns.values()]
        merged_df = merged_df[selected_columns]

        # 상대습도 계산
        def calculate_relative_humidity(row):
            kelvin = row['TEMPERATURE'] # 온도(K)
            q = row['HUMIDITY'] # 비습(kg/kg)
            P = row['AIR_PRESSURE'] # 기압(Pa)
            celsius = kelvin - 273.15 # 섭씨 온도로 변환
            e_s = 6.112 * np.exp((17.67 * celsius) / (celsius + 243.5)) * 100 # 포화 수증기압 계산 (Pa)
            q_s = 0.622 * e_s / (P - e_s) # 포화 비습 계산 (kg/kg)            
            RH = (q / q_s) * 100 # 상대습도 계산 (%)

            if RH > 100: # 이상치 제거
                return None
            return RH # 상대습도(%)

        merged_df['HUMIDITY'] = merged_df.apply(lambda row:calculate_relative_humidity(row), axis=1)
        return merged_df.dropna()

    ''' **----------------- 파일 별 (강수량, 강설량 등) -----------------** '''
    def __partial_weather(self, **kwargs):
        """
        path: h5 파일 경로
        col: 선택 파일 컬럼
        :return: ['time', 'lat', 'lng', col] DataFrame
        """
        path = kwargs.get('path')
        col = kwargs.get('col')

        ds = xr.open_dataset(path, engine='netcdf4')
        select = ds[['time', 'lat', 'lon', col]]
        df = select.to_dataframe().reset_index(drop=True).rename(columns={'lon': 'lng'})
        df = df.astype({'lat': 'float32', 'lng': 'float32'})
        df['time'] = pd.to_datetime(df['time'], unit='s').dt.tz_localize('UTC')
        df = df[df['time'].dt.year == 2019]

        return df.groupby(['time', 'lat', 'lng'], as_index=False).mean().reset_index(drop=True)

    ''' **----------------- 시간별로 합치기 -----------------** '''
    def time_group(self, df):
        bins = [-1, 6, 12, 18, 24]
        labels = ['0-6', '6-12', '12-18', '18-24']
        df['DATE'] = df['TIME'].dt.date
        df['TIME_GROUP'] = pd.cut(df['TIME'].dt.hour, bins=bins, labels=labels, right=True)
        df.drop(columns=['TIME'], inplace=True)
        return df.groupby(['DATE', 'TIME_GROUP', 'LAT', 'LNG'], as_index=False).mean().reset_index(drop=True)

    ''' **----------------- 가까운 도시 찾기 -----------------** '''
    def near_city(self, df):
        """
        weather: 날씨 데이터 dataframe
        return: 날씨 데이터 dataframe 필터링 + 도시
        """
        df = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df['LNG'], df['LAT']))
        df = gpd.sjoin(df, self.state, how="left", predicate="within")

        city = pd.read_csv(root.get_path('city'))
        city['POINT'] = city['POINT'].apply(lambda x: wkt.loads(x))

        temp = []  # 결과를 저장할 리스트 초기화

        for district, group in df.groupby('DISTRICT'):
            selected = city[city['DISTRICT'] == district]
            if selected.empty or district is None:
                continue
            X = np.array([[point.x, point.y] for point in selected['POINT']])
            y = selected['CITY'].values

            knn = KNeighborsClassifier(n_neighbors=1)
            knn.fit(X, y)
            new_point = np.array([[point.x, point.y] for point in group['geometry']])
            group['CITY'] = knn.predict(new_point)
            group.drop(columns=['geometry', 'DISTRICT', 'COUNTRY_CODE'], inplace=True)
            group = group.groupby(['DATE', 'TIME_GROUP', 'CITY'], as_index=False).mean().reset_index(drop=True)
            temp.append(group)

        return pd.concat(temp).dropna()


''' =========================================== 음악 취향 =========================================== '''
class top_100_tags_for_preference:  # Last.fm API
    def __init__(self):
        self.api_key = config['lastfm_key']

    ''' ----------------------------------------- 유럽 국가 TAG + 한국(KOR) 태그 ----------------------------------------- '''
    def get_data(self):
        data = {}
        euros = pd.read_csv(root.get_path('country'))['COUNTRY_NAME']
        euros = pd.concat([euros, pd.Series(['Korea, Republic of'])], ignore_index=True)
        for name in euros:
            data[name] = self.__get_country_tags(name)
        return data

    ''' **----------------- 특정 국가 장르 태그 -----------------** '''
    def __get_country_tags(self, country):
        top_tracks = self.__get_top_tracks_by_country(country)
        genre_tags = []

        for track in top_tracks:
            track_name = track['name']
            artist_name = track['artist']['name']
            genre_tags.extend(self.__get_track_genre_tags(track_name, artist_name))

        return genre_tags

    ''' **----------------- 특정 국가 인기 트랙 -----------------** '''
    def __get_top_tracks_by_country(self, country):
        url = "http://ws.audioscrobbler.com/2.0/"
        params = {
            'method': 'geo.gettoptracks',
            'country': country,
            'api_key': self.api_key,
            'format': 'json',
            'limit': 1
        }
        response = requests.get(url, params=params)
        return response.json()['tracks']['track']

    ''' **----------------- 트랙의 장르 태그들 -----------------** '''
    def __get_track_genre_tags(self, track, artist):
        url = "http://ws.audioscrobbler.com/2.0/"
        params = {
            'method': 'track.gettoptags',
            'track': track,
            'artist': artist,
            'api_key': self.api_key,
            'format': 'json'
        }
        response = requests.get(url, params=params)
        tags = response.json().get('toptags', {}).get('tag', [])
        return [tag['name'] for tag in tags]


### 가공된 파일 저장

In [7]:
class save:
    def __init__(self):
        # 데이터 저장할 위치
        self.country_path = root.get_path('country') # 국가
        self.genre_streams_path = root.get_path('genre_streams_by_country') # 국가, 장르 별 청취수
        self.weather_path = root.get_path('weather') # 수집된 날짜, 도시별 날씨 데이터
        self.top_100_tags_path = root.get_path('last_fm') # 국가별 음악 취향 찾기
        self.state_path = root.get_path('state') # 지리 데이터
        self.city_path = root.get_path('city')  # 지리 데이터

        # nc 파일 컬럼
        self.nc_cols = ['Rainf', 'Snowf', 'PSurf', 'Qair', 'Tair', 'Wind']
        
        # 캐글 날씨 데이터 with 장르 저장 경로
        self.kaggle_weather_with_genre_path = root.get_path('kaggle_weather_with_genre')

        # original 파일 저장 경로
        self.nc_path = {  # nc 파일 위치 패턴
            'Rainf': root.get_path('nc_rainfall_pattern'),
            'Snowf': root.get_path('nc_snowfall_pattern'),
            'PSurf': root.get_path('nc_air_pressure_pattern'),
            'Qair': root.get_path('nc_humidity_pattern'),
            'Tair': root.get_path('nc_temperature_pattern'),
            'Wind': root.get_path('nc_wind_pattern')
        }
        self.h5py_path = { # h5py 파일 저장 패턴
            'Rainf': root.get_path('h5py_rainf_pattern'),
            'Snowf': root.get_path('h5py_snowf_pattern'),
            'PSurf': root.get_path('h5py_air_pressure_pattern'),
            'Qair': root.get_path('h5py_humidity_pattern'),
            'Tair': root.get_path('h5py_temperature_pattern'),
            'Wind': root.get_path('h5py_wind_pattern')
        }        
        self.shp_path = { # 지리 정보 shp 파일 경로
            'country': root.get_path('natural_country'),  # 국가 코드, 도시명
            'city': root.get_path('natural_city'),  # 도시명, 행정 경계, 인구수
            'state': root.get_path('natural_state')
        }

    
    ''' ----------------------------------------- 캐글 날씨 데이터에서 필요한 데이터만 추출 ----------------------------------------- '''
    def streams(self):
        save_genre = pd.read_csv(self.kaggle_weather_with_genre_path)
        save_genre.rename(columns={
            'date': 'DATE',
            'region': 'COUNTRY_CODE',
            'genre': 'GENRE',
            'streams': 'STREAMS'
        }, inplace=True)
        save_genre = save_genre[['DATE', 'COUNTRY_CODE', 'GENRE', 'STREAMS']]
        save_genre.to_csv('./data/cleaned/GENRE_STREAMS_BY_COUNTRY.csv', index=False)

    ''' ----------------------------------------- 국가별 음악 취향 top_100_tags 데이터 저장 ----------------------------------------- '''

    def top_100_tags(self):
        data = top_100_tags_for_preference().get_data()
        rows = []
        for name, tags in data.items():
            for tag in tags:
                rows.append([name, tag])
        df_top_100_tags = pd.DataFrame(rows, columns=['COUNTRY_NAME', 'TAG'])
        df_top_100_tags.to_csv(self.top_100_tags_path, index=False)

    ''' ----------------------------------------- shp -> csv 지리 데이터 저장 ----------------------------------------- '''

    def geo_info(self):
        country = pd.read_csv(self.country_path)
        state = gpd.read_file(self.shp_path['state']).rename(columns={
            'iso_a2': 'GEO_CODE', 'iso_3166_2': 'DISTRICT', 'geometry': 'GEOMETRY'
        })
        state = pd.merge(state, country, on='GEO_CODE', how="inner", validate="many_to_one")

        state = state[['COUNTRY_CODE', 'DISTRICT', 'GEOMETRY']].drop_duplicates()

        city = gpd.read_file(self.shp_path['city']).rename(columns={
            'ISO_A2': 'GEO_CODE', 'NAME': 'CITY', 'POP_MAX': 'POPULATION', 'geometry': 'POINT'
        })

        # GeoDataFrame 생성
        state_gdf = gpd.GeoDataFrame(state, geometry='GEOMETRY')
        city_gdf = gpd.GeoDataFrame(city, geometry='POINT')

        city = gpd.sjoin(city_gdf, state_gdf, how="left", predicate="within")
        city = city[['CITY', 'DISTRICT', 'POPULATION', 'POINT']].drop_duplicates().dropna()

        state['GEOMETRY'] = state['GEOMETRY'].apply(lambda x: wkt.dumps(x))
        state.to_csv(self.state_path, index=False)
        city.to_csv(self.city_path, index=False)

    ''' ----------------------------------------- nc -> h5 파일로 2019년도 날씨 데이터 저장 ----------------------------------------- '''

    def nc_to_h5py(self):
        city = pd.read_csv(root.get_path('city'))
        points = city['POINT'].apply(lambda x: wkt.loads(x))  # WKT 문자열을 공간 데이터 객체로 변환

        # 경도와 위도의 최소값과 최대값
        min_lon, max_lon = points.apply(lambda x: x.x).min(), points.apply(lambda x: x.x).max()
        min_lat, max_lat = points.apply(lambda x: x.y).min(), points.apply(lambda x: x.y).max()

        for col in self.nc_cols:
            files = glob.glob(self.nc_path[col])

            for i, file in enumerate(files):

                # nc 파일 불러오기
                dataset = nc.Dataset(file, 'r')
                latitudes = dataset.variables['lat'][:]
                longitudes = dataset.variables['lon'][:]
                times = dataset.variables['time'][:]

                data_column = dataset.variables[col][:]
                time_units = dataset.variables['time'].units
                dates = num2date(times, units=time_units)

                # 2019년 데이터만 필터링
                indices_2019  = [idx for idx, date in enumerate(dates) if date.year == 2019]
                if not indices_2019 :
                    continue  # 2019년 데이터가 없으면 다음 파일로

                # 위도와 경도 범위에 맞는 인덱스 필터링
                lat_indices = np.nonzero((latitudes >= min_lat) & (latitudes <= max_lat))[0]
                lon_indices = np.nonzero((longitudes >= min_lon) & (longitudes <= max_lon))[0]

                latitudes_2019 = latitudes[lat_indices]
                longitudes_2019 = longitudes[lon_indices]
                times_2019 = np.array([int(time.mktime(dates[idx].timetuple())) for idx in indices_2019])  # 유닉스 타임스탬프로 변환
                data_column_2019 = data_column[np.ix_(indices_2019, lat_indices, lon_indices)]

                dataset.close()

                # HDF5 파일 생성
                with h5py.File(f'{self.h5py_path[col]}{i + 1}.h5', 'w') as hdf:
                    # 데이터셋 생성
                    hdf.create_dataset('lat', data=latitudes_2019)
                    hdf.create_dataset('lon', data=longitudes_2019)
                    hdf.create_dataset('time', data=times_2019)
                    hdf.create_dataset(col, data=data_column_2019)

    ''' ----------------------------------------- 날씨 데이터 저장 ----------------------------------------- '''

    def weather(self):
        weather_dataframe = weather().get()
        weather_dataframe.to_csv(self.weather_path, index=False)

In [8]:
save = save()
save.streams()
save.geo_info()
save.nc_to_h5py()
save.weather()
save.top_100_tags()