# RL 구현 참고용

In [87]:
#라이브러리
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
import pandas as pd
device=torch.device("cuda" if torch.cuda.is_available() else "cpu") 

In [88]:
# 가짜 데이터 생성
temp=[random.randint(-10,100) for _ in range(100)]
property=[i*3.5+10+random.random()*2 for i in temp]
data=pd.DataFrame({'x':temp,'y':property})


In [89]:
#Environment 정의
from sklearn.linear_model import LinearRegression
model=LinearRegression().fit(np.array(temp).reshape(-1,1),np.array(property).reshape((-1,1)))
class Environment:
    def __init__(self,z,setPoint):
        self.z=z#현재 X값
        self.setPoint=setPoint#target point
        self.terminated=False#달성 여부
        self.state=None
    def reset(self):
        yPred=model.predict(np.array([self.z]).reshape(-1,1)).item()
        self.state=torch.tensor([self.setPoint-yPred])
        return self.state
    def step(self, action):
        if action==0:
            self.z-=1
        elif action==1:
            self.z+=1
        if self.z<-10 or self.z>100:
            reward=-100
            self.terminated=True
            return None, torch.tensor([reward]), self.terminated
        self.state=torch.tensor([self.setPoint-model.predict(np.array([self.z]).reshape(-1,1)).item()])
        if abs(self.state)<=5:
            self.terminated=True
        reward=1/abs(self.state)*5
        return self.state, reward, self.terminated
    def render(self):
        return self.z

In [90]:
#RL 모델 정의
class DQN(nn.Module):
    def __init__(self, outputs):
        super(DQN, self).__init__()
        self.linear1=nn.Linear(1,16,bias=True)
        self.linear2=nn.Linear(16,outputs,bias=True)
    def forward(self,x):
        x=x.to(device)
        x=F.relu(self.linear1(x))
        x=self.linear2(x)
        return torch.unsqueeze(F.log_softmax(x,dim=0),0)

In [91]:
# Select Action 정의
BATCH_SIZE=128
GAMMA=0.999
EPS_START=0.9
EPS_END=0.05
EPS_DECAY=200
TARGET_UPDATE=10
n_actions=2
policy_net=DQN(n_actions).to(device)
target_net=DQN(n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
steps_done=0
def select_action(state):
    global steps_done
    sample=random.random()
    eps_threshold=EPS_END+(EPS_START-EPS_END)*math.exp(-1*steps_done/EPS_DECAY)
    steps_done+=1
    if sample>eps_threshold:
        with torch.no_grad():
            return policy_net(state).max(1)[1]
    else:
        return torch.tensor([random.randrange(n_actions)],device=device,dtype=torch.long)

In [92]:
#학습 정의
Transition=namedtuple('Transition',('state','action','next_state','reward'))
class ReplayMemory(object):
    def __init__(self,capacity):
        self.memory=deque([],maxlen=capacity)
    def push(self, *args):
        self.memory.append(Transition(*args))
    def sample(self, batch_size):
        return random.sample(self.memory,batch_size)
    def __len__(self):
        return len(self.memory)
optimizer=optim.RMSprop(policy_net.parameters())
memory=ReplayMemory(1000)

In [93]:
#Q learning
def optimize_model():
    if len(memory)<BATCH_SIZE:
        return
    transitions=memory.sample(BATCH_SIZE)
    batch=Transition(*zip(*transitions))
    non_final_mask=torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)),device=device, dtype=torch.bool)
    non_final_next_states=torch.cat([s for s in batch.next_state if s is not None])
    state_batch=torch.stack(batch.state)
    action_batch=torch.cat(batch.action)
    reward_batch=torch.cat(batch.reward)
    state_action_values=policy_net(state_batch).squeeze().gather(1,action_batch.unsqueeze(1))
    next_state_values=torch.zeros(BATCH_SIZE,device=device)
    next_state_values[non_final_mask]=target_net(non_final_next_states.reshape(non_final_next_states.size()[0],1)).unsqueeze().max(1)[0].detach()
    expected_state_action_values=(next_state_values*GAMMA)+reward_batch
    criterion=nn.SmoothL1Loss()
    loss=criterion(state_action_values,expected_state_action_values.unsqueeze(1))
    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1,1)
    optimizer.step()


In [94]:
def Action(x):
    if x.item()==0:
        return "Down"
    else:
        return "Up"
    
