# LSTM

Model : LSTM

Data : KOSDAQ 2020.01.01 ~ 2024.04.30

Feature : '시가', '고가', '저가', '종가', '거래량'

Scaler : Standard

## Data Load

In [1]:
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dropout, Dense
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import Callback, ModelCheckpoint, EarlyStopping
from sklearn.metrics import f1_score, precision_score, recall_score
from sklearn.utils.class_weight import compute_class_weight
import matplotlib.pyplot as plt

In [2]:
# 파일 경로
file_path = '/Users/yangdong-yeong/Desktop/Fnguide/data/kosdaq_2020~2024.csv'

# CSV 파일 불러오기
df = pd.read_csv(file_path)

# 'ticker' 열의 값을 문자열로 변환하고, 5자리인 경우 앞에 '0'을 추가
df['ticker'] = df['ticker'].astype(str).str.zfill(6)

# 변경된 데이터 확인
print(df.head(5))

  df = pd.read_csv(file_path)


            날짜    시가    고가    저가    종가     거래량       등락률  ticker
0   2020-01-02  2820  2845  2780  2795  131742 -0.886525  060310
1   2020-01-03  2795  2825  2775  2810  100374  0.536673  060310
2   2020-01-06  2800  2805  2650  2685  201779 -4.448399  060310
3   2020-01-07  2685  2745  2685  2745   62241  2.234637  060310
4   2020-01-08  2720  2745  2580  2585  203909 -5.828780  060310
5   2020-01-09  2645  2730  2640  2700   68577  4.448743  060310
6   2020-01-10  2710  2785  2710  2785   71797  3.148148  060310
7   2020-01-13  2770  2835  2740  2815  100353  1.077199  060310
8   2020-01-14  2830  2845  2795  2805   83827 -0.355240  060310
9   2020-01-15  2805  2825  2780  2800   81013 -0.178253  060310
10  2020-01-16  2825  2950  2800  2905  308911  3.750000  060310
11  2020-01-17  2900  2985  2885  2930  105773  0.860585  060310
12  2020-01-20  2965  2965  2860  2880  128911 -1.706485  060310
13  2020-01-21  2865  2920  2850  2890   88842  0.347222  060310
14  2020-01-22  2890  293

In [4]:
df.shape

(1597711, 8)

## Data Processing

In [5]:
#날짜 포맷 확인 및 변환
df['날짜'] = pd.to_datetime(df['날짜'])

# ticker 별로 데이터 분할
tickers = tickers = df['ticker'].unique()
ticker_df = {ticker: df[df['ticker'] == ticker] for ticker in tickers}

In [None]:
# 스케일링
scalers = {ticker: StandardScaler() for ticker in tickers}

for ticker in tickers:
    # 데이터프레임의 복사본 생성
    df_copy = ticker_df[ticker].copy()
    # 관련 컬럼을 float로 타입 변경
    df_copy[['시가', '고가', '저가', '종가', '거래량']] = df_copy[['시가', '고가', '저가', '종가', '거래량']].astype(float)
    # 데이터 스케일링
    scaled_data = scalers[ticker].fit_transform(df_copy[['시가', '고가', '저가', '종가', '거래량']])
    # 스케일된 데이터를 원본 DataFrame에 적용
    ticker_df[ticker].loc[:, ['시가', '고가', '저가', '종가', '거래량']] = scaled_data
    
# 메모리 문제로 파일 중간 저장
all_data = pd.concat([ticker_df[ticker] for ticker in tickers], axis=0, ignore_index=True)
all_data.to_csv('/Users/yangdong-yeong/Downloads/ticker_df_combined.csv', index=False)

In [None]:
# CSV 파일 불러오기
file_path = '/Users/yangdong-yeong/Downloads/ticker_df_combined.csv'
ticker_df = pd.read_csv(file_path)

# '날짜' 열을 datetime 객체로 변환
ticker_df['날짜'] = pd.to_datetime(ticker_df['날짜'])

# ticker 별로 데이터 분할
tickers = ticker_df['ticker'].unique()
ticker_data = {ticker: ticker_df[ticker_df['ticker'] == ticker] for ticker in tickers}

In [None]:
# 시퀀스 데이터와 레이블 생성 함수
def create_sequences_and_labels(df, sequence_length, threshold=20, look_forward=30):
    sequences = []
    labels = []
    if len(df) >= sequence_length + look_forward:

        for i in range(len(df) - sequence_length - look_forward + 1):
            sequence_start = i
            sequence_end = i + sequence_length
            future_start = sequence_end
            future_end = future_start + look_forward

            sequences.append(df[['시가', '고가', '저가', '종가', '거래량']].iloc[sequence_start:sequence_end].values)
            future_data = df.iloc[future_start:future_end]
            # 등락률을 사용하여 레이블 결정
            labels.append(any(future_data['등락률'] >= threshold))

        return np.array(sequences), np.array(labels).astype(int)
    else:
        return np.array([]), np.array([])  # 데이터 포인트가 부족할 경우 빈 배열 반환
    
# 시퀀스 길이 설정
sequence_length = 180  # 과거 180일의 데이터로 시퀀스 생성
num_features = 5

# 각 ticker 별로 시퀀스와 레이블 생성
ticker_sequences_and_labels = {ticker: create_sequences_and_labels(ticker_data[ticker], sequence_length) for ticker in tickers}

