In [1]:
import numpy as np

In [None]:
class tanh:
    def feedforward(self, mat):
        return np.tanh(mat)

    def backward(self, grad_output):  # lr 제거
        return grad_output * (1 - np.tanh(grad_output) ** 2)

In [None]:
class sigmoid:
    def __init__(self):
        self.pred = None

    def feedforward(self, mat):
        self.pred = 1 / (1 + np.exp(-mat))  # 값 저장
        return self.pred

    def backward(self, grad_output):
        if self.pred is None:  # 예외 처리 추가
            raise ValueError("feedforward() should be called before backward()")
        return grad_output * self.pred * (1 - self.pred)


In [56]:
class LSTMCELL:
    def __init__(self, input_dim, output_node, return_sequence=False, batch_size=1):
        self.input_node = input_dim
        self.output_node = output_node
        self.return_sequence = return_sequence
        self.batch_size = batch_size
        self.kernel = np.random.randn(input_dim, 4 * output_node) * 0.01
        self.recurrent_kernel = np.random.randn(output_node, 4 * output_node) * 0.01
        self.bias = np.zeros((1, 4 * output_node))
        self.h_p_s = np.zeros((batch_size, output_node))
        self.c_p_s = np.zeros((batch_size, output_node))

        # Sigmoid와 Tanh 객체를 생성하여 재사용
        self.sigmoid = sigmoid()
        self.tanh = tanh()

        # 추가: self.input 초기화
        self.input = None

    def feedforward(self, mat, states):
        h_p_s, c_p_s = states  # 전달받은 동적 상태
        self.input = mat
        z = np.matmul(mat, self.kernel) + np.matmul(h_p_s, self.recurrent_kernel) + self.bias
        z_0, z_1, z_2, z_3 = np.split(z, 4, axis=1)
        f = self.sigmoid.feedforward(z_0)
        i = self.sigmoid.feedforward(z_1)
        o = self.sigmoid.feedforward(z_2)
        c_t = self.tanh.feedforward(z_3)
        c_s = f * c_p_s + i * c_t
        h_s = o * np.tanh(c_s)
        # 반환 시 동적 상태(c_p_s, h_p_s)도 함께 반환하도록 합니다.
        # (물론 실제로 c_p_s, h_p_s는 이번 계산에 의해 업데이트되었으므로, 이를 반환)
        return h_s, [h_s, c_s], [f, i, o, c_t, c_p_s, h_p_s]

    def backward(self, grad_output, states, gates, lr):
        
        f, i, o, c_t, c_p_s, h_p_s = gates
        h_s, c_s = states
        dc_s = grad_output * o * (1 - np.tanh(c_s) ** 2)
        dc_p_s = f * dc_s
        dc_t = i * dc_s
        # 고정 상태 대신 전달받은 동적 상태 사용
        df = dc_s * c_p_s
        di = c_t * dc_s
        do = np.tanh(c_s) * dc_s
        dz0 = self.sigmoid.backward(df)
        dz1 = self.sigmoid.backward(di)
        dz2 = self.sigmoid.backward(do)
        dz3 = self.tanh.backward(dc_t)
        dz = np.hstack([dz0, dz1, dz2, dz3])
        if self.input is None:
            raise ValueError("Error: self.input is None. Make sure feedforward() is called before backward().")
        dw = np.dot(self.input.T, dz)
        du = np.dot(h_p_s.T, dz)
        db = np.sum(dz, axis=0, keepdims=True)
        self.kernel -= lr * dw
        self.recurrent_kernel -= lr * du
        self.bias -= lr * db
        # 이전 은닉 상태에 대한 gradient도 동적 상태를 사용하여 반환
        return np.dot(dz, self.recurrent_kernel.T), [dc_p_s, h_p_s], [df, di, do, dc_t, c_p_s, h_p_s]

