In [30]:
import numpy as np
from collections import defaultdict
from itertools import islice
import re

In [31]:
# 1. 데이터셋 (간단 예시)
texts = [
    "Box box box",
    "My tyres are gone",
    "Tell him to get out",
    "We need to push",
    "Engine is overheating",
    "Good job keep pushing",
    "I'm losing power",
    "Switching to plan B",
    "Push now push now",
    "Let me race please"
]
labels = [
    "진입 명령", "차량 상태", "불만/요청", "전략/전술", "차량 상태",
    "격려", "차량 상태", "전략/전술", "전략/전술", "불만/요청"
]

In [32]:
def generate_ngrams(text, n=2):
    """Generate n-grams from a given text."""
    tokens = re.findall(r'\b\w+\b', text.lower())
    return [' '.join(tokens[i:i+n]) for i in range(len(tokens)-n+1)]

In [33]:
def build_vocab(texts, ngram_range=(1, 2)):
    """Build vocabulary for n-grams."""
    vocab = defaultdict(int)
    for text in texts:
        for n in range(ngram_range[0], ngram_range[1] + 1):
            ngrams = generate_ngrams(text, n)
            for ngram in ngrams:
                vocab[ngram] += 1
    return {word: idx for idx, word in enumerate(vocab.keys())}

In [34]:
def vectorize_texts(texts, vocab, ngram_range=(1, 2)):
    """Convert texts to vectorized form using the vocabulary."""
    vectors = []
    for text in texts:
        vec = np.zeros(len(vocab))
        for n in range(ngram_range[0], ngram_range[1] + 1):
            ngrams = generate_ngrams(text, n)
            for ngram in ngrams:
                if ngram in vocab:
                    vec[vocab[ngram]] += 1
        vectors.append(vec)
    return np.array(vectors)

In [35]:
# Build vocabulary and vectorize texts
vocab = build_vocab(texts, ngram_range=(1, 3))
X = vectorize_texts(texts, vocab, ngram_range=(1, 3))

# Encode labels
label2idx = {label: idx for idx, label in enumerate(set(labels))}
y = np.array([label2idx[label] for label in labels])
num_classes = len(label2idx)

In [36]:
def train_test_split(X, y, test_size=0.3, seed=42):
    np.random.seed(seed)
    indices = np.arange(len(X))
    np.random.shuffle(indices)
    split = int(len(X) * (1 - test_size))
    train_idx, test_idx = indices[:split], indices[split:]
    return X[train_idx], X[test_idx], y[train_idx], y[test_idx]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)

In [37]:
def softmax(z):
    exp_z = np.exp(z - np.max(z, axis=1, keepdims=True))
    return exp_z / np.sum(exp_z, axis=1, keepdims=True)

def cross_entropy(probs, y_true):
    batch_size = probs.shape[0]
    log_probs = np.log(probs + 1e-9)
    loss = -log_probs[range(batch_size), y_true].mean()
    return loss

In [45]:
input_dim = X_train.shape[1]
W = np.random.randn(input_dim, num_classes) * 0.01
b = np.zeros((1, num_classes))
learning_rate = 0.1


In [47]:
epochs = 500
for epoch in range(epochs):
    logits = np.dot(X_train, W) + b
    probs = softmax(logits)
    
    loss = cross_entropy(probs, y_train)

    # 수동 gradient 계산
    N = X_train.shape[0]
    one_hot = np.zeros_like(probs)
    one_hot[np.arange(N), y_train] = 1
    dL_dz = (probs - one_hot) / N  # softmax + cross entropy gradient

    # Weight & bias gradients
    dW = np.dot(X_train.T, dL_dz)
    db = np.sum(dL_dz, axis=0, keepdims=True)

    # 경사하강법 업데이트
    W -= learning_rate * dW
    b -= learning_rate * db

    if epoch % 10 == 0:
        print(f"Epoch {epoch}: Loss = {loss:.4f}")

