드라이브 마운트 및 라이브러리 설치

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# package 설치
!curl -L http://prdownloads.sourceforge.net/ta-lib/ta-lib-0.4.0-src.tar.gz -O && tar xzvf ta-lib-0.4.0-src.tar.gz
!cd ta-lib && ./configure --prefix=/usr && make && make install && cd - && pip install ta-lib
!pip install pygad
!pip install ta

라이브러리 임포트

In [None]:
import talib
import numpy as np
import pygad
import pandas as pd
import matplotlib.pyplot as plt
from talib import *

# 공통 함수들

In [None]:
# 주식 데이터를 불러오는 함수
def load_data(file_name):
    file_path = "/content/drive/MyDrive/0_Capstone/data/merged_completed/" + file_name


    # 주식 데이터 불러오기
    df = pd.read_csv(file_path)
    # 거꾸로 뒤집기
    df = df.loc[::-1]
    # index reset하기 - 기존 index 제거 O
    df = df.reset_index(drop=True)

    df.rename(columns={
        'TRD_DD': 'Timestamp',
        'TDD_CLSPRC': 'Close',
        'TDD_OPNPRC': 'Open',
        'TDD_HGPRC': 'High',
        'TDD_LWPRC': 'Low',
        'ACC_TRDVOL': 'Volume'
    }, inplace=True)

    df['Timestamp'] = pd.to_datetime(df['Timestamp'])

    # 쉼표 제거하기
    def remove_commas(value):
        if isinstance(value, str):
            return int(value.replace(',', ''))
        return value

    df = df.applymap(remove_commas)

    return df

# MDD 계산 함수
def cal_mdd(df):
  #MDD 계산
  # 1년간 영업일은 252일로 잡아서 window = 252로 설정
  window = 252

  # 2. 종가에서 1년기간 단위 최고치 peak를 구함
  peak = df['MKTCAP'].rolling(window, min_periods=1).max()

  # 3. 최고치 대비 현재 종가가 얼마나 하락했는지 구함
  drawdown = df['MKTCAP']/peak - 1.0

  # 4. drawdown에서 1년기간 단위로 최저치 max_dd를 구한다.
  # max_dd는 마이너스 값이기에 최저치가 MDD가 된다.
  max_dd = drawdown.rolling(window, min_periods=1).min()
  mdd = max_dd.min()

  return mdd

# RSI ✅

In [None]:
# 시그널 변환 함수

def make_rsi_sig(df, rsi, thres_sell, thres_buy, period, sig_column):
  # RSI 계산
  df[rsi] = talib.RSI(df['MKTCAP'], timeperiod=period) # RSI 기간 설정 (일반적으로 14일을 사용)

  # 시그널 생성
  df[sig_column] = 0  # 초기 시그널은 0으로 설정

  # RSI가 70 이상이면 Sell 시그널 (-1)
  df.loc[df[rsi] > thres_sell, sig_column] = -1

  # RSI가 30 이하이면 Buy 시그널 (1)
  df.loc[df[rsi] < thres_buy, sig_column] = 1

  return df

# ga 최적화 함수
def ga_rsi_optimize(df):
  #-----------fitness 정의 (함수 수정 완료)--------------------------------------------
  def rsi_fitness_func(ga_instance, solution, solution_idx):
      #mdd계산
      mdd = cal_mdd(df)

      # 솔루션 : 매도 임계값, 매수 임계값, 고려 기간
      rsi_sell_threshold = solution[0]
      rsi_buy_threshold = solution[1]
      rsi_timeperiod = solution[2]

      #생성된 매개변수로 시그널 생성
      make_rsi_ga_signal(df, rsi_sell_threshold, rsi_buy_threshold, rsi_timeperiod)

      #생성된 시그널을 바탕으로 수익률 계산
      profit = rsi_backtesting(df)
      fitness = 0.9*profit + 0.1*mdd  #유전자 적합도 판단 기준(=다음 세대를 위한 우월 개체 선정에 쓰일 가산점)

      return fitness

  # PyGAD 옵션 설정
  init_range_low = 0
  init_range_high = 100
  num_generations = 10  # 세대 수
  num_parents_mating = 10  # 부모 개체 수
  sol_per_pop = 20  # 개체 수
  parent_selection_type = "rws"
  mutation_type = "random"

  # PyGAD GA 객체 생성
  ga_instance = pygad.GA(num_generations=num_generations,
                        num_parents_mating=num_parents_mating,
                        sol_per_pop=sol_per_pop,
                        num_genes=3,  # 매개변수 수
                        init_range_low=init_range_low,
                        init_range_high=init_range_high,
                        # solution = [sell thres, buy thres, period]
                        gene_space = [{'low': 10, 'high': 90}, {'low': 10, 'high': 90}, {'low': 9, 'high': 28}],
                        fitness_func=rsi_fitness_func,
                        parent_selection_type=parent_selection_type,
                        mutation_type=mutation_type,
                        #on_generation=callback_generation,
                        )

  fitness_progress = [] # 각 세대에서의 최적의 적합도를 저장할 리스트 생성
  best_solutions = []  # 각 세대별 최적 해결 값을 저장할 리스트 생성

  #세대만큼 반복
  for generation in range(ga_instance.num_generations):
      best_solution = ga_instance.best_solution() # 각 세대 중 최적 결과
      best_solutions.append(best_solution[0])
      best_fitness = best_solution[1]
      fitness_progress.append(best_fitness)

      ga_instance.run() # 한 세대 실행하는 함수(현재 세대에서 새로운 개체 집단을 생성하고 업데이트)

  final_best_solution = best_solutions[-1]  # 마지막 세대의 최적 해결 값 저장
  final_best_solution = final_best_solution.tolist()  # numpy array를 list형으로 변환
  best_rsi_sell_threshold = final_best_solution[0]
  best_rsi_buy_threshold = final_best_solution[1]
  best_rsi_timeperiod = final_best_solution[2]

  return best_rsi_sell_threshold, best_rsi_buy_threshold, best_rsi_timeperiod

#-------------------------------------ga 바탕 rsi signal----------------------------------------------
# 시그널 생성 함수
def make_rsi_ga_signal(df, best_rsi_sell_threshold, best_rsi_buy_threshold, best_rsi_timeperiod):

  # RSI 계산
  df['ga_rsi'] = talib.RSI(df['MKTCAP'], timeperiod=best_rsi_timeperiod)

  # 시그널 생성
  df['ga_rsi_signal'] = 0  # 초기 시그널 값을 0으로 설정

  # RSI 값이 최적화된 매도 임계값을 초과하면 시그널 값 : -1
  # df['ga_rsi'] > best_rsi_sell_threshold 조건을 충족하는 행들에 대해, ga_signal 컬럼 값을 -1로 변경
  df.loc[df['ga_rsi'] > best_rsi_sell_threshold, 'ga_rsi_signal'] = -1

  # RSI 값이 최적화된 매수 임계값을 하회하면 시그널 값 : 1
  # df['rsi'] > best_rsi_buy_threshold 조건을 충족하는 행들에 대해, ga_signal 컬럼 값을 1로 변경
  df.loc[df['ga_rsi'] < best_rsi_buy_threshold, 'ga_rsi_signal'] = 1

  return df['ga_rsi_signal']


#-------------------------------------Backtesting-----------------------------------------
def rsi_backtesting(df):
  # 초기 자본금 설정
  initial_capital = 1000000000000000  #1000조

  # 보유 주식 수와 자본금 추적
  shares_held = 0
  capital = initial_capital
  capital_history = [capital]

  # 매수, 매도, 또는 보유 결정에 따른 자본금 변화 계산
  for i in range(1, len(df)):
      if df['ga_rsi_signal'][i] == 1:  # Buy 시그널인 경우
          shares_to_buy = capital // df['MKTCAP'][i]  # 보유 가능한 주식 수 계산
          shares_held += shares_to_buy
          capital -= shares_to_buy * df['MKTCAP'][i]
      elif df['ga_rsi_signal'][i] == -1:  # Sell 시그널인 경우
          capital += shares_held * df['MKTCAP'][i]  # 보유 주식 매도
          shares_held = 0

      capital_history.append(capital + shares_held * df['MKTCAP'][i])  # 자본금 변화 추적

  # 수익률 계산
  returns = (capital_history[-1] - initial_capital) / initial_capital * 100

  return returns

#---------------------매개변수 수렴확인---------------------
def rsi_best_profit(df):
  # 최적화된 매개변수 출력
  best_rsi_sell_threshold, best_rsi_buy_threshold, best_rsi_timeperiod = ga_rsi_optimize(df)

  # 수익률 확인 및 최적 매개변수로 시그널 갱신
  make_rsi_ga_signal(df, best_rsi_sell_threshold, best_rsi_buy_threshold, best_rsi_timeperiod)
  fit_returns = rsi_backtesting(df)

  return best_rsi_sell_threshold, best_rsi_buy_threshold, best_rsi_timeperiod, fit_returns, df



# 최종적으로 모듈화된 수익률 도출 함수
def rsi_test(df):
  # 기본 시그널 생성
  make_rsi_sig(df, 'rsi', 70, 30, 14, 'rsi_signal')

  # GA로 필요한 매개변수 최적화
  rsi_df = rsi_best_profit(df)[4]

  return rsi_df['ga_rsi_signal']

# SMA ✅

In [None]:
# 시그널 변환 함수

def make_sma_sig(df, sma, period, sig_column):
  # 이동평균 계산
  df[sma] = talib.SMA(df['MKTCAP'], timeperiod=period)
  # 포매팅
  df[sma].astype(float)

  # 시그널 생성
  df[sig_column] = 0  # 초기 시그널은 0으로 설정

  # 이동평균 상향 돌파 시그널
  # 전일 : mktcap < sma / 후일(현재 지점) : mktcap > sma
  df.loc[(df['MKTCAP'].shift() < df[sma].shift()) & (df['MKTCAP'] > df[sma]), sig_column] = 1

  # 이동평균 하향 돌파 시그널
  # 전일 : mktcap > sma / 후일(현재 지점) : mktcap < sma
  df.loc[(df['MKTCAP'].shift() > df[sma].shift()) & (df['MKTCAP'] < df[sma]), sig_column] = -1

  return df


# ga 최적화 함수
def ga_sma_optimize(df):
  #-----------fitness 정의 (함수 수정 완료)--------------------------------------------
  def sma_fitness_func(ga_instance, solution, solution_idx):
    # mdd계산
    mdd = cal_mdd(df)

    # 솔루션 : timeperiod
    period = solution[0]

    # 생성된 매개변수로 시그널 생성
    make_sma_ga_signal(df, period)

    # 생성된 시그널을 바탕으로 수익률 계산
    profit = sma_backtesting(df)
    fitness = 0.9*profit + 0.1*mdd  #유전자 적합도 판단 기준(=다음 세대를 위한 우월 개체 선정에 쓰일 가산점)

    return fitness

  # PyGAD 옵션 설정
  # 매개변수 범위 제한 : solution 배열의 값들을 3일~60일로 제한
  init_range_low = 3
  init_range_high = 60
  num_generations = 10  # 세대 수
  num_parents_mating = 10  # 부모 개체 수
  sol_per_pop = 20  # 개체 수
  parent_selection_type = "rws"
  mutation_type = "random"


  # PyGAD GA 객체 생성
  ga_instance = pygad.GA(num_generations=num_generations,
                        num_parents_mating=num_parents_mating,
                        sol_per_pop=sol_per_pop,
                        num_genes=1,  # 매개변수 수
                        init_range_low=init_range_low,
                        init_range_high=init_range_high,
                        gene_space = range(3, 60),
                        fitness_func=sma_fitness_func,
                        parent_selection_type=parent_selection_type,
                        mutation_type=mutation_type,
                        #on_generation=callback_generation,
                        )

  fitness_progress = [] #각 세대에서의 최적의 적합도를 저장할 리스트 생성
  best_solutions = []  # 각 세대별 최적 해결 값을 저장할 리스트 생성

  # 세대만큼 반복
  for generation in range(ga_instance.num_generations):
      best_solution = ga_instance.best_solution() # 각 세대 중 최적 결과(베스트를 갱신하는 형태)
      best_solutions.append(best_solution[0])
      best_fitness = best_solution[1]
      fitness_progress.append(best_fitness)

      ga_instance.run() # 한 세대 실행하는 함(현재 세대에서 새로운 개체 집단을 생성하고 업데이트)

  final_best_solution = best_solutions[-1]  # 마지막 세대의 최적 해결 값 저장
  final_best_solution = final_best_solution.tolist()  # numpy array를 list형으로 변환
  best_sma_timeperiod = final_best_solution[0]    # 최적화된 SMA timeperiod 저장

  return best_sma_timeperiod

#-------------------------------------ga 바탕 rsi signal----------------------------------------------
# 시그널 생성 함수
def make_sma_ga_signal(df, best_sma_timeperiod):
  # 이동평균 계산
  df['ga_sma'] = talib.SMA(df['MKTCAP'], timeperiod=best_sma_timeperiod)

  # 시그널 생성
  df['ga_sma_signal'] = 0  # 초기 시그널 값을 0으로 설정


  # 이동평균 상향 돌파 시그널
  # 전일 : mktcap < ga_sma / 후일(현재 지점) : mktcap > ga_sma
  df.loc[(df['MKTCAP'].shift() < df['ga_sma'].shift()) & (df['MKTCAP'] > df['ga_sma']), 'ga_sma_signal'] = 1

  # 이동평균 하향 돌파 시그널
  # 전일 : mktcap > ga_sma / 후일(현재 지점) : mktcap < ga_sma
  df.loc[(df['MKTCAP'].shift() > df['ga_sma'].shift()) & (df['MKTCAP'] < df['ga_sma']), 'ga_sma_signal'] = -1

  return df['ga_sma_signal']


#-------------------------------------Backtesting-----------------------------------------
# 원래는 학습, 테스트 데이터 분리해서 하려고 했는데 시간관계상 그냥 한 데이터로 함.
def sma_backtesting(df):
  # 초기 자본금 설정
  initial_capital = 1000000000000000  #1000조

  # 보유 주식 수와 자본금 추적
  shares_held = 0
  capital = initial_capital
  capital_history = [capital]

  # 매수, 매도, 또는 보유 결정에 따른 자본금 변화 계산
  for i in range(1, len(df)):
      if df['ga_sma_signal'][i] == 1:  # Buy 시그널인 경우
          shares_to_buy = capital // df['MKTCAP'][i]  # 보유 가능한 주식 수 계산
          shares_held += shares_to_buy
          capital -= shares_to_buy * df['MKTCAP'][i]
      elif df['ga_sma_signal'][i] == -1:  # Sell 시그널인 경우
          capital += shares_held * df['MKTCAP'][i]  # 보유 주식 매도
          shares_held = 0

      capital_history.append(capital + shares_held * df['MKTCAP'][i])  # 자본금 변화 추적

  # 수익률 계산
  returns = (capital_history[-1] - initial_capital) / initial_capital * 100


  return returns

#---------------------매개변수 수렴확인---------------------
def best_sma_profit(df):
  # 최적화된 매개변수 출력
  best_sma_period = ga_sma_optimize(df)

  # 수익률 확인 및 최적 매개변수로 시그널 갱신
  make_sma_ga_signal(df, best_sma_period)
  fit_returns = sma_backtesting(df)

  return best_sma_period, fit_returns, df


# 최종적으로 모듈화된 수익률 도출 함수

def sma_test(df):
  # 기본 시그널 생성
  make_sma_sig(df, 'sma', 20, 'sma_signal')

  # GA로 필요한 매개변수 최적화
  sma_df = best_sma_profit(df)[2]

  return sma_df['ga_sma_signal']

# 볼린저밴드 ✅

In [None]:
# 시그널 변환 함수

def make_bbnd_sig(df, upper_band, middle_band, lower_band, period, nbdevup, nbdevdn, sig_column):
  # 볼린저 밴드 계산
  upper, middle, lower = talib.BBANDS(df['MKTCAP'], timeperiod=period, nbdevup=nbdevup, nbdevdn=nbdevdn)
  df[upper_band] = upper
  df[middle_band] = middle
  df[lower_band] = lower

  # 시그널 생성
  df[sig_column] = 0  # 초기 시그널은 0으로 설정

  # 추가 설명 ! : 상향선 상향 돌파시 매수하는 법도 있지만, 다른 인덱스와 통일하기 위해 과매수, 과매도를 조건으로 시그널을 생성하는 방법 선택

  # 볼린저 밴드 상향 돌파 시그널
  # 전일 : mktcap < upper_band / 후일(현재 지점) : mktcap > upper_band
  df.loc[(df['MKTCAP'].shift() < df[upper_band].shift()) & (df['MKTCAP'] > df[upper_band]), sig_column] = 1

  # 볼린저 밴드 하향 돌파 시그널 : 주가가 하락세를 보이고 있기 때문에 매도함.
  # 전일 : mktcap > lower_band / 후일(현재 지점) : mktcap < lower_band
  df.loc[(df['MKTCAP'].shift() > df[lower_band].shift()) & (df['MKTCAP'] < df[lower_band]), sig_column] = -1

  return df