num_episodes=100
for i_episode in range(num_episodes):
    epMemory=list()
    z=random.randrange(-10,100)
    setPoint=random.randrange(math.floor(min(data['y'])), math.ceil(max(data['y'])))
    env=Environment(z=z,setPoint=setPoint)
    state=env.reset()
    print("setPoint", setPoint, "을 맞추기 위해")
    for t in count():
        action=select_action(torch.tensor([state]).float())
        next_state, reward, done=env.step(action.item())
        reward=torch.tensor([reward],device=device)
        print(t,"-현재 X",env.render(),"에서",Action(action),"하면 'setPoint-y'값은",next_state,"reward는",round(reward.item(),2))
        epMemory.append([state,action,next_state,reward])
        state=next_state
        optimize_model()
        if done:
            if env.render()>=-10 and env.render()<=100:
                _=[memory.push(epMemory[i][0],epMemory[i][1],epMemory[i][2],epMemory[i][3]) for i in range(len(epMemory))]
                print("성공!")
            else:
                print("실패")
            print("")
            break
        if t>=100000:
            print("중단!")
            break
    if i_episode % TARGET_UPDATE==0:
        target_net.load_state_dict(policy_net.state_dict())
    print('complete')


setPoint 83 을 맞추기 위해
0 -현재 X 40 에서 Down 하면 'setPoint-y'값은 tensor([-68.0167]) reward는 0.07
1 -현재 X 41 에서 Up 하면 'setPoint-y'값은 tensor([-71.5154]) reward는 0.07
2 -현재 X 40 에서 Down 하면 'setPoint-y'값은 tensor([-68.0167]) reward는 0.07
3 -현재 X 39 에서 Down 하면 'setPoint-y'값은 tensor([-64.5179]) reward는 0.08
4 -현재 X 38 에서 Down 하면 'setPoint-y'값은 tensor([-61.0191]) reward는 0.08
5 -현재 X 39 에서 Up 하면 'setPoint-y'값은 tensor([-64.5179]) reward는 0.08
6 -현재 X 38 에서 Down 하면 'setPoint-y'값은 tensor([-61.0191]) reward는 0.08
7 -현재 X 37 에서 Down 하면 'setPoint-y'값은 tensor([-57.5204]) reward는 0.09
8 -현재 X 36 에서 Down 하면 'setPoint-y'값은 tensor([-54.0216]) reward는 0.09
9 -현재 X 37 에서 Up 하면 'setPoint-y'값은 tensor([-57.5204]) reward는 0.09
10 -현재 X 38 에서 Up 하면 'setPoint-y'값은 tensor([-61.0191]) reward는 0.08
11 -현재 X 39 에서 Up 하면 'setPoint-y'값은 tensor([-64.5179]) reward는 0.08
12 -현재 X 40 에서 Up 하면 'setPoint-y'값은 tensor([-68.0167]) reward는 0.07
13 -현재 X 39 에서 Down 하면 'setPoint-y'값은 tensor([-64.5179]) reward는 0.08
14 -현재 X 38 에서 Down 하

TypeError: unsqueeze() missing 1 required positional arguments: "dim"

# 포트폴리오 최적화 코드

In [None]:
#라이브러리
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
import pandas as pd
device=torch.device("cuda" if torch.cuda.is_available() else "cpu") 

## 필요 함수, 객체들 정의

In [None]:
#미국 주식 데이터 데이터 
import yfinance as yf
from torch.utils.data import Dataset, DataLoader
tickers = ["XLK","XLF","XLV","XLY","XLP","XLC","XLI","XLE","XLB","XLU","XLRE"]

df_close = yf.download(
    tickers,
    start="2015-01-01",
    interval="1d",
    auto_adjust=True,
    progress=False
)["Close"]
split_idx = int(len(df_close) * 0.8)
train_close = df_close.iloc[:split_idx]
test_close  = df_close.iloc[split_idx:]
class TickerSeqDataset(Dataset):
    def __init__(self, close_df: pd.DataFrame, window: int, use_zscore: bool = True):
        self.close_df = close_df
        self.window = window
        self.use_zscore = use_zscore

        self.tickers = list(close_df.columns)
        self.series = {}
        self.mu = {}
        self.sd = {}
        for t in self.tickers:
            s = close_df[t].dropna().values.astype(np.float32)
            self.series[t] = s

            if use_zscore:
                m = float(s.mean())
                v = float(s.std())
                self.mu[t] = m
                self.sd[t] = v if v > 1e-8 else 1.0
        self.index = []
        for ti, t in enumerate(self.tickers):
            L = len(self.series[t])
            if L >= window + 1:
                for end in range(window - 1, L - 1):
                    self.index.append((ti, end))

    def __len__(self):
        return len(self.index)

    def __getitem__(self, idx):
        ti, end = self.index[idx]
        t = self.tickers[ti]
        s = self.series[t]

        x = s[end - self.window + 1 : end + 1]  
        y = s[end + 1]                            
        if self.use_zscore:
            x = (x - self.mu[t]) / self.sd[t]
            y = (y - self.mu[t]) / self.sd[t]

        x_seq = torch.from_numpy(x).unsqueeze(-1)  
        y_t = torch.tensor([y], dtype=torch.float32)      
        return x_seq, y_t

