## DB 에서 값 가져오기(datetime)

In [None]:
import pymysql
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
from dateutil import parser
import json
import os

# DB 연결
def get_connection():
    host = input("DB 호스트: ")
    user = input("DB 사용자명: ")
    password = input("DB 비밀번호: ")
    db = input("DB 이름: ")
    port = int(input("포트 (기본 3306): ") or "3306")

    conn = pymysql.connect(
        host=host,
        user=user,
        password=password,
        db=db,
        port=port,
        charset='utf8'
    )
    print(f"\n>>> DB 연결 성공: {db}\n")
    return conn

# 테이블 목록
def get_table_list(cursor, db_name):
    cursor.execute(f"SHOW TABLES FROM `{db_name}`")
    return [row[0] for row in cursor.fetchall()]

# 컬럼 속성이 datetime이거나 timestamp인 컬럼 추출
def get_datetime_column(cursor, db_name, table):
    cursor.execute("""
        SELECT COLUMN_NAME, DATA_TYPE
        FROM information_schema.columns
        WHERE table_schema = %s AND table_name = %s
    """, (db_name, table))

    datetime_columns = [row[0] for row in cursor.fetchall()
                        if row[1].lower() in ("datetime", "timestamp")]

    if not datetime_columns:
        return None

    priority = ['reg_dt', 'created_at', 'insert_dt', 'log_time']
    for col in priority:
        if col in datetime_columns:
            return col

    return datetime_columns[0]


# 유효 값인지 확인
def is_valid_datetime(value):
    return value not in ("0000-00-00", "0000-00-00 00:00:00", None, "")

# 시작일, 종료일
def get_date_range(cursor, db_name, table, datetime_col):
    try:
        cursor.execute(f"""
            SELECT MIN(`{datetime_col}`), MAX(`{datetime_col}`)
            FROM `{db_name}`.`{table}`
            WHERE `{datetime_col}` IS NOT NULL
        """)
        result = cursor.fetchone()

        if not result or not is_valid_datetime(result[0]) or not is_valid_datetime(result[1]):
            return None, None

        start = parser.parse(result[0]) if isinstance(result[0], str) else result[0]
        end = parser.parse(result[1]) if isinstance(result[1], str) else result[1]
        return start, end

    except Exception as e:
        print(f"쿼리 실패: {table} - {e}")
        return None, None

# 행 길이
def get_avg_row_length(cursor, db_name, table):
    cursor.execute("""
        SELECT AVG_ROW_LENGTH
        FROM information_schema.tables
        WHERE table_schema = %s AND table_name = %s
    """, (db_name, table))
    result = cursor.fetchone()
    return result[0] if result and result[0] else 0

# 단위(주, 월, 년)
def date_range_by_unit(start, end, unit):
    ranges = []
    current = start

    while current < end:
        if unit == 'week':
            next_point = current + timedelta(days=7)
            label = f"{current:%Y-W%U}"
        elif unit == 'month':
            next_point = current + relativedelta(months=1)
            label = f"{current:%Y-%m}"
        elif unit == 'year':
            next_point = current + relativedelta(years=1)
            label = f"{current:%Y}"
        else:
            raise ValueError("단위 오류")

        ranges.append((label, current, next_point))
        current = next_point

    return ranges

# 시작일, 종료일 기준 용량
def estimate_storage(cursor, db_name, table, datetime_col, start, end, avg_len):
    cursor.execute(f"""
        SELECT COUNT(*)
        FROM `{db_name}`.`{table}`
        WHERE `{datetime_col}` >= %s AND `{datetime_col}` < %s
    """, (start, end))
    row_count = cursor.fetchone()[0]
    estimated_mb = row_count * avg_len / 1024 / 1024
    return round(estimated_mb, 2)

# 포맷
def safe_date_format(date_obj):
    return date_obj.strftime('%Y-%m-%d') if hasattr(date_obj, 'strftime') else str(date_obj)

