In [37]:
from tensorflow.keras.datasets import cifar10
import numpy as np
# import cupy as np

# 데이터 로드 (NumPy 배열로 로드됨)
(X_train, y_train), (X_test, y_test) = cifar10.load_data()

# 데이터 정규화 및 NumPy에서 CuPy로 변환
X_train = np.array(X_train / 255.0)
X_test = np.array(X_test / 255.0)

# # 레이블을 원-핫 인코딩으로 변환 후 CuPy로 변환
y_train_onehot = np.array(np.eye(10)[y_train.reshape(-1)])
y_test_onehot = np.array(np.eye(10)[y_test.reshape(-1)])

# # 확인
print(f"X_train shape: {X_train.shape}, dtype: {X_train.dtype}")
print(f"y_train_onehot shape: {y_train_onehot.shape}, dtype: {y_train_onehot.dtype}")



X_train shape: (50000, 32, 32, 3), dtype: float64
y_train_onehot shape: (50000, 10), dtype: float64


In [2]:
class CNN:
  def __init__(self):
    self.layers = []
    self.filters = []
    self.weights = []
    self.biases = []
    self.caches = []

  def add_conv_layer(self,num_filters, filter_size, input_depth, padding=0):
    """합성곱 레이어 추가"""
    layer_filters = np.random.randn(num_filters, filter_size, filter_size, input_depth) * 0.01
    self.filters.append(layer_filters)
    self.layers.append(('conv',num_filters, filter_size, padding))

  def add_pool_layer(self, pool_size=2, stride=2):
    self.layers.append(('pool', pool_size, stride))

  def add_dense_layer(self, input_size, output_size):
    W = np.random.randn(input_size, output_size) * 0.01
    b = np.zeros((1, output_size))
    self.weights.append(W)
    self.biases.append(b)
    self.layers.append(('dense', input_size, output_size))

  def pad(self, X, pad_size):
    """패딩 메서드: 입력에 패딩 추가"""
    # 각 차원에 맞춰 패딩을 설정하며, 마지막 차원에는 패딩을 추가하지 않음
    padding_shape = [(pad_size, pad_size) if i < X.ndim - 1 else (0, 0) for i in range(X.ndim)]
    return np.pad(X, padding_shape, mode='constant', constant_values=0)


  def relu(self, x):
    return np.maximum(0, x)

  def relu_derivative(self, x):
    return (x > 0).astype(float)

  def softmax(self, x):
    exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))
    return exp_x / np.sum(exp_x, axis=1, keepdims=True)

  def convolve(self, X, filters, padding=0):
    # 배치가 있는 경우와 없는 경우를 처리
    if len(X.shape) == 4:
        batch_size, h, w, d = X.shape
        outputs = []
        for i in range(batch_size):
            # 개별 샘플에 대해 패딩 및 합성곱 수행
            output = self.convolve_single(X[i], filters, padding)
            outputs.append(output)
        return np.array(outputs)
    else:
        # 배치가 없는 경우 단일 이미지에 대해 합성곱 수행
        return self.convolve_single(X, filters, padding)

  def convolve_single(self, X, filters, padding=0):
    """단일 이미지에 대한 합성곱 연산"""
    if padding > 0:
      X = self.pad(X, padding)

    h, w, d = X.shape
    num_filters, filter_height, filter_width, _ = filters.shape
    output_height = h - filter_height + 1
    output_width = w - filter_width + 1
    output = np.zeros((output_height, output_width, num_filters))

    for f in range(num_filters):
      filter = filters[f]
      for i in range(output_height):
        for j in range(output_width):
          region = X[i:i + filter_height, j:j + filter_width, :]
          output[i, j, f] = np.sum(region * filter)
    return output

  def pool(self, X, size=2, stride=2):
    """최대 풀링 연산 수행"""
    h, w, d = X.shape
    output = np.zeros((h//size, w//size, d))
    for i in range(0, h, stride):
      for j in rnage(0, w, stride):
        output[i // stride, j // stride, :] = np.max(X[i:i + size, j:j + size, :], axis=(0, 1))
    return output

  def flatten(self, X):
    """Flatten 레이어"""
    return X.reshape(1,-1)

  def fully_connected(self, X, W, b):
    """완전 연결 연산"""
    return np.dot(X, W) + b

  def forward(self,X):
    """순전파: 추가된 레이어들을 순서대로 처리"""
    out = X
    self.cache = []
    conv_layer_count = 0
    dense_layer_count = 0

    for layer in self.layers:
      if layer[0] == 'conv':
        num_filters, filter_size, padding = layer[1], layer[2], layer[3]
        filters = self.filters[conv_layer_count]
        out = self.convolve(out, filters, padding)
        out = self.relu(out)
        self.cache.append(out)
        conv_layer_count += 1

      elif layer[0] == 'pool':
        pool_size, stride = layer[1], layer[2]
        out = self.pool(out, pool_size, stride)
        self.cache.append((out, pool_size, stride))

      elif layer[0] == 'dense':
        input_size, output_size, = layer[1], layer[2]
        W, b = self.weights[dense_layer_count], self.biases[dense_layer_count]
        if dense_layer_count == 0:
          out = self.flatten(out)
        out = self.fully_connected(out, W, b)
        if dense_layer_count < len(self.weights) - 1:
          out = self.relu(out)
        self.cache.append(out)
        denxe_layer_count += 1

    out = self.softmax(out)
    self.cache.append(out)
    return out

  def compute_loss(self, y_pred, y_true):
    m = y_true.shape[0]
    log_likelihood = -np.log(y_pred[range(m), ytrue])
    return np.sum(log_likelihood) / m

  def bakward(self, y, learning_rate=0.01):
    """역전파를 통한 기울기 계산 및 가중치 업데이트"""
    d_out = self.cache.pop()
    d_out[range(len(y)), y] -= 1

    dense_layer_count = len(self.weigths) - 1
    conv_layer_count = len(self.filters) - 1

    for layer in reversed(self.layers):
      if layer[0] =='dense':
        dZ = d_out * self.relu_derivative(self.cache.pop())
        dW = np.dot(self.cache[-1].T, dZ)
        db = np.sum(dZ, axis=0, keepdims=True)
        self.weigths[dense_layer_count] -= learning_rate * dW
        self.biases[dense_layer_count] -= learning_rate * db
        d_out = np.dot(dZ, self.weights[dense_layer_count].T)
        dense_layer_count -= 1

      elif layer[0] == 'pool':
        d_out = self.pool_backward(d_out, self.cache.pop())

      elif layer[0] == 'conv':
        filters = self.filters[conv_layer_count]
        d_out, d_filters = self.conv_backward(d_out, self.cache.pop(), filters)
        self.filters[conv_layer_count] -= learning_rate * d_filters
        conv_layer_count -= 1

  def pool_backward(self, d_out, cache):
    d_pool, pool_size, stride = np.zeros_like(cache[0]), cache[1], cache[2]
    h, w, d = cache[0].shape
    for i in range(0, h, stride):
      for k in range(0, w, stride):
        for k in range(d):
          idx = np.unravel_index(np.argmax(cache[0][i:i + pool_size, j:j + pool_size, k]), (pool_size, pool_size))
          d_pool[i+idx[0], j+idx[1], k] = d_out[i//stride, j//stride,k]
    return d_pool

  def conv_backward(self, d_out, cache, filters):
    """Convolution Layer 역전파"""
    d_filters = np.zeros_like(filters)
    h, w, d = cache.shape
    num_filters, filter_height, filter_width, _ = filters.shape
    d_conv = np.zeros_like(cahce)

    for f in range(num_filters):
      filter = filters[f]
      for i in range(h - filter_height + 1):
        for j in range(w - filter_width + 1):
          for k in range(b):
            region = cache[i:i + filter_height, j:j + filter_width, :]
            d_filters[f, :, :, k] += region * d_out[i, j, f]
            d_conv[i:i+filter_height, j:j+filter_width, k] += filter * d_out[i,j,f]
    return d_conv, d_filters

  def train(self, X, y, epochs=10, learning_rate=0.01):
    for epoch in range(epochs):
      y_pred = self.forward(X)
      loss = self.compute_loss(y_pred, y)
      print(f"Epoch {epoch + 1}, Loss: {loss}")
      self.backward(y, learning_rate)

  def predict(self, X):
    y_pred = self.forward(X)
    return np.argmax(y_pred, axis=1)






In [None]:
cnn = CNN()
cnn.add_conv_layer(num_filters=8, filter_size=3, input_depth=3, padding=1)
cnn.add_pool_layer(pool_size=2, stride=2)
cnn.add_conv_layer(num_filters=16, filter_size=3, input_depth=8, padding=1)
cnn.add_pool_layer(pool_size=2, stride=2)
cnn.add_dense_layer(input_size=16 * 8 * 8, output_size=128)
cnn.add_dense_layer(input_size=128, output_size=10)


cnn.train(X_train, y_train_onehot, epochs=5, learning_rate=0.01)


In [6]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# 예측 수행 (예시)
y_pred = cnn.predict(X_test)
y_test_cpu = y_test.get()  # CuPy 배열을 NumPy 배열로 변환
y_pred_cpu = y_pred.get()  # CuPy 배열을 NumPy 배열로 변환

# 혼동 행렬 계산
conf_matrix = confusion_matrix(y_test_cpu, y_pred_cpu)

# 혼동 행렬 시각화
plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=range(10), yticklabels=range(10))
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix for CIFAR-10 Classification")
plt.show()

(50000, 32, 32, 3)

In [None]:
# 간단한 CNN 모델 생성 및 레이어 추가
cnn = CNN()
cnn.add_conv_layer(num_filters=8, filter_size=3, input_depth=3, padding=1)  # 하나의 합성곱 층
cnn.add_pool_layer(pool_size=2, stride=2)                                   # 하나의 풀링 층
cnn.add_dense_layer(input_size=8 * 16 * 16, output_size=128)                # 하나의 Dense 층 (flatten 후)
cnn.add_dense_layer(input_size=128, output_size=10)                         # 출력층

# 학습 수행
cnn.train(X_train, y_train_onehot, epochs=5, learning_rate=0.01)


In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# 예측 수행 (예시)
y_pred = cnn.predict(X_test)
y_test_cpu = y_test.get()  # CuPy 배열을 NumPy 배열로 변환
y_pred_cpu = y_pred.get()  # CuPy 배열을 NumPy 배열로 변환

# 혼동 행렬 계산
conf_matrix = confusion_matrix(y_test_cpu, y_pred_cpu)

# 혼동 행렬 시각화
plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=range(10), yticklabels=range(10))
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix for CIFAR-10 Classification")
plt.show()

In [50]:
import keras
from keras.models import Sequential
from keras.layers import Input, Dense, Conv2D, MaxPool2D, Flatten, Dropout
from keras.utils import clear_session
from keras.callbacks import EarlyStopping
from keras.models import Model

In [71]:
clear_session()

il = Input(shape = (32,32,3))

hl = Conv2D(filters=8,
            kernel_size = (3,3),
            padding='same',
            activation='relu'
            )(il)

hl = MaxPool2D(pool_size = (2,2),
               strides = (2,2)
               )(hl)

hl = Conv2D(filters=16,
            kernel_size = (3,3),
            padding='same',
            activation='relu'
            )(hl)

hl = MaxPool2D(pool_size = (2,2),
               strides = (2,2)
               )(hl)


hl = Flatten()(hl)

hl = Dense(128,  activation='relu')(hl)

dr = Dropout(0.5)(hl)

ol = Dense(10,  activation='softmax')(dr)


model = Model(il, ol)

model.summary()

In [72]:
model.compile(optimizer='adam', loss=keras.losses.categorical_crossentropy, metrics=['accuracy'])

In [73]:
es = EarlyStopping(monitor = 'val_loss', min_delta=0, patience=7, restore_best_weights=True )

In [74]:
hist = model.fit(X_train, y_train_onehot, epochs = 10000, validation_split=.2, callbacks=[es])

Epoch 1/10000
[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 4ms/step - accuracy: 0.2725 - loss: 1.9671 - val_accuracy: 0.4837 - val_loss: 1.4356
Epoch 2/10000
[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 2ms/step - accuracy: 0.4533 - loss: 1.4989 - val_accuracy: 0.5262 - val_loss: 1.3293
Epoch 3/10000
[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 2ms/step - accuracy: 0.5062 - loss: 1.3826 - val_accuracy: 0.5641 - val_loss: 1.2430
Epoch 4/10000
[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 3ms/step - accuracy: 0.5301 - loss: 1.3156 - val_accuracy: 0.5736 - val_loss: 1.2003
Epoch 5/10000
[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 2ms/step - accuracy: 0.5453 - loss: 1.2656 - val_accuracy: 0.5920 - val_loss: 1.1572
Epoch 6/10000
[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 3ms/step - accuracy: 0.5693 - loss: 1.2048 - val_accuracy: 0.6032 - val_loss: 1.125

#torch

In [7]:
from tensorflow.keras.datasets import cifar10
import numpy as np

# CIFAR-10 데이터셋 로드
(X_train, y_train), (X_test, y_test) = cifar10.load_data()

# 데이터 정규화 (0-1 범위로 스케일링)
X_train = X_train / 255.0
X_test = X_test / 255.0

# 레이블을 원-핫 인코딩 없이 정수 형태로 변환
y_train = y_train.reshape(-1)
y_test = y_test.reshape(-1)

# NumPy 배열을 PyTorch 텐서로 변환
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).permute(0, 3, 1, 2)  # (N, H, W, C) -> (N, C, H, W)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).permute(0, 3, 1, 2)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)


In [8]:
from torch.utils.data import TensorDataset, DataLoader

# TensorDataset을 통해 입력과 레이블을 묶기
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

# DataLoader 설정
train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=64, shuffle=False)


In [15]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt

In [29]:
class SimpleCNN(nn.Module):
  def __init__(self):
    super(SimpleCNN, self).__init__()

    self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)
    self.bn1 = nn.BatchNorm2d(32)
    self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

    self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
    self.bn2 = nn.BatchNorm2d(64)

    self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
    self.bn3 = nn.BatchNorm2d(128)

    self.fc1 = nn.Linear(128 * 4 * 4, 256)
    self.dropout = nn.Dropout(0.5)
    self.fc2 = nn.Linear(256, 10)

  def forward(self, x):
    x = self.pool(F.relu(self.bn1(self.conv1(x))))
    x = self.pool(F.relu(self.bn2(self.conv2(x))))
    x = self.pool(F.relu(self.bn3(self.conv3(x))))


    #Flatten
    x = x.view(-1, 128 * 4 * 4)

    x = F.relu(self.fc1(x))
    x = self.dropout(x)
    x = self.fc2(x)

    return x

In [33]:
model = SimpleCNN()

In [34]:
print(model)

SimpleCNN(
  (conv1): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv3): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn3): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (fc1): Linear(in_features=2048, out_features=256, bias=True)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc2): Linear(in_features=256, out_features=10, bias=True)
)


In [35]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = 0.0001)

In [None]:
num_epochs = 100
for epoch in range(num_epochs):
  for images, labels in train_loader:
    output = model(images)

    loss = criterion(output, labels)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

  if not epoch%10:
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss : {loss.item():.6f}')

Epoch [1/100], Loss : 0.714551
Epoch [11/100], Loss : 0.033514
Epoch [21/100], Loss : 0.178189
Epoch [31/100], Loss : 0.061476
Epoch [41/100], Loss : 0.068716
Epoch [51/100], Loss : 0.044403
