### 설정

In [1]:
import os
from dotenv import load_dotenv
import pandas as pd
import mysql.connector
import pandas as pd
from sqlalchemy import create_engine
import numpy as np

# 정제 데이터
DATA_DIR_CLEAN = r'C:\Users\WD\Desktop\DW_Project\00_cleand_data'

# env 파일 불러오기
load_dotenv()

# 환경설정
user = os.getenv("DB_USER")
password = os.getenv("DB_PASSWORD")
host = os.getenv("DB_HOST")
port = os.getenv("DB_PORT")
database = os.getenv("DB_NAME")

# 폴더 내 CSV 파일 로드
clean_csv_files = [f for f in os.listdir(DATA_DIR_CLEAN) if f.endswith('.csv')]
dataframes = {
    os.path.splitext(f)[0]: pd.read_csv(os.path.join(DATA_DIR_CLEAN, f))
    for f in clean_csv_files
}

# # dataframe
# customers = dataframes['customers']
# geolocation = dataframes['geolocation']
# orders = dataframes['orders']
# order_items = dataframes['order_items']
# order_payments = dataframes['order_payments']
# order_reviews = dataframes['order_reviews']
# products = dataframes['products']
# product_category_name_translation = dataframes['product_category_name_translation']
# sellers = dataframes['sellers']

### CREATE TABLE

```sql
CREATE TABLE product_category_name_translation (
    product_category_name VARCHAR(255) PRIMARY KEY,
    product_category_name_english VARCHAR(255)
);

CREATE TABLE geolocation (
    geolocation_zip_code_prefix VARCHAR(255) NOT NULL,
    geolocation_lat DECIMAL(12, 8) NOT NULL,
    geolocation_lng DECIMAL(12, 8) NOT NULL,
    geolocation_city VARCHAR(255) NOT NULL,
    geolocation_state VARCHAR(255) NOT NULL
);

CREATE TABLE sellers (
    seller_id VARCHAR(255) PRIMARY KEY,
    seller_zip_code_prefix VARCHAR(255),
    seller_city VARCHAR(255),
    seller_state VARCHAR(255)
);

CREATE TABLE customers (
    customer_id VARCHAR(255) PRIMARY KEY,
    customer_unique_id VARCHAR(255) NOT NULL,
    customer_zip_code_prefix VARCHAR(255),
    customer_city VARCHAR(255),
    customer_state VARCHAR(255)
);

CREATE TABLE products (
    product_id VARCHAR(255) PRIMARY KEY,
    product_category_name VARCHAR(255),
    product_name_length FLOAT,
    product_description_length FLOAT,
    product_photos_qty FLOAT,
    product_weight_g FLOAT,
    product_length_cm FLOAT,
    product_height_cm FLOAT,
    product_width_cm FLOAT,
    FOREIGN KEY (product_category_name) REFERENCES product_category_name_translation (product_category_name)
);

CREATE TABLE orders (
    order_id VARCHAR(255) PRIMARY KEY,
    customer_id VARCHAR(255) NOT NULL,
    order_status VARCHAR(255),
    order_purchase_timestamp TIMESTAMP,
    order_approved_at TIMESTAMP,
    order_delivered_carrier_date TIMESTAMP,
    order_delivered_customer_date TIMESTAMP,
    order_estimated_delivery_date TIMESTAMP,
    FOREIGN KEY (customer_id) REFERENCES customers (customer_id)
);

CREATE TABLE order_items (
    order_id VARCHAR(255) NOT NULL,
    order_item_id INTEGER NOT NULL,
    product_id VARCHAR(255) NOT NULL,
    seller_id VARCHAR(255) NOT NULL,
    shipping_limit_date TIMESTAMP,
    price FLOAT,
    freight_value FLOAT,
    PRIMARY KEY (order_id, order_item_id),
    FOREIGN KEY (order_id) REFERENCES orders (order_id),
    FOREIGN KEY (product_id) REFERENCES products (product_id),
    FOREIGN KEY (seller_id) REFERENCES sellers (seller_id)
);

CREATE TABLE order_payments (
    order_id VARCHAR(255) NOT NULL,
    payment_sequential INTEGER NOT NULL,
    payment_type VARCHAR(255),
    payment_installments INTEGER,
    payment_value FLOAT,
    PRIMARY KEY (order_id, payment_sequential),
    FOREIGN KEY (order_id) REFERENCES orders (order_id)
);

CREATE TABLE order_reviews (
    review_id VARCHAR(255) PRIMARY KEY,
    order_id VARCHAR(255) NOT NULL,
    review_score INTEGER,
    review_comment_title VARCHAR(255),
    review_comment_message VARCHAR(255),
    review_creation_date TIMESTAMP,
    review_answer_timestamp TIMESTAMP,
    FOREIGN KEY (order_id) REFERENCES orders (order_id)
);
```

