In [None]:
!pip install ta
!pip install pygad
!pip install backtesting

In [None]:
!sudo apt-get install -y fonts-nanum
!sudo fc-cache -fv
!rm ~/.cache/matplotlib -rf

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
import pygad
from backtesting import Backtest, Strategy
from datetime import timedelta
import matplotlib.pyplot as plt

# ✅ Smoothed price
def smooth_price(df, p=5, t=5):
    df = df.copy()

    # 두번의 지수이동평균을 순차적으로 적용하여 종가 데이터 스무딩
    df['close'] = df['close'].ewm(span=p).mean().ewm(span=t).mean()
    return df

# ✅ GDC 기반 전략 (매도 시 1% 수수료)
class GDCStrategy(Strategy):
    def init(self):
        self.signal = self.data.signal
        self.last_signal = None

    def next(self):
        # ==================== 마지막날에 남은 주식 모두 매도하는 로직 추가 ====================
        if self.I == len(self.data) - 1:
              if self.position:
                  self.position.close()
              return

        current_signal = self.signal[-1]
        price = self.data.Close[-1]

        # ==================== 매수, 매도 시그널 바뀜 수정(-1 매수, 1 매도) ====================
        if self.position:
            if current_signal == -1 and self.position.is_short:
                self.position.close()
                self.buy()

            elif current_signal == 1 and self.position.is_long:
                self.position.close(size=self.position.size, reduce_only=False)
                self.sell()
        else:
            if current_signal == -1:
                self.buy()
            elif current_signal == 1:
                self.sell()

# ✅ SMA, EMA, MACD, RSI, Stochastic, ROC만 사용
def compute_indicators(df, params):
    df = df.copy()
    df = smooth_price(df)

    # SMA
    df['SMA'] = df['close'].rolling(int(params[0])).mean() - df['close'].rolling(int(params[1])).mean()

    # EMA
    df['EMA'] = df['close'].ewm(span=int(params[2]), adjust=False).mean() - df['close'].ewm(span=int(params[3]), adjust=False).mean()

    # MACD
    macd_fast = df['close'].ewm(span=int(params[4]), adjust=False).mean()
    macd_slow = df['close'].ewm(span=int(params[5]), adjust=False).mean()
    df['MACD'] = macd_fast - macd_slow
    df['MACD_Signal'] = df['MACD'].ewm(span=9, adjust=False).mean()  # signal line

    # RSI
    delta = df['close'].diff()
    up = delta.clip(lower=0)
    down = -delta.clip(upper=0)
    ma_up = up.rolling(int(params[6])).mean()
    ma_down = down.rolling(int(params[6])).mean()
    rs = ma_up / (ma_down + 1e-10)
    df['RSI'] = 100 - (100 / (1 + rs))

    # Stochastic: %K, %D
    low_min = df['low'].rolling(int(params[7])).min()
    high_max = df['high'].rolling(int(params[7])).max()
    df['Stoch_K'] = (df['close'] - low_min) / (high_max - low_min + 1e-10) * 100
    df['Stoch_D'] = df['Stoch_K'].rolling(3).mean()

    # ROC
    df['ROC'] = df['close'].pct_change(periods=int(params[8])) * 100

    # 수정 1
    df = df.dropna().copy()
    return df
    # return df.dropna()


# ✅ 신호 생성 함수
def generate_signal(df):
    signals = []
    for i in range(len(df)):
        row = df.iloc[i]
        score = 0

        # SMA: 단기 > 장기 (매수) → -1
        score += -1 if row['SMA'] > 0 else 1 if row['SMA'] < 0 else 0

        # EMA: 단기 > 장기 (매수) → -1
        score += -1 if row['EMA'] > 0 else 1 if row['EMA'] < 0 else 0

        # MACD: MACD선 > Signal선 (매수) → -1
        score += -1 if row['MACD'] > row['MACD_Signal'] else 1 if row['MACD'] < row['MACD_Signal'] else 0

        # RSI: 30 이하 과매도 (매수) → -1 / 70 이상 과매수 (매도) → +1
        score += -1 if row['RSI'] < 30 else 1 if row['RSI'] > 70 else 0

        # Stochastic: %K > %D (매수) → -1
        score += -1 if row['Stoch_K'] > row['Stoch_D'] else 1 if row['Stoch_K'] < row['Stoch_D'] else 0

        # ROC: > 0 (상승 추세, 매수) → -1
        score += -1 if row['ROC'] > 0 else 1 if row['ROC'] < 0 else 0

        # signals.append(1 if score >= 3 else -1 if score <= -3 else 0)
        # signals.append(1 if score >= 2 else -1 if score <= -2 else 0) # 시그널 기준 완화 1
        signals.append(1 if score >= 1 else -1 if score <= -1 else 0) # 시그널 기준 완화 2


    return np.array(signals)

