In [None]:
# 1. 단일 쓰레드, no batch, 종목 단위 트랜잭션

import pymysql
from openbb import obb

# MySQL 연결 설정
def connect_to_mysql():
    connection = pymysql.connect(
        host='host',
        user='username',
        password='password',
        database='database'
    )
    return connection

# Ticker 목록 가져오기
def get_all_tickers(cursor):
    sql = "SELECT id, ticker FROM stock_info"
    cursor.execute(sql)
    return cursor.fetchall()  # [(id, ticker), ...]

# StockPrices 데이터 삽입 함수
def insert_stock_price(cursor, stock_info_id, trade_date, open_price, close_price, high_price, low_price, volume):
    sql = """
    INSERT INTO stock_prices (ticker_id, trade_date, open_price, close_price, high_price, low_price, volume)
    VALUES (%s, %s, %s, %s, %s, %s, %s)
    ON DUPLICATE KEY UPDATE
    open_price = VALUES(open_price),
    close_price = VALUES(close_price),
    high_price = VALUES(high_price),
    low_price = VALUES(low_price),
    volume = VALUES(volume)
    """
    cursor.execute(sql, (stock_info_id, trade_date, open_price, close_price, high_price, low_price, volume))

# OpenBB에서 특정 날짜의 가격 데이터 가져오기
def fetch_historical_price(ticker, start_date, end_date):
    df = obb.equity.price.historical(symbol=ticker, start_date=start_date, end_date=end_date)
    return df.to_df()  # Pandas DataFrame 반환

# Main 실행
if __name__ == "__main__":
    start_date = "2024-12-01"
    end_date = "2024-12-30"

    connection = None
    try:
        # MySQL 연결
        connection = connect_to_mysql()
        cursor = connection.cursor()

        # Step 1: stock_info에서 모든 티커 가져오기
        tickers = get_all_tickers(cursor)
        print(f"Found {len(tickers)} tickers in stock_info.")

        # Step 2: 각 티커에 대해 주가 데이터 가져오기 및 삽입
        for stock_info_id, ticker in tickers:
            try:
                print(f"Processing {ticker}...")

                # OpenBB를 이용해 주가 데이터 가져오기
                historical_price_df = fetch_historical_price(ticker, start_date=start_date, end_date=end_date)
                print(historical_price_df)

                # DataFrame을 사용하여 데이터를 stock_prices 테이블에 삽입
                for index, row in historical_price_df.iterrows():
                    trade_date = index.strftime('%Y-%m-%d')  # 날짜 형식 변환
                    insert_stock_price(
                        cursor,
                        stock_info_id,
                        trade_date,
                        row['open'],      # open_price
                        row['close'],     # close_price
                        row['high'],      # high_price
                        row['low'],       # low_price
                        row['volume']     # volume
                    )

                connection.commit() # 단건 커밋
                print(f"Price data for {ticker} inserted successfully.")
            except Exception as e:
                print(f"Error processing {ticker}: {e}")
                connection.rollback()

    except pymysql.MySQLError as e:
        print(f"MySQL Error: {e}")
    finally:
        if connection:
            connection.close()


In [None]:
# 2. INSERT IGNORE(중복 데이터 무시)

import pymysql
import logging
from openbb import obb

# MySQL 연결 설정
def connect_to_mysql():
    connection = pymysql.connect(
        host='host',
        user='username',
        password='password',
        database='database'
    )
    return connection

# Ticker 목록 가져오기
def get_all_tickers(cursor):
    sql = "SELECT id, ticker FROM stock_info"
    cursor.execute(sql)
    return cursor.fetchall()  # [(id, ticker), ...]

# StockPrices 데이터 삽입 함수
def insert_stock_price(cursor, stock_info_id, trade_date, open_price, close_price, high_price, low_price, volume):
    sql = """
    INSERT IGNORE INTO stock_prices (ticker_id, trade_date, open_price, close_price, high_price, low_price, volume)
    VALUES (%s, %s, %s, %s, %s, %s, %s)
    """
    cursor.execute(sql, (stock_info_id, trade_date, open_price, close_price, high_price, low_price, volume))

