In [18]:
#MySQL 연결
import pymysql
from sqlalchemy import create_engine, text #연결 관리용으로만 가볍게
from pathlib import Path
from pprint import pprint

#데이터 처리/시각화
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px

#외부 API 호출하기
import requests
from requests.exceptions import Timeout, ConnectionError, RequestException

#.env 파일 활용하기
from dotenv import load_dotenv
import os
import time

#env 파일 읽기
load_dotenv()

#API 요청 정보
url = 'https://bigdata.kepco.co.kr/openapi/v1/powerUsage/houseAve.do'
api_key = os.getenv('ELECTRIC_API_KEY')

#db에서 level 1인 지역 코드를 가져왔다. 앞의 두자리를 잘라서
#사용할 예정입니다.
region = [
"1100000000","2600000000","2700000000","2800000000","2900000000","3000000000","3100000000","3600000000","4100000000","4300000000","4400000000","4600000000","4700000000","4800000000","5000000000","5100000000","5200000000"
]

#SQLAlchemy의 engine을 미리 만들어 놓습니다.
DB_URL = f"mysql+pymysql://{os.getenv('DB_USERNAME')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}/{os.getenv('DB_NAME')}"
engine = create_engine(DB_URL)

In [19]:
#engine 연결을 확인합니다.
with engine.connect() as conn:
    result = conn.execute(text("SELECT 1"))
    print(result.fetchone())  # (1,)

(1,)


In [33]:
#단건의 간단한 호출 테스트 할 수 있습니다.
url = 'https://bigdata.kepco.co.kr/openapi/v1/powerUsage/houseAve.do'
api_key = os.getenv('ELECTRIC_API_KEY')

params = {
    'year' : '2013',
    'month' : '05',
    'metroCd' : '11',
    'apiKey' : os.getenv('ELECTRIC_API_KEY')
}

response = requests.get(url, params=params)

if response.status_code == 200:
    df = pd.DataFrame(response.json()['data'])
    print(df.head())
else:
    print(f'Erorr: {response.status_code}')


   year month  metro city  houseCnt  powerUsage   bill
0  2013    05  서울특별시   중구     61652      211.55  26835
1  2013    05  서울특별시  용산구    115636      227.71  32179
2  2013    05  서울특별시  광진구    167472      200.41  24371
3  2013    05  서울특별시  중랑구    172540      213.22  24754
4  2013    05  서울특별시  양천구    177707      243.22  29801


In [34]:
result = response.json()
def convert_input(data):
    return {
        'sd_code' : int(data.get('sd_code')),
        'year' : int(data.get('year')),
        'month' : int(data.get('month')),
        'sd_name' : data.get('metro'),
        'sgg_name' : data.get('city'),
        'house_cnt' : int(data.get('houseCnt')),
        'power_usage' : float(data.get('powerUsage')),
        'bill' : int(data.get('bill'))
    }

def save_batch(data_list):
    """배치 데이터 DB 저장 - Raw SQL"""
    if not data_list:
        return
    
    # 실제 컬럼명에 맞게 수정 필요
    query = text("""
        INSERT INTO household_power 
        (sd_code, year, month, sd_name, sgg_name, house_cnt, power_usage, bill)
        VALUES (:sd_code, :year, :month, :sd_name, :sgg_name, :house_cnt, :power_usage, :bill)
    """)
    
    with engine.connect() as conn:
        conn.execute(query, data_list)
        conn.commit()

def is_already_collected(year, month, region_code):
    """이미 수집했는지 확인"""
    query = text("""
        SELECT COUNT(*) as cnt 
        FROM household_power 
        WHERE year=:year AND month=:month AND sd_code=:sd_code
    """)
    
    with engine.connect() as conn:
        result = conn.execute(query, {'year': year, 'month': month, 'metro': region_code})
        return result.fetchone()[0] > 0

batch_data = []
BATCH_SIZE = 20
r = 11
if 'data' in result and result['data']:
    
    for d in result['data'] :
        d['sd_code'] = r

    convert_data = [convert_input(d) for d in result['data']]
    
    batch_data.extend(convert_data)  # dict 리스트 추가
    print(f"{len(result['data'])}건")
    
    if len(batch_data) >= BATCH_SIZE:
        save_batch(batch_data)  # Raw SQL로 저장
        batch_data.clear()

25건


In [None]:
def convert_input(data):
    return {
        'sd_code' : int(data.get('sd_code')),
        'year' : int(data.get('year')),
        'month' : int(data.get('month')),
        'sd_name' : data.get('metro'),
        'sgg_name' : data.get('city'),
        'house_cnt' : int(data.get('houseCnt')),
        'power_usage' : float(data.get('powerUsage')),
        'bill' : int(data.get('bill'))
    }


def save_batch(data_list):
    """배치 데이터 DB 저장 - Raw SQL"""
    if not data_list:
        return
    
    # 실제 컬럼명에 맞게 수정 필요
    query = text("""
        INSERT INTO raw_power_data 
        (sd_code, year, month, sd_name, sgg_name, house_cnt, power_usage, bill)
        VALUES (:year, :month, :metro_cd, :usage, :bill, :house_cnt)
    """)
    
    with engine.connect() as conn:
        conn.execute(query, data_list)
        conn.commit()

