In [1]:
import os
import pandas as pd
import sys
from collections import defaultdict
import fileinput
import numpy as np
import time
import datetime
import re
import json

## 인자값 입력

### 1) 병합할 파일들이 있는 디렉토리 경로
### 2) 출력파일 디렉토리 경로
### 3) 병합할 파일의 시간 필드명
### 4) 병합할 환경데이터 파일 디렉토리 경로

In [2]:
file_input_path = './data'
file_output_path = './out'
date_time = 'coll_dt'
target_data_info = "['weather','./env_data/weather_seoul/'],['traffic','..env_data/traffic_volume/'],['air','./env_data/air_pollution_gangnam/']"

In [3]:
def printProgressBar(iteration, total, prefix = 'Progress', suffix = 'Complete',\
                      decimals = 1, length = 50, fill = '█'): 
    # 작업의 진행상황을 표시
    percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
    filledLength = int(length * iteration // total)
    bar = fill * filledLength + '-' * (length - filledLength)
    print('\r%s |%s| %s%% %s' %(prefix, bar, percent, suffix), end='\r')
    sys.stdout.flush()
    if iteration == total:
        print()

def recursive_search_dir(_nowDir, _filelist):
    dir_list = []  # 현재 디렉토리의 서브디렉토리가 담길 list
    try:
        f_list = os.listdir(_nowDir)
    except FileNotFoundError:
        print("\n"+_nowDir)
        print("\n(병합 대상 파일이 존재하지 않습니다.)")
        sys.exit(1)

    for fname in f_list:
        if os.path.isdir(_nowDir + "/" + fname):
            dir_list.append(_nowDir + "/" + fname)
        elif os.path.isfile(_nowDir + "/" + fname):
            file_extension = os.path.splitext(fname)[1]
            if file_extension == ".csv" or file_extension == ".CSV":  # csv
                _filelist.append(_nowDir + "/" + fname)

    for toDir in dir_list:
        recursive_search_dir(toDir, _filelist)

In [4]:
def merging(merged_file_full_path, csv_file, target_data_info, date_time):
    df = pd.read_csv(csv_file, low_memory=False)

    # 병합할 환경데이터 카테고리 목록을 dict로 모두 저장 (index도 함께 저장)
    category = dict()
    for i in range(len(target_data_info)):
        category[target_data_info[i][0]] = i

    # 필드 병합할 때, 이미 불러온 데이터는 dictionary에 저장 (속도 향상 위해)
    weather_cache = dict()
    traffic_cache = dict()
    air_cache = dict()

    for i in range(len(df)):
        # YYYY mm dd HH 순서일 때
        date_time_digit = re.sub('-|:|/| ', '', str(df[date_time].iloc[i]))   # '-', ':', '/', ' ' 제거하고, 순수 숫자만 남김
        ref_date = date_time_digit[:8]
        ref_time = int(date_time_digit[8:10])

        # 날씨데이터를 병합할 경우
        if 'weather' in category.keys():
            weather_key = ref_date + str(ref_time) # 이미 받아온 값인지 확인하기 위한 key값 (날짜/시간)

            if weather_key in weather_cache:
                weather_value = weather_cache[weather_key]
            else:
                weather_file = target_data_info[category['weather']][1] + '/' + ref_date + '.json'
                try:
                    with open(weather_file, 'r') as file:
                        data = json.load(file)

                        # print("관측지점ID:", data[ref_time]['stnId'])
                        # print("기온(˚C):", data[ref_time]['ta'])
                        # print("풍속(m/s):", data[ref_time]['ws'])
                        # print("습도(%):", data[ref_time]['hm'])
                        # print("강수량(mm):", data[ref_time]['rn'])
                        # print("적설(cm):", data[ref_time]['dsnw'])

                        weather_value = [('weather_station_ID', data[ref_time]['stnId']), ('temperature', data[ref_time]['ta']), ('wind_speed', data[ref_time]['ws']), ('humidity', data[ref_time]['hm']), ('rainfall', data[ref_time]['rn']), ('snowfall', data[ref_time]['dsnw'])]

                # 해당 날짜의 환경데이터 파일이 없을 경우, NaN값으로 입력
                except FileNotFoundError:
                    weather_value = [('weather_station_ID', np.nan), ('temperature', np.nan), ('wind_speed', np.nan), ('humidity', np.nan), ('rainfall', np.nan), ('snowfall', np.nan)]

                # 불러온 환경데이터 값을 캐시에 저장 (속도 향상 위해)
                weather_cache[weather_key] = weather_value
            
            for value in weather_value:
                if i == 0:  # 날씨 값을 넣기 위해, 빈 컬럼 추가
                    df.insert(loc=len(df.columns), column=value[0], value='')
                df.loc[i, value[0]] = value[1]   # 날씨 값을 새로운 컬럼에 추가

        # 교통데이터를 병합할 경우
        if 'traffic' in category.keys():
            traffic_key = ref_date # 이미 받아온 값인지 확인하기 위한 key값 (날짜)

            if traffic_key in traffic_cache:
                traffic_value = traffic_cache[traffic_key]
            else:
                traffic_file = target_data_info[category['traffic']][1] + '/' + ref_date + '.json'
                try:
                    with open(traffic_file, 'r') as file:
                        data = json.load(file)

                        # print("교통량:", data[0]['trafficVolumn'])

                        traffic_value = [('traffic_volume', data[0]['trafficVolumn'])]

                # 해당 날짜의 환경데이터 파일이 없을 경우, NaN값으로 입력
                except FileNotFoundError:
                    traffic_value = [('traffic_volume', np.nan)]

                # 불러온 환경데이터 값을 캐시에 저장 (속도 향상 위해)
                traffic_cache[traffic_key] = traffic_value

            for value in traffic_value:
                if i == 0:  # 교통량 값을 넣기 위해, 빈 컬럼 추가
                    df.insert(loc=len(df.columns), column=value[0], value='')
                df.loc[i, value[0]] = value[1]   # 교통량 값을 새로운 컬럼에 추가

        # 대기오염 데이터를 병합할 경우
        if 'air' in category.keys():
            air_key = ref_date # 이미 받아온 값인지 확인하기 위한 key값 (날짜)

            if air_key in air_cache:
                air_value = air_cache[air_key]
            else:
                air_file = target_data_info[category['air']][1] + '/' + ref_date + '.json'
                try:
                    with open(air_file, 'r') as file:
                        data = json.load(file)

                        # print("일산화탄소 평균농도:", data[0]['coValue'])
                        # print("이산화질소 평균농도:", data[0]['no2Value'])
                        # print("오존 평균농도:", data[0]['o3Value'])
                        # print("미세먼지(PM10) 평균농도:", data[0]['pm10Value'])
                        # print("미세먼지(PM25) 평균농도:", data[0]['pm25Value'])
                        # print("아황산가스 평균농도:", data[0]['so2Value'])

                        air_value = [('air_co', data[0]['coValue']), ('air_no2', data[0]['no2Value']), ('air_o3', data[0]['o3Value']), ('air_pm10', data[0]['pm10Value']), ('air_pm25', data[0]['pm25Value']), ('air_so2', data[0]['so2Value'])]

                # 해당 날짜의 환경데이터 파일이 없을 경우, NaN값으로 입력
                except FileNotFoundError:
                    air_value = [('air_co', np.nan), ('air_no2', np.nan), ('air_o3', np.nan), ('air_pm10', np.nan), ('air_pm25', np.nan), ('air_so2', np.nan)]

                # 불러온 환경데이터 값을 캐시에 저장 (속도 향상 위해)
                air_cache[air_key] = air_value

            for value in air_value:
                if i == 0:  # 대기오염 값을 넣기 위해, 빈 컬럼 추가
                    df.insert(loc=len(df.columns), column=value[0], value='')
                df.loc[i, value[0]] = value[1]   # 대기오염 값을 새로운 컬럼에 추가

        # 추후 다른 환경데이터 병합 시, 여기에 코드 추가
        if 'blah_blah_blah' in category:
            return 0

    # 병합된 df를 csv 파일에 저장
    df.to_csv(merged_file_full_path, index=False)

    return 1

In [5]:
# 문자열을 list로 변환
target_data_info = eval(target_data_info)


if file_input_path[-1] == '/':
    file_input_path = file_input_path[:-1]

for info in target_data_info:
    if info[1][-1] == '/':
        info[1] = info[1][:-1]


print (" Input Files Dir. location = ", file_input_path)
print (" Additional Files Info. = ", target_data_info)
print (" Output Files Dir. location = ", file_output_path)
csv_list = []

print('\n======================================================')
print('필드 병합 시작')
print('======================================================')
print('CSV 파일 목록 불러오는 중..')
recursive_search_dir(file_input_path, csv_list)

print('총 CSV 파일 수 : {}'.format(len(csv_list)))

proc_start_time = time.time()

exist_cnt = 0
print('\n필드 병합 중..')
progress_cnt=0
result_cnt=0
for csv_file in csv_list:
    printProgressBar(progress_cnt, len(csv_list))
    # 병합 파일을 저장할 경로
    merged_file_name = csv_file.replace(file_input_path, '').split('/')[-1]   # "OOOOO.csv"
    merged_file_dir = file_output_path + csv_file.replace(file_input_path, '').replace(merged_file_name, '')

    if not os.path.isdir(merged_file_dir):
        os.makedirs(merged_file_dir)

    merged_file_full_path = merged_file_dir + merged_file_name

    # 병합 파일명이 이미 존재할 경우, 통과
    if os.path.isfile(merged_file_full_path):
        exist_cnt += 1
        continue

    result_cnt += merging(merged_file_full_path, csv_file, target_data_info, date_time)
    progress_cnt += 1
printProgressBar(progress_cnt, len(csv_list))

    # time.sleep(0.001) # CPU 부하 줄이기 위함

print('병합 완료한 파일 수 : {}'.format(result_cnt))
print('total running time : {:.2f} sec'.format(time.time()-proc_start_time))

 Input Files Dir. location =  ./data
 Additional Files Info. =  (['weather', './env_data/weather_seoul'], ['traffic', '..env_data/traffic_volume'], ['air', './env_data/air_pollution_gangnam'])
 Output Files Dir. location =  ./out

필드 병합 시작
CSV 파일 목록 불러오는 중..
총 CSV 파일 수 : 35

필드 병합 중..
Progress |██████████████████████████████████████████████████| 100.0% Complete

기존 병합 처리된 파일 수 : 0
새로 병합 완료한 파일 수 : 1
total running time : 8.36 sec