In [None]:
class Linear:
    def __init__(self, input_node, output_node):
        self.weights = np.random.randn(input_node, output_node) * 0.01
        self.bias = np.zeros((1, output_node))

    def feedforward(self, mat):
        self.input = mat
        return np.dot(mat, self.weights) + self.bias

    def backward(self, grad_output, lr, clip_value=1.0):
        grad_weights = np.dot(self.input.T, grad_output)
        grad_bias = np.sum(grad_output, axis=0, keepdims=True)

        grad_norm = np.linalg.norm(grad_weights)
        if grad_norm > clip_value:
            grad_weights = grad_weights * (clip_value / grad_norm)

        self.weights -= lr * grad_weights
        self.bias -= lr * grad_bias

        return np.dot(grad_output, self.weights.T)

In [None]:
class Softmax:
    def __init__(self):
        self.pred = None

    def feedforward(self, mat):
        ex = np.exp(mat - np.max(mat, axis=1, keepdims=True))
        self.pred = ex / np.sum(ex, axis=1, keepdims=True)
        return self.pred

    def backward(self, answer):  # lr 제거
        return self.pred - answer

In [None]:
def cross_entropy(y_pred, y_true):
    return -np.mean(y_true * np.log(y_pred + 1e-9))

def mse_derivative(y_pred, y_true):
    return 2 * (y_pred - y_true) / y_pred.shape[0]

In [69]:
import numpy as np

class Network:
    def __init__(self, layers=None, batch_size=32):
        if layers is None:
            self.layers = []
        else:
            self.layers = layers
        self.loss = None
        self.lr = None
        self.batch_size = batch_size

    def add(self, layer):
        self.layers.append(layer)

    def fit(self, x_data, y_data, epochs):
        prev_total_loss = float('inf')
        num_batches = int(np.ceil(x_data.shape[0] / self.batch_size))  # 배치 수 계산
        time_steps = x_data.shape[1]  # 시퀀스 길이

        for epoch in range(epochs):
            total_loss = 0

            # 각 배치마다 순전파/역전파 수행
            for batch in range(num_batches):
                batch_start = batch * self.batch_size
                batch_end = min(batch_start + self.batch_size, x_data.shape[0])
                x_batch = x_data[batch_start:batch_end]
                y_batch = y_data[batch_start:batch_end]
                current_batch_size = x_batch.shape[0]

                # forward propagation
                x_input = x_batch
                lstm_states = []   # 각 LSTM 레이어의 마지막 상태 저장
                lstm_gates = []    # 각 LSTM 레이어의 각 타임스텝 게이트 저장

                for layer in self.layers:
                    if isinstance(layer, LSTMCELL):
                        # 각 LSTM 레이어는 자기 배치 크기에 맞게 초기 상태를 생성
                        states = [np.zeros((current_batch_size, layer.output_node)),
                                  np.zeros((current_batch_size, layer.output_node))]
                        layer_outputs = []
                        layer_gates_list = []
                        for t in range(time_steps):
                            x_t = x_input[:, t, :]
                            h_s, states, gates = layer.feedforward(x_t, states)
                            layer_outputs.append(h_s)
                            layer_gates_list.append(gates)
                        # 만약 return_sequence가 True이면 시퀀스 전체를, False이면 마지막 타임스텝만 반환
                        if layer.return_sequence:
                            x_input = np.stack(layer_outputs, axis=1)
                        else:
                            x_input = layer_outputs[-1]
                        lstm_states.append(states)
                        lstm_gates.append(layer_gates_list)
                    else:
                        x_input = layer.feedforward(x_input)

                final_output = x_input
                loss = cross_entropy(final_output, y_batch)
                total_loss += loss

                # 손실에 대한 gradient 계산 (여기서는 예시로 MSE 미분을 사용; 필요에 따라 수정)
                grad_output = mse_derivative(final_output, y_batch)

                # backward propagation: 레이어 역순으로 진행
                for layer in reversed(self.layers):
                    if isinstance(layer, LSTMCELL):
                        # 저장해두었던 해당 레이어의 게이트 정보와 상태를 사용
                        layer_gates_list = lstm_gates.pop()
                        states = lstm_states.pop()
                        if layer.return_sequence:
                            # grad_output shape: (batch_size, time_steps, output_node)
                            new_grad = 0
                            for t in reversed(range(time_steps)):
                                grad_t = grad_output[:, t, :]
                                grad_t, states, _ = layer.backward(grad_t, states, layer_gates_list[t], self.lr)
                                new_grad += grad_t
                            grad_output = new_grad
                        else:
                            for t in reversed(range(time_steps)):
                                grad_output, states, _ = layer.backward(grad_output, states, layer_gates_list[t], self.lr)
                    elif isinstance(layer, (sigmoid, tanh, Softmax)):
                        grad_output = layer.backward(grad_output)
                    else:
                        grad_output = layer.backward(grad_output, self.lr)

            print(f"Epoch {epoch + 1}/{epochs}, Loss: {total_loss / num_batches}")

            if prev_total_loss < total_loss:
                break
            else:
                prev_total_loss = total_loss

    def pred(self, mat):
        batch_size, time_steps, input_dim = mat.shape
        x_input = mat

        for layer in self.layers:
            if isinstance(layer, LSTMCELL):
                states = [np.zeros((batch_size, layer.output_node)),
                          np.zeros((batch_size, layer.output_node))]
                layer_outputs = []
                for t in range(time_steps):
                    x_t = x_input[:, t, :]
                    h_s, states, _ = layer.feedforward(x_t, states)
                    layer_outputs.append(h_s)
                if layer.return_sequence:
                    x_input = np.stack(layer_outputs, axis=1)
                else:
                    x_input = layer_outputs[-1]
            else:
                x_input = layer.feedforward(x_input)
        
        return x_input

    def compile(self, loss="cross_entropy", lr=0.01):
        self.loss = loss
        self.lr = lr