def safe_commission(trade, order):
    try:
        return 0.01 * trade.value if hasattr(trade, 'is_short') and (trade.is_short or trade.side == 'sell') else 0
    except Exception:
        return 0


# def safe_commission(trade, order):
#     # 매도 거래이고, 포지션을 새로 여는 거래가 아닐 경우 (즉, 청산 목적이면)
#     if trade.side == 'sell' and not trade.is_entry:
#         return 0.01 * trade.value
#     return 0


# ✅ 커스텀 수수료 적용 (매도 시에만 1% 적용)
def backtest(df, signal):
    df_bt = df.copy()
    df_bt['signal'] = signal
    df_bt = df_bt[['open', 'high', 'low', 'close', 'volume', 'signal']].dropna()
    df_bt.rename(columns={'open': 'Open', 'high': 'High', 'low': 'Low', 'close': 'Close', 'volume': 'Volume'}, inplace=True)
    df_bt.index.name = 'Date'
    bt = Backtest(
    df_bt,
    GDCStrategy,
    cash=100_000,
    commission=safe_commission,
    exclusive_orders=True
    )

    stats = bt.run()
    return stats['Return [%]'] / 100, stats['Max. Drawdown [%]'] / 100, stats['Sharpe Ratio']

# ✅ 데이터 로드
df_all = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/stock/tictocstock.csv", parse_dates=['Date'])
df_all.columns = df_all.columns.str.strip().str.lower()
df_all = df_all[['date', 'ticker', 'open', 'high', 'low', 'close', 'volume']]
df_all.sort_values(['ticker', 'date'], inplace=True)

# ✅ TJX 데이터 불러오기
df = df_all[df_all['ticker'] == 'TJX'].set_index('date').copy().ffill()

# ✅ GA 피트니스 함수
def fitness_func(ga, solution, idx):
    try:
        params = [max(2, int(round(p))) for p in solution]
        if not (params[0] < params[1] and params[2] < params[3] and params[4] < params[5]):
            return -np.inf
        df_ind = compute_indicators(df, params)
        signal = generate_signal(df_ind)
        profit, mdd, sharpe = backtest(df_ind, signal)
        return 0.8 * profit + 0.2 * (1 / mdd)
    except:
        return -np.inf

# ✅ 유전자 범위
gene_space = [
    {'low': 5, 'high': 50},    # SMA short
    {'low': 10, 'high': 60},   # SMA long
    {'low': 5, 'high': 50},    # EMA short
    {'low': 10, 'high': 60},   # EMA long
    {'low': 5, 'high': 25},    # MACD fast
    {'low': 26, 'high': 60},   # MACD slow
    {'low': 5, 'high': 30},    # RSI
    {'low': 5, 'high': 40},    # Stoch
    {'low': 5, 'high': 30},    # ROC
]

# ✅ GA 실행
ga = pygad.GA(
    num_generations=50,
    num_parents_mating=10,
    fitness_func=fitness_func,
    sol_per_pop=20,
    num_genes=9,
    gene_space=gene_space,
    gene_type=float,
    mutation_type="random",
    mutation_percent_genes=20
)

ga.run()

# ✅ best_params 추출
best_params = [int(round(g)) for g in ga.best_solution()[0]]
print("✅ 최적 파라미터:", best_params)