window_size = 30
batch_size = 64

train_ds = TickerSeqDataset(train_close, window=window_size, use_zscore=True)
test_ds  = TickerSeqDataset(test_close,  window=window_size, use_zscore=True)

train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, drop_last=True)
test_loader  = DataLoader(test_ds,  batch_size=batch_size, shuffle=False, drop_last=False)

# 4) 모델 차원 (종목 단위이므로 1->1)
input_dim = 1
output_dim = 1


In [None]:
#한국 데이터 준비
import FinanceDataReader as fdr


In [None]:
# 모델들 사전 학습
from sklearn.linear_model import LinearRegression
us_lstm = us_bol(
        input_dim=input_dim,
        hidden_dim=CONFIG["hidden_dim"],
        out_dim=output_dim,
        num_layers=CONFIG["num_layers"],
        window_size=20
    )
us_bol_trained = train_model(us_lstm, train_loader, CONFIG)
us_lstm_loss=evaluate_model(us_bol_trained,test_loader)

Epoch [1/50], Loss: 1.055490


KeyboardInterrupt: 

In [None]:
# 커스텀 손실함수
class loss_fucntion():
    def __init__(self):
        pass
    def loss_mse(y,y_pred):
        return ((y-y_pred)**2).mean()

#미국 주식 볼린저밴드 계산(LSTM)
import torch
import torch.nn as nn
import torch.nn.functional as F

class us_bol(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, out_dim, window_size):  # out_dim = 종목 수
        super().__init__()
        self.lstm = nn.LSTM(
            input_size=input_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            dropout=0.2
        )
        self.head = nn.Linear(hidden_dim, out_dim)
        self.window = window_size
    def forward(self, x_seq):
        out, (h_n, c_n) = self.lstm(x_seq)
        y_pred = self.head(h_n[-1])      
        return y_pred

def bollinger_pred_break_nextband(self, y_pred_next, df_close_upto_prev):
    y = y_pred_next[0] if y_pred_next.dim() == 2 else y_pred_next
    y_np = y.detach().cpu().numpy()
    df_ext = df_close_upto_prev.copy()
    y_np =df_ext.loc[df_ext.index[-1] + pd.Timedelta(days=1)]
    ma = df_ext.rolling(self.window).mean()
    std = df_ext.rolling(self.window).std()
    upper_next = ma.iloc[-1].to_numpy()
    lower_next = ma.iloc[-1].to_numpy()
    upper_next = ma.iloc[-1].to_numpy() + 2 * std.iloc[-1].to_numpy()
    lower_next = ma.iloc[-1].to_numpy() - 2 * std.iloc[-1].to_numpy()
    flags = np.zeros_like(y_np, dtype=np.int64)
    flags[y_np > upper_next] = 1
    flags[y_np < lower_next] = -1
    return torch.tensor(flags, device=y_pred_next.device)

 
#한국 주식 correlation, 지표, 거래량 같은 거 계산
class kor():
    def __init__(self):
        pass


