In [None]:
# 전쳐리
# db 

In [77]:
# 전처리 
import pandas as pd 
import numpy as np
from datetime import datetime, timedelta
roadkill_df = pd.read_csv('한국도로공사_로드킬 데이터 정보_20250501.csv', encoding='cp949')
roadkill_df.columns = (
    roadkill_df.columns
    .str.replace('\ufeff', '', regex=True)
    .str.strip()
)
roadkill_df.head()
roadkill_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 59 entries, 0 to 58
Data columns (total 9 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   본부명     59 non-null     object 
 1   지사명     59 non-null     object 
 2   노선명     59 non-null     object 
 3   구간      59 non-null     object 
 4   방 향     59 non-null     object 
 5   5km     59 non-null     int64  
 6   발생건수    59 non-null     int64  
 7   위도      59 non-null     float64
 8   경도      59 non-null     float64
dtypes: float64(2), int64(2), object(5)
memory usage: 4.3+ KB


In [62]:
print(roadkill_df.columns.tolist())

['본부명', '지사명', '노선명', '구간', '방 향', '5km', '발생건수', '위도', '경도']


In [91]:
def add_timestamp(df: pd.DataFrame,
                  start_dt: str | None = None,
                  step_seconds: int = 240,
                  as_string: bool = True,
                  **kwargs) -> pd.DataFrame:
    # return_string 별칭 허용
    if 'return_string' in kwargs:
        as_string = kwargs.pop('return_string')

    out = df.copy()
    base = datetime.now().replace(microsecond=0) if start_dt is None else pd.to_datetime(start_dt).to_pydatetime()
    times = [base + timedelta(seconds=i * step_seconds) for i in range(len(out))]
    if as_string:
        out['timestamp'] = pd.to_datetime(times).strftime('%Y-%m-%d %H:%M:%S')
    else:
        out['timestamp'] = pd.to_datetime(times)
    return out

In [None]:
roadkill_loc = roadkill_df[['지사명', '위도', '경도']].copy()
roadkill_loc['위도'] = pd.to_numeric(roadkill_loc['위도'], errors='coerce')
roadkill_loc['경도'] = pd.to_numeric(roadkill_loc['경도'], errors='coerce')

# status 랜덤
np.random.seed(42)
# 0=사망, 1=발견, 2=재발견 (동일 확률)
roadkill_loc['status'] = np.random.choice([0, 1, 2], size=len(roadkill_loc)).astype('int8')

# timestamp 추가 
roadkill_loc = add_timestamp(roadkill_loc, step_seconds=240, return_string=True)
print(roadkill_loc.head())

# status
# 직사명 
roadkill_freq = roadkill_df[['지사명', '발생건수']]
# 발생건수 테이블 합계
roadkill_freq['발생건수'] = pd.to_numeric(roadkill_freq['발생건수'], errors='coerce').fillna(0).astype(int)
roadkill_freq_grouped = roadkill_freq.groupby('지사명', as_index=False)['발생건수'].sum()


print(roadkill_freq_grouped.head())




  지사명         위도          경도  status            timestamp
0  영동  36.317750  127.556418       2  2025-08-25 14:50:48
1  대전  36.349629  127.466302       0  2025-08-25 14:54:48
2  대전  36.465756  127.418342       2  2025-08-25 14:58:48
3  대전  36.550930  127.431874       2  2025-08-25 15:02:48
4  천안  36.625360  127.383686       0  2025-08-25 15:06:48
    지사명  발생건수
0    강릉     6
1  경기광주     6
2    공주     9
3    남원    11
4    논산     3


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  roadkill_freq['발생건수'] = pd.to_numeric(roadkill_freq['발생건수'], errors='coerce').fillna(0).astype(int)


In [64]:
dup_cnt = roadkill_loc.duplicated(subset=['위도','경도']).sum()
dup_cnt

np.int64(7)

In [65]:
# impor date time,데이터 insert할때의 시간 넣기 , 시간 (시, 분 , 초)

In [76]:
# -*- coding: utf-8 -*-
# roadkill_loc (위도+경도 PK), roadkill_freq (지사명 PK) 생성 및 적재 예시

import pandas as pd
import pymysql
from sqlalchemy import create_engine, text

# 1. MySQL 연결 
db_config = {
    'host': 'localhost',
    'user': 'root',
    'passwd': '1234',
    'db': 'roadkill_db',
    'charset': 'utf8'
}

# sqlalchemy engine 
engine = create_engine(
    f"mysql+pymysql://{db_config['user']}:{db_config['passwd']}@{db_config['host']}/{db_config['db']}?charset={db_config['charset']}"
)


# 3. DB테이블 생성

with engine.begin() as conn:
    conn.execute(text("DROP TABLE IF EXISTS roadkill_loc"))
    conn.execute(text("DROP TABLE IF EXISTS roadkill_freq"))

    # 위도, 경도, status pk
    conn.execute(text("""
        CREATE TABLE roadkill_loc (
            지사명      VARCHAR(100) NOT NULL,
            위도        DOUBLE NOT NULL,
            경도        DOUBLE NOT NULL,
            status      TINYINT NOT NULL,               -- 0=사망, 1=발견, 2=재발견
            created_at  DATETIME DEFAULT CURRENT_TIMESTAMP,
            PRIMARY KEY (위도, 경도, status),
            CHECK (status IN (0,1,2))
        ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
    """))

    conn.execute(text("""
        CREATE TABLE roadkill_freq (
            지사명      VARCHAR(100) NOT NULL PRIMARY KEY,
            발생건수     INT NOT NULL,
            created_at  DATETIME DEFAULT CURRENT_TIMESTAMP
        ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
    """))

# 4) 데이터 적재
# 4-1) roadkill_loc : 중복 좌표면 status=2로 갱신(재발견), 타임스탬프 업데이트
rows = roadkill_loc.to_dict(orient='records')

upsert_sql = text("""
    INSERT INTO roadkill_loc (지사명, 위도, 경도, status)
    VALUES (:지사명, :위도, :경도, :status) AS new
    ON DUPLICATE KEY UPDATE
        지사명 = new.지사명,              -- 최신 지사명으로 교체(원치 않으면 이 줄 제거)
        status = 2,                       -- 중복이면 재발견으로 강제
        created_at = CURRENT_TIMESTAMP;   -- 최신 시간으로 갱신
""")

with engine.begin() as conn:
    conn.execute(upsert_sql, rows)

# 4-2) roadkill_freq_grouped : 기존처럼 append
roadkill_freq_grouped.to_sql("roadkill_freq", con=engine, if_exists="append", index=False)

print("roadkill_loc 업서트 + roadkill_freq 적재 완료")

# =====================
# 5. 검증 (PyMySQL 커서)
# =====================
conn = pymysql.connect(**db_config)
cur = conn.cursor()

cur.execute("SELECT COUNT(*) FROM roadkill_loc;")
print("roadkill_loc row count:", cur.fetchone())

cur.execute("SELECT COUNT(*) FROM roadkill_freq;")
print("roadkill_freq row count:", cur.fetchone())

cur.close()
conn.close()


roadkill_loc 업서트 + roadkill_freq 적재 완료
roadkill_loc row count: (56,)
roadkill_freq row count: (25,)
