# 기술적 분석 (Technical Analysis)

In [None]:
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')

# from config import UNIVERSE, START_DATE, END_DATE
UNIVERSE = 'KOSPI200'
START_DATE = '2020-01-01'
END_DATE = '2024-12-31'

TECH_CONFIG = {
    'MA_SHORT': 20, 'MA_LONG': 60,
    'EMA_SHORT': 12, 'EMA_LONG': 26,
    'MACD_FAST': 12, 'MACD_SLOW': 26, 'MACD_SIGNAL': 9,
    'RSI_PERIOD': 14, 'RSI_OVERBOUGHT': 70, 'RSI_OVERSOLD': 30,
    'STOCH_K': 14, 'STOCH_D': 3, 'STOCH_OVERBOUGHT': 80, 'STOCH_OVERSOLD': 20,
    'BB_PERIOD': 20, 'BB_STD': 2,
    'ATR_PERIOD': 14,
    'VOLUME_MA_PERIOD': 20,
}

In [None]:
# 기술적 지표 계산 함수

def calc_ma(close, period):
    return close.rolling(window=period).mean()

def calc_ema(close, period):
    return close.ewm(span=period, adjust=False).mean()

def calc_macd(close, fast, slow, signal):
    ema_fast = calc_ema(close, fast)
    ema_slow = calc_ema(close, slow)
    macd_line = ema_fast - ema_slow
    signal_line = calc_ema(macd_line, signal)
    histogram = macd_line - signal_line
    return macd_line, signal_line, histogram

def calc_rsi(close, period):
    delta = close.diff()
    gain = delta.where(delta > 0, 0)
    loss = (-delta).where(delta < 0, 0)
    avg_gain = gain.rolling(window=period).mean()
    avg_loss = loss.rolling(window=period).mean()
    rs = avg_gain / avg_loss
    return 100 - (100 / (1 + rs))

def calc_stochastic(high, low, close, k_period, d_period):
    lowest_low = low.rolling(window=k_period).min()
    highest_high = high.rolling(window=k_period).max()
    stoch_k = 100 * (close - lowest_low) / (highest_high - lowest_low)
    stoch_d = stoch_k.rolling(window=d_period).mean()
    return stoch_k, stoch_d

def calc_atr(high, low, close, period):
    prev_close = close.shift(1)
    tr = pd.concat([high - low, abs(high - prev_close), abs(low - prev_close)], axis=1).max(axis=1)
    return tr.rolling(window=period).mean()

def calc_bollinger(close, period, std_dev):
    middle = close.rolling(window=period).mean()
    std = close.rolling(window=period).std()
    upper = middle + (std_dev * std)
    lower = middle - (std_dev * std)
    width = (upper - lower) / middle
    pct_b = (close - lower) / (upper - lower)
    return middle, upper, lower, width, pct_b

def calc_obv(close, volume):
    direction = np.sign(close.diff())
    direction.iloc[0] = 0
    return (direction * volume).cumsum()

def calc_volume_oscillator(volume, short, long):
    short_ma = volume.rolling(window=short).mean()
    long_ma = volume.rolling(window=long).mean()
    return ((short_ma - long_ma) / long_ma) * 100

In [None]:
# 팩터 (0-100) 및 시그널 (-1, 0, 1) 계산

def calc_factor_momentum(close, config):
    return calc_rsi(close, config['RSI_PERIOD'])

def calc_factor_trend(close, config):
    ma_short = calc_ma(close, config['MA_SHORT'])
    ma_long = calc_ma(close, config['MA_LONG'])
    ma_ratio = (ma_short / ma_long - 1) * 100
    return (ma_ratio.clip(-10, 10) + 10) * 5

def calc_factor_volatility(high, low, close, config):
    atr = calc_atr(high, low, close, config['ATR_PERIOD'])
    atr_pct = atr / close * 100
    return 100 - (atr_pct.clip(0, 5) * 20)

def calc_factor_volume(close, volume, config):
    vol_osc = calc_volume_oscillator(volume, config['MA_SHORT'], config['VOLUME_MA_PERIOD'])
    return (vol_osc.clip(-50, 50) + 50)

