# dAiv 순환환신경망 특강: 시퀀스 데이터의 압축과 생성 [사칙연산 계산기편]

## Imports

### For Local User

In [None]:
from platform import system

%pip install uv
!uv init
!uv sync

if system() == "Windows":
    %uv add torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu128
else:
    %uv add torch torchvision torchaudio

%uv add matplotlib tqdm numpy pandas scipy jupyter ipywidgets
%uv add git+https://github.com/dAiv-CNU/torchdaiv.git

### For Colab User

In [None]:
%pip add matplotlib tqdm numpy pandas scipy jupyter ipywidgets
%pip install git+https://github.com/dAiv-CNU/torchdaiv.git

#### Library Imports

In [None]:
import torch
from torch import nn, optim
from torch.utils.data import DataLoader

from torchdaiv.datasets import FixedLengthCalculatorDataset

from tqdm.auto import tqdm

## Load Data

In [None]:
# 데이터 샘플 생성 및 출력 확인인
train_dataset = FixedLengthCalculatorDataset(size=320000, max_length=[3, 4])
test_dataset = FixedLengthCalculatorDataset(size=3200, max_length=[3, 4])
longer_train_dataset = FixedLengthCalculatorDataset(size=320000, max_length=[5, 4])
longer_test_dataset = FixedLengthCalculatorDataset(size=3200, max_length=[5, 4])

for i in range(10):
    train_dataset.sample(i)
print()

for i, dt in zip(range(10), train_dataset):
    print(i, *dt)

print("\n", "-" * 20, "\n")

for i in range(10):
    longer_train_dataset.sample(i)
print()

for i, dt in zip(range(10), longer_train_dataset):
    print(i, *dt)

---
# PRACTICE 1: 분류(N-to-1): 사칙연산 계산기 만들기 (MLP)
---

---
## Model Definition
> 상황 가정: 토크나이제이션 없이 단순한 MLP로 길이가 동일한 인풋 데이터로 구현

> 한계: 다양한 길이의 인풋이 들어온다면 처리를 어떻게 해야 할까?

![MLP](https://miro.medium.com/v2/resize:fit:1400/1*KMmqs1A-PqGTmYUk_MpcDw.png)

In [None]:
# 배치 생성
BATCH_SIZE = 1024

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, drop_last=True)

for i, (data, target) in zip(range(1), train_loader):
    print("Batch", i+1)
    print(f">>> Data({len(data)}개):")
    [print(d) for d in data]
    print(f">>> Target({len(target)}개):")
    [print(t) for t in target]

In [None]:
class CalculatorMLP(nn.Module):
    def __init__(self, hidden_size=64):
        super().__init__()

        self.layers = nn.Sequential(
            nn.Linear(7, hidden_size),  # 입력: 3(num1) + 1(opr) + 3(num2) = 7
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size//2),
            nn.ReLU(),
            nn.Linear(hidden_size//2, hidden_size//4),
            nn.ReLU(),
            nn.Linear(hidden_size//4, 4),  # 출력: 4
            nn.Sigmoid()  # 출력값을 0~1 사이로 제한
        )

    def forward(self, x):
        return self.layers(x)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

model = CalculatorMLP()
model.to(device)

In [None]:
# 하이퍼파라미터 설정
EPOCHS = 10000
LEARNING_RATE = 1e-2, 1e-5

criterion = nn.MSELoss()
optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE[0])
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS, eta_min=LEARNING_RATE[1])

In [None]:
# 학습 루프
train_len = len(train_loader)
for epoch in tqdm(range(EPOCHS), desc="Epochs"):
    model.train()
    total_loss = 0

    for batch_data, batch_labels in tqdm(train_loader, desc="Train", leave=False):
        optimizer.zero_grad()

        batch_data = torch.stack(batch_data, dim=1).float().to(device)
        batch_labels = torch.stack(batch_labels, dim=1).float().to(device) * 127  # 0~1 범위를 0~127로 변환

        outputs = model(batch_data) * 127  # 모델 출력도 0~127 범위로 변환

        loss = criterion(outputs, batch_labels)
        loss.backward()
        total_loss += loss.item() / len(batch_data)

        optimizer.step()
        scheduler.step()

    print(f"\rEpoch [{epoch+1}/{EPOCHS}], Loss: {total_loss/train_len}", end="" if (epoch + 1) % 10 == 0 else "\n")

In [None]:
# 추론 루프
model.eval()
total_loss = 0

for batch_data, batch_labels in tqdm(test_loader):
    batch_data = torch.stack(batch_data, dim=1).float().to(device)
    batch_labels = torch.stack(batch_labels, dim=1).float().to(device)

    with torch.no_grad():
        outputs = model(batch_data)

    loss = criterion(outputs*127, batch_labels*127) / len(batch_data)
    total_loss += loss.item()

print(f"Test Loss: {total_loss/len(test_loader)}")

In [None]:
@torch.no_grad()
def pipeline(model, input_str, device=device):
    model.eval()
    norm = lambda input_ascii: [x / 127 for x in map(ord, input_ascii)]
    input_str = input_str.replace(" ", "\00")
    input_data = torch.tensor(norm(str(input_str))).unsqueeze(0)  # (1, 7)
    input_data = (input_data / 127).float().to(device)
    return "".join(map(chr, torch.floor(model(input_data) * 127).int().squeeze(0).tolist()))