Epoch 0: Loss = 0.0112
Epoch 10: Loss = 0.0110
Epoch 20: Loss = 0.0108
Epoch 30: Loss = 0.0106
Epoch 40: Loss = 0.0104
Epoch 50: Loss = 0.0102
Epoch 60: Loss = 0.0100
Epoch 70: Loss = 0.0098
Epoch 80: Loss = 0.0096
Epoch 90: Loss = 0.0094
Epoch 100: Loss = 0.0093
Epoch 110: Loss = 0.0091
Epoch 120: Loss = 0.0090
Epoch 130: Loss = 0.0088
Epoch 140: Loss = 0.0087
Epoch 150: Loss = 0.0085
Epoch 160: Loss = 0.0084
Epoch 170: Loss = 0.0083
Epoch 180: Loss = 0.0082
Epoch 190: Loss = 0.0080
Epoch 200: Loss = 0.0079
Epoch 210: Loss = 0.0078
Epoch 220: Loss = 0.0077
Epoch 230: Loss = 0.0076
Epoch 240: Loss = 0.0075
Epoch 250: Loss = 0.0074
Epoch 260: Loss = 0.0073
Epoch 270: Loss = 0.0072
Epoch 280: Loss = 0.0071
Epoch 290: Loss = 0.0070
Epoch 300: Loss = 0.0069
Epoch 310: Loss = 0.0068
Epoch 320: Loss = 0.0067
Epoch 330: Loss = 0.0066
Epoch 340: Loss = 0.0066
Epoch 350: Loss = 0.0065
Epoch 360: Loss = 0.0064
Epoch 370: Loss = 0.0063
Epoch 380: Loss = 0.0063
Epoch 390: Loss = 0.0062
Epoch 400: 

In [50]:
test_logits = np.dot(X_test, W) + b
test_probs = softmax(test_logits)
preds = np.argmax(test_probs, axis=1)
acc = np.mean(preds == y_test)

print("\n테스트 정확도:", acc)


테스트 정확도: 0.3333333333333333


In [51]:
# 새로운 데이터 예측
new_texts = [
    "Push harder now",  # 예제 문장
    "Engine is failing"
]

# 1. N-gram 벡터화
new_X_data = []
for text in new_texts:
    vec = np.zeros(len(vocab))
    for n in range(1, 3):  # n-gram 범위 (1, 2)
        ngrams = generate_ngrams(text, n)
        for ngram in ngrams:
            if ngram in vocab:
                vec[vocab[ngram]] += 1
    new_X_data.append(vec)

new_X = np.array(new_X_data)

# 2. 예측
new_logits = np.dot(new_X, W) + b
new_probs = softmax(new_logits)
new_y_pred = np.argmax(new_probs, axis=1)

# 3. 예측 결과를 레이블로 변환
idx2label = {v: k for k, v in label2idx.items()}
new_y_pred_labels = [idx2label[idx] for idx in new_y_pred]

# 4. 결과 출력
for text, label in zip(new_texts, new_y_pred_labels):
    print(f"Text: \"{text}\" | Predicted Label: \"{label}\"")

Text: "Push harder now" | Predicted Label: "전략/전술"
Text: "Engine is failing" | Predicted Label: "불만/요청"


In [52]:
import pickle

# 모델과 필요한 데이터 저장
model_data = {
    "W": W,  # 가중치 행렬
    "b": b,  # 편향
    "vocab": vocab,  # N-gram 단어 사전
    "label2idx": label2idx,  # 레이블 -> 인덱스 매핑
    "idx2label": {v: k for k, v in label2idx.items()},  # 인덱스 -> 레이블 매핑
    "ngram_range": (1, 2)  # N-gram 범위
}

with open("ngram_model.pkl", "wb") as f:
    pickle.dump(model_data, f)

print("모델이 성공적으로 저장되었습니다!")

모델이 성공적으로 저장되었습니다!