def is_already_collected(year, month, region_code):
    """이미 수집했는지 확인"""
    query = text("""
        SELECT COUNT(*) as cnt 
        FROM raw_power_data 
        WHERE year=:year AND month=:month AND metro_cd=:metro
    """)
    
    with engine.connect() as conn:
        result = conn.execute(query, {'year': year, 'month': month, 'metro': region_code})
        return result.fetchone()[0] > 0

# 메인 로직
batch_data = []
BATCH_SIZE = 50

for year in range(2013, 2026):
    for month in range(1, 13):
        if (year == 2013 and month < 5) or (year == 2025 and month > 11):
            continue
        
        for r in region:
            region_code = r[:2]
            
            if is_already_collected(year, month, region_code):
                print(f"{year}-{month:02d} {region_code}: 이미 수집됨")
                continue
            
            try:
                response = requests.get(url, params=params, timeout=15)
                response.raise_for_status()
                result = response.json()
                
                if 'data' in result and result['data']:
                    batch_data.extend(result['data'])  # dict 리스트 추가
                    print(f"✓ {year}-{month:02d} {region_code}: {len(result['data'])}건")
                    
                    if len(batch_data) >= BATCH_SIZE:
                        save_batch(batch_data)  # Raw SQL로 저장
                        batch_data.clear()
                        
            except Exception as e:
                print(f"✗ {year}-{month:02d} {region_code}: {type(e).__name__}")
            
            time.sleep(5)

save_batch(batch_data)

In [None]:
print('데이터 전처리..')

df = pd.DataFrame(all_data)

#컬럼명 맞추기
df = df.rename(columns={
        'metro':'sd_name',
        'city':'sgg_name',
        'houseCnt':'house_cnt',
        'powerUsage':'power_usage'
})

# 날짜 컬럼 생성
# date이기 때문에 모두 1일로 생성해준다.
df['date'] = pd.to_datetime(df['year'] + '-' + df['month'] + '-01')

df_to_insert = df[['date', 'sd_name', 'sgg_name', 'house_cnt', 'power_usage', 'bill']]

# 중복제거
df_to_insert = df_to_insert.drop_duplicates(subset=['date', 'sd_name', 'sgg_name'])

with MySQLConnector() as db:
    insert_query = """
    INSERT INTO household_power_usage
        (date, sd_name, sgg_name, house_cnt, power_usage, bill)
    VALUES ( %s, %s, %s, %s, %s, %s)
    """

    data_to_insert = [tuple(row) for row in df_to_insert.values]

    db.cursor.executemany(insert_query, data_to_insert)
    db.conn.commit()

    print(f'db 삽입완료: {len(data_to_insert)}건')

In [None]:
failed_requests

In [None]:
# 실패한 요청 다시 시도

retry_data = []
re_success = 0
re_fails = []

for f in failed_requests:
    fail = f.split('-')
    print(fail)
    params = {
        'year' : fail[0],
        'month' : fail[1],
        'metroCd' : fail[2],
        'apiKey' : os.getenv('ELECTRIC_API_KEY')
     }
    
    try:
        response = requests.get(url, params=params, timeout=10)
        print(f"{response.url}")
        response.raise_for_status()
        result = response.json()
        if 'data' in result and result['data']:
            retry_data.extend(result['data'])
            re_success += 1
            print(f"✓ {year}-{month_str} {r[:2]}: {len(result['data'])}건")
            
    except (ConnectionError, Timeout):
        # 연결 실패 - 조용히 기록만 하고 계속
        re_fails.append(f"{year}-{month_str}-{r[:2]}")
        print(f"✗ {year}-{month_str} {r[:2]}: 연결 실패 (무시)")
        
    except Exception as e:
        # 기타 예외
        failed_requests.append(f"{year}-{month_str}-{r[:2]}")
        print(f"✗ {year}-{month_str} {r[:2]}: {type(e).__name__}")
    
    time.sleep(5)  # 현재 설정 유지

print(f"\n=== 수집 결과 ===")
print(f"✓ 성공: {re_success}개 요청, {len(retry_data)}건 데이터")
print(f"✗ 실패: {len(re_fails)}개 요청")
print(f"성공률: {re_success/(re_success+len(re_fails))*100:.1f}%")

In [None]:
print(retry_data)

In [None]:
print('데이터 전처리..')

df = pd. DataFrame(retry_data)

#컬럼명 맞추기
df = df.rename(columns={
        'metro':'sd_name',
        'city':'sgg_name',
        'houseCnt':'house_cnt',
        'powerUsage':'power_usage'
})

# 날짜 컬럼 생성
# date이기 때문에 모두 1일로 생성해준다.
df['date'] = pd.to_datetime(df['year'] + '-' + df['month'] + '-01')

df_to_insert = df[['date', 'sd_name', 'sgg_name', 'house_cnt', 'power_usage', 'bill']]

# 중복제거
df_to_insert = df_to_insert.drop_duplicates(subset=['date', 'sd_name', 'sgg_name'])

with MySQLConnector() as db:
    insert_query = """
    INSERT INTO household_power_usage
        (date, sd_name, sgg_name, house_cnt, power_usage, bill)
    VALUES ( %s, %s, %s, %s, %s, %s)
    """

    data_to_insert = [tuple(row) for row in df_to_insert.values]

    db.cursor.executemany(insert_query, data_to_insert)
    db.conn.commit()

    print(f'db 삽입완료: {len(data_to_insert)}건')