### 데이터 적재

In [2]:
# 1. 테이블 NaN 값 처리

for f in clean_csv_files:
    name = os.path.splitext(f)[0]
    path = os.path.join(DATA_DIR_CLEAN, f)
    
    print(f"Loading and processing {name} from {path}...")
    df = pd.read_csv(path)

    # 모든 NaN 값을 Python의 None으로 변환하는 핵심 로직
    # np.nan (float NaN) 처리
    df = df.replace({np.nan: None})
    
    # 추가적으로, 날짜/시간 컬럼의 NaT (Not a Time)도 None으로 변환
    for col in df.columns:
        if pd.api.types.is_datetime64_any_dtype(df[col]):
            df[col] = df[col].replace({pd.NaT: None})
        # 간혹 문자열 'nan'이 데이터로 들어있는 경우도 처리 (Pandas read_csv가 간혹 문자열로 읽을 때)
        elif pd.api.types.is_object_dtype(df[col]):
            # 'nan' 문자열과 빈 문자열을 None으로 처리
            df[col] = df[col].replace({'nan': None, '': None, 'None': None}) # 'None' 문자열도 포함
            
    dataframes[name] = df
    print(f"{name} loaded and NaN processed. Shape: {df.shape}")
    print("")

Loading and processing customers from C:\Users\WD\Desktop\DW_Project\00_cleand_data\customers.csv...
customers loaded and NaN processed. Shape: (99441, 5)

Loading and processing geolocation from C:\Users\WD\Desktop\DW_Project\00_cleand_data\geolocation.csv...
geolocation loaded and NaN processed. Shape: (19901, 5)

Loading and processing orders from C:\Users\WD\Desktop\DW_Project\00_cleand_data\orders.csv...
orders loaded and NaN processed. Shape: (99438, 8)

Loading and processing order_items from C:\Users\WD\Desktop\DW_Project\00_cleand_data\order_items.csv...
order_items loaded and NaN processed. Shape: (112650, 7)

Loading and processing order_payments from C:\Users\WD\Desktop\DW_Project\00_cleand_data\order_payments.csv...
order_payments loaded and NaN processed. Shape: (103883, 5)

Loading and processing order_reviews from C:\Users\WD\Desktop\DW_Project\00_cleand_data\order_reviews.csv...
order_reviews loaded and NaN processed. Shape: (98407, 7)

Loading and processing products 

In [3]:
# 1. 외래키 순서에 따른 적재 우선순위 정의
load_order = [
    'geolocation',
    'product_category_name_translation',
    'customers',
    'sellers',
    'products',
    'orders',
    'order_items',
    'order_reviews',
    'order_payments'
]


# 2. MySQL 연결
conn = mysql.connector.connect(
    host=host,
    user=user,
    password=password,
    database=database,
    charset='utf8mb4',
    collation='utf8mb4_general_ci'
)
cursor = conn.cursor()

