In [2]:
import pandas as pd
import torch
from collections import Counter
from sklearn.preprocessing import LabelEncoder

In [3]:
data = pd.read_csv("data/final_data.csv", encoding='utf-8-sig')
data.head()

Unnamed: 0,번호,약품일반성분명코드(ATC코드),질병명
0,1,['R03A'],천식
1,2,['C07AG02'],고혈압
2,3,['J01D'],세균감염
3,4,"['S', 'S']",non
4,5,['M01AE'],관절염


In [4]:
#질병명이 non인 경우 -> 결측치이므로 제거
data = data[data['질병명']!='non']

#번호 열은 의미없는 값 -> 드랍
data = data.drop(columns=['번호'])

data.info()

<class 'pandas.core.frame.DataFrame'>
Index: 173055 entries, 0 to 199999
Data columns (total 2 columns):
 #   Column            Non-Null Count   Dtype 
---  ------            --------------   ----- 
 0   약품일반성분명코드(ATC코드)  173055 non-null  object
 1   질병명               173055 non-null  object
dtypes: object(2)
memory usage: 4.0+ MB


In [5]:
# ATC 코드 vocabulary 생성
code_counter = Counter(code 
                       for _, row in data.iterrows()
                       for code in eval(row['약품일반성분명코드(ATC코드)']))

voca = {code: idx+1 for idx, code in enumerate(code_counter)} #vocabulary 생성

In [6]:
# 라벨 인코딩
label_encoder = LabelEncoder()
data['질병명'] = label_encoder.fit_transform(data['질병명'])
print(label_encoder.classes_)

['ADHD' '고지혈증' '고혈압' '관절염' '기침' '녹내장' '당뇨병' '대사질환' '말초혈관질환' '바이러스감염'
 '배뇨장애' '변비' '복합감염' '불면증' '불안장애' '비염' '세균감염' '심부전' '안과감염' '안질환' '역류성식도염'
 '영양결핍' '우울증' '위염' '정신질환' '진균감염' '천식' '통증' '통풍' '피부감염' '피부염' '해독' '혈전예방'
 '호흡기감염' '호흡기질환']


In [7]:
from sklearn.model_selection import train_test_split
train_df, test_df = train_test_split(data, test_size=0.2, random_state=42)

In [8]:
import torch
import ast
from torch.utils.data import Dataset, DataLoader

# 파이토치 데이터셋 정의
class PrescriptionDataset(Dataset):
    def __init__(self, df, voca, label_encoder, max_len=5):
        self.voca = voca
        self.label_encoder = label_encoder
        self.max_len = max_len
        self.samples = []

        for _, row in df.iterrows():
            codes = ast.literal_eval(row['약품일반성분명코드(ATC코드)'])
            token_ids = [voca.get(code, 0) for code in codes]
            token_ids = token_ids[:max_len] + [0] * (max_len - len(token_ids))
            label_id = row['질병명']
            self.samples.append((torch.tensor(token_ids), torch.tensor(label_id)))

    def __len__(self): # len() 호출 시
        return len(self.samples)

    def __getitem__(self, idx): # 인덱스[] 호출 시
        return self.samples[idx]

In [9]:
import torch.nn as nn

# 트랜스포머 분류 모델 정의
class TransformerClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, nhead, num_layers, num_classes):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size + 1, embed_dim, padding_idx=0)
        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=nhead)
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(embed_dim, num_classes)

    def forward(self, x):  # x: [batch, seq_len]
        x = self.embedding(x)           # [batch, seq_len, embed_dim]
        x = x.permute(1, 0, 2)          # [seq_len, batch, embed_dim]
        out = self.encoder(x)           # [seq_len, batch, embed_dim]
        out = out.mean(dim=0)           # mean pooling
        return self.fc(out)             # [batch, num_classes]

In [10]:
train_dataset = PrescriptionDataset(train_df, voca, label_encoder)
test_dataset = PrescriptionDataset(test_df, voca, label_encoder)

train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=2)

In [11]:
samples = []
for x, y in train_dataset:
    samples.append({
        "input_sequence": x.tolist(),
        "label": label_encoder.inverse_transform([y.item()])[0]
    })

df = pd.DataFrame(samples)
print(df.head())

       input_sequence   label
0     [4, 0, 0, 0, 0]     관절염
1  [36, 28, 85, 0, 0]    복합감염
2   [45, 45, 0, 0, 0]  바이러스감염
3    [4, 22, 0, 0, 0]     관절염
4     [7, 0, 0, 0, 0]      비염


In [14]:
model = TransformerClassifier(
    vocab_size=len(voca),
    embed_dim=32,
    nhead=4,
    num_layers=2,
    num_classes=len(label_encoder.classes_)
)



In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

for epoch in range(10):
    total_loss = 0
    model.train()
    for x_batch, y_batch in train_loader:
        optimizer.zero_grad()
        output = model(x_batch)             # 예측값
        loss = criterion(output, y_batch)   # 예측값과 훈련실제값과의 오차값
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}, Loss: {total_loss:.4f}")



KeyboardInterrupt: 

In [None]:
#학습 모델 저장(오실행 방지를 위해 코드 주석화 하였음였음)
# torch.save(model.state_dict(), "trained_transformer_model.pth")

In [15]:
model.load_state_dict(torch.load("trained_transformer_model.pth"))
model.eval()

TransformerClassifier(
  (embedding): Embedding(152, 32, padding_idx=0)
  (encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-1): 2 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=32, out_features=32, bias=True)
        )
        (linear1): Linear(in_features=32, out_features=2048, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=2048, out_features=32, bias=True)
        (norm1): LayerNorm((32,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((32,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
      )
    )
  )
  (fc): Linear(in_features=32, out_features=35, bias=True)
)

In [16]:
def evaluate(model, dataloader):
    model.eval()
    correct = total = 0
    with torch.no_grad():
        for x_batch, y_batch in dataloader:
            preds = model(x_batch).argmax(dim=1)
            correct += (preds == y_batch).sum().item()
            total += y_batch.size(0)
    return correct / total

# 테스트셋 정확도 출력
accuracy = evaluate(model, test_loader)
print(f"Test Accuracy: {accuracy:.4f}")


Test Accuracy: 0.9335


In [None]:
def predict(model, atc_list, vocab, label_encoder, max_len=5):
    model.eval()
    tokens = [vocab.get(code, 0) for code in atc_list]
    tokens = tokens[:max_len] + [0] * (max_len - len(tokens))
    x = torch.tensor(tokens).unsqueeze(0)  # [1, seq_len]
    with torch.no_grad():
        logits = model(x)
        pred = logits.argmax(dim=1).item()
    return label_encoder.inverse_transform([pred])[0]

In [None]:
predict(model, ['A10A', 'C10C'], voca, label_encoder)

'피부염'