# 모든 ticker의 시퀀스와 레이블을 하나의 리스트에 모으기
all_sequences = []
all_labels = []

for ticker, (sequences, labels) in ticker_sequences_and_labels.items():
    if sequences.size > 0:
        all_sequences.append(sequences)
        all_labels.append(labels)

# 배열로 변환
if all_sequences:
    all_sequences = np.concatenate(all_sequences)
    all_labels = np.concatenate(all_labels)
else:
    all_sequences, all_labels = np.array([]), np.array([])

print(f"Combined Sequences Shape: {all_sequences.shape if all_sequences.size > 0 else 'No sequences available'}")
print(f"Combined Labels Shape: {all_labels.shape if all_labels.size > 0 else 'No labels available'}")


In [None]:
# LSTM 모델
model = Sequential([
    LSTM(64, return_sequences=True, input_shape=(sequence_length, num_features)),
    Dropout(0.5),
    LSTM(32),
    Dropout(0.5),
    Dense(1, activation='sigmoid')
])


# 모델 컴파일
model.compile(
    optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=0.001),
    loss='binary_crossentropy',
    metrics=['accuracy', tf.keras.metrics.Precision(), tf.keras.metrics.Recall()]
)

In [None]:
# F1 Score 콜백 정의
class F1ScoreCallback(tf.keras.callbacks.Callback):
    def __init__(self, X_val, y_val):
        super().__init__()
        self.X_val = X_val
        self.y_val = y_val
        self.precision_scores = []
        self.recall_scores = []
        self.f1_scores = []

    def on_epoch_end(self, epoch, logs=None):
        val_predict = (self.model.predict(self.X_val) > 0.5).astype(int)
        val_targ = self.y_val
        precision = precision_score(val_targ, val_predict, zero_division=0)
        recall = recall_score(val_targ, val_predict, zero_division=0)
        f1 = f1_score(val_targ, val_predict, zero_division=0)

        self.precision_scores.append(precision)
        self.recall_scores.append(recall)
        self.f1_scores.append(f1)

    def plot_metrics(self):
        epochs = range(1, len(self.f1_scores) + 1)
        plt.figure(figsize=(10, 6))
        plt.plot(epochs, self.f1_scores, label='F1 Score')
        plt.plot(epochs, self.precision_scores, label='Precision')
        plt.plot(epochs, self.recall_scores, label='Recall')
        plt.xlabel('Epochs')
        plt.ylabel('Score')
        plt.legend()
        plt.title('Model Performance Metrics')
        plt.grid(True)
        plt.show()

In [None]:
# 데이터 분할
X_train, X_test, y_train, y_test, indices_train, indices_test = train_test_split(
    all_sequences, all_labels, np.arange(all_sequences.shape[0]), test_size=0.2, random_state=42)


# 체크포인트를 저장할 경로 설정
checkpoint_path = '/Users/yangdong-yeong/Desktop/Fnguide/Check_Point/model_{epoch:02d}-{val_loss:.2f}.h5'

# 콜백 인스턴스 생성
f1_callback = F1ScoreCallback(X_test, y_test)
early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
model_checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_path,
    save_weights_only=True,
    monitor='val_loss',
    mode='min',
    save_best_only=True)


# 클래스 가중치 계산
class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
class_weight_dict = dict(enumerate(class_weights))

In [None]:
# 모델 학습
history = model.fit(
    X_train, y_train,
    epochs=10,
    batch_size=64,
    validation_data=(X_test, y_test),
    callbacks=[f1_callback, early_stopping, model_checkpoint_callback],
    class_weight=class_weight_dict,  # 클래스 가중치 사용
    verbose=2
)

# 학습 완료 후 지표 플로팅
f1_callback.plot_metrics()

In [None]:
# 모델 예측
predictions = model.predict(X_test)
predicted_labels = (predictions > 0.5).astype(int)

# 양성으로 예측된 샘플의 원본 인덱스와 예측된 레이블 추출
positive_indices = np.where(predicted_labels == 1)[0]
positive_actual_indices = indices_test[positive_indices]
positive_predicted_labels = predicted_labels[positive_indices]

# 원본 데이터에서 해당 인덱스의 'ticker'와 실제 레이블 추출
positive_tickers = df.loc[positive_actual_indices, 'ticker']
actual_labels = y_test[positive_indices]  # y_test에서 양성으로 예측된 인덱스의 실제 레이블 추출

# 결과 출력
for idx, ticker, pred_label, act_label in zip(positive_actual_indices, positive_tickers, positive_predicted_labels, actual_labels):
    print(f"Original Index: {idx}, Ticker: {ticker}, Predicted: {pred_label}, Actual: {act_label}")

In [None]:
total_samples = 0
positive_samples = 0

# 각 ticker 별로 레이블 데이터에서 1과 0의 개수를 계산하여 타겟 비율을 확인
for sequences, labels in ticker_sequences_and_labels.values():
    total_samples += len(labels)
    positive_samples += sum(labels)

# 타겟 비율 계산
target_ratio = positive_samples / total_samples

print(f"Total samples: {total_samples}")
print(f"Positive samples: {positive_samples}")
print(f"Target ratio: {target_ratio:.2f}")

Total samples: 1284939
Positive samples: 1091650
Target ratio: 0.85