In [None]:
print(pipeline(model, " 35+ 12"))

---
# PRACTICE 1-1: RNN으로 사칙연산 계산기를 다시 구현
---

---
## Model Definition
> Recursive한 인코딩을 통해 인풋 길이가 동일하다는 가정을 삭제 가능

> 한계: 그러나 생성 결과의 길이가 값 하나가 아니라면 처리 불가능

![RNN Encoder](https://blog.kakaocdn.net/dn/dQIPiW/btrHKcZI8NY/FfecZoTxardfpZGGKzR1oK/img.png)

In [None]:
# 배치 생성
LONGER_BATCH_SIZE = 1024

longer_train_loader = DataLoader(longer_train_dataset, batch_size=LONGER_BATCH_SIZE, shuffle=True, drop_last=True)
longer_test_loader = DataLoader(longer_test_dataset, batch_size=LONGER_BATCH_SIZE, shuffle=False, drop_last=True)

for i, (data, target) in zip(range(1), longer_train_loader):
    print("Batch", i+1)
    print(f">>> Data({len(data)}개):")
    [print(d) for d in data]
    print(f">>> Target({len(target)}개):")
    [print(t) for t in target]

In [None]:
class CalculatorRNN(nn.Module):
    def __init__(self, input_dim=1, hidden_dim=64, output_dim=4, num_layers=1):
        super().__init__()
        self.rnn = nn.RNN(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        out, _ = self.rnn(x)
        out_last = out[:, -1, :]  # 마지막 시퀀스 출력만 사용
        logits = self.fc(out_last)  # 분류기에 연결
        return self.sigmoid(logits)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

model = CalculatorRNN()
model.to(device)

In [None]:
# 하이퍼파라미터 설정
EPOCHS = 10000
LEARNING_RATE = 1e-2, 1e-5

criterion = nn.MSELoss()
optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE[0])
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS, eta_min=LEARNING_RATE[1])

In [None]:
# 학습 루프
train_len = len(longer_train_loader)
for epoch in tqdm(range(EPOCHS), desc="Epochs"):
    model.train()
    total_loss = 0

    for batch_data, batch_labels in tqdm(longer_train_loader, desc="Train", leave=False):
        optimizer.zero_grad()

        batch_data = torch.stack(batch_data, dim=1).unsqueeze(2).float().to(device)
        batch_labels = torch.stack(batch_labels, dim=1).float().to(device) * 127  # 0~1 범위를 0~127로 변환

        outputs = model(batch_data) * 127  # 모델 출력도 0~127 범위로 변환

        loss = criterion(outputs, batch_labels)
        loss.backward()
        total_loss += loss.item() / len(batch_data)

        optimizer.step()
        scheduler.step()

    print(f"\rEpoch [{epoch+1}/{EPOCHS}], Loss: {total_loss/train_len}", end="" if (epoch + 1) % 10 == 0 else "\n")

In [None]:
# 추론 루프
model.eval()
total_loss = 0

for batch_data, batch_labels in tqdm(test_loader):
    batch_data = torch.stack(batch_data, dim=1).unsqueeze(2).float().to(device)
    batch_labels = torch.stack(batch_labels, dim=1).float().to(device) * 127  # 0~1 범위를 0~127로 변환

    with torch.no_grad():
        outputs = model(batch_data) * 127  # 모델 출력도 0~127 범위로 변환

    loss = criterion(outputs, batch_labels) / len(batch_data)
    total_loss += loss.item()

print(f"Test Loss: {total_loss/len(test_loader)}")

In [None]:
@torch.no_grad()
def pipeline(model, input_str, device=device):
    model.eval()
    norm = lambda input_ascii: [x / 127 for x in map(ord, input_ascii)]
    input_str = input_str.replace(" ", "")
    input_data = torch.tensor(norm(str(input_str))).unsqueeze(0)
    input_data = (input_data / 127).unsqueeze(2).float().to(device)
    return "".join(map(chr, torch.floor(model(input_data) * 127).int().squeeze(0).tolist()))

In [None]:
print(pipeline(model, "35+12-10"))

---
# PRACTICE 1-2: RNN Encoder-Decoder로 구성해보기 (직접 해보기)
---

---
## Model Definition
> 실습 1-1에서의 생성 길이 한계를 극복하기 위해 Recursive한 인코딩 뿐만 아니라 Recursive한 디코딩도 수행하도록 모델 코드를 작성해보자.

![RNN Seq2Seq](https://blog.kakaocdn.net/dn/LUwms/btszM0Eg9wB/e1fPBEkRWGkkX1fSjYJLMk/img.jpg)

In [None]:
class CalculatorSeq2SeqRNN(nn.Module):
    def __init__(self, input_dim=1, hidden_dim=64, output_dim=1, num_layers=1):
        super().__init__()
        self.encoder = nn.RNN(input_dim, hidden_dim, num_layers, batch_first=True)
        self.decoder = ??
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # Encoder 단계
        out, _ = self.encoder(x)
        out_last = out[:, -1, :]  # 마지막 시퀀스 출력만 사용
        
        # Decoder 단계 (여기서는 간단히 Encoder의 출력을 사용)
        ??
        
        # FC 레이어를 통해 출력 생성
        logits = self.fc(out_last)  # 분류기에 연결
        return self.sigmoid(logits)