# ✅ 슬라이딩 윈도우 백테스트 함수 추가 : 기존 백테스트 함수 하나를 여러 번 호출해서 결과를 누적하는 구조임
def run_quarterly_backtests(df, params, window_months=3, stride_months=3):
    results = []
    start_date = df.index.min()
    end_date = df.index.max()

    current_start = pd.to_datetime(start_date)
    current_end = current_start + pd.DateOffset(months=window_months)

    while current_end <= end_date:
        df_slice = df.loc[current_start:current_end].copy()

        # 수정 1
        if len(df_slice) < max(params):
            print(f"❌ [{current_start.date()} ~ {current_end.date()}] 데이터 너무 짧음 ({len(df_slice)} rows) → 스킵")
            current_start += pd.DateOffset(months=stride_months)
            current_end = current_start + pd.DateOffset(months=window_months)
            continue

        try:
            df_ind = compute_indicators(df_slice, params)

            if df_ind.empty:
                print(f"📭 [{current_start.date()} ~ {current_end.date()}] 지표 계산 결과 없음 → 스킵")
                raise ValueError("Empty indicators")

            signal = generate_signal(df_ind)

            values, counts = np.unique(signal, return_counts=True)
            print(f"📊 [{current_start.date()} ~ {current_end.date()}] 시그널 분포:", dict(zip(values, counts)))

            if np.count_nonzero(signal) < 5:
                print(f"🚫 거래 신호 부족 → 스킵")
                raise ValueError("Too few signals")

            profit, mdd, sharpe = backtest(df_ind, signal)

            # results.append({
            #     'start': current_start,
            #     'end': current_end,
            #     'profit': profit,
            #     'mdd': mdd,
            #     'sharpe': sharpe
            # })

            results.append({ # 백분율 표시
                'start': current_start,
                'end': current_end,
                'profit': round(profit * 100, 2),
                'mdd': round(mdd * 100, 2),
                'sharpe': round(sharpe, 3)
            })

        except Exception as e:
            print(f"⚠️ 백테스트 오류: {current_start}~{current_end} 구간 → {e}")

        current_start += pd.DateOffset(months=stride_months)
        current_end = current_start + pd.DateOffset(months=window_months)

    return pd.DataFrame(results)


# ✅ 실행 예시
result_df = run_quarterly_backtests(df, best_params)
print(result_df)
# print(result_df[['start', 'end', 'profit', 'mdd', 'sharpe']])

 # 위 코드 결과 시각화

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# ✅ 1. 각 분기별 매수/매도 시점 차트 그리기
def plot_signals_by_quarter(df, signal, start, end):
    plt.figure(figsize=(12, 4))
    plt.plot(df['close'], label='Close Price', alpha=0.7)

    buy_signals = df[signal == 1]
    sell_signals = df[signal == -1]

    plt.scatter(buy_signals.index, buy_signals['close'], marker='^', color='green', label='Buy Signal', s=80)
    plt.scatter(sell_signals.index, sell_signals['close'], marker='v', color='red', label='Sell Signal', s=80)

    plt.title(f'Buy/Sell Signals from {start.date()} to {end.date()}')
    plt.legend()
    plt.grid(True)
    plt.show()

# ✅ 2. 유전자 알고리즘 피트니스 그래프
def plot_ga_fitness(ga_instance):
    plt.figure(figsize=(8, 4))
    plt.plot(ga_instance.best_solutions_fitness, label='Best Fitness per Generation', color='blue')
    plt.title('Genetic Algorithm Fitness Over Generations')
    plt.xlabel('Generation')
    plt.ylabel('Fitness')
    plt.grid(True)
    plt.legend()
    plt.show()

# ✅ 3. 최적 파라미터 보기 좋게 출력
def explain_best_params(params):
    names = [
        "SMA Short Window",  # params[0]
        "SMA Long Window",   # params[1]
        "EMA Short Window",  # params[2]
        "EMA Long Window",   # params[3]
        "MACD Fast",         # params[4]
        "MACD Slow",         # params[5]
        "RSI Period",        # params[6]
        "Stochastic Period", # params[7]
        "ROC Period",        # params[8]
    ]
    print("✅ 최적 파라미터 해석")
    for name, val in zip(names, params):
        print(f" - {name:20}: {val}")

# ✅ 예시 실행
# 설명 출력
explain_best_params([18, 22, 13, 19, 20, 50, 20, 31, 14])