def main():
    conn = get_connection()
    cursor = conn.cursor()
    db_name = conn.db.decode() if isinstance(conn.db, bytes) else conn.db

    tables = get_table_list(cursor, db_name)
    final_result = {}
    skipped = {}

    for table in tables:
        datetime_col = get_datetime_column(cursor, db_name, table)
        if not datetime_col:
            reason = "datetime 컬럼 없음"
            print(f"건너뜀: {table} ({reason})")
            skipped[table] = reason
            continue

        print(f"테이블 분석 중: {table} (기준 컬럼: {datetime_col})")

        start_date, end_date = get_date_range(cursor, db_name, table, datetime_col)
        if not start_date or not end_date:
            reason = "날짜 정보 없음 또는 이상값 존재"
            print(f"건너뜀: {table} ({reason})")
            skipped[table] = reason
            continue

        avg_len = get_avg_row_length(cursor, db_name, table)
        if avg_len == 0:
            reason = "AVG_ROW_LENGTH = 0"
            print(f"건너뜀: {table} ({reason})")
            skipped[table] = reason
            continue

        table_result = {
            "startDate": safe_date_format(start_date),
            "endDate": safe_date_format(end_date),
            "week": {},
            "month": {},
            "year": {}
        }

        for unit in ['week', 'month', 'year']:
            for label, s, e in date_range_by_unit(start_date, end_date, unit):
                mb = estimate_storage(cursor, db_name, table, datetime_col, s, e, avg_len)
                if mb > 0:
                    table_result[unit][label] = f"{mb:,.3f}"

        final_result[table] = table_result

    if skipped:
        final_result["skipped"] = skipped

    cursor.close()
    conn.close()

    output = json.dumps(final_result, indent=2, ensure_ascii=False)

    print("\n결과값:")
    print(output)

    filename = f"db_size_estimate_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    with open(filename, "w", encoding="utf-8") as f:
        f.write(output)

    print(f"\n분석 완료! 결과 파일: {filename}")
    print(f"파일 경로: {os.path.abspath(filename)}")

if __name__ == "__main__":
    main()

## DB column 확인(datetime)

In [11]:
import pymysql
import json

# DB 연결
def get_connection():
    host = input("DB 호스트: ")
    user = input("DB 사용자명: ")
    password = input("DB 비밀번호: ")
    db = input("DB 이름: ")
    port = int(input("포트 (기본 3306): ") or "3306")

    return pymysql.connect(
        host=host,
        user=user,
        password=password,
        db=db,
        port=port,
        charset='utf8'
    )

# 컬럼 속성이 datetime이거나 timestamp인 것만 추출
def get_datetime_columns():
    conn = get_connection()
    cursor = conn.cursor()
    db_name = conn.db.decode() if isinstance(conn.db, bytes) else conn.db

    cursor.execute(f"SHOW TABLES FROM `{db_name}`")
    tables = [row[0] for row in cursor.fetchall()]

    result = {}

    for table in tables:
        cursor.execute("""
            SELECT COLUMN_NAME, DATA_TYPE
            FROM information_schema.columns
            WHERE table_schema = %s AND table_name = %s
        """, (db_name, table))

        datetime_cols = [
            row[0] for row in cursor.fetchall()
            if row[1].lower() in ("datetime", "timestamp")
        ]

        result[table] = datetime_cols

    cursor.close()
    conn.close()

    # 터미널 출력
    print(json.dumps(result, indent=2, ensure_ascii=False))

    # JSON 파일 저장
    output_file = 'table_columns.json'
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(result, f, ensure_ascii=False, indent=2)

    print(f"\n저장 완료: {output_file}")

if __name__ == "__main__":
    get_datetime_columns()