In [None]:
#피처 추출 모델 학습
def train_model(model, train_loader, config):
    model.to(device)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=config["learning_rate"])
    
    for epoch in range(config["epochs"]):
        model.train()
        total_loss = 0.0
        for X_seq,y in train_loader:
            X_seq = X_seq.to(device)
            y = y.to(device)
            optimizer.zero_grad()
            outputs = model(X_seq)
            loss = criterion(outputs, y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item() * X_seq.size(0)
        avg_loss = total_loss / len(train_loader.dataset)
        print(f"Epoch [{epoch+1}/{config['epochs']}], Loss: {avg_loss:.6f}")
    return model
def evaluate_model(model, X_test, y_test):
    model.eval()
    criterion=nn.MSELoss()
    with torch.no_grad():
        for x_seq,y in test_loader:
            x_seq = x_seq.to(device)
            y = y.to(device)
            pred = model(x_seq)    
            loss = criterion(pred, y)
            total_loss += loss.item()
            total_n += y.size(0)

    
    return total_loss / total_n

In [125]:
CONFIG={
    "hidden_dim":64,
    "num_layers":12,
    "learning_rate":0.1,
    "epochs":50
}

In [126]:
# 모델들 사전 학습
from sklearn.linear_model import LinearRegression
us_lstm = us_bol(
        input_dim=input_dim,
        hidden_dim=CONFIG["hidden_dim"],
        out_dim=output_dim,
        num_layers=CONFIG["num_layers"],
        window_size=20
    )
us_bol_trained = train_model(us_lstm, train_loader, CONFIG)
us_lstm_loss=evaluate_model(us_bol_trained,test_loader)

Epoch [1/50], Loss: 1.055490


KeyboardInterrupt: 

In [None]:
# environment 정의
class Environment:
    def __init__(self, df_close_us: pd.DataFrame, df_close_kr:pd.DataFrame, predictor, device,
                 seq_window=30, bol_window=20, cost_rate=0.0005,
                 normalize_action=True):
        self.df_us = df_close_us.dropna().copy()
        self.df_kr = df_close_kr.dropna().copy()
        self.predictor = predictor.to(device)
        self.device = device
        self.seq_window = seq_window
        self.bol_window = bol_window
        self.cost_rate = cost_rate
        self.normalize_action = normalize_action
        self.tickers = list(self.df_kr.columns)
        self.N = len(self.tickers)
        self.t = None
        self.prev_w = None

    def reset(self,start_idx=None):
        self.t = (self.seq_window - 1 if start_idx is None else max(start_idx, self.seq_window - 1))
        self.prev_w = np.ones(self.N, dtype=np.float32) / self.N
        return self._get_state()

    def step(self, action):
        if isinstance(action, torch.Tensor):
            w = action.detach().cpu().numpy().astype(np.float32).reshape(-1)
        else:
            w = np.asarray(action, dtype=np.float32).reshape(-1)
        #softmax로 변환
        expw = np.exp(w - np.max(w))
        w = expw / (np.sum(expw) + 1e-12)
        turnover = float(np.sum(np.abs(w - self.prev_w)))
        cost = self.cost_rate * turnover
        t0 = self.t
        t1 = self.t + 1
        if t1 >= len(self.df_kr):
            return None, 0.0, True, {"reason": "end_of_data"}
        p0 = self.df_kr.iloc[t0].values.astype(np.float32)  
        p1 = self.df_kr.iloc[t1].values.astype(np.float32) 
        asset_ret = (p1 / (p0 + 1e-12)) - 1.0            
        port_ret = float(np.dot(w, asset_ret))
        reward = port_ret - cost
        self.prev_w = w
        self.t = t1
        next_state = self._get_state()
        info = {"t": self.t, "port_ret": port_ret, "cost": cost, "turnover": turnover}
        done = False
        return next_state, reward, done, info
    def _get_state(self):
        t = self.t
        us_aligned = self.df_us.reindex(self.df_kr.index, method="ffill").shift(1)
        df_upto_t_us = us_aligned.iloc[:t+1]
        df_upto_t_kr = self.df_kr.iloc[:t+1]
        cur_prices = df_upto_t_kr.iloc[-1].values.astype(np.float32)
        pred_break = self.predictor.get_pred_break(df_upto_t_us, t)
        prev_w = self.prev_w.astype(np.float32)
        state = torch.tensor(
            np.concatenate([cur_prices, pred_break, prev_w], axis=0),
            dtype=torch.float32,
            device=self.device
        )
        return state

In [None]:
#custom policy에서 사용할 네트워크 만들기
class CustomNetwork(nn.Module):
    def __init__(self, features_dim: int, latent_dim_pi: int = 64, latent_dim_vf: int = 64):
        super().__init__()
        self.latent_dim_pi = latent_dim_pi
        self.latent_dim_vf = latent_dim_vf

        self.policy_net = nn.Sequential(
            nn.Linear(features_dim, 256),
            nn.ReLU(),
            nn.Linear(256, latent_dim_pi),
            nn.ReLU(),
        )

        self.value_net = nn.Sequential(
            nn.Linear(features_dim, 256),
            nn.ReLU(),
            nn.Linear(256, latent_dim_vf),
            nn.ReLU(),
        )

    def forward(self, features: torch.Tensor):
        return self.forward_actor(features), self.forward_critic(features)

    def forward_actor(self, features: torch.Tensor):
        return self.policy_net(features)

    def forward_critic(self, features: torch.Tensor):
        return self.value_net(features)


In [None]:
# custom policy 만들기(PPO)기반
from stable_baselines3.common.policies import ActorCriticPolicy
class CustomPolicy(ActorCriticPolicy):
    def __init__(self, observation_space, action_space, lr_schedule,
                 net_arch=None, activation_fn=nn.Tanh, ortho_init=False, *args, **kwargs):
        super().__init__(observation_space, action_space, lr_schedule,
                         net_arch=net_arch, activation_fn=activation_fn,
                         ortho_init=ortho_init, *args, **kwargs)

    def _build_mlp_extractor(self):
        self.mlp_extractor = CustomNetwork(features_dim=self.features_dim,
                                           latent_dim_pi=self.latent_dim_pi,
                                           latent_dim_vf=self.latent_dim_vf)


In [None]:
# PPO 방식으로 custompolicy이용해서 학습
from stable_baselines3 import PPO
env= Environment()
model = PPO(
    CustomPolicy,
    env,
    policy_kwargs={"latent_dim_pi": 128, "latent_dim_vf": 128},
    verbose=2
)
model.learn(total_timesteps=1000)