try:
    print("MySQL 적재를 시작합니다.")

    # 3. 순서대로 테이블을 적재
    for name in load_order:
        df = dataframes.get(name)
        if df is None:
            print(f"⚠️ 테이블 '{name}'의 DataFrame이 존재하지 않습니다. 건너뜁니다.")
            continue

        print(f"\n--- 테이블: {name} 적재 중 ---")

        # 컬럼 구성 및 SQL 생성
        columns = df.columns.tolist()
        placeholders = ", ".join(["%s"] * len(columns))
        columns_string = ", ".join(columns)

        sql_template = f"""
        INSERT INTO {name} (
            {columns_string}
        ) VALUES ({placeholders})
        """
        print(f"생성된 SQL 템플릿:\n{sql_template}")
        print(f"SQL 템플릿 컬럼 개수: {len(columns)}")
        print(f"SQL 템플릿 컬럼: {columns}")

        # 행 단위 삽입
        for idx, row_series in df.iterrows():
            row_data_tuple = tuple(row_series.values)

            if len(row_data_tuple) != len(columns):
                print(f"!!! 경고: 테이블 '{name}'의 데이터 불일치 !!!")
                print(f"  예상 컬럼 개수: {len(columns)}")
                print(f"  실제 데이터 값 개수: {len(row_data_tuple)}")
                print(f"  컬럼: {columns}")
                print(f"  데이터: {row_data_tuple}")
                continue

            try:
                cursor.execute(sql_template, row_data_tuple)
            except mysql.connector.Error as insert_err:
                print(f"!!! 삽입 오류 발생 (테이블: {name}, 행 인덱스: {idx}) !!!")
                print(f"  오류: {insert_err}")
                print(f"  삽입 시도 SQL: {sql_template}")
                print(f"  삽입 시도 데이터: {row_data_tuple}")
                conn.rollback()
                break

        conn.commit()
        print(f"테이블 '{name}' 적재 완료 및 커밋.")

except mysql.connector.Error as err:
    print(f"\n전체 적재 과정 중 오류: {err}")
    if conn.is_connected():
        conn.rollback()
        print("트랜잭션 롤백됨.")

finally:
    if conn.is_connected():
        cursor.close()
        conn.close()
        print("MySQL 연결이 닫혔습니다.")

MySQL 적재를 시작합니다.

--- 테이블: geolocation 적재 중 ---
생성된 SQL 템플릿:

        INSERT INTO geolocation (
            geolocation_state, geolocation_city, geolocation_zip_code_prefix, geolocation_lat, geolocation_lng
        ) VALUES (%s, %s, %s, %s, %s)
        
SQL 템플릿 컬럼 개수: 5
SQL 템플릿 컬럼: ['geolocation_state', 'geolocation_city', 'geolocation_zip_code_prefix', 'geolocation_lat', 'geolocation_lng']
테이블 'geolocation' 적재 완료 및 커밋.

--- 테이블: product_category_name_translation 적재 중 ---
생성된 SQL 템플릿:

        INSERT INTO product_category_name_translation (
            product_category_name, product_category_name_english
        ) VALUES (%s, %s)
        
SQL 템플릿 컬럼 개수: 2
SQL 템플릿 컬럼: ['product_category_name', 'product_category_name_english']
테이블 'product_category_name_translation' 적재 완료 및 커밋.

--- 테이블: customers 적재 중 ---
생성된 SQL 템플릿:

        INSERT INTO customers (
            customer_id, customer_unique_id, customer_zip_code_prefix, customer_city, customer_state
        ) VALUES (%s, %s, %s, %s, %s)

### 데이터 정합성 검사 

In [4]:
# MariaDB 11.3.2 버전과 호환되는 charset과 collation을 명시적으로 설정합니다.
# connect_args를 사용하여 mysql.connector에 직접 연결 인자를 전달합니다.
db_connection_str = f'mysql+mysqlconnector://{user}:{password}@{host}:{port}/{database}'
connect_args_dict = {
    'charset': 'utf8mb4',
    'collation': 'utf8mb4_general_ci' # MariaDB 11.3.2와 호환되는 콜레이션
}