{
  "MDCL_RSV": [],
  "SC_LOG": [
    "TR_SENDDATE",
    "TR_RSLTDATE",
    "TR_MODIFIED"
  ],
  "SC_TRAN": [
    "TR_SENDDATE",
    "TR_RSLTDATE",
    "TR_MODIFIED"
  ],
  "Surveys": [],
  "_tiaraEvent": [],
  "autologic_surgeryfix_percent": [
    "reg_date",
    "upd_date",
    "autoDt1",
    "autoDt2"
  ],
  "autologic_surgeryfix_percent_cal": [
    "reg_dt",
    "del_dt"
  ],
  "autologic_surgeryfix_percent_pre": [
    "reg_date"
  ],
  "bandbanner": [
    "reg_dt",
    "upd_dt"
  ],
  "call_test": [
    "call_date",
    "call_s_time",
    "call_e_time",
    "call_a_time",
    "call_recall_date"
  ],
  "contents": [],
  "crm_cstt_dtal_cntn": [
    "reg_date"
  ],
  "crm_cust_info": [
    "reg_date"
  ],
  "crm_mdcl_rsv": [
    "reg_date"
  ],
  "crm_prsc_rsv_cntn": [
    "reg_date"
  ],
  "crm_rsv_custinfo": [
    "reg_date"
  ],
  "crontab": [],
  "crontab_model_progress_picture_run_history": [
    "run_dt"
  ],
  "crontab_run_history": [
    "run_dt"
  ],
  "cust_info": [
    "re

## 결과값에서 연도만 추출

In [5]:
import json
import pandas as pd

# JSON 파일 읽기
with open('db_size_estimate_20250619.json', 'r', encoding='utf-8') as f:
    data = json.load(f)

# "skipped" 항목 제거
data = {k: v for k, v in data.items() if k != "skipped"}

# 연도별 데이터만 추출
df_data = {}
for table, info in data.items():
    year_data = info.get('year', {})
    df_data[table] = {
        year: float(size.replace(',', '')) for year, size in year_data.items()
    }

# DataFrame으로 변환
df = pd.DataFrame.from_dict(df_data, orient='index')

# 터미널 출력
print("연도 추출 결과:")
print(df)

# JSON 파일 저장
output_file = 'storage_summary.json'
with open(output_file, 'w', encoding='utf-8') as f:
    json.dump(df_data, f, ensure_ascii=False, indent=2)

print(f"\n저장 완료: {output_file}")


연도 추출 결과:
                   2016  2017  2018  2019  2020   2021  2022   2023   2024  \
SC_TRAN            2.26  6.20  2.41  2.87  5.92  15.05  8.79  50.39  17.93   
md_board           3.63  3.82  2.15  0.89  0.85   0.17  0.05    NaN   0.77   
md_board_mng       0.01   NaN   NaN   NaN   NaN    NaN   NaN    NaN    NaN   
md_email           0.04  0.06  0.03  0.02   NaN    NaN   NaN    NaN    NaN   
md_files           0.01   NaN   NaN   NaN   NaN    NaN  0.05    NaN    NaN   
...                 ...   ...   ...   ...   ...    ...   ...    ...    ...   
th_board_gallery2   NaN   NaN   NaN   NaN   NaN    NaN   NaN    NaN    NaN   
th_board_qna2       NaN   NaN   NaN   NaN   NaN    NaN   NaN    NaN    NaN   
th_board_qna4       NaN   NaN   NaN   NaN   NaN    NaN   NaN    NaN    NaN   
th_board_notice     NaN   NaN   NaN   NaN   NaN    NaN   NaN    NaN    NaN   
th_board_qna3       NaN   NaN   NaN   NaN   NaN    NaN   NaN    NaN    NaN   

                   2025  2008  2009  2010  2011  2012

## 엑셀 내보내기

In [1]:
import json
import pandas as pd
import os

def flatten_json_to_dataframe(json_data):
    all_months = set()
    all_years = set()
    table_rows = []

    for table_name, table_data in json_data.items():
        if table_name == "skipped":
            continue

        months = table_data.get("month", {})
        years = table_data.get("year", {})

        all_months.update(months.keys())
        all_years.update(years.keys())

    all_months = sorted(all_months)
    all_years = sorted(all_years)
    columns = ["table"] + all_months + all_years + ["총합 (MB)"]

    for table_name, table_data in json_data.items():
        if table_name == "skipped":
            continue

        row = {"table": table_name}
        total = 0.0

        # 월 단위
        for m in all_months:
            val = table_data.get("month", {}).get(m, "")
            row[m] = val or ""

        # 연 단위 + 총합 계산
        for y in all_years:
            val = table_data.get("year", {}).get(y, "")
            row[y] = val or ""
            if val:
                try:
                    total += float(val.replace(",", ""))
                except ValueError:
                    pass

        row["총합 (MB)"] = f"{total:,.3f}" if total else ""
        table_rows.append(row)

    return pd.DataFrame(table_rows, columns=columns)

def main():
    filename = "db_size_estimate_20250619.json"  # 파일명
    if not os.path.exists(filename):
        print(f"파일이 존재하지 않습니다: {filename}")
        return

    with open(filename, "r", encoding="utf-8") as f:
        json_data = json.load(f)

    df = flatten_json_to_dataframe(json_data)

    output_filename = "db_datetime_size_summary.xlsx"
    df.to_excel(output_filename, index=False)

    print(f"\n✔ 엑셀 파일 생성 완료: {output_filename}")
    print(f"경로: {os.path.abspath(output_filename)}")

if __name__ == "__main__":
    main()



✔ 엑셀 파일 생성 완료: db_datetime_size_summary.xlsx
경로: /Users/sc301/Desktop/yhy/project/lagacy_db/db_datetime_size_summary.xlsx


## 연도별 line chart

In [None]:
import json
import pandas as pd
from datetime import datetime
import plotly.express as px
import os

def load_json(filename):
    with open(filename, "r", encoding="utf-8") as f:
        return json.load(f)

def preprocess_data(data):
    records = []

    for table_name, table_data in data.items():
        if table_name == "skipped":
            continue

        year_data = table_data.get("year", {})
        for year, size in year_data.items():
            try:
                size_mb = float(size.replace(",", ""))
                records.append({
                    "테이블": table_name,
                    "연도": int(year),
                    "용량(MB)": size_mb
                })
            except ValueError:
                continue

    df = pd.DataFrame(records)
    df = df.sort_values(by=["테이블", "연도"])
    return df

def save_to_excel(df, output_path):
    pivot_df = df.pivot(index="연도", columns="테이블", values="용량(MB)")
    pivot_df = pivot_df.fillna("")
    pivot_df.to_excel(output_path, index=True)
    print(f"엑셀 파일 저장 완료: {os.path.abspath(output_path)}")

def save_plot(df, output_path):
    fig = px.line(
        df,
        x="연도",
        y="용량(MB)",
        color="테이블",
        markers=True,
        title="테이블별 연도별 용량 추이 (MB)"
    )
    fig.write_html(output_path)
    print(f"차트 HTML 파일 저장 완료: {os.path.abspath(output_path)}")

def main():
    input_json = "db_size_estimate_20250619.json"  # JSON 파일명
    if not os.path.exists(input_json):
        print(f"파일이 존재하지 않습니다: {input_json}")
        return

    data = load_json(input_json)
    df = preprocess_data(data)

    if df.empty:
        print("처리할 연도 데이터가 없습니다.")
        return

    excel_output = "db_datetime_yearly_size_summary.xlsx"
    chart_output = "db_datetime_yearly_size_chart.html"

    save_to_excel(df, excel_output)
    save_plot(df, chart_output)

if __name__ == "__main__":
    main()


엑셀 파일 저장 완료: /Users/sc301/Desktop/yhy/project/lagacy_db/db_yearly_size_summary.xlsx
차트 HTML 파일 저장 완료: /Users/sc301/Desktop/yhy/project/lagacy_db/db_yearly_size_chart.html
