In [None]:
# LSTM Autoencoder
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, RepeatVector
import matplotlib.pyplot as plt

# ---------------------------------------------------------
# 1. 데이터 로드 및 전처리
# ---------------------------------------------------------
df = pd.read_csv("../data/USD_KRW.csv")
df = df.loc[df['date'] >= '1997.12.16', ['date', 'close']]
df['date'] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)

# 수익률 계산
df['return'] = df['close'].shift() / df['close'].shift(5) - 1
df.dropna(inplace=True)

# ---------------------------------------------------------
# 2. 시계열 윈도우 만들기 함수
# ---------------------------------------------------------
def create_sequences(data, window_size):
    sequences = []
    for i in range(len(data) - window_size):
        seq = data[i:i+window_size]
        sequences.append(seq)
    return np.array(sequences)

# ---------------------------------------------------------
# 3. 슬라이딩 윈도우 적용
# ---------------------------------------------------------
WINDOW_SIZE = 5

# 윈도우 적용을 위해 return만 추출
returns = df[['return']].values
sequences = create_sequences(returns, WINDOW_SIZE)

# ---------------------------------------------------------
# 4. 학습/검증/테스트 분할
# ---------------------------------------------------------
dates = df.index[WINDOW_SIZE:]  # 시퀀스의 날짜는 WINDOW_SIZE 이후부터 가능
assert len(sequences) == len(dates)

train_end = pd.Timestamp("2018.01.01")
test_start = pd.Timestamp("2022.01.01")

# 각 세트의 인덱스 구하기
train_idx = dates < train_end
test_idx = dates >= test_start
val_idx = (dates >= train_end) & (dates < test_start)

# 분할
x_train = sequences[train_idx]
x_val = sequences[val_idx]
x_test = sequences[test_idx]

# ---------------------------------------------------------
# 5. 정규화 (train 기준으로 fit)
# ---------------------------------------------------------
scaler = StandardScaler()

# 2D로 reshape 후 정규화
x_train_2d = x_train.reshape(-1, WINDOW_SIZE)
x_val_2d = x_val.reshape(-1, WINDOW_SIZE)
x_test_2d = x_test.reshape(-1, WINDOW_SIZE)

x_train_scaled = scaler.fit_transform(x_train_2d).reshape(-1, WINDOW_SIZE, 1)
x_val_scaled = scaler.transform(x_val_2d).reshape(-1, WINDOW_SIZE, 1)
x_test_scaled = scaler.transform(x_test_2d).reshape(-1, WINDOW_SIZE, 1)

# ---------------------------------------------------------
# 6. LSTM Autoencoder 정의
# ---------------------------------------------------------
LATENT_DIM = 16

input_layer = Input(shape=(WINDOW_SIZE, 1))
encoded = LSTM(LATENT_DIM, activation='relu', return_sequences=False)(input_layer)
decoded = RepeatVector(WINDOW_SIZE)(encoded)
decoded = LSTM(1, activation='linear', return_sequences=True)(decoded)

autoencoder = Model(inputs=input_layer, outputs=decoded)
autoencoder.compile(optimizer='adam', loss='mse')

# ---------------------------------------------------------
# 7. 모델 학습
# ---------------------------------------------------------
history = autoencoder.fit(
    x_train_scaled, x_train_scaled,
    epochs=50,
    batch_size=32,
    validation_data=(x_val_scaled, x_val_scaled),
    shuffle=False
)

# ---------------------------------------------------------
# 8. 테스트 데이터 재구성 오차 계산
# ---------------------------------------------------------
x_test_pred = autoencoder.predict(x_test_scaled)
mse = np.mean(np.power(x_test_scaled - x_test_pred, 2), axis=(1, 2))

# ---------------------------------------------------------
# 9. 이상 탐지 (Threshold 자동화: IQR 방식 예시)
# ---------------------------------------------------------
q1 = np.percentile(mse, 25)
q3 = np.percentile(mse, 75)
iqr = q3 - q1
threshold = q3 + 1.5 * iqr

# 이상 탐지
anomalies = mse > threshold
anomaly_dates = dates[test_idx][anomalies]

# ---------------------------------------------------------
# 10. 시각화
# ---------------------------------------------------------
import matplotlib.dates as mdates

mse_series = pd.Series(mse, index=dates[test_idx])

# 환율 시계열에서 테스트 구간만 추출
price_series = df.loc[dates[test_idx], 'close']

# 이상치 시점에 해당하는 환율 값
anomaly_prices = price_series.loc[anomaly_dates]

# 그래프 출력
plt.figure(figsize=(15, 5))
plt.plot(price_series, label='Exchange Rate', color='blue')
plt.scatter(anomaly_dates, anomaly_prices, color='red', marker='x', label='Anomaly')
plt.title("Exchange Rate with Anomalies Detected by LSTM Autoencoder")
plt.xlabel("Date")
plt.ylabel("Exchange Rate")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
# Isolation Forest
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler

# --- 데이터 및 전처리 ---
# df: 날짜 인덱스 포함 DataFrame, data: 특징 컬럼만 있는 DataFrame
df = pd.read_csv('../data/USD_KRW.csv')
df = df.loc[df['date'] >= '1997.12.16', : ]
df['date'] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)

data = df.copy()

X = data.values  # (samples, features)
dates = data.index

# -------------------------
# 3. 트렌드 제거 (환율에 대해)
# -------------------------
rolling_window = 20
data['close_detrended'] = data['close'] - data['close'].rolling(window=rolling_window, min_periods=1).mean()

# -------------------------
# 4. 학습/테스트 분할
# -------------------------
test_start = pd.Timestamp("2022.01.01")
train_data = data.loc[data.index < test_start]
test_data = data.loc[data.index >= test_start]

# -------------------------
# 5. 특징 선택 및 스케일링
# -------------------------
features = data.columns.drop('close')  # '종가'는 예측 대상이라 제외 (또는 포함해도 됨)

scaler = StandardScaler()
X_train = scaler.fit_transform(train_data[features])
X_test = scaler.transform(test_data[features])

# -------------------------
# 6. Isolation Forest 훈련
# -------------------------
iso_forest = IsolationForest(contamination=0.02, random_state=42)
iso_forest.fit(X_train)

# 예측 (1: 정상, -1: 이상)
y_pred = iso_forest.predict(X_test)
anomalies = y_pred == -1

# -------------------------
# 7. 시각화
# -------------------------
test_dates = test_data.index
price_series = test_data['close']
anomaly_dates = test_dates[anomalies]
anomaly_prices = price_series.loc[anomaly_dates]

plt.figure(figsize=(15, 5))
plt.plot(test_dates, price_series, label='Exchange Rate', color='blue')
plt.scatter(anomaly_dates, anomaly_prices, color='red', marker='x', label='Isolation Forest Anomaly')
plt.title("Isolation Forest Anomaly Detection on Exchange Rate (Detrended)")
plt.xlabel("Date")
plt.ylabel("Exchange Rate")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
# One-Class SVM
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.svm import OneClassSVM

# --- 데이터 및 전처리 ---
# df: 날짜 인덱스 포함 DataFrame, data: 특징 컬럼만 있는 DataFrame
df = pd.read_csv('../data/USD_KRW.csv')
df = df.loc[df['날짜'] >= '1997.12.16', : ]
df['date'] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)

# Trend removal: 5-day moving average
df_trend = df['close'].rolling(window=5, min_periods=1, center=True).mean()
df_detrended = df.copy()
df_detrended['close'] = data['close'] - df_trend

# Scale all features
scaler = StandardScaler()
data_scaled = scaler.fit_transform(df_detrended)

# Train One-Class SVM
svm = OneClassSVM(nu=0.05, kernel='rbf', gamma='scale')
svm.fit(data_scaled)

# Predict
pred = svm.predict(data_scaled)
anomalies = pred == -1
anomaly_dates = df.index[anomalies]
anomaly_prices = df.loc[anomalies, 'close']

# Plotting
plt.figure(figsize=(15, 5))
plt.plot(df.index, df['close'], label='Exchange Rate')
plt.scatter(anomaly_dates, anomaly_prices, color='red', marker='x', label='One-Class SVM Anomaly')
plt.title("One-Class SVM Anomaly Detection on Exchange Rate")
plt.xlabel("Date")
plt.ylabel("Exchange Rate")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
# Univariate Z-score 기반 이상 탐지 파이프라인
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report, confusion_matrix
from scipy.stats import zscore
import seaborn as sns

# 데이터 불러오기
df = pd.read_csv('/content/drive/MyDrive/프로젝트/일일환율.csv')
df['날짜'] = pd.to_datetime(df['날짜'])
df.set_index('날짜', inplace=True)

# 변동률(로그 수익률) 계산
df['return'] = np.log(df['종가']).diff(-1)
df.dropna(inplace=True)

# Z-score 계산
df['z_score'] = zscore(df['return'])

# ✅ Threshold 자동 설정 (IQR 방식)
q1 = df['z_score'].quantile(0.25)
q3 = df['z_score'].quantile(0.75)
iqr = q3 - q1
upper = q3 + 1.5 * iqr
lower = q1 - 1.5 * iqr

# 이상치 판단
df['anomaly'] = ((df['z_score'] > upper) | (df['z_score'] < lower)).astype(int)

# ✅ 라벨 생성 (실제 라벨이 있다면 이 부분 대체 가능)
# 수익률이 ±1.5% 이상이면 이상치로 간주
threshold_label = 0.015
df['label'] = (np.abs(df['return']) > threshold_label).astype(int)