# OpenBB에서 특정 날짜의 가격 데이터 가져오기
def fetch_historical_price(ticker, start_date, end_date):
    df = obb.equity.price.historical(symbol=ticker, start_date=start_date, end_date=end_date)
    return df.to_df()  # Pandas DataFrame 반환

# 로깅 설정
logging.basicConfig(
    filename="stock_price_update.log",
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

# Main 실행
if __name__ == "__main__":
    start_date = "2024-12-01"
    end_date = "2024-12-03"
    batch_size = 5  # Batch 크기 설정

    connection = None
    try:
        # MySQL 연결
        connection = connect_to_mysql()
        cursor = connection.cursor()

        # Step 1: stock_info에서 모든 티커 가져오기
        tickers = get_all_tickers(cursor)
        logging.info(f"Found {len(tickers)} tickers in stock_info.")

        # Step 2: Batch 처리로 각 티커에 대해 주가 데이터 가져오기 및 삽입
        for batch_start in range(0, len(tickers), batch_size):
            batch = tickers[batch_start:batch_start + batch_size]
            logging.info(f"Processing batch {batch_start // batch_size + 1}: {len(batch)} tickers")

            for stock_info_id, ticker in batch:
                try:
                    logging.info(f"Processing {ticker}...")

                    # OpenBB를 이용해 주가 데이터 가져오기
                    historical_price_df = fetch_historical_price(ticker, start_date=start_date, end_date=end_date)
                    logging.info(f"Fetched data for {ticker}. Rows: {len(historical_price_df)}")

                    # DataFrame을 사용하여 데이터를 stock_prices 테이블에 삽입
                    for index, row in historical_price_df.iterrows():
                        trade_date = index.strftime('%Y-%m-%d')  # 날짜 형식 변환

                        # 데이터 삽입
                        insert_stock_price(
                            cursor,
                            stock_info_id,
                            trade_date,
                            row['open'],      # open_price
                            row['close'],     # close_price
                            row['high'],      # high_price
                            row['low'],       # low_price
                            row['volume']     # volume
                        )

                    connection.commit()
                    logging.info(f"Price data for {ticker} inserted successfully.")
                except Exception as e:
                    logging.error(f"Error processing {ticker}: {e}")
                    connection.rollback()  # 해당 티커만 롤백 처리

    except pymysql.MySQLError as e:
        logging.critical(f"MySQL Error: {e}")
    finally:
        if connection:
            connection.close()
        logging.info("Database connection closed.")

In [None]:
# 3. +) 비동기처리 통한 동시 작업 & executemany 통한 네트워크 오버헤드 감소

import asyncio
import nest_asyncio
import aiomysql
import logging
from openbb import obb

nest_asyncio.apply()

# 로깅 설정 (파일에 저장)
logging.basicConfig(
    level=logging.INFO,  # 로그 레벨 설정
    format="%(asctime)s - %(levelname)s - %(message)s",  # 로그 포맷
    handlers=[
        logging.FileHandler("stock_price_update.log"),  # 로그를 파일에 저장
        #logging.StreamHandler()  # 로그를 터미널에도 출력
    ]
)

# MySQL 연결 설정
async def connect_to_mysql():
    return await aiomysql.connect(
        host='host',
        user='username',
        password='password',
        database='database',
        minsize=1,
        maxsize=40  # 동시 연결 최대 개수
    )

# Ticker 목록 가져오기
async def get_all_tickers(cursor):
    sql = "SELECT id, ticker FROM stock_info"
    await cursor.execute(sql)
    return await cursor.fetchall()  # [(id, ticker), ...]

# Batch Insert 함수
async def batch_insert_stock_prices(cursor, batch_data):
    sql = """
    INSERT IGNORE INTO stock_prices (ticker_id, trade_date, open_price, close_price, high_price, low_price, volume)
    VALUES (%s, %s, %s, %s, %s, %s, %s)
    """
    await cursor.executemany(sql, batch_data)

# Ticker 데이터 처리
async def process_ticker(pool, stock_info_id, ticker, start_date, end_date, semaphore):
    async with semaphore:
        async with pool.acquire() as conn:
            async with conn.cursor() as cursor:
                try:
                    df = obb.equity.price.historical(symbol=ticker, start_date=start_date, end_date=end_date).to_df()
                    df = df.dropna(subset=['open', 'close', 'high', 'low', 'volume'])
                    df = df[(df['open'] > 0) & (df['close'] > 0) & (df['high'] > 0) & (df['low'] > 0)]

                    batch_data = [
                        (
                            stock_info_id,
                            index.strftime('%Y-%m-%d'),
                            row['open'],
                            row['close'],
                            row['high'],
                            row['low'],
                            row['volume']
                        )
                        for index, row in df.iterrows()
                    ]

                    batch_size = 500  # 하나의 티커에 대한 데이터들을 배치 단위로 처리
                    for i in range(0, len(batch_data), batch_size):
                        batch = batch_data[i:i + batch_size]  # 배치 데이터 슬라이싱
                        await batch_insert_stock_prices(cursor, batch)  # 배치 단위 삽입

                    await conn.commit()  # 티커 단위 커밋
                    logging.info(f"Processed {ticker}: {len(batch_data)} rows inserted.")
                except Exception as e:
                    logging.error(f"Error processing {ticker}: {e}")
                    await conn.rollback()

# 메인 함수
async def main():
    start_date = "2022-01-01"
    end_date = "2022-12-31"
    pool = await aiomysql.create_pool(
        host='host',
        user='username',
        password='password',
        database='database',
        minsize=1,
        maxsize=40  # MySQL 연결 풀
    )

    semaphore = asyncio.Semaphore(60)  # 병렬 작업 제한

    async with pool.acquire() as conn:
        async with conn.cursor() as cursor:
            tickers = await get_all_tickers(cursor)

    tasks = [process_ticker(pool, stock_info_id, ticker, start_date, end_date, semaphore) for stock_info_id, ticker in tickers]
    await asyncio.gather(*tasks)  # 비동기로 모든 티커 처리

    pool.close()
    await pool.wait_closed()

# 실행
await main()

In [2]:
# 4. bulk insert 

import pandas as pd
import logging
import time
from sqlalchemy import text, create_engine, event
from openbb import obb
import functools

# 로깅 설정
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[logging.FileHandler("query_log.log")],
)