# ga 최적화 함수
def ga_bbnd_optimize(df, timeperiod):
    #-----------fitness 정의 (함수 수정 완료)--------------------------------------------
    def bbnd_fitness_func(ga_instance, solution, solution_idx):
        #mdd계산
        mdd = cal_mdd(df)

        # 솔루션 : 상향선 기준 수치, 하향선 기준 수치
        nbdevup = solution[0]
        nbdevdown = solution[1]

        # 생성된 매개변수로 시그널 생성
        make_ga_bbnd_signal(df, nbdevup, nbdevdown, timeperiod)

        # 생성된 시그널을 바탕으로 수익률 계산
        profit = bbnd_backtesting(df)
        fitness = 0.9*profit + 0.1*mdd  # 유전자 적합도 판단 기준(=다음 세대를 위한 우월 개체 선정에 쓰일 가산점)

        return fitness

    # PyGAD 옵션 설정
    init_range_low = 0
    init_range_high = 5
    num_generations = 3  # 세대 수
    num_parents_mating = 10  # 부모 개체 수
    sol_per_pop = 20  # 개체 수
    parent_selection_type = "rws"
    mutation_type = "random"

    # PyGAD GA 객체 생성
    ga_instance = pygad.GA(num_generations=num_generations,
                        num_parents_mating=num_parents_mating,
                        sol_per_pop=sol_per_pop,
                        num_genes=2,  # 매개변수 수
                        # solution = [lowerbound, upperbound]
                        # 2.5부터 3까지 0.01 간격으로 배열을 생성헌 실수 범위
                        gene_space = [{'low': 0, 'high': 0.5}, {'low': 2.5, 'high': 3}],
                        fitness_func=bbnd_fitness_func,
                        parent_selection_type=parent_selection_type,
                        mutation_type=mutation_type,
                        # on_generation=callback_generation,
                        )

    fitness_progress = [] # 각 세대에서의 최적의 적합도를 저장할 리스트 생성
    best_solutions = []  # 각 세대별 최적 해결 값을 저장할 리스트 생성

    #세대만큼 반복
    for generation in range(ga_instance.num_generations):
        best_solution = ga_instance.best_solution() # 각 세대 중 최적 결과
        best_solutions.append(best_solution[0])
        best_fitness = best_solution[1]
        fitness_progress.append(best_fitness)

        ga_instance.run() # 한 세대 실행하는 함(현재 세대에서 새로운 개체 집단을 생성하고 업데이트)


    final_best_solution = best_solutions[-1] # 마지막 세대의 최적 해결 값 저장
    final_best_solution = final_best_solution.tolist()  # numpy array를 list형으로 변환
    best_nbdevup = final_best_solution[0]
    best_nbdevdown = final_best_solution[1]

    return best_nbdevup, best_nbdevdown

#-------------------------------------ga 바탕 볼린저밴드 signal----------------------------------------------
# 시그널 생성 함수
def make_ga_bbnd_signal(df, best_nbdevup, best_nbdevdown, best_timeperiod):
    # 시그널 생성
    df['ga_bbnd_signal'] = 0
    # 볼린저 밴드 계산
    ga_upper, ga_middle, ga_lower = talib.BBANDS(df['MKTCAP'], timeperiod=best_timeperiod, nbdevup=best_nbdevup, nbdevdn=best_nbdevdown)
    df['ga_upper_band'] = ga_upper
    df['ga_middle_band'] = ga_middle
    df['ga_lower_band'] = ga_lower

    # 볼린저 밴드 상향 돌파 시그널
    # 전일 : mktcap < upper_band / 후일(현재 지점) : mktcap > upper_band
    df.loc[(df['MKTCAP'].shift() < df['ga_upper_band'].shift()) & (df['MKTCAP'] > df['ga_upper_band']), 'ga_bbnd_signal'] = 1

    # 볼린저 밴드 하향 돌파 시그널 : 주가가 하락세를 보이고 있기 때문에 매도함.
    # 전일 : mktcap > lower_band / 후일(현재 지점) : mktcap < lower_band
    df.loc[(df['MKTCAP'].shift() > df['ga_lower_band'].shift()) & (df['MKTCAP'] < df['ga_lower_band']), 'ga_bbnd_signal'] = -1

    return df['ga_bbnd_signal']

#-------------------------------------Backtesting-----------------------------------------
# 원래는 학습, 테스트 데이터 분리해서 하려고 했는데 시간관계상 그냥 한 데이터로 함.
def bbnd_backtesting(df):
    # 초기 자본금 설정
    initial_capital = 1000000000000000  #1000조

    # 보유 주식 수와 자본금 추적
    shares_held = 0
    capital = initial_capital
    capital_history = [capital]

    # 매수, 매도, 또는 보유 결정에 따른 자본금 변화 계산
    for i in range(1, len(df)):
        if df['ga_bbnd_signal'][i] == 1:  # Buy 시그널인 경우
            shares_to_buy = capital // df['MKTCAP'][i]  # 보유 가능한 주식 수 계산
            shares_held += shares_to_buy
            capital -= shares_to_buy * df['MKTCAP'][i]
        elif df['ga_bbnd_signal'][i] == -1:  # Sell 시그널인 경우
            capital += shares_held * df['MKTCAP'][i]  # 보유 주식 매도
            shares_held = 0

        capital_history.append(capital + shares_held * df['MKTCAP'][i])  # 자본금 변화 추적

    # 수익률 계산
    returns = (capital_history[-1] - initial_capital) / initial_capital * 100

    return returns


#---------------------매개변수 수렴확인---------------------
def best_bbnd_profit(df, timeperiod):
    # 최적화된 매개변수 출력
    best_nbdevup, best_nbdevdown = ga_bbnd_optimize(df, timeperiod)

    # 수익률 확인 및 최적 매개변수로 시그널 갱신
    make_ga_bbnd_signal(df, best_nbdevup, best_nbdevdown, timeperiod)
    fit_returns = bbnd_backtesting(df)

    return best_nbdevup, best_nbdevdown, fit_returns, df


# 최종적으로 모듈화된 수익률 도출 함수
def bbnd_test(df):
  #------------------sma-----------------------------
  # 데이터에 기본 시그널 생성
  make_sma_sig(df, 'sma', 20, 'sma_signal')

  # GA로 필요한 매개변수 최적화
  sma_result = best_sma_profit(df)
  result_prd = sma_result[0]


  #---------------bbnd-------------------------------
  # bbnd 기본 시그널 생성
  make_bbnd_sig(df, 'upper_band', 'middle_band', 'lower_band', 20, 2, 2, 'bbnd_signal')

  # GA로 up, down 매개변수 최적화
  bbnd_df = best_bbnd_profit(df, result_prd)[3]

  return bbnd_df['ga_bbnd_signal']

# ROC ✅

In [None]:
# 시그널 변환 함수

def make_roc_sig(df, roc, period, sig_column):
  # ROC 계산 : 보통 10으로 설정
  df[roc] = talib.ROC(df['MKTCAP'], timeperiod=period)

  # 시그널 생성
  df[sig_column] = 0  # 초기 시그널은 0으로 설정

  # 이동평균 상향 돌파 시그널
  # 전일 : mktcap < 0 / 후일(현재 지점) : mktcap > 0
  df.loc[(df[roc].shift() < 0) & (df[roc] > 0), sig_column] = 1

  # 이동평균 하향 돌파 시그널
  # 전일 : mktcap > 0 / 후일(현재 지점) : mktcap < 0
  df.loc[(df[roc].shift() > 0) & (df[roc] < 0), sig_column] = -1

  return df


# ga 최적화 함수
def ga_roc_optimize(df):
  #-----------fitness 정의 (함수 수정 완료)--------------------------------------------
  def roc_fitness_func(ga_instance, solution, solution_idx):
      # mdd 계산
      mdd = cal_mdd(df)

      # 솔루션 : timeperiod
      period = solution[0]

      # 생성된 매개변수로 시그널 생성
      make_roc_ga_signal(df, period)

      # 생성된 시그널을 바탕으로 수익률 계산
      profit = roc_backtesting(df)
      fitness = 0.9*profit + 0.1*mdd  # 유전자 적합도 판단 기준(=다음 세대를 위한 우월 개체 선정에 쓰일 가산점)

      return fitness

  # PyGAD 옵션 설정
  init_range_low = 3
  init_range_high = 60
  num_generations = 10  # 세대 수
  num_parents_mating = 10  # 부모 개체 수
  sol_per_pop = 20  # 개체 수
  parent_selection_type = "rws"
  mutation_type = "random"

  # PyGAD GA 객체 생성
  ga_instance = pygad.GA(num_generations=num_generations,
                        num_parents_mating=num_parents_mating,
                        sol_per_pop=sol_per_pop,
                        num_genes=1,  # 매개변수 수
                        init_range_low=init_range_low,
                        init_range_high=init_range_high,
                        gene_space = range(3, 60),
                        fitness_func=roc_fitness_func,
                        parent_selection_type=parent_selection_type,
                        mutation_type=mutation_type,
                        #on_generation=callback_generation,
                        )

  fitness_progress = [] # 각 세대에서의 최적의 적합도를 저장할 리스트 생성
  best_solutions = []  # 각 세대별 최적 해결 값을 저장할 리스트 생성

  #세대만큼 반복
  for generation in range(ga_instance.num_generations):
      best_solution = ga_instance.best_solution() #각 세대 중 최적 결과(베스트를 갱신하는 형태)
      best_solutions.append(best_solution[0])
      best_fitness = best_solution[1]
      fitness_progress.append(best_fitness)

      ga_instance.run() # 한 세대 실행하는 함(현재 세대에서 새로운 개체 집단을 생성하고 업데이트)

  final_best_solution = best_solutions[-1]  # 마지막 세대의 최적 해결 값 저장
  final_best_solution = final_best_solution.tolist()  # numpy array를 list형으로 변환
  best_roc_timeperiod = final_best_solution[0]    # 최적화된 timeperiod 저장

  return best_roc_timeperiod