# ✅ 평가 지표 계산
accuracy = accuracy_score(df['label'], df['anomaly'])
precision = precision_score(df['label'], df['anomaly'])
recall = recall_score(df['label'], df['anomaly'])
f1 = f1_score(df['label'], df['anomaly'])

print("📊 accuracy:", round(accuracy, 3))
print("📊 Precision:", round(precision, 3))
print("📊 Recall:", round(recall, 3))
print("📊 F1 Score:", round(f1, 3))

# Confusion matrix
sns.heatmap(confusion_matrix(df['label'], df['anomaly']), annot=True, fmt='d')
plt.title('Confusion Matrix')
plt.show()

# ✅ 시각화
plt.figure(figsize=(15, 5))
plt.plot(df.index, df['종가'], label='Exchange Rate')
plt.scatter(df.index[df['anomaly'] == 1], df['종가'][df['anomaly'] == 1], color='red', marker='x', label='Detected Anomaly')
plt.title("📈 Z-score 기반 환율 이상 탐지 결과")
plt.xlabel("Date")
plt.ylabel("Exchange Rate")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
# Z-Score 기반 예측

import pandas as pd
import numpy as np
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve

# === 파라미터 ===
window_size = 20      # 과거 며칠 데이터를 input으로 사용할지
future_window = 5     # 향후 며칠 중 이상치 확인
z_thresh = 1.5       # 이상치 기준

# === 데이터 로딩 ===
df = pd.read_csv('../data/독립변수.csv')

# === 기본 피처 ===
df['return'] = df['종가'].pct_change()
df['zscore'] = (df['종가'] - df['종가'].rolling(20).mean()) / df['종가'].rolling(20).std()

# === RSI 계산 함수 ===
def compute_rsi(series, period=14):
    delta = series.diff()
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)
    avg_gain = gain.rolling(period).mean()
    avg_loss = loss.rolling(period).mean()
    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))
    return rsi

# === 향후 이상치 여부 타겟 생성 ===
def label_future_outliers(z_scores, window, threshold):
    labels = []
    for i in range(len(z_scores)):
        future_window = z_scores[i+1:i+1+window]
        if future_window.isna().any() or len(future_window) < window:
            labels.append(np.nan)
        else:
            labels.append(int((future_window.abs() > threshold).any()))
    return labels

df['is_outlier_future'] = label_future_outliers(df['zscore'], future_window, z_thresh)

# === 추가 피처들 ===
df['rolling_mean_5'] = df['종가'].rolling(5).mean()
df['rolling_std_5'] = df['종가'].rolling(5).std()
df['ema_5'] = df['종가'].ewm(span=5).mean()
df['bollinger_upper'] = df['rolling_mean_5'] + 2 * df['rolling_std_5']
df['bollinger_lower'] = df['rolling_mean_5'] - 2 * df['rolling_std_5']
df['zscore_return'] = (df['return'] - df['return'].rolling(30).mean()) / df['return'].rolling(30).std()
df['momentum_5'] = df['종가'] - df['종가'].shift(5)
df['rsi_14'] = compute_rsi(df['종가'], 14)

# === Lag 피처 ===
for i in range(1, window_size + 1):
    df[f'lag_{i}'] = df['종가'].shift(i)
    df[f'return_lag_{i}'] = df['return'].shift(i)

features = df.columns[1 : ]


# === 학습용 데이터 준비 ===
df_model = df.dropna(axis=0)
X = df_model[features].drop('is_outlier_future', axis=1)
y = df_model['is_outlier_future']

# === Train/Test Split ===
X_train, X_test, y_train, y_test = X[ : int(len(X) * 0.7)], X[int(len(X) * 0.7) : ], y[ : int(len(X) * 0.7)], y[int(len(X) * 0.7) : ]

# === 클래스 불균형 보정 ===
pos_weight = (len(y_train) - sum(y_train)) / sum(y_train)

# === XGBoost 모델 ===
model = XGBClassifier(
    n_estimators=100,
    max_depth=4,
    learning_rate=0.05,
    scale_pos_weight=pos_weight,
    use_label_encoder=False,
    eval_metric='logloss'
)
model.fit(X_train, y_train)

# === 평가 ===
y_pred = model.predict(X_test)

print(confusion_matrix(y_test, y_pred))
print(classification_report(y_test, y_pred))

# === 피처 중요도 출력 (선택 사항) ===
import matplotlib.pyplot as plt
import seaborn as sns

feature_importances = pd.Series(model.feature_importances_, index=X.columns)
plt.figure(figsize=(10, 6))
sns.barplot(x=feature_importances.sort_values(ascending=False).head(15), y=feature_importances.sort_values(ascending=False).head(15).index)
plt.title("Top 15 Feature Importances")
plt.show()