# [중요] INSERT 쿼리 횟수를 기록할 전역 변수
insert_query_count = 0

# ------------------------------------------------------------------------------
# 1) Decorator: 함수 실행 시간을 측정해서 로그를 남기는 데코레이터
# ------------------------------------------------------------------------------
def measure_time(func):
    """함수 실행 시간을 로깅하는 데코레이터."""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        elapsed_time = time.time() - start_time
        logging.info(f"[TIMELOG] Function '{func.__name__}' took {elapsed_time:.4f} seconds.")
        return result
    return wrapper

# ------------------------------------------------------------------------------
# 2) MySQL 연결 엔진 생성. 이 때 이벤트 리스너 등록 (before/after_cursor_execute)
# ------------------------------------------------------------------------------
def create_engine_for_mysql():
    """MySQL 연결용 엔진 생성. 이벤트 리스너 등록"""
    DATABASE_CONFIG = {
        host='host',
        user='username',
        password='password',
        database='database',
    }
    connection_string = (
        f"mysql+pymysql://{DATABASE_CONFIG['user']}:{DATABASE_CONFIG['password']}@"
        f"{DATABASE_CONFIG['host']}/{DATABASE_CONFIG['database']}"
    )
    engine = create_engine(connection_string)

    # -- before_cursor_execute: 쿼리 실행 직전에 호출
    @event.listens_for(engine, "before_cursor_execute", retval=True)
    def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
        """
        실제로 실행될 쿼리를 로그로 남기고, INSERT 쿼리 횟수를 전역 변수에 합산.
        또한 시간을 재기 위해 context._query_start_time 을 설정.
        """
        # 쿼리와 파라미터를 로그에 남긴다
        logging.info(f"[SQL START] statement: {statement}")
        logging.info(f"[SQL START] parameters: {parameters}")

        # INSERT 쿼리 카운트 증가
        global insert_query_count
        if "INSERT" in statement.strip().upper():
            insert_query_count += 1

        # 쿼리 시작 시간 기록
        context._query_start_time = time.time()

        # retval=True 이므로, statement, parameters를 그대로 반환해야 한다
        return statement, parameters

    # -- after_cursor_execute: 쿼리 실행 직후에 호출
    @event.listens_for(engine, "after_cursor_execute")
    def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
        """
        쿼리 실행 소요 시간을 측정하여 로그로 남긴다.
        """
        elapsed = time.time() - context._query_start_time
        logging.info(f"[SQL END] Query took {elapsed:.4f} seconds.")

    return engine