#-------------------------------------ga 바탕 roc signal----------------------------------------------
# 시그널 생성
def make_roc_ga_signal(df, best_roc_timeperiod):
  # 이동평균 계산
  df['ga_roc'] = talib.ROC(df['MKTCAP'], timeperiod=best_roc_timeperiod)

  # 시그널 생성
  df['ga_roc_signal'] = 0  # 초기 시그널 값을 0으로 설정

  # 이동평균 상향 돌파 시그널
  # 전일 : mktcap < 0 / 후일(현재 지점) : mktcap > 0
  df.loc[(df['ga_roc'].shift() < 0) & (df['ga_roc'] > 0), 'ga_roc_signal'] = 1

  # 이동평균 하향 돌파 시그널
  # 전일 : mktcap > 0 / 후일(현재 지점) : mktcap < 0
  df.loc[(df['ga_roc'].shift() > 0) & (df['ga_roc'] < 0), 'ga_roc_signal'] = -1

  return df['ga_roc_signal']


#-------------------------------------Backtesting-----------------------------------------
# 원래는 학습, 테스트 데이터 분리해서 하려고 했는데 시간관계상 그냥 한 데이터로 함.
def roc_backtesting(df):
  # 초기 자본금 설정
  initial_capital = 1000000000000000  #1000조

  # 보유 주식 수와 자본금 추적
  shares_held = 0
  capital = initial_capital
  capital_history = [capital]

  # 매수, 매도, 또는 보유 결정에 따른 자본금 변화 계산
  for i in range(1, len(df)):
      if df['ga_roc_signal'][i] == 1:  # Buy 시그널인 경우
          shares_to_buy = capital // df['MKTCAP'][i]  # 보유 가능한 주식 수 계산
          shares_held += shares_to_buy
          capital -= shares_to_buy * df['MKTCAP'][i]
      elif df['ga_roc_signal'][i] == -1:  # Sell 시그널인 경우
          capital += shares_held * df['MKTCAP'][i]  # 보유 주식 매도
          shares_held = 0

      capital_history.append(capital + shares_held * df['MKTCAP'][i])  # 자본금 변화 추적

  # 수익률 계산
  returns = (capital_history[-1] - initial_capital) / initial_capital * 100


  return returns

#---------------------매개변수 수렴확인---------------------
def best_roc_profit(df):
  # 최적화된 매개변수 출력
  best_roc_period = ga_roc_optimize(df)

  # 수익률 확인 및 최적 매개변수로 시그널 갱신
  make_roc_ga_signal(df, best_roc_period)
  fit_returns = roc_backtesting(df)

  return best_roc_period, fit_returns, df



# 최종적으로 모듈화된 수익률 도출 함수
def roc_test(df):
  # train 데이터에 기본 시그널 생성
  make_roc_sig(df, 'roc', 10, 'roc_signal')

  # GA로 필요한 매개변수 최적화
  roc_df = best_roc_profit(df)[2]

  return roc_df['ga_roc_signal']

# DPO ✅

In [None]:
# ta 라이브러리 임포트 : DPO는 기존 talib에 없어서 ta 라이브러리를 별도로 설치하였습니다.
import ta
from ta.trend import DPOIndicator


# 시그널 변환 함수

def make_dpo_sig(df, dpo, period, sig_column):
  # DPO 계산 : 보통 20
  df[dpo] = ta.trend.DPOIndicator(close=df['MKTCAP'], window=period, fillna=False).dpo()

  # 시그널 생성s
  df[sig_column] = 0  # 초기 시그널은 0으로 설정

  # 이동평균 상향 돌파 시그널
  # 전일 : mktcap < 0 / 후일(현재 지점) : mktcap > 0
  df.loc[(df[dpo].shift() < 0) & (df[dpo] > 0), sig_column] = 1

  # 이동평균 하향 돌파 시그널
  # 전일 : mktcap > 0 / 후일(현재 지점) : mktcap < 0
  df.loc[(df[dpo].shift() > 0) & (df[dpo] < 0), sig_column] = -1

  return df



# ga 최적화 함수
def ga_dpo_optimize(df):
  #-----------fitness 정의 (함수 수정 완료)--------------------------------------------
  def dpo_fitness_func(ga_instance, solution, solution_idx):
      # mdd 계산
      mdd = cal_mdd(df)

      # 솔루션 : timeperiod
      period = int(solution[0])

      # 생성된 매개변수로 시그널 생성
      make_dpo_ga_signal(df, period)

      # 생성된 시그널을 바탕으로 수익률 계산
      profit = dpo_backtesting(df)
      fitness = 0.9*profit + 0.1*mdd  # 유전자 적합도 판단 기준(=다음 세대를 위한 우월 개체 선정에 쓰일 가산점)

      return fitness

  # PyGAD 옵션 설정
  init_range_low = 3
  init_range_high = 60
  num_generations = 10  # 세대 수
  num_parents_mating = 10  # 부모 개체 수
  sol_per_pop = 20  # 개체 수
  parent_selection_type = "rws"
  mutation_type = "random"

  # PyGAD GA 객체 생성
  ga_instance = pygad.GA(num_generations=num_generations,
                        num_parents_mating=num_parents_mating,
                        sol_per_pop=sol_per_pop,
                        num_genes=1,  # 매개변수 수
                        init_range_low=init_range_low,
                        init_range_high=init_range_high,
                        gene_space = range(3, 60),
                        fitness_func=dpo_fitness_func,
                        parent_selection_type=parent_selection_type,
                        mutation_type=mutation_type,
                        #on_generation=callback_generation,
                        )

  fitness_progress = [] # 각 세대에서의 최적의 적합도를 저장할 리스트 생성
  best_solutions = []  # 각 세대별 최적 해결 값을 저장할 리스트 생성

  #세대만큼 반복
  for generation in range(ga_instance.num_generations):
      best_solution = ga_instance.best_solution() # 각 세대 중 최적 결과(베스트를 갱신하는 형태)
      best_solutions.append(best_solution[0])
      best_fitness = best_solution[1]
      fitness_progress.append(best_fitness)

      ga_instance.run() # 한 세대 실행하는 함(현재 세대에서 새로운 개체 집단을 생성하고 업데이트)

  final_best_solution = best_solutions[-1]  # 마지막 세대의 최적 해결 값 저장
  final_best_solution = final_best_solution.tolist()  # numpy array를 list형으로 변환
  best_dpo_timeperiod = final_best_solution[0]    # 최적화된 timeperiod 저장

  return best_dpo_timeperiod

#-------------------------------------ga 바탕 dpo signal----------------------------------------------
# 시그널 생성
def make_dpo_ga_signal(df, best_dpo_timeperiod):
  # 이동평균 계산
  df['ga_dpo'] = ta.trend.DPOIndicator(close=df['MKTCAP'], window=best_dpo_timeperiod, fillna=False).dpo()

  # 시그널 생성
  df['ga_dpo_signal'] = 0  # 초기 시그널 값을 0으로 설정

  # 이동평균 상향 돌파 시그널
  # 전일 : mktcap < 0 / 후일(현재 지점) : mktcap > 0
  df.loc[(df['ga_dpo'].shift() < 0) & (df['ga_dpo'] > 0), 'ga_dpo_signal'] = 1

  # 이동평균 하향 돌파 시그널
  # 전일 : mktcap > 0 / 후일(현재 지점) : mktcap < 0
  df.loc[(df['ga_dpo'].shift() > 0) & (df['ga_dpo'] < 0), 'ga_dpo_signal'] = -1

  return df['ga_dpo_signal']


#-------------------------------------Backtesting-----------------------------------------
# 원래는 학습, 테스트 데이터 분리해서 하려고 했는데 시간관계상 그냥 한 데이터로 함.
def dpo_backtesting(df):
  # 초기 자본금 설정
  initial_capital = 1000000000000000  #1000조

  # 보유 주식 수와 자본금 추적
  shares_held = 0
  capital = initial_capital
  capital_history = [capital]

  # 매수, 매도, 또는 보유 결정에 따른 자본금 변화 계산
  for i in range(1, len(df)):
      if df['ga_dpo_signal'][i] == 1:  # Buy 시그널인 경우
          shares_to_buy = capital // df['MKTCAP'][i]  # 보유 가능한 주식 수 계산
          shares_held += shares_to_buy
          capital -= shares_to_buy * df['MKTCAP'][i]
      elif df['ga_dpo_signal'][i] == -1:  # Sell 시그널인 경우
          capital += shares_held * df['MKTCAP'][i]  # 보유 주식 매도
          shares_held = 0

      capital_history.append(capital + shares_held * df['MKTCAP'][i])  # 자본금 변화 추적

  # 수익률 계산
  returns = (capital_history[-1] - initial_capital) / initial_capital * 100


  return returns

