<a href="https://colab.research.google.com/github/haedal-uni/analysis/blob/main/work/2025-04/Robo-Advisor/%EA%B3%84%EC%A2%8C%20%2B%20%EB%B8%8C%EB%A1%9C%EC%BB%A4%20%EA%B0%9D%EC%B2%B4%20%2B%20%ED%8F%AC%ED%8A%B8%ED%8F%B4%EB%A6%AC%EC%98%A4%20%EA%B8%B0%EB%B0%98.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

클래스 기반 구조 (Account, Broker)

포트폴리오 구성 및 리밸런싱 수행

SMA와 EMA 전략 비교

MDD, 샤프지수, 연간 수익률 등 성과 지표 계산

Plotly를 활용한 인터랙티브 시각화

해당 [링크](https://github.com/ald0met/RoboAdvisor_with_Python/blob/main/market_timing_module1.ipynb)를 보고 진행함


In [1]:
import warnings
import matplotlib
import sys
import numpy as np
import os
import pandas as pd
warnings.filterwarnings(action='ignore')
if 'google.colab' in sys.modules:
    !echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selections
    # 나눔 폰트를 설치
    !sudo apt-get -qq -y install fonts-nanum
    import matplotlib.font_manager as fm
    font_files = fm.findSystemFonts(fontpaths=['/usr/share/fonts/truetype/nanum'])
    for fpath in font_files:
        fm.fontManager.addfont(fpath)
matplotlib.rcParams['font.family'] = 'NanumGothic'
matplotlib.rcParams['axes.unicode_minus'] = False

Selecting previously unselected package fonts-nanum.
(Reading database ... 126315 files and directories currently installed.)
Preparing to unpack .../fonts-nanum_20200506-1_all.deb ...
Unpacking fonts-nanum (20200506-1) ...
Setting up fonts-nanum (20200506-1) ...
Processing triggers for fontconfig (2.13.1-4.2ubuntu5) ...


In [2]:
import pandas as pd
import numpy as np
from typing import Optional, Dict, List
import plotly.express as px

# --- 사용자 정의 클래스 ---
class Account:
    def __init__(self, initial_cash):
        self.cash = initial_cash
        self.holdings = {}  # 보유 주식 수량
        self.orders = []
        self.account_history = []
        self.portfolio_history = []

    def update_position(self, transactions):
        for trans in transactions:
            ticker = trans['ticker']
            price = trans['price']
            quantity = trans['quantity']
            cost = price * quantity

            self.cash -= cost
            self.holdings[ticker] = self.holdings.get(ticker, 0) + quantity

    def update_portfolio(self, dt, data):
        value = self.cash
        for _, row in data.iterrows():
            ticker = row['ticker']
            price = row['close']
            quantity = self.holdings.get(ticker, 0)
            value += price * quantity

        self.portfolio_history.append({'date': dt, 'value': value})

    def update_order(self):
        self.orders = []  # 매일 주문 초기화


class Broker:
    def process_order(self, dt, data, orders):
        results = []
        for order in orders:
            ticker = order['ticker']
            weight = order['weight']
            close_price = data.loc[data['ticker'] == ticker, 'close'].values
            if len(close_price) == 0:
                continue
            price = close_price[0]
            total_value = order['total_value']
            quantity = int(total_value // price)
            if quantity <= 0:
                continue
            results.append({
                'ticker': ticker,
                'price': price,
                'quantity': quantity
            })
        return results


def rebalance(dt, data, account, weights):
    total_value = account.cash
    for ticker, weight in weights.items():
        invest_amount = total_value * weight
        order = {
            'ticker': ticker,
            'weight': weight,
            'total_value': invest_amount
        }
        account.orders.append(order)

# --- 데이터 불러오기 ---
df1 = pd.read_csv('미국 철강 코일 선물 과거 데이터.csv', parse_dates=['날짜'], thousands=",")
df2 = pd.read_csv('구리 선물 과거 데이터.csv', parse_dates=['날짜'], thousands=",")
df1['ticker'] = 'STEEL'
df2['ticker'] = 'COPPER'
df = pd.concat([df1, df2], ignore_index=True)

# 전처리
df['거래량'] = df['거래량'].apply(lambda x: float(str(x).replace('K', '')) * 1000 if 'K' in str(x) else float(x))
df['변동 %'] = df['변동 %'].apply(lambda x: float(str(x).replace('%', '')) / 100)
df.sort_values('날짜', inplace=True)
df.dropna(inplace=True)
df.rename(columns={'날짜': 'date', '종가': 'close'}, inplace=True)

# --- 이동평균 계산 ---
def calculate_MA(ohlcv_data: pd.DataFrame, period: int, ma_type: str) -> pd.DataFrame:
    close_data = ohlcv_data[['date', 'ticker', 'close']].copy()
    close_data = close_data.set_index(['date', 'ticker']).unstack(level=1)['close']

    if ma_type == 'sma':
        ma = close_data.rolling(window=period).mean()
    elif ma_type == 'ema':
        ma = close_data.ewm(span=period).mean()
    else:
        raise ValueError("Invalid MA type")

    return ma

# --- 이동평균 전략 함수 ---
def get_moving_average_weights(ohlcv_data: pd.DataFrame, ma_data: pd.Series) -> Optional[Dict]:
    if ma_data.isnull().any():
        return None

    portfolio = {}
    stocks_to_buy = []

    for ticker in ohlcv_data['ticker'].unique():
        price = ohlcv_data.loc[ohlcv_data['ticker'] == ticker, 'close'].values[0]
        ma_price = ma_data.get(ticker)

        if ma_price is None:
            continue

        if price > ma_price:
            stocks_to_buy.append(ticker)
        else:
            portfolio[ticker] = 0.0

    if not stocks_to_buy:
        return portfolio

    weight = 1 / len(stocks_to_buy)
    for ticker in stocks_to_buy:
        portfolio[ticker] = weight

    return portfolio

# --- 전략 실행 ---
def simulate_moving_average(ohlcv_data: pd.DataFrame, ma_type: str, period: int) -> Account:
    account = Account(initial_cash=100000000)
    broker = Broker()
    ma = calculate_MA(ohlcv_data, period, ma_type)

    for date, group in ohlcv_data.groupby('date'):
        ohlcv = group
        ma_slice = ma.loc[date] if date in ma.index else None
        if ma_slice is None:
            continue

        weights = get_moving_average_weights(ohlcv, ma_slice)
        if weights is None:
            continue

        rebalance(date, ohlcv, account, weights)
        transactions = broker.process_order(date, ohlcv, account.orders)
        account.update_position(transactions)
        account.update_portfolio(date, ohlcv)
        account.update_order()

    return account

# --- 실행 및 결과 수익률 시각화 ---
period = 3
sma_result = simulate_moving_average(df, 'sma', period)
ema_result = simulate_moving_average(df, 'ema', period)

# 포트폴리오 히스토리 DataFrame 변환
sma_df = pd.DataFrame(sma_result.portfolio_history)
ema_df = pd.DataFrame(ema_result.portfolio_history)

sma_df['strategy'] = 'SMA'
ema_df['strategy'] = 'EMA'
result_df = pd.concat([sma_df, ema_df])

# 기준일 대비 수익률
result_df['value'] = result_df['value'].astype(float)
result_df.sort_values(['strategy', 'date'], inplace=True)
result_df['cum_return'] = result_df.groupby('strategy')['value'].transform(lambda x: x / x.iloc[0] - 1)
result_df.sort_values(['strategy', 'date'], inplace=True)

# 시각화
fig = px.line(result_df, x='date', y='cum_return', color='strategy', title='이동평균 전략 누적 수익률 비교')
fig.show()


In [3]:
import pandas as pd
import numpy as np
import plotly.express as px
from typing import Optional, Dict

# --- 사용자 정의 클래스 ---
class Account:
    def __init__(self, initial_cash):
        self.cash = initial_cash
        self.holdings = {}
        self.orders = []
        self.portfolio_history = []

    def update_position(self, transactions):
        for trans in transactions:
            ticker = trans['ticker']
            price = trans['price']
            quantity = trans['quantity']
            cost = price * quantity
            self.cash -= cost
            self.holdings[ticker] = self.holdings.get(ticker, 0) + quantity

    def update_portfolio(self, dt, data):
        value = self.cash
        for _, row in data.iterrows():
            ticker = row['ticker']
            price = row['close']
            quantity = self.holdings.get(ticker, 0)
            value += price * quantity
        self.portfolio_history.append({'date': dt, 'value': value})

    def update_order(self):
        self.orders = []

class Broker:
    def process_order(self, dt, data, orders):
        results = []
        for order in orders:
            ticker = order['ticker']
            weight = order['weight']
            close_price = data.loc[data['ticker'] == ticker, 'close'].values
            if len(close_price) == 0:
                continue
            price = close_price[0]
            total_value = order['total_value']
            quantity = int(total_value // price)
            if quantity <= 0:
                continue
            results.append({
                'ticker': ticker,
                'price': price,
                'quantity': quantity
            })
        return results

def rebalance(dt, data, account, weights):
    total_value = account.cash
    for ticker, weight in weights.items():
        invest_amount = total_value * weight
        account.orders.append({
            'ticker': ticker,
            'weight': weight,
            'total_value': invest_amount
        })

# --- 데이터 로드 및 전처리 ---
df1 = pd.read_csv('미국 철강 코일 선물 과거 데이터.csv', parse_dates=['날짜'], thousands=",")
df2 = pd.read_csv('구리 선물 과거 데이터.csv', parse_dates=['날짜'], thousands=",")
df1['ticker'] = 'STEEL'
df2['ticker'] = 'COPPER'
df = pd.concat([df1, df2], ignore_index=True)

df['거래량'] = df['거래량'].apply(lambda x: float(str(x).replace('K', '')) * 1000 if 'K' in str(x) else float(x))
df['변동 %'] = df['변동 %'].apply(lambda x: float(str(x).replace('%', '')) / 100)
df.sort_values('날짜', inplace=True)
df.dropna(inplace=True)
df.rename(columns={'날짜': 'date', '종가': 'close'}, inplace=True)

# --- 이동평균 계산 함수 ---
def calculate_MA(ohlcv_data: pd.DataFrame, period: int, ma_type: str) -> pd.DataFrame:
    close_data = ohlcv_data[['date', 'ticker', 'close']].copy()
    close_data = close_data.set_index(['date', 'ticker']).unstack(level=1)['close']
    if ma_type == 'sma':
        ma = close_data.rolling(window=period).mean()
    elif ma_type == 'ema':
        ma = close_data.ewm(span=period).mean()
    else:
        raise ValueError("Invalid MA type")
    return ma

# --- 전략별 비중 계산 함수 ---
def get_moving_average_weights(ohlcv_data: pd.DataFrame, ma_data: pd.Series) -> Optional[Dict]:
    if ma_data.isnull().any():
        return None
    portfolio = {}
    stocks_to_buy = []
    for ticker in ohlcv_data['ticker'].unique():
        price = ohlcv_data.loc[ohlcv_data['ticker'] == ticker, 'close'].values[0]
        ma_price = ma_data.get(ticker)
        if ma_price is None:
            continue
        if price > ma_price:
            stocks_to_buy.append(ticker)
        else:
            portfolio[ticker] = 0.0
    if not stocks_to_buy:
        return portfolio
    weight = 1 / len(stocks_to_buy)
    for ticker in stocks_to_buy:
        portfolio[ticker] = weight
    return portfolio

# --- 전략 실행 함수 ---
def simulate_moving_average(ohlcv_data: pd.DataFrame, ma_type: str, period: int, label: str) -> pd.DataFrame:
    account = Account(initial_cash=100000000)
    broker = Broker()
    ma = calculate_MA(ohlcv_data, period, ma_type)
    for date, group in ohlcv_data.groupby('date'):
        ohlcv = group
        if date not in ma.index:
            continue
        ma_slice = ma.loc[date]
        weights = get_moving_average_weights(ohlcv, ma_slice)
        if weights is None:
            continue
        rebalance(date, ohlcv, account, weights)
        transactions = broker.process_order(date, ohlcv, account.orders)
        account.update_position(transactions)
        account.update_portfolio(date, ohlcv)
        account.update_order()
    df_result = pd.DataFrame(account.portfolio_history)
    df_result['strategy'] = label
    return df_result

# --- 전략 실행 ---
sma_result = simulate_moving_average(df, 'sma', period=3, label='SMA')
ema_result = simulate_moving_average(df, 'ema', period=3, label='EMA')
result_df = pd.concat([sma_result, ema_result], ignore_index=True)

# --- 성과 지표 계산 ---
def calculate_metrics(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df['date'] = pd.to_datetime(df['date'])
    df.sort_values(['strategy', 'date'], inplace=True)

    # 누적 수익률
    df['cum_return'] = df.groupby('strategy')['value'].transform(lambda x: x / x.iloc[0] - 1)

    # 일간 수익률
    df['daily_return'] = df.groupby('strategy')['value'].pct_change()

    # 연간 수익률
    df['year'] = df['date'].dt.year
    annual_return = df.groupby(['strategy', 'year']).apply(lambda x: x['value'].iloc[-1] / x['value'].iloc[0] - 1).reset_index()
    annual_return.columns = ['strategy', 'year', 'annual_return']

    # 최대 낙폭 (MDD)
    def calc_mdd(x):
        cumulative = x / x.iloc[0]
        peak = cumulative.cummax()
        drawdown = cumulative / peak - 1
        return drawdown.min()

    mdd_df = df.groupby('strategy')['value'].apply(calc_mdd).reset_index()
    mdd_df.columns = ['strategy', 'MDD']

    # 샤프지수 (무위험수익률 0%)
    sharpe_df = df.groupby('strategy')['daily_return'].agg(['mean', 'std']).reset_index()
    sharpe_df['sharpe'] = sharpe_df['mean'] / sharpe_df['std'] * np.sqrt(252)
    sharpe_df = sharpe_df[['strategy', 'sharpe']]

    # 결과 병합
    summary = pd.merge(mdd_df, sharpe_df, on='strategy')
    return df, annual_return, summary

# --- 지표 계산 및 시각화 ---
result_df, annual_return_df, summary_df = calculate_metrics(result_df)

# --- 누적 수익률 시각화 ---
fig = px.line(result_df, x='date', y='cum_return', color='strategy', title='전략별 누적 수익률', width=750, height=450)
fig.show()

# --- 연간 수익률 바 차트 ---
fig2 = px.bar(annual_return_df, x='year', y='annual_return', color='strategy', barmode='group', title='연간 수익률', width=750, height=450)
fig2.show()

# --- 성과 지표 출력 ---
print("📈 성과 요약 (Sharpe, MDD):")
print(summary_df.round(4))


📈 성과 요약 (Sharpe, MDD):
  strategy     MDD  sharpe
0      EMA -1.0000  0.7641
1      SMA -0.3496  0.5242