# ------------------------------------------------------------------------------
# 3) DB 유틸 함수들 (Decorator로 실행 시간 측정)
# ------------------------------------------------------------------------------
@measure_time
def test_db_connection(engine):
    """DB 연결이 정상적으로 되는지 SELECT 1로 테스트"""
    try:
        with engine.connect() as conn:
            result = conn.execute(text("SELECT 1"))
            row = result.fetchone()
            if row and row[0] == 1:
                logging.info("DB connection success: SELECT 1 returned 1.")
            else:
                logging.warning(f"DB connection unexpected result: {row}")
    except Exception as e:
        logging.error(f"DB connection failed: {e}")
        raise

@measure_time
def show_tables(engine):
    """DB 내 테이블 목록 확인"""
    try:
        with engine.connect() as conn:
            result = conn.execute(text("SHOW TABLES"))
            tables = [row[0] for row in result.fetchall()]
            logging.info(f"Tables in DB: {tables}")
    except Exception as e:
        logging.error(f"Could not retrieve table list: {e}")

@measure_time
def count_inserted_rows(engine, table_name):
    """특정 테이블에 몇 건이 들어있는지 COUNT(*)로 확인"""
    try:
        with engine.connect() as conn:
            result = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}"))
            count_val = result.fetchone()[0]
            logging.info(f"'{table_name}' has {count_val} rows.")
    except Exception as e:
        logging.error(f"Error counting rows from {table_name}: {e}")

@measure_time
def get_all_tickers(engine):
    """stock_info 테이블에서 (id, ticker) 목록을 조회"""
    query = "SELECT id, ticker FROM stock_info"
    with engine.connect() as conn:
        tickers_df = pd.read_sql(query, conn)
    return tickers_df

@measure_time
def process_ticker(ticker_id, ticker, start_date, end_date):
    """OpenBB API로 시세 데이터 가져와 스키마 맞춰 전처리 후 DataFrame 반환"""
    try:
        df = obb.equity.price.historical(symbol=ticker, start_date=start_date, end_date=end_date).to_df()
        df = df.dropna(subset=['open', 'close', 'high', 'low', 'volume'])
        df = df[(df['open'] > 0) & (df['close'] > 0) & (df['high'] > 0) & (df['low'] > 0)]
        if not isinstance(df.index, pd.DatetimeIndex):
            df.index = pd.to_datetime(df.index)

        # 컬럼명 변경 및 추가
        df['ticker_id'] = ticker_id
        df['trade_date'] = df.index.strftime('%Y-%m-%d')
        df = df.rename(columns={
            'open': 'open_price',
            'close': 'close_price',
            'high': 'high_price',
            'low': 'low_price',
            'volume': 'volume'
        })
        return df[['ticker_id', 'trade_date', 'open_price', 'close_price', 'high_price', 'low_price', 'volume']]
    except Exception as e:
        logging.error(f"Error processing {ticker}: {e}")
        return pd.DataFrame()

@measure_time
def bulk_insert_with_pandas(engine, df, table_name, batch_size=1000):
    """Pandas to_sql로 배치 INSERT. 'multi' & chunksize=..."""
    if df.empty:
        logging.info(f"No data to insert into table '{table_name}'.")
        return
    try:
        with engine.connect() as conn:
            df.to_sql(
                name=table_name,
                con=conn,
                if_exists="append",
                index=False,
                method="multi",         # multi insert
                chunksize=batch_size,
            )
        logging.info(f"Finished inserting {len(df)} rows into '{table_name}'.")
    except Exception as e:
        logging.error(f"Error inserting data into '{table_name}': {e}")