#---------------------매개변수 수렴확인---------------------
def best_dpo_profit(df):
  # 최적화된 매개변수 출력
  best_dpo_period = int(ga_dpo_optimize(df))

  # 수익률 확인 및 최적 매개변수로 시그널 갱신
  make_dpo_ga_signal(df, best_dpo_period)
  fit_returns = dpo_backtesting(df)

  return best_dpo_period, fit_returns, df



# 최종적으로 모듈화된 수익률 도출 함수
def dpo_test(df):
  # 기본 시그널 생성
  make_dpo_sig(df, 'dpo', 20, 'dpo_signal')

  # GA로 필요한 매개변수 최적화
  dpo_df = best_dpo_profit(df)[2]

  return dpo_df['ga_dpo_signal']

# STOCH ✅

In [None]:
# 시그널 변환 함수
def add_ratio(df):
  df['high_ratio'] = (df['MKTCAP'] * df['High']) / df['Close'] #high값을 MKTCAP에 상응하는 비율로 맞춘 열 추가
  df['low_ratio'] = (df['MKTCAP'] * df['Low']) / df['Close']  #low값을 MKTCAP에 상응하는 비율로 맞춘 열 추가
  return df

#시그널 생성함수 모듈화
def make_stoch_sig(df, k_line, d_line, fast_period, slow_period, sig_column):
  stoch = talib.STOCH(df["high_ratio"], df["low_ratio"], df["MKTCAP"], fastk_period=fast_period, slowk_period=slow_period, slowd_period=slow_period)

  df[k_line] = stoch[0]  # K선 값
  df[d_line] = stoch[1]  # D선 값

  # 시그널 생성
  df[sig_column] = 0  # 초기 시그널은 0으로 설정

  # K선이 D선 위로 돌파시 골든크로스 => 매수시점(1)
  # 전일: K_line < D_line / 후일(현재 지점) :  K_line > D_line
  df.loc[(df[k_line].shift() <= df[d_line].shift()) & (df[k_line] > df[d_line]), sig_column] = 1

  # K선이 D선 아래로 돌파시 데드크로스 => 매도시점(-1)
  # 전일: K_line > D_line / 후일(현재 지점) :  K_line < D_line
  df.loc[(df[k_line].shift() >= df[d_line].shift()) & (df[k_line] < df[d_line]), sig_column] = -1

  return df




# ga 최적화 함수
def ga_stoch_optimize(df):
  #-----------fitness 정의 (함수 수정 완료)--------------------------------------------
  def stoch_fitness_func(ga_instance, solution, solution_idx):
      #mdd계산
      mdd = cal_mdd(df)

      # 솔루션 : K선 결정 범위 기간 period_K, K선을 이동평균하는 기간 period_D
      period_K = solution[0]
      period_D = solution[1]

      # 생성된 매개변수로 시그널 생성
      make_stoch_ga_signal(df, period_K, period_D)

      # 생성된 시그널을 바탕으로 수익률 계산
      profit = stoch_backtesting(df)

      fitness = 0.9*profit + 0.1*mdd  # 유전자 적합도 판단 기준(=다음 세대를 위한 우월 개체 선정에 쓰일 가산점)

      return fitness

  # PyGAD 옵션 설정
  init_range_low = 0
  init_range_high = 100
  num_generations = 10  # 세대 수
  num_parents_mating = 10  # 부모 개체 수
  sol_per_pop = 20  # 개체 수
  parent_selection_type = "rws"
  mutation_type = "random"

  # PyGAD GA 객체 생성
  ga_instance = pygad.GA(num_generations=num_generations,
                        num_parents_mating=num_parents_mating,
                        sol_per_pop=sol_per_pop,
                        num_genes=2,  # 매개변수 수
                        gene_space = [range(5, 28), range(3, 10)], # 매개변수 범위제한 -> 해당 범위 잘 적용됨을 확인 함
                        fitness_func=stoch_fitness_func,
                        parent_selection_type=parent_selection_type,
                        mutation_type=mutation_type,
                        #on_generation=callback_generation,
                        )


  best_solution = ga_instance.best_solution() # 전체 최적 결과(현재 까지 찾은 최적 해와 적합도중 best를 출력)
  best_period_K = best_solution[0][0]
  best_period_D = best_solution[0][1]

  return best_period_K, best_period_D

#best_period_K, best_period_D = ga_optimize()
#print(f"Best Parameters: K = {best_period_K}, D = {best_period_D}")

#-------------------------------------ga 바탕 rsi signal----------------------------------------------
# 시그널 생성 함수
def make_stoch_ga_signal(df, best_period_K, best_period_D):
  #stoch값 계산
  stoch = talib.STOCH(df["high_ratio"], df["low_ratio"], df["MKTCAP"], fastk_period=best_period_K, slowk_period=best_period_D, slowd_period=best_period_D)
  df['ga_k_line'] = stoch[0]  # K선 값
  df['ga_D_line'] = stoch[1]  # D선 값

  # 시그널 생성
  df['ga_stoch_signal'] = 0  # 초기 시그널 값을 0으로 설정

  # K선이 D선 위로 돌파시 골든크로스 => 매수시점(1)
  # 전일: K_line < D_line / 후일(현재 지점) :  K_line > D_line
  df.loc[(df['ga_k_line'].shift() <= df['ga_D_line'].shift()) & (df['ga_k_line'] > df['ga_D_line']), 'ga_stoch_signal'] = 1

  # K선이 D선 아래로 돌파시 데드크로스 => 매도시점(-1)
  # 전일: K_line > D_line / 후일(현재 지점) :  K_line < D_line
  df.loc[(df['ga_k_line'].shift() >= df['ga_D_line'].shift()) & (df['ga_k_line'] < df['ga_D_line']), 'ga_stoch_signal'] = -1

  return df['ga_stoch_signal']

#-------------------------------------Backtesting 함수 정의-----------------------------------------
def stoch_backtesting(df):
  # 초기 자본금 설정
  initial_capital = 1000000000000000  #1000조

  # 보유 주식 수와 자본금 추적
  shares_held = 0
  capital = initial_capital
  capital_history = [capital]

  # 매수, 매도, 또는 보유 결정에 따른 자본금 변화 계산
  for i in range(1, len(df)):
      if df['ga_stoch_signal'][i] == 1:  # Buy 시그널인 경우
          shares_to_buy = capital // df['MKTCAP'][i]  # 보유 가능한 주식 수 계산
          shares_held += shares_to_buy
          capital -= shares_to_buy * df['MKTCAP'][i]
      elif df['ga_stoch_signal'][i] == -1:  # Sell 시그널인 경우
          capital += shares_held * df['MKTCAP'][i]  # 보유 주식 매도
          shares_held = 0

      capital_history.append(capital + shares_held * df['MKTCAP'][i])  # 자본금 변화 추적

  # 수익률 계산
  returns = (capital_history[-1] - initial_capital) / initial_capital * 100


  return returns

#---------------------매개변수 수렴확인---------------------

def best_stoch_profit(df):
  # 최적화된 매개변수 출력
  best_period_K, best_period_D = ga_stoch_optimize(df)

  # 수익률 확인 및 최적 매개변수로 시그널 갱신
  make_stoch_ga_signal(df, best_period_K, best_period_D)

  # 최적화된 매개변수 출력, 최적화된 수익 출력(mdd로 나눈 값 아님 주의 단순 수익률)
  return best_period_K, best_period_D, df



# 최종적으로 모듈화된 수익률 도출 함수
def stoch_test(df):
  #high_ratio, low_ratio 열추가(스토캐스틱만)
  df = add_ratio(df)

  # 기본 시그널 생성
  make_stoch_sig(df, 'k_line', 'd_line', 14, 3, 'stoch_signal')

  # ga를 이용하여 매개변수 최적화하기
  stoch_df = best_stoch_profit(df)[2]

  return stoch_df['ga_stoch_signal']

# MACD ✅

In [None]:
# 시그널 변환 함수
# 일반적인 기준으로 MACD 시그널 계산하는 함수 (fast_period=12, slow_period=26, signal_period=9)

