In [1]:
import os
import urllib.request
import json
import pandas as pd
import matplotlib.pyplot as plt
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
from bs4 import BeautifulSoup
import requests
import time
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import GRU, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam

In [2]:
plt.rcParams['font.family'] = 'Malgun Gothic'  # Windows: 'Malgun Gothic', MacOS: 'AppleGothic', Linux: 'NanumGothic'
plt.rcParams['axes.unicode_minus'] = False     # 그래프에서 마이너스 기호 깨짐 방지

# 환경 변수에서 민감한 정보를 불러옴
client_id = os.getenv('NAVER_CLIENT_ID', 'default_client_id')
client_secret = os.getenv('NAVER_CLIENT_SECRET', 'default_client_secret')

In [3]:
# 업비트 데이터를 가져오는 함수
def fetch_upbit_data(market="KRW-BTC", count=200):
    url = "https://api.upbit.com/v1/candles/days"
    params = {"market": market, "count": count}
    response = requests.get(url, params=params)

    if response.status_code != 200:
        print(f"업비트 API에서 데이터를 가져오지 못했습니다. 상태 코드: {response.status_code}")
        return None

    data = response.json()

    # 데이터프레임으로 변환하고 필요한 열만 선택
    df = pd.DataFrame(data)
    df = df[["candle_date_time_kst", "trade_price"]]
    df.columns = ["date", "price"]
    df["date"] = pd.to_datetime(df["date"])
    df.sort_values(by="date", inplace=True)
    df.reset_index(drop=True, inplace=True)
    return df

In [4]:
# 뉴스 데이터를 가져오는 함수
def fetch_news_data(query, display=10):
    url = f"https://openapi.naver.com/v1/search/news.json?query={query}&display={display}"
    headers = {
        "X-Naver-Client-Id": client_id,
        "X-Naver-Client-Secret": client_secret
    }
    response = requests.get(url, headers=headers)

    if response.status_code != 200:
        print(f"뉴스 API에서 데이터를 가져오지 못했습니다. 상태 코드: {response.status_code}")
        return None

    data = response.json()
    return pd.DataFrame(data['items'])


In [5]:
# 감정 분석을 위한 함수
def analyze_sentiment(texts):
    tokenizer = AutoTokenizer.from_pretrained("nlptown/bert-base-multilingual-uncased-sentiment")
    model = AutoModelForSequenceClassification.from_pretrained("nlptown/bert-base-multilingual-uncased-sentiment")
    sentiments = []

    for text in texts:
        inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512)
        outputs = model(**inputs)
        sentiment = torch.argmax(outputs.logits, dim=1).item()
        sentiments.append(sentiment)

    return sentiments


In [6]:
# BTC/KRW 시장 데이터 가져오기
df_price = fetch_upbit_data()
if df_price is not None:
    print("샘플 가격 데이터:")
    print(df_price.head())

샘플 가격 데이터:
                 date       price
0 2024-05-25 09:00:00  96240000.0
1 2024-05-26 09:00:00  95610000.0
2 2024-05-27 09:00:00  96050000.0
3 2024-05-28 09:00:00  94600000.0
4 2024-05-29 09:00:00  93910000.0


In [7]:
# 뉴스 데이터 가져오기
news_df = fetch_news_data("비트코인")
if news_df is not None:
    print("샘플 뉴스 데이터:")
    print(news_df.head())
    news_df['sentiment'] = analyze_sentiment(news_df['title'])

뉴스 API에서 데이터를 가져오지 못했습니다. 상태 코드: 401


In [8]:
# 데이터 병합
if df_price is not None and news_df is not None:
    merged_df = df_price.copy()
    merged_df['sentiment'] = news_df['sentiment'].mean()
    print("병합된 데이터:")
    print(merged_df.head())

    # GRU 모델을 위한 데이터 전처리
    scaler = MinMaxScaler()
    merged_df['scaled_price'] = scaler.fit_transform(merged_df['price'].values.reshape(-1, 1))
    merged_df['scaled_sentiment'] = scaler.fit_transform(merged_df['sentiment'].values.reshape(-1, 1))

    # 멀티모달 데이터 준비
    def create_multimodal_sequences(prices, sentiments, seq_length):
        sequences = []
        for i in range(len(prices) - seq_length):
            seq = np.hstack((prices[i:i + seq_length], sentiments[i:i + seq_length]))
            label = prices[i + seq_length]
            sequences.append((seq, label))
        return sequences

    sequence_length = 10
    sequences = create_multimodal_sequences(
        merged_df['scaled_price'].values,
        merged_df['scaled_sentiment'].values,
        sequence_length
    )
    X, y = zip(*sequences)
    X = np.array(X)
    y = np.array(y)

    # 데이터를 학습 및 테스트용으로 분리
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # GRU 모델 생성
    model = Sequential([
        GRU(128, input_shape=(sequence_length, 2), return_sequences=True),
        Dropout(0.2),
        GRU(64),
        Dropout(0.2),
        Dense(1)
    ])

    model.compile(optimizer=Adam(learning_rate=0.001), loss='mse')

    # 모델 학습
    history = model.fit(
        X_train, y_train,
        validation_data=(X_test, y_test),
        epochs=20,
        batch_size=16,
        callbacks=[EarlyStopping(patience=5, restore_best_weights=True)]
    )

    # 모델 평가
    loss = model.evaluate(X_test, y_test)
    print(f"테스트 손실: {loss}")

    # 예측값을 원래 스케일로 변환
    predictions = model.predict(X_test)
    original_scale_predictions = scaler.inverse_transform(predictions)
    original_scale_y_test = scaler.inverse_transform(y_test.reshape(-1, 1))

    # 예측값과 실제값 비교
    print("샘플 예측값과 실제값:")
    for pred, true in zip(original_scale_predictions[:5], original_scale_y_test[:5]):
        print(f"예측값: {pred[0]:.2f}, 실제값: {true[0]:.2f}")

    # 학습 기록 그래프
    plt.plot(history.history['loss'], label='훈련 손실')
    plt.plot(history.history['val_loss'], label='검증 손실')
    plt.legend()
    plt.show()