# ------------------------------------------------------------------------------
# 4) 메인 함수
# ------------------------------------------------------------------------------
def main():
    overall_start_time = time.time()

    # 1. 날짜 범위 설정
    start_date = "2024-02-01"
    end_date   = "2024-02-28"

    # 2. 엔진 생성 (+ 이벤트 리스너 등록)
    engine = create_engine_for_mysql()

    # 3. DB 연결 테스트
    test_db_connection(engine)

    # 4. DB 테이블 목록 확인
    show_tables(engine)

    # 5. 모든 티커의 데이터를 모으기
    ticker_start = time.time()
    tickers_df = get_all_tickers(engine)
    all_data = []
    for _, row in tickers_df.iterrows():
        ticker_data = process_ticker(row["id"], row["ticker"], start_date, end_date)
        all_data.append(ticker_data)
    final_df = pd.concat(all_data, ignore_index=True)
    logging.info(f"[DEBUG] final_df dtypes:\n{final_df.dtypes}")
    logging.info("\n" + final_df.head(10).to_string())
    logging.info(f"[TIMELOG] All ticker processing took {time.time() - ticker_start:.4f} seconds.")

    # 6. bulk insert
    insert_start = time.time()
    bulk_insert_with_pandas(engine, final_df, "stock_price", batch_size=1000)
    logging.info(f"[TIMELOG] Bulk insert took {time.time() - insert_start:.4f} seconds.")

    # 7. 테이블 row 수 확인
    count_inserted_rows(engine, "stock_price")

    # 8. 실행 시간 계산
    elapsed_time = time.time() - overall_start_time
    logging.info(f"[TIMELOG] Total execution time: {elapsed_time:.2f} seconds")

    # 9. INSERT 쿼리 개수 로그
    global insert_query_count
    logging.info(f"[TIMELOG] Total INSERT queries executed: {insert_query_count}")

if __name__ == "__main__":
    main()


In [None]:
# 5. bulk insert + 멀티스레드(concurrent.futures)
import pandas as pd
import logging
import time
from sqlalchemy import text, create_engine, event
from openbb import obb
import functools
import concurrent.futures  # 추가

# 로깅 설정
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[logging.FileHandler("query_log.log")],
)

insert_query_count = 0

def measure_time(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        elapsed_time = time.time() - start_time
        logging.info(f"[TIMELOG] Function '{func.__name__}' took {elapsed_time:.4f} seconds.")
        return result
    return wrapper

def create_engine_for_mysql():
    DATABASE_CONFIG = {
        host='host',
        user='username',
        password='password',
        database='database',
    }
    connection_string = (
        f"mysql+pymysql://{DATABASE_CONFIG['user']}:{DATABASE_CONFIG['password']}@"
        f"{DATABASE_CONFIG['host']}/{DATABASE_CONFIG['database']}"
    )
    engine = create_engine(connection_string)

    @event.listens_for(engine, "before_cursor_execute", retval=True)
    def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
        logging.info(f"[SQL START] statement: {statement}")
        logging.info(f"[SQL START] parameters: {parameters}")

        global insert_query_count
        if "INSERT" in statement.strip().upper():
            insert_query_count += 1

        context._query_start_time = time.time()
        return statement, parameters

    @event.listens_for(engine, "after_cursor_execute")
    def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
        elapsed = time.time() - context._query_start_time
        logging.info(f"[SQL END] Query took {elapsed:.4f} seconds.")

    return engine

@measure_time
def test_db_connection(engine):
    try:
        with engine.connect() as conn:
            result = conn.execute(text("SELECT 1"))
            row = result.fetchone()
            if row and row[0] == 1:
                logging.info("DB connection success: SELECT 1 returned 1.")
            else:
                logging.warning(f"DB connection unexpected result: {row}")
    except Exception as e:
        logging.error(f"DB connection failed: {e}")
        raise

@measure_time
def show_tables(engine):
    try:
        with engine.connect() as conn:
            result = conn.execute(text("SHOW TABLES"))
            tables = [row[0] for row in result.fetchall()]
            logging.info(f"Tables in DB: {tables}")
    except Exception as e:
        logging.error(f"Could not retrieve table list: {e}")

@measure_time
def count_inserted_rows(engine, table_name):
    try:
        with engine.connect() as conn:
            result = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}"))
            count_val = result.fetchone()[0]
            logging.info(f"'{table_name}' has {count_val} rows.")
    except Exception as e:
        logging.error(f"Error counting rows from {table_name}: {e}")