# 피트니스 시각화
plot_ga_fitness(ga)

# 각 분기별 시그널 시각화
for i, row in result_df.iterrows():
    df_quarter = df.loc[row['start']:row['end']].copy()
    df_quarter = compute_indicators(df_quarter, [11, 27, 9, 45, 13, 42, 21, 31, 15])
    signal = generate_signal(df_quarter)
    plot_signals_by_quarter(df_quarter, signal, row['start'], row['end'])


# 버전 2: 슬라이딩 윈도우 미적용 코드 (6개월치 백테스팅)

In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
import pygad
from backtesting import Backtest, Strategy
from datetime import timedelta
import matplotlib.pyplot as plt

# ✅ Smoothed price
def smooth_price(df, p=5, t=5):
    df = df.copy()
    df['close'] = df['close'].ewm(span=p).mean().ewm(span=t).mean()
    return df

# ✅ GDC 기반 전략 (매도 시 1% 수수료)
class GDCStrategy(Strategy):
    def init(self):
        self.signal = self.data.signal
        self.last_signal = None

    def next(self):
        # ==================== 마지막날에 남은 주식 모두 매도하는 로직 추가 ====================
        if self.I == len(self.data) - 1:
              if self.position:
                  self.position.close()
              return

        current_signal = self.signal[-1]
        price = self.data.Close[-1]

        # ==================== 매수, 매도 시그널 바뀜 수정(-1 매수, 1 매도) ====================
        if self.position:
            if current_signal == -1 and self.position.is_short:
                self.position.close()
                self.buy()

            elif current_signal == 1 and self.position.is_long:
                self.position.close(size=self.position.size, reduce_only=False)
                self.sell()
        else:
            if current_signal == -1:
                self.buy()
            elif current_signal == 1:
                self.sell()

# ✅ SMA, EMA, MACD, RSI, Stochastic, ROC만 사용
def compute_indicators(df, params):
    df = df.copy()
    df = smooth_price(df)

    # SMA
    df['SMA'] = df['close'].rolling(int(params[0])).mean() - df['close'].rolling(int(params[1])).mean()

    # EMA
    df['EMA'] = df['close'].ewm(span=int(params[2]), adjust=False).mean() - df['close'].ewm(span=int(params[3]), adjust=False).mean()

    # MACD
    macd_fast = df['close'].ewm(span=int(params[4]), adjust=False).mean()
    macd_slow = df['close'].ewm(span=int(params[5]), adjust=False).mean()
    df['MACD'] = macd_fast - macd_slow
    df['MACD_Signal'] = df['MACD'].ewm(span=9, adjust=False).mean()  # signal line

    # RSI
    delta = df['close'].diff()
    up = delta.clip(lower=0)
    down = -delta.clip(upper=0)
    ma_up = up.rolling(int(params[6])).mean()
    ma_down = down.rolling(int(params[6])).mean()
    rs = ma_up / (ma_down + 1e-10)
    df['RSI'] = 100 - (100 / (1 + rs))

    # Stochastic: %K, %D
    low_min = df['low'].rolling(int(params[7])).min()
    high_max = df['high'].rolling(int(params[7])).max()
    df['Stoch_K'] = (df['close'] - low_min) / (high_max - low_min + 1e-10) * 100
    df['Stoch_D'] = df['Stoch_K'].rolling(3).mean()

    # ROC
    df['ROC'] = df['close'].pct_change(periods=int(params[8])) * 100

    # 수정 1
    df = df.dropna().copy()
    return df
    # return df.dropna()