try:
    # SQLAlchemy 엔진 생성 시 connect_args 전달
    engine = create_engine(db_connection_str, connect_args=connect_args_dict)
    print("SQLAlchemy 엔진이 성공적으로 생성되었습니다.")

    # SQL 쿼리 리스트 
    queries = {
        "orders_not_in_customers": """
            SELECT DISTINCT o.customer_id
            FROM orders o
            LEFT JOIN customers c ON o.customer_id = c.customer_id
            WHERE c.customer_id IS NULL;
        """,
        "products_not_in_translation": """
            SELECT DISTINCT p.product_category_name
            FROM products p
            LEFT JOIN product_category_name_translation pcnt
            ON p.product_category_name = pcnt.product_category_name
            WHERE pcnt.product_category_name IS NULL AND p.product_category_name IS NOT NULL;
        """,
        "order_items_product_mismatch": """
            SELECT DISTINCT oi.product_id
            FROM order_items oi
            LEFT JOIN products p ON oi.product_id = p.product_id
            WHERE p.product_id IS NULL;
        """,
        "order_items_seller_mismatch": """
            SELECT DISTINCT oi.seller_id
            FROM order_items oi
            LEFT JOIN sellers s ON oi.seller_id = s.seller_id
            WHERE s.seller_id IS NULL;
        """,
        "customers_zipcode_not_in_geo": """
            SELECT DISTINCT cust.customer_zip_code_prefix
            FROM customers cust
            LEFT JOIN geolocation geo ON cust.customer_zip_code_prefix = geo.geolocation_zip_code_prefix
            WHERE geo.geolocation_zip_code_prefix IS NULL AND cust.customer_zip_code_prefix IS NOT NULL;
        """,
        "sellers_zipcode_not_in_geo": """
            SELECT DISTINCT s.seller_zip_code_prefix
            FROM sellers s
            LEFT JOIN geolocation geo ON s.seller_zip_code_prefix = geo.geolocation_zip_code_prefix
            WHERE geo.geolocation_zip_code_prefix IS NULL AND s.seller_zip_code_prefix IS NOT NULL;
        """
    }

    # 각 쿼리 실행 및 결과 저장
    results = {}
    for name, query in queries.items():
        print(f"\n🔍 [{name}] 쿼리 실행 중...")
        df = pd.read_sql(query, engine)
        results[name] = df
        print(f"[{name}] 결과 미존재 수: {len(df)}")
        
        if not df.empty:
            print("첫 5개 행:")
            print(df.head())
        else:
            print("결과 없음 (미존재 데이터가 없습니다).")


except Exception as e:
    print(f"\n데이터베이스 연결 또는 쿼리 실행 중 오류 발생: {e}")

finally:
    if 'engine' in locals():
        engine.dispose()
        print("SQLAlchemy 엔진의 모든 연결이 해제되었습니다.")

SQLAlchemy 엔진이 성공적으로 생성되었습니다.

🔍 [orders_not_in_customers] 쿼리 실행 중...
[orders_not_in_customers] 결과 미존재 수: 0
결과 없음 (미존재 데이터가 없습니다).

🔍 [products_not_in_translation] 쿼리 실행 중...
[products_not_in_translation] 결과 미존재 수: 0
결과 없음 (미존재 데이터가 없습니다).

🔍 [order_items_product_mismatch] 쿼리 실행 중...
[order_items_product_mismatch] 결과 미존재 수: 0
결과 없음 (미존재 데이터가 없습니다).

🔍 [order_items_seller_mismatch] 쿼리 실행 중...
[order_items_seller_mismatch] 결과 미존재 수: 0
결과 없음 (미존재 데이터가 없습니다).

🔍 [customers_zipcode_not_in_geo] 쿼리 실행 중...
[customers_zipcode_not_in_geo] 결과 미존재 수: 0
결과 없음 (미존재 데이터가 없습니다).

🔍 [sellers_zipcode_not_in_geo] 쿼리 실행 중...
[sellers_zipcode_not_in_geo] 결과 미존재 수: 0
결과 없음 (미존재 데이터가 없습니다).
SQLAlchemy 엔진의 모든 연결이 해제되었습니다.