@measure_time
def get_all_tickers(engine):
    query = "SELECT id, ticker FROM stock_info"
    with engine.connect() as conn:
        tickers_df = pd.read_sql(query, conn)
    return tickers_df

@measure_time
def process_ticker(ticker_id, ticker, start_date, end_date):
    """단일 티커를 처리 (비동기로 병렬 호출 가능)"""
    try:
        df = obb.equity.price.historical(symbol=ticker, start_date=start_date, end_date=end_date).to_df()
        df = df.dropna(subset=['open', 'close', 'high', 'low', 'volume'])
        df = df[(df['open'] > 0) & (df['close'] > 0) & (df['high'] > 0) & (df['low'] > 0)]
        if not isinstance(df.index, pd.DatetimeIndex):
            df.index = pd.to_datetime(df.index)

        df['ticker_id'] = ticker_id
        df['trade_date'] = df.index.strftime('%Y-%m-%d')
        df = df.rename(columns={
            'open': 'open_price',
            'close': 'close_price',
            'high': 'high_price',
            'low': 'low_price',
            'volume': 'volume'
        })
        return df[['ticker_id', 'trade_date', 'open_price', 'close_price', 'high_price', 'low_price', 'volume']]
    except Exception as e:
        logging.error(f"Error processing {ticker}: {e}")
        return pd.DataFrame()

@measure_time
def bulk_insert_with_pandas(engine, df, table_name, batch_size=50000):
    if df.empty:
        logging.info(f"No data to insert into table '{table_name}'.")
        return
    try:
        with engine.connect() as conn:
            df.to_sql(
                name=table_name,
                con=conn,
                if_exists="append",
                index=False,
                method="multi",
                chunksize=batch_size,
            )
        logging.info(f"Finished inserting {len(df)} rows into '{table_name}'.")
    except Exception as e:
        logging.error(f"Error inserting data into '{table_name}': {e}")

def main():
    overall_start_time = time.time()

    start_date = "2023-01-01"
    end_date   = "2023-12-31"

    engine = create_engine_for_mysql()

    test_db_connection(engine)
    show_tables(engine)

    tickers_df = get_all_tickers(engine)
    logging.info(f"Total tickers: {len(tickers_df)}")

    # ---------------------------
    # (1) 멀티스레드로 티커 데이터 가져오기
    # ---------------------------
    ticker_start_time = time.time()

    all_data = []
    # ThreadPoolExecutor의 기본 max_workers는 CPU 코어 수에 따라 결정되지만
    # I/O가 큰 작업이라면 더 늘려도 됩니다 (ex: max_workers=10, 20).
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        future_to_ticker = {
            executor.submit(process_ticker, row["id"], row["ticker"], start_date, end_date): row["ticker"]
            for _, row in tickers_df.iterrows()
        }
        # as_completed는 future가 완료되는 대로 순회
        for future in concurrent.futures.as_completed(future_to_ticker):
            ticker_symbol = future_to_ticker[future]
            try:
                result_df = future.result()
                all_data.append(result_df)
            except Exception as exc:
                logging.error(f"Ticker {ticker_symbol} generated an exception: {exc}")

    final_df = pd.concat(all_data, ignore_index=True) if all_data else pd.DataFrame()
    processing_time = time.time() - ticker_start_time
    logging.info(f"[TIMELOG] All ticker processing took {processing_time:.2f} seconds.")
    logging.info(f"final_df shape: {final_df.shape}")
    logging.info("\n" + final_df.head(10).to_string())

    # ---------------------------
    # (2) DB Insert
    # ---------------------------
    insert_start_time = time.time()
    bulk_insert_with_pandas(engine, final_df, "stock_price", batch_size=50000)
    logging.info(f"[TIMELOG] Bulk insert took {time.time() - insert_start_time:.2f} seconds.")

    # ---------------------------
    # (3) COUNT
    # ---------------------------
    count_inserted_rows(engine, "stock_price")

    # ---------------------------
    # (4) 전체 소요 시간
    # ---------------------------
    elapsed_time = time.time() - overall_start_time
    logging.info(f"[TIMELOG] Total execution time: {elapsed_time:.2f} seconds")

    global insert_query_count
    logging.info(f"[TIMELOG] Total INSERT queries executed: {insert_query_count}")

if __name__ == "__main__":
    main()