#시그널 생성함수 모듈화
def make_macd_sig(df, macd, signal_line, fast_period, slow_period, sig_period, sig_column):
  macdval =talib.MACD(df['MKTCAP'],fast_period, slow_period, sig_period)

  df[macd] = macdval[0]  # macd 값
  df[signal_line] = macdval[1]  # macd 시그널 라인값

  # 시그널 생성
  df[sig_column] = 0  # 초기 시그널은 0으로 설정

  # 골든크로스
  # 전일 : macd < signal line / 후일(현재 지점) : macd > signal line (macd가 치고 올라가는-매수 타이밍)
  df.loc[(df[macd].shift() < df[signal_line].shift()) & (df[macd] > df[signal_line]), sig_column] = 1
  # 데드크로스
  # 전일 : macd > signal line / 후일(현재 지점) : macd < signal line (macd가 아래로 내려가는-매도 타이밍)
  df.loc[(df[macd].shift() > df[signal_line].shift()) & (df[macd] < df[signal_line]), sig_column] = -1

  return df



# ga 최적화 함수
def ga_macd_optimize(df):
  #-----------fitness 정의 (함수 수정 완료)--------------------------------------------
  def macd_fitness_func(ga_instance, solution, solution_idx):
      #mdd계산
      mdd = cal_mdd(df)

      # 솔루션 : 단기, 장기, 시그널 기간
      fast_period = solution[0]
      slow_period = solution[1]
      signal_period = solution[2]

      # 생성된 매개변수로 시그널 생성
      make_macd_ga_signal(df, fast_period, slow_period, signal_period)

      # 생성된 시그널을 바탕으로 수익률 계산
      profit = macd_backtesting(df)

      fitness = 0.9*profit + 0.1*mdd  # 유전자 적합도 판단 기준(=다음 세대를 위한 우월 개체 선정에 쓰일 가산점)

      return fitness

  # PyGAD 옵션 설정
  init_range_low = 0
  init_range_high = 100
  num_generations = 10  # 세대 수
  num_parents_mating = 10  # 부모 개체 수
  sol_per_pop = 20  # 개체 수
  parent_selection_type = "rws"
  mutation_type = "random"

  # PyGAD GA 객체 생성
  ga_instance = pygad.GA(num_generations=num_generations,
                        num_parents_mating=num_parents_mating,
                        sol_per_pop=sol_per_pop,
                        num_genes=3,  # 매개변수 수
                       gene_space = [range(3, 20), range(15, 30), range(3, 15)], # 매개변수 범위제한 -> 해당 범위 잘 적용됨을 확인 함
                        fitness_func=macd_fitness_func,
                        parent_selection_type=parent_selection_type,
                        mutation_type=mutation_type,
                        #on_generation=callback_generation,
                        )


  best_solution = ga_instance.best_solution() # 전체 최적 결과(현재 까지 찾은 최적 해와 적합도중 best를 출력)
  best_fastperiod = best_solution[0][0]
  best_slowperiod = best_solution[0][1]
  best_signalperiod = best_solution[0][2]

  return best_fastperiod, best_slowperiod, best_signalperiod


#-------------------------------------ga 바탕 rsi signal----------------------------------------------
# 시그널 생성 함수
def make_macd_ga_signal(df,  best_fastperiod, best_slowperiod, best_signalperiod):
  #macd값 계산
  macdval, macdsignal, macdhist = talib.MACD(df['Close'], best_fastperiod, best_slowperiod, best_signalperiod)
  df['ga_macd'] = macdval
  df['ga_signal_line'] = macdsignal

  # 시그널 생성
  df['ga_macd_signal'] = 0  # 초기 시그널 값을 0으로 설정

  # macd 곡선이 시그널 곡선 상향 골든크로스 => 매수시점(1)
  # 전일 : macd < signal line / 후일(현재 지점) : macd > signal line (macd가 치고 올라가는-매수 타이밍)
  df.loc[(df['ga_macd'].shift() < df['ga_signal_line'].shift()) & (df['ga_macd'] > df['ga_signal_line']), 'ga_macd_signal'] = 1

  # macd 곡선이 시그널 곡선 하향 데드크로스 => 매도시점(-1)
  # 전일 : macd > signal line / 후일(현재 지점) : macd < signal line (macd가 아래로 내려가는-매도 타이밍)
  df.loc[(df['ga_macd'].shift() > df['ga_signal_line'].shift()) & (df['ga_macd'] < df['ga_signal_line']), 'ga_macd_signal'] = -1

  return df['ga_macd_signal']

#-------------------------------------Backtesting 함수 정의-----------------------------------------
def macd_backtesting(df):
  # 초기 자본금 설정
  initial_capital = 1000000000000000  #1000조

  # 보유 주식 수와 자본금 추적
  shares_held = 0
  capital = initial_capital
  capital_history = [capital]

  # 매수, 매도, 또는 보유 결정에 따른 자본금 변화 계산
  for i in range(1, len(df)):
      if df['ga_macd_signal'][i] == 1:  # Buy 시그널인 경우
          shares_to_buy = capital // df['MKTCAP'][i]  # 보유 가능한 주식 수 계산
          shares_held += shares_to_buy
          capital -= shares_to_buy * df['MKTCAP'][i]
      elif df['ga_macd_signal'][i] == -1:  # Sell 시그널인 경우
          capital += shares_held * df['MKTCAP'][i]  # 보유 주식 매도
          shares_held = 0

      capital_history.append(capital + shares_held * df['MKTCAP'][i])  # 자본금 변화 추적

  # 수익률 계산
  returns = (capital_history[-1] - initial_capital) / initial_capital * 100


  return returns

#---------------------매개변수 수렴확인---------------------

def best_macd_profit(df):
  # 최적화된 매개변수 출력
  best_fastperiod, best_slowperiod, best_signalperiod = ga_macd_optimize(df)

  # 수익률 확인 및 최적 매개변수로 시그널 갱신
  make_macd_ga_signal(df, best_fastperiod, best_slowperiod, best_signalperiod)

  # 최적화된 매개변수 출력, 최적화된 수익 출력(mdd로 나눈 값 아님 주의 단순 수익률)
  return best_fastperiod, best_slowperiod, best_signalperiod, df



# 최종적으로 모듈화된 수익률 도출 함수
def merged_macd_test(df):
  #train 데이터에 기본 시그널 생성
  make_macd_sig(df, 'macd', 'signal_line', 12, 26, 9, 'macd_signal')

  #ga를 이용하여 매개변수 최적화하기
  macd_df = best_macd_profit(df)[3]

  return macd_df['ga_macd_signal']

# GDC ✅

In [None]:
# 시그널 변환 함수

def make_gdc_sig(df, short_ma, long_ma, short_period, long_period, sig_column):
  # 단기 이동 평균선 계산
  df[short_ma] = talib.SMA(df['MKTCAP'], timeperiod=short_period)

  # 중장기 이동 평균선 계산
  df[long_ma] = talib.SMA(df['MKTCAP'], timeperiod=long_period)

  # 포매팅
  df['short_ma'].astype(float)
  df['long_ma'].astype(float)

  # 시그널 생성
  df[sig_column] = 0 # 초기 시그널은 0으로 설정

  # 골든크로스
  # 전일 : 단기이동평균선 < 중장기 이동평균선 / 후일(현재 지점) : 단기이동평균선 > 중장기 이동평균선
  df.loc[(df[short_ma].shift() < df[long_ma].shift()) & (df[short_ma] > df[long_ma]), sig_column] = 1

  # 데드크로스
  # 전일 : 단기이동평균선 > 중장기 이동평균선 / 후일(현재 지점) : 단기이동평균선 < 중장기 이동평균선
  df.loc[(df[short_ma].shift() > df[long_ma].shift()) & (df[short_ma] < df[long_ma]), sig_column] = -1

  return df



# ga 최적화 함수
def ga_gdc_optimize(df):
  def gdc_fitness_func(ga_instance, solution, solution_idx):
      #mdd계산
      mdd = cal_mdd(df)

      # 솔루션 : 단기 이동평균 기간 L, 장기 이동평균 기간 S
      short_period = solution[0]
      long_period = solution[1]

      # 생성된 L,S 값을 이용하여 시그널 생성
      make_gdc_ga_signal(df,short_period,long_period)

      # 백테스팅으로 반환된 수익률 자체를 fitness함수의 최적화 평가 지표로 이용
      # 생성된 시그널을 바탕으로 수익률 계산
      profit = gdc_backtesting(df)
      fitness = 0.9 * profit + 0.1 * mdd  # 유전자 적합도 판단 기준(=다음 세대를 위한 우월 개체 선정에 쓰일 가산점)
      return fitness

  # PyGAD 옵션 설정
  num_generations = 10         # 최적화를 반복할 세대의 수
  num_parents_mating = 5       # 각 세대에서 선택되는 부모 개체의 수
  sol_per_pop = 10             # 각 세대에서 생성되는 개체의 수
  num_genes = 2                # 개체의 유전자 수
  gene_space = [{'low': 3, 'high': 14}, {'low': 30, 'high': 60}]  # 각 유전자의 가능한 값 범위
  # 초기 개체의 유전자 값이 선택되는 범위
  init_range_low = 0
  init_range_high = 100
  # 부모 개체를 선택하는 방법
  parent_selection_type = "rws" # 룰렛 휠 선택 방식
  # 돌연변이 유형
  mutation_type = "random"      # 무작위 돌연변이

  ga_instance = pygad.GA(num_generations=num_generations,
                          num_parents_mating=num_parents_mating,
                          sol_per_pop=sol_per_pop,
                          num_genes=num_genes,
                          gene_space=gene_space,
                          init_range_low=init_range_low,
                          init_range_high=init_range_high,
                          parent_selection_type=parent_selection_type,
                          mutation_type=mutation_type,
                          fitness_func=gdc_fitness_func
                          )

  best_solution = ga_instance.best_solution() #전체 최적 결과(현재 까지 찾은 최적 해와 적합도중 best를 출력)
  best_period_short = best_solution[0][0]
  best_period_long = best_solution[0][1]

  return best_period_short, best_period_long