def calc_signal_macd(close, config):
    macd, signal, _ = calc_macd(close, config['MACD_FAST'], config['MACD_SLOW'], config['MACD_SIGNAL'])
    cross_up = (macd > signal) & (macd.shift(1) <= signal.shift(1))
    cross_down = (macd < signal) & (macd.shift(1) >= signal.shift(1))
    sig = pd.Series(0, index=close.index)
    sig[cross_up] = 1
    sig[cross_down] = -1
    return sig

def calc_signal_rsi(close, config):
    rsi = calc_rsi(close, config['RSI_PERIOD'])
    buy = (rsi > config['RSI_OVERSOLD']) & (rsi.shift(1) <= config['RSI_OVERSOLD'])
    sell = (rsi < config['RSI_OVERBOUGHT']) & (rsi.shift(1) >= config['RSI_OVERBOUGHT'])
    sig = pd.Series(0, index=close.index)
    sig[buy] = 1
    sig[sell] = -1
    return sig

def calc_signal_bb(close, config):
    _, upper, lower, _, _ = calc_bollinger(close, config['BB_PERIOD'], config['BB_STD'])
    buy = (close > lower) & (close.shift(1) <= lower.shift(1))
    sell = (close < upper) & (close.shift(1) >= upper.shift(1))
    sig = pd.Series(0, index=close.index)
    sig[buy] = 1
    sig[sell] = -1
    return sig

def calc_signal_ma_cross(close, config):
    ma_short = calc_ma(close, config['MA_SHORT'])
    ma_long = calc_ma(close, config['MA_LONG'])
    cross_up = (ma_short > ma_long) & (ma_short.shift(1) <= ma_long.shift(1))
    cross_down = (ma_short < ma_long) & (ma_short.shift(1) >= ma_long.shift(1))
    sig = pd.Series(0, index=close.index)
    sig[cross_up] = 1
    sig[cross_down] = -1
    return sig

In [None]:
# 파이프라인 및 Export 함수

def process_single_ticker(ohlcv, config):
    close, high, low, volume = ohlcv['Close'], ohlcv['High'], ohlcv['Low'], ohlcv['Volume']
    result = pd.DataFrame(index=ohlcv.index)
    
    result['Factor_Tech_Momentum'] = calc_factor_momentum(close, config)
    result['Factor_Tech_Trend'] = calc_factor_trend(close, config)
    result['Factor_Tech_Volatility'] = calc_factor_volatility(high, low, close, config)
    result['Factor_Tech_Volume'] = calc_factor_volume(close, volume, config)
    result['Signal_Tech_MACD'] = calc_signal_macd(close, config)
    result['Signal_Tech_RSI'] = calc_signal_rsi(close, config)
    result['Signal_Tech_BB'] = calc_signal_bb(close, config)
    result['Signal_Tech_MA'] = calc_signal_ma_cross(close, config)
    
    return result

def generate_technical_factors(ohlcv_data, config):
    results = []
    for ticker, ohlcv in ohlcv_data.items():
        try:
            df = process_single_ticker(ohlcv, config)
            df['ticker'] = ticker
            results.append(df)
        except Exception as e:
            print(f"Error processing {ticker}: {e}")
    
    if not results:
        return pd.DataFrame()
    
    combined = pd.concat(results).reset_index().rename(columns={'index': 'date'})
    return combined.set_index(['date', 'ticker']).sort_index()

def get_technical_factors(ohlcv_data=None, config=None, load_from_file=None):
    if load_from_file:
        return pd.read_parquet(load_from_file)
    if ohlcv_data is None:
        raise ValueError("Either ohlcv_data or load_from_file must be provided")
    return generate_technical_factors(ohlcv_data, config or TECH_CONFIG)

def save_technical_factors(factors, path='technical_factors.parquet'):
    factors.to_parquet(path)

def get_factor_columns():
    return {
        'factors': ['Factor_Tech_Momentum', 'Factor_Tech_Trend', 'Factor_Tech_Volatility', 'Factor_Tech_Volume'],
        'signals': ['Signal_Tech_MACD', 'Signal_Tech_RSI', 'Signal_Tech_BB', 'Signal_Tech_MA']
    }