In [72]:
import numpy as np

# 네트워크 구성 및 학습
seq_length = 10
batch_size = 32
x_data = np.random.randn(batch_size, seq_length, 4)
y_data = np.random.randint(0, 2, size=(batch_size, 1))

network = Network(batch_size=batch_size)
network.add(LSTMCELL(input_dim=4, output_node=16, return_sequence=False, batch_size=batch_size))
network.add(Linear(input_node=16, output_node=1))
network.add(sigmoid())  # Softmax 대신 Sigmoid 사용

network.compile(loss="cross_entropy", lr=0.1)
network.fit(x_data, y_data, epochs=10000)


# 예측 수행
test_sample = np.random.randn(1, seq_length, 4)
prediction = network.pred(test_sample)
print("Prediction:", prediction)


Epoch 1/10000, Loss: 0.4548836447029083
Epoch 2/10000, Loss: 0.4523245379352947
Epoch 3/10000, Loss: 0.449807293735205
Epoch 4/10000, Loss: 0.44733121524940744
Epoch 5/10000, Loss: 0.44489561274435724
Epoch 6/10000, Loss: 0.44249980388823
Epoch 7/10000, Loss: 0.44014311400527933
Epoch 8/10000, Loss: 0.43782487630386413
Epoch 9/10000, Loss: 0.43554443207946547
Epoch 10/10000, Loss: 0.43330113089398503
Epoch 11/10000, Loss: 0.4310943307325861
Epoch 12/10000, Loss: 0.4289233981393067
Epoch 13/10000, Loss: 0.4267877083326363
Epoch 14/10000, Loss: 0.42468664530221434
Epoch 15/10000, Loss: 0.422619601887769
Epoch 16/10000, Loss: 0.4205859798413749
Epoch 17/10000, Loss: 0.41858518987407345
Epoch 18/10000, Loss: 0.41661665168785345
Epoch 19/10000, Loss: 0.4146797939939555
Epoch 20/10000, Loss: 0.41277405451842064
Epoch 21/10000, Loss: 0.41089887999576474
Epoch 22/10000, Loss: 0.4090537261516226
Epoch 23/10000, Loss: 0.4072380576751644
Epoch 24/10000, Loss: 0.40545134818205125
Epoch 25/10000, L