#-------------------------------------ga 바탕 rsi signal----------------------------------------------
# 시그널 생성 함수
def make_gdc_ga_signal(df, best_period_short, best_period_long):
  # 단기와 장기 이동평균을 계산
  df['ga_short_ma'] = talib.SMA(df['MKTCAP'], timeperiod=best_period_short)
  df['ga_long_ma'] = talib.SMA(df['MKTCAP'], timeperiod=best_period_long)

  # 시그널 생성
  df['ga_gdc_signal'] = 0 # 초기 시그널 값을 0으로 설정

  # 단기 이동평균선이 중장기 이동평균선을 상향돌파  => 매수시점(1)
  # 전일 : 단기이동평균선 < 중장기 이동평균선 / 후일(현재 지점) : 단기이동평균선 > 중장기 이동평균선
  df.loc[(df['ga_short_ma'].shift() < df['ga_long_ma'].shift()) & (df['ga_short_ma'] > df['ga_long_ma']), 'ga_gdc_signal'] = 1

  # 단기 이동평균선이 중장기 이동평균선을 하향돌파  => 매도시점(-1)
  # 전일 : 단기이동평균선 > 중장기 이동평균선 / 후일(현재 지점) : 단기이동평균선 < 중장기 이동평균선
  df.loc[(df['ga_short_ma'].shift() > df['ga_long_ma'].shift()) & (df['ga_short_ma'] < df['ga_long_ma']), 'ga_gdc_signal'] = -1

  return df['ga_gdc_signal']

#-------------------------------------Backtesting-----------------------------------------
def gdc_backtesting(df):
  # 초기 자본금 설정
  initial_capital = 1000000000000000  #1000조

  # 보유 주식 수와 자본금 추적
  shares_held = 0
  capital = initial_capital
  capital_history = [capital]

  # 매수, 매도, 또는 보유 결정에 따른 자본금 변화 계산
  for i in range(1, len(df)):
      if df['ga_gdc_signal'][i] == 1:  # Buy 시그널인 경우
          shares_to_buy = capital // df['MKTCAP'][i]  # 보유 가능한 주식 수 계산
          shares_held += shares_to_buy
          capital -= shares_to_buy * df['MKTCAP'][i]
      elif df['ga_gdc_signal'][i] == -1:  # Sell 시그널인 경우
          capital += shares_held * df['MKTCAP'][i]  # 보유 주식 매도
          shares_held = 0

      capital_history.append(capital + shares_held * df['MKTCAP'][i])  # 자본금 변화 추적

  # 수익률 계산
  returns = (capital_history[-1] - initial_capital) / initial_capital * 100


  return returns

#-------------------------------최고 수익률을 내는 임계값-----------------------------
def best_gdc_profit(df):
  best_period_short, best_period_long = ga_gdc_optimize(df)

  # 수익률 확인 및 최적 매개변수로 시그널 갱신
  make_gdc_ga_signal(df, best_period_short, best_period_long)

  return best_period_short, best_period_long, df



# 최종적으로 모듈화된 수익률 도출 함수
def gdc_test(df):
  # train 데이터에 기본 시그널 생성
  make_gdc_sig(df, 'short_ma', 'long_ma', 5, 120, 'gdc_signal')

  # GA로 필요한 매개변수 최적화
  gdc_df = best_gdc_profit(df)[2]

  return gdc_df['ga_gdc_signal']


# TREND_보간법O

In [None]:
# 1. FNC_Func_PST

import numpy as np
from scipy.signal import argrelextrema

def FNC_Func_PST(T, P, c):
    # c : 데이터 자체
    c = np.array(c, dtype=float)
    n = len(c)  # 전체 데이터의 길이를 구함

    c_min = argrelextrema(c, np.less)[0]  # c 데이터 국소 최솟값의 인덱스
    c_max = argrelextrema(c, np.greater)[0]  # c 데이터 국소 최댓값의 인덱스

    cp1 = np.concatenate((c_max, c_min))  # cp1 : 최댓값 최솟값의 위치를 병합한 배열
    cp2 = np.concatenate((c[c_max], c[c_min]))  # cp2 : 최댓값 최솟값 자체를 병합한 배열
    cp = np.column_stack((cp1[np.argsort(cp1)], cp2[np.argsort(cp1)])) # 각각 정렬 후 [cp1, cp2]로 병합

    if len(cp) == 0:  # cp 배열이 비어있다면 빈 리스트와 0을 반환하고 종료
        return [], 0

    index = 0
    sp = np.zeros((len(cp), 2)) # 2차원 배열 생성
    sp[index] = cp[index] # sp에 첫 번째 국소 최댓값/최솟값의 위치와 값 저장

    i = 1
    while i < len(cp) - 1:
        # 최댓값/최솟값의 위치와 현재 위치 사이의 간격이 T보다 작거나
        # 현재 위치와 다음 위치 사이의 값 차이가 평균값에 대비하여 P보다 작으면 스킵.
        if (cp[i+1,0] - cp[i,0] < T) and (abs(cp[i+1,1] - cp[i,1]) / ((cp[i,1] + cp[i+1,1]) / 2) < P):
            i += 2

        # 위의 두 조건에 해당하지 않는다면 현재 위치를 sp에 저장하고 인덱스 +!
        else:
            index += 1
            sp[index] = cp[i]
            i += 1

    # 스킵되지 않은 값들만 포함되도록 sp 자르기
    sp = sp[:index+1]

    # 만약  sp 배열의 길이가 1이라면
    if len(sp) == 1:
        # 결과 배열을 0으로 채우고
        out_arg1 = np.zeros(n)
        # out_arg2를 0으로 설정 후 함수 종료
        out_arg2 = 0

    # sp 배열의 길이가 1이 아닌 경우
    else:
        # sp 배열에서 값 추출하여 temp에 저장
        temp = sp[:,1]
        # temp 배열에서 국소 최소값과 국소 최대값의 위치 찾기
        temp_min = argrelextrema(temp, np.less)[0]
        temp_max = argrelextrema(temp, np.greater)[0]

        # 최소값 최대값의 위치를 병합
        t1 = np.concatenate((temp_max, temp_min))
        t2 = np.concatenate((temp[temp_max], temp[temp_min]))

        # 위치 + 해당 값 배열 생성
        rp_t = np.column_stack((sp[t1[np.argsort(t1)],0], t2[np.argsort(t1)]))

        if len(rp_t) == 0:
            return [], 0

        # rp_t와 동일한 구조를 가지고 0으로 채워진 rp 배열 생성
        rp = np.zeros_like(rp_t)

        # 처음 두 개 값 설정
        rp[0,0] = rp_t[0,0]
        rp[1,0] = rp_t[1,0]

        # 첫 번째 값이 두 번째 값보다 작으면
        if rp_t[0,1] < rp_t[1,1]:
            # +1
            rp[0,1] = 1
            # 아니면 1
            rp[1,1] = -1
        else:
            rp[0,1] = -1
            rp[1,1] = 1

        # 나머지 값은 이전 값과 비교하여 1 -1 설정
        for i in range(2, len(rp_t)):
            rp[i,0] = rp_t[i,0]
            if rp_t[i-1,1] < rp_t[i,1]:
                rp[i,1] = -1
            else:
                rp[i,1] = 1

        # out_arg1 배열 초기화
        out_arg1 = np.zeros(n)

        # rp 배열에 대하여 선형회귀 실행
        for i in range(len(rp)-1):
            # 1차 다항식 회귀 계산식 : 기울기와 y 절편을 반환함
            a = np.polyfit(rp[i:i+2,0], rp[i:i+2,1], 1)

            # y = mx + b 계산식 수행
            for j in range(int(rp[i,0]), int(rp[i+1,0])):
                out_arg1[j] = j*a[0] + a[1]

        # 나머지 범위에 대해서는 out_arg1을 0으로 설정
        for i in range(int(rp[-1,0]), n):
            out_arg1[i] = 0

        # rp 배열의 위치에 해당하는 인덱스에 값을 설정하여 최종 out_arg1 배열 구성
        # 세그먼트에 대한 예측 값을 할당함
        out_arg1[rp[:,0].astype(int)] = rp[:,1]

        # 배열의 마지막 위치값 저장
        out_arg2 = rp[-1,0]

    return out_arg1, out_arg2