# ✅ 신호 생성 함수
def generate_signal(df):
    signals = []
    for i in range(len(df)):
        row = df.iloc[i]
        score = 0

        # SMA: 단기 > 장기 (매수) → -1
        score += -1 if row['SMA'] > 0 else 1 if row['SMA'] < 0 else 0

        # EMA: 단기 > 장기 (매수) → -1
        score += -1 if row['EMA'] > 0 else 1 if row['EMA'] < 0 else 0

        # MACD: MACD선 > Signal선 (매수) → -1
        score += -1 if row['MACD'] > row['MACD_Signal'] else 1 if row['MACD'] < row['MACD_Signal'] else 0

        # RSI: 30 이하 과매도 (매수) → -1 / 70 이상 과매수 (매도) → +1
        score += -1 if row['RSI'] < 30 else 1 if row['RSI'] > 70 else 0

        # Stochastic: %K > %D (매수) → -1
        score += -1 if row['Stoch_K'] > row['Stoch_D'] else 1 if row['Stoch_K'] < row['Stoch_D'] else 0

        # ROC: > 0 (상승 추세, 매수) → -1
        score += -1 if row['ROC'] > 0 else 1 if row['ROC'] < 0 else 0

        # signals.append(1 if score >= 3 else -1 if score <= -3 else 0)
        # signals.append(1 if score >= 2 else -1 if score <= -2 else 0) # 시그널 기준 완화 1
        signals.append(1 if score >= 1 else -1 if score <= -1 else 0) # 시그널 기준 완화 2


    return np.array(signals)

def safe_commission(trade, order):
    try:
        return 0.01 * trade.value if hasattr(trade, 'is_short') and (trade.is_short or trade.side == 'sell') else 0
    except Exception:
        return 0


# def safe_commission(trade, order):
#     # 매도 거래이고, 포지션을 새로 여는 거래가 아닐 경우 (즉, 청산 목적이면)
#     if trade.side == 'sell' and not trade.is_entry:
#         return 0.01 * trade.value
#     return 0


# ✅ 커스텀 수수료 적용 (매도 시에만 1% 적용)
def backtest(df, signal):
    df_bt = df.copy()
    df_bt['signal'] = signal
    df_bt = df_bt[['open', 'high', 'low', 'close', 'volume', 'signal']].dropna()
    df_bt.rename(columns={'open': 'Open', 'high': 'High', 'low': 'Low', 'close': 'Close', 'volume': 'Volume'}, inplace=True)
    df_bt.index.name = 'Date'
    bt = Backtest(
    df_bt,
    GDCStrategy,
    cash=100_000,
    commission=safe_commission,
    exclusive_orders=True
    )

    stats = bt.run()
    return stats['Return [%]'] / 100, stats['Max. Drawdown [%]'] / 100, stats['Sharpe Ratio']


# =====================================================================================================================================
# 추가
# ✅ 데이터 로드
df_all = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/stock/tictocstock.csv", parse_dates=['Date'])
df_all.columns = df_all.columns.str.strip().str.lower()
df_all = df_all[['date', 'ticker', 'open', 'high', 'low', 'close', 'volume']]
df_all.sort_values(['ticker', 'date'], inplace=True)

# ✅ TJX 데이터 불러오기
df = df_all[df_all['ticker'] == 'TJX'].set_index('date').copy().ffill()

# ✅ 전체 기간 확인
start_date = df.index.min()   # 2022-05-13
end_date = df.index.max()     # 2025-05-12

# ✅ 마지막 6개월을 테스트용으로 분리
split_date = end_date - pd.DateOffset(months=6)
df_train = df[df.index < split_date].copy()
df_test = df[df.index >= split_date].copy()

# ✅ 학습용 날짜 범위 출력
print("📘 학습 데이터 기간:", df_train.index.min().date(), "~", df_train.index.max().date())

# ✅ 백테스트용 날짜 범위 출력
print("📙 백테스트 데이터 기간:", df_test.index.min().date(), "~", df_test.index.max().date())



# ✅ GA 피트니스 함수 (학습용 데이터로만 사용)
def fitness_func(ga, solution, idx):
    try:
        params = [max(2, int(round(p))) for p in solution]
        if not (params[0] < params[1] and params[2] < params[3] and params[4] < params[5]):
            return -np.inf
        df_ind = compute_indicators(df_train, params)
        signal = generate_signal(df_ind)
        profit, mdd, sharpe = backtest(df_ind, signal)
        return 0.8 * profit + 0.2 * (1 / mdd)
    except:
        return -np.inf

# ✅ 유전자 범위
gene_space = [
    {'low': 5, 'high': 50},    # SMA short
    {'low': 10, 'high': 60},   # SMA long
    {'low': 5, 'high': 50},    # EMA short
    {'low': 10, 'high': 60},   # EMA long
    {'low': 5, 'high': 25},    # MACD fast
    {'low': 26, 'high': 60},   # MACD slow
    {'low': 5, 'high': 30},    # RSI
    {'low': 5, 'high': 40},    # Stoch
    {'low': 5, 'high': 30},    # ROC
]

# ✅ GA 실행 후 최적 파라미터 출력 (기존과 동일)
ga = pygad.GA(
    num_generations=50,
    num_parents_mating=10,
    fitness_func=fitness_func,
    sol_per_pop=20,
    num_genes=9,
    gene_space=gene_space,
    gene_type=float,
    mutation_type="random",
    mutation_percent_genes=20
)
ga.run()
best_params = [int(round(g)) for g in ga.best_solution()[0]]
print("✅ 최적 파라미터:", best_params)

# ✅ 백테스트 (검증용 데이터에 대해 수행)
df_test_ind = compute_indicators(df_test, best_params)
signal_test = generate_signal(df_test_ind)
profit, mdd, sharpe = backtest(df_test_ind, signal_test)

# # ✅ 결과 출력
print("📊 백테스트 결과 (최근 6개월)")
print("수익률:", round(profit * 100, 2), "%")
print("최대 낙폭:", round(mdd * 100, 2), "%")
print("Sharpe Ratio:", round(sharpe, 3))

# from collections import Counter

# # ✅ 시그널 분포 출력
# signal_counter = Counter(signal_test)
# # np.int64로 출력 형식 맞춤
# formatted_counts = {f"np.int64({k})": f"np.int64({v})" for k, v in signal_counter.items()}
# print(f"📊 [{df_test_ind.index.min().date()} ~ {df_test_ind.index.max().date()}] 시그널 분포:", formatted_counts)

# # ✅ 성과 요약 테이블 출력
# import pandas as pd
# result_df = pd.DataFrame([{
#     'start': df_test_ind.index.min().date(),
#     'end': df_test_ind.index.max().date(),
#     'profit': round(profit * 100, 2),
#     'mdd': round(mdd * 100, 2),
#     'sharpe': round(sharpe, 3)
# }])

# print(result_df.to_string(index=False))

# 위 코드 결과 시각화

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# ✅ 2. 유전자 알고리즘 피트니스 그래프
def plot_ga_fitness(ga_instance):
    plt.figure(figsize=(8, 4))
    plt.plot(ga_instance.best_solutions_fitness, label='Best Fitness per Generation', color='blue')
    plt.title('Genetic Algorithm Fitness Over Generations')
    plt.xlabel('Generation')
    plt.ylabel('Fitness')
    plt.grid(True)
    plt.legend()
    plt.show()

# ✅ 3. 최적 파라미터 보기 좋게 출력
def explain_best_params(params):
    names = [
        "SMA Short Window",  # params[0]
        "SMA Long Window",   # params[1]
        "EMA Short Window",  # params[2]
        "EMA Long Window",   # params[3]
        "MACD Fast",         # params[4]
        "MACD Slow",         # params[5]
        "RSI Period",        # params[6]
        "Stochastic Period", # params[7]
        "ROC Period",        # params[8]
    ]
    print("✅ 최적 파라미터 해석")
    for name, val in zip(names, params):
        print(f" - {name:20}: {val}")

# ✅ 예시 실행
# 설명 출력
explain_best_params([6, 39, 14, 54, 15, 56, 7, 28, 21])

# 피트니스 시각화
plot_ga_fitness(ga)

# ✅ 시그널 시각화 (1): 종가 + 시그널
plt.figure(figsize=(14, 6))
plt.plot(df_test_ind.index, df_test_ind['close'], label='Close Price', color='gray')

buy_signals = df_test_ind[signal_test == 1]
sell_signals = df_test_ind[signal_test == -1]

plt.scatter(buy_signals.index, buy_signals['close'], marker='^', color='green', label='Buy Signal', s=100)
plt.scatter(sell_signals.index, sell_signals['close'], marker='v', color='red', label='Sell Signal', s=100)

plt.title("📈 시그널 기반 종가 시각화")
plt.xlabel("Date")
plt.ylabel("Price")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