# 2. Preprocessing
import numpy as np
import scipy.io
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from pathlib import Path
import pandas as pd

def FNC_02_Preprocessing(df): #df를 전달하는 것으로 수정함
    # 데이터 로딩
    data = pd.DataFrame()
    scaler = MinMaxScaler(feature_range=(0,1))
    # df['MKTCAP'] = df['MKTCAP'].str.replace(',', '').astype(float)

    # KOSPI KOSDAQ는 시가총액, 나머지는 종가를 기준으로 가격 값을 일정 범위로 스케일링
    # KOSPI KOSDAQ 경우 증자 감자가 때문에 가격이 급격하게 변할 수 있으므로 시가총액을 기준으로 함.
    for i, data in enumerate(df):
        scaler = StandardScaler()
        df['A1'] = scaler.fit_transform(df['MKTCAP'].values.reshape(-1, 1))

        scaler = MinMaxScaler(feature_range=(0, 1))
        df['A2'] = scaler.fit_transform(df['MKTCAP'].values.reshape(-1, 1))

        scaler = MinMaxScaler(feature_range=(-1, 1))
        df['A3'] = scaler.fit_transform(df['MKTCAP'].values.reshape(-1, 1))

        # print(f"Scaling: {i+1} Done.")

    # 주가의 Target으로 사용할 Price_Series_Transform
    # 지그재그형 주가를 T, P를 기준으로 smooting

    # PST 파라매터 설정
    # T=5  # time interval
    # P=1.00  # percentage

    # PST
    for j, data in enumerate(df):
        # nPST=0 means there is no BUY and SELL point by PST
        if df['A2'].size == 0 or len(df['A2']) <= 10:
            # too short time series to calculate pst
            df['PST'] = []
            df['nPST'] = 0
        else:
            df['PST'], df['nPST'] = FNC_Func_PST(T, P, df['A2'].values)

        #print(f"PST: {j+1} Done.")

    # Save
    return df['PST']


# T, P, gridSearch

In [None]:
import warnings

def trend_gridSearch(df, file_name):
  array_T = list(range(2, 11))
  array_P = [0.5 + 0.5 * i for i in range(int(10.00 / 0.5) + 1)]

  max_returns = float('-inf')  # Initialize max_returns to negative infinity
  best_T = None
  best_P = None

  for T in array_T:
      for P in array_P:
          try:
              # 트렌드 만들기
              result = FNC_02_Preprocessing(T, P, df)

              # Backtest--------------------------------------
              # 초기 자본금 설정
              initial_capital = 1000000000000000  # 1000조

              # 보유 주식 수와 자본금 추적
              shares_held = 0
              capital = initial_capital
              capital_history = [capital]

              # 매수, 매도, 또는 보유 결정에 따른 자본금 변화 계산
              for i in range(1, len(df)):
                with warnings.catch_warnings():
                    warnings.simplefilter("ignore", category=RuntimeWarning)
                    if result[i] == 1:  # Buy 시그널인 경우
                        shares_to_buy = capital // df['MKTCAP'][i]  # 보유 가능한 주식 수 계산
                        shares_held += shares_to_buy
                        capital -= shares_to_buy * df['MKTCAP'][i]
                    elif result[i] == -1:  # Sell 시그널인 경우
                        capital += shares_held * df['MKTCAP'][i]  # 보유 주식 매도
                        shares_held = 0

                    capital_history.append(capital + shares_held * df['MKTCAP'][i])  # 자본금 변화 추적
              # 수익률 계산
              returns = (capital_history[-1] - initial_capital) / initial_capital * 100

              # print(f"T: {T}, P: {P:.2f}, Returns: {returns}")

              # Update max_returns and corresponding T, P values
              if returns > max_returns:
                  max_returns = returns
                  best_T = T
                  best_P = P

          # 예외 처리
          except (IndexError, ValueError) as e:
              # print(f"{type(e).__name__} occurred. Skipping T: {T}, P: {P:.2f}")

              # Break out of the inner loop when either IndexError or ValueError occurs
              break

  print(f"Filename : {file_name}, Best T: {best_T}, Best P: {best_P:.2f}, Max Returns: {max_returns}")



# csv로 저장

In [None]:
file_name = "new_KR7051910008.csv"
#---------------- ** 추출해야 되는 파일 ** ---------------
# "new_KR7005930003.csv"
# "new_KR7000250001.csv"
# "new_KR7036570000.csv"
# "new_KR7051910008.csv"
# "new_KR7066700006.csv"
# "new_KR7066970005.csv"
# "new_KR7068760008.csv"
# "new_KR7078600004.csv"
# "new_KR7096770003.csv"
# "new_KR7128940004.csv"
# "new_KR7185750007.csv"
# "new_KR7192080000.csv"
# "new_KR7207940008.csv"
# "new_KR7225570001.csv"
# "new_KR7247540008.csv"
# "new_KR7251270005.csv"
# "new_KR7263750002.csv"
# "new_KR7293490009.csv"




# 데이터 불러오기
df = load_data(file_name)

# 시그널 열 저장------------------------------------------------------------------
#-------INDEX-------
df['RSI_sig'] = rsi_test(df)
df['SMA_sig'] = sma_test(df)
df['BBND_sig'] = bbnd_test(df)
df['ROC_sig'] = roc_test(df)
df['DPO_sig'] = dpo_test(df)
df['STOCH_sig'] = stoch_test(df)
df['MACD_sig'] = merged_macd_test(df)
df['GDC_sig'] = gdc_test(df)

#-------TREND-------
df['TREND'] = FNC_02_Preprocessing(df)
# --------------------------------------------------------------------------------

# 새로운 csv 열 구성하기
df.rename(columns={'Timestamp': 'TRD_DD'}, inplace=True)

# 여러 개의 열 선택 (열 이름 기반)
selected_columns = df[['TRD_DD', 'RSI_sig', 'SMA_sig', 'BBND_sig', 'ROC_sig', 'DPO_sig', 'STOCH_sig', 'MACD_sig', 'GDC_sig', 'TREND']]
selected_columns

If you do not want to mutate any gene, please set mutation_type=None.
If you do not want to mutate any gene, please set mutation_type=None.
If you do not want to mutate any gene, please set mutation_type=None.
If you do not want to mutate any gene, please set mutation_type=None.
If you do not want to mutate any gene, please set mutation_type=None.
If you do not want to mutate any gene, please set mutation_type=None.
If you do not want to mutate any gene, please set mutation_type=None.
If you do not want to mutate any gene, please set mutation_type=None.
If you do not want to mutate any gene, please set mutation_type=None.
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['diff'][i] = diff
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-doc

Unnamed: 0,TRD_DD,MKTCAP,RSI_sig,SMA_sig,BBND_sig,ROC_sig,DPO_sig,STOCH_sig,MACD_sig,GDC_sig,TREND,TREND_N
0,2001-04-25,837525832000,0,0,0,0,0,0,0,0,0.0,-1
1,2001-04-26,798870793600,0,0,0,0,0,0,0,0,0.0,0
2,2001-04-27,818198312800,0,0,0,0,0,0,0,0,0.0,0
3,2001-04-30,824640819200,0,0,0,0,0,0,0,0,0.0,0
4,2001-05-02,837525832000,0,0,0,0,0,0,0,0,0.0,0
...,...,...,...,...,...,...,...,...,...,...,...,...
5446,2023-05-08,51250041018000,1,0,0,0,0,0,0,-1,0.0,0
5447,2023-05-09,50967671646000,1,0,0,0,1,0,0,0,0.0,0
5448,2023-05-10,50049971187000,1,0,0,0,0,0,0,0,0.0,0
5449,2023-05-11,48779309013000,1,0,0,0,0,0,0,0,0.0,0


# 간단한 BACKTESTING

In [None]:
rsi = rsi_backtesting(df)
sma = sma_backtesting(df)
bbnd = bbnd_backtesting(df)
roc = roc_backtesting(df)
dpo = dpo_backtesting(df)
stoch = stoch_backtesting(df)
macd = macd_backtesting(df)
gdc = gdc_backtesting(df)

print("rsi : ", rsi)
print("sma : ", sma)
print("bbnd : ", bbnd)
print("roc : ", roc)
print("dpo : ", dpo)
print("stoch : ", stoch)
print("macd : ", macd)
print("gdc : ", gdc)

rsi :  20066.869153998214
sma :  3045.03339732939
bbnd :  2285.42553200056
roc :  7721.94519032216
dpo :  67609.5147065528
stoch :  327.23389456257
macd :  1274.8298207101
gdc :  1424.8431786046901
