# 목적

- 파이토치로 CNN을 구현하여 이미지 데이터가 어떤 이미지인지 분류하는 모델을 생성하겠다
- 객체 지향방식으로 모델을 생성
- 데이터
  - 치매/정상 뇌스캔 이미지
  - train/ad or normal: 80개
  - test/ad or normal: 60개
  - 형식 *.jpg
- 목표
  - 뇌스캔 사진 입력 -> 치매를 진단 (정상/치매)

# 모듈 가져오기, 환경 변수

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# 참고 (연습용 내부 데이터) -> FashionMNIST
from torchvision import transforms, datasets

In [2]:
# GPU 설정
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
DEVICE

device(type='cuda')

## 학습 관련 용어 정리

- 종류 1
  - 온라인 : 학습된 모델이 실시간 반영 (리스크가 크다)
  - 오프라인 : 모델 교체시 시스템 다운 -> 교체 ->가동, 실제 서비스와 분리되어 있다

- 종류 2
  - 배치학습
    - 전체 데이터를 통으로 학습에 한번에 넣어서 진행 (많은 리소스 요구)
  - 미니베치학습
    - 전체 데이터를 n개로 분할해서 학습 회수를 늘려서 진행
- 전략 
  - 내가 직접 신경망을 만들어서 학습
  - 전이학습 (tranfer learning)
    - 타인이 오랜시간 대량의 데이터로 학습한(사전학습) 모델을 가져와서 사용
    - 파인튜닝
    - 프럼프트 러닝
    - 인컨텍스트 러닝
      - 제로샷 러닝
      - 원샷 러닝
      - 퓨샷 러닝
- 학습시 요소
  - epoch : 전체 데이터를 한번 다 사용하여 학습한 경우 '1 epoch' 라고 표현 (1세대 학습)
  - batch size : 1회 학습시 투입되는 데이터량 -> 미니 배치 학습
  - 예시, 데이터가 총 1,000,000 개 있다 
    - 1회 학습시 1,000개 사용
    ( 세대별로 1000개의 조합이 다르다 )
    - 10 에폭으로 학습 하겠다
    - 1 에폭 학습시 학습 행위가 몇번 진행되는가? 1,000번 학습이 진행
  - 조기학습 종료 ( early_stopping )
    - 더이상 학습의 성과가 없으면 학습행위를 중단한다

In [3]:
# 환경변수 (학습에 관련된 설정값)
# GPU 환경 : 30회 / CPU 환경 : 5회
EPOCHES = 30 if torch.cuda.is_available() else 5
BATCH_SIZE = 64 # 1회 학습시 64개 데이터를 공급받아서 학습하겠다

# 데이터 준비

- 데이터 공급자를 설계 제공

In [4]:
# 훈련용 데이터 공급자
train_loader = torch.utils.data.DataLoader( datasets.FashionMNIST( root='./data',# 데이터가 저장된 위치
                                                                  train= True,   # 훈련용
                                                                  download=True, # 부재시 다운로드 진행
                                                                  # 이미지를 전처리 (가공, 등등)
                                                                  transform=transforms.Compose([
                                                                      transforms.ToTensor(), # 이미지는 텐서로 받겠다
                                                                      transforms.Normalize( 0.2,0.3 ) # 정규화, 평균, 표준편차 설정값
                                                                  ])),
                                           batch_size=BATCH_SIZE, 
                                           shuffle=True )

In [5]:
test_loader = torch.utils.data.DataLoader( datasets.FashionMNIST(root='./data', # 데이터가 저장된 위치
                                                                  train= False,  # 테스트용
                                                                  download=True,# 부재시 다운로드 진행
                                                                  # 이미지를 전처리 (가공, 등등)
                                                                  transform=transforms.Compose([
                                                                      transforms.ToTensor(), # 이미지는 텐서로 받겠다
                                                                      transforms.Normalize( 0.2,0.3 ) # 정규화, 평균, 표준편차 설정값
                                                                  ])),
                                              batch_size=BATCH_SIZE, 
                                              shuffle=True )

# 인공신경망 구성

## 합성곱 요소간 관계식

- 합성곱을 통과한 feature map의 크기
- H = ((h + 2*p-k) / s) + 1
- W = ((w + 2*p-k) / s) + 1

In [6]:
if 0:
  class Net(nn.Module):
    # 맴버 변수
    # 맴버 함수(상속 받은 함수 재정의 or 직접 구현)
    def forward(self,x): 
      '''
        부모로 부터 받은 상속 함수
        모델의 순방향으로 네트워크를 구성하는 함수
        네트워크 연결 (=keras의 add())
      '''
      pass
    # 생성자 ( self <-> this)
    def __init__(self):
      '''
        1. 부모 생성자 호출 -> 통상 상속받으면 진행
      '''
      super(Net,self).__init__()
      '''
        네트워크의 각 요소를 생성 -> 합성곱층, 풀링층, 전결합층, 출력층
        MNIST(28,28)  -> (14,14) -> (7,7) -> FC -> 출력층으로 수렴
      '''
      # 1F 합성곱층 -> 맴버변수 -> 객체.맴버
      # 1개 이미지 -> 합성곱 통과시키면 32장으로 증폭 # 행렬의 곱을 이용해서 특징을 추출
      self.conv1 = nn.Conv2d( 1, 32, 5, 1, padding='same' )
      # 2F 합성곱층
      self.conv2 = nn.Conv2d( 32, 32*2, 5, 1, padding='same' )
      # 과적합 방지 층 -> 학습 방해 (10% 방해하겠다) -> 학습이 되지않게 조절하는 값
      self.dropout = nn.Dropout2d(0.1)
      # 3F FC : 4D -> 2D로 Flattern 후에 출력에 수렴하기전 적당한 크기로 완충하는 역할
      # 원본이미지 784 pixel -> 3136 pixel -> 1024 <- feature
      self.fc = nn.Linear( 7*7*32*2, 1024 )
      # 4F output 생성
      self.output = nn.Linear( 1024,10 )
      
      pass


In [7]:
class Net(nn.Module):
  # 맴버 변수
  # 맴버 함수(상속 받은 함수 재정의 or 직접 구현)
  def forward(self,x): # 순정신경망 (x->y)
    # x는 입력,stride가 2이므로 이미지 크기는 반으로 줄어든다
    # 커널의 절반만 뽑는다 -> 커널의 크기 절반 스트라이드 2
    x = F.relu( F.max_pool2d( self.conv1( x ), 2) )   # 28 -> 14
    # 이미지는 반으로 줄어든다
    x = F.relu( F.max_pool2d( self.dropout( self.conv2( x ) ), 2) )  # 14 -> 7
    # Flattern 4D -> 2D view()
    x = x.view( -1 , 7*7*32*2 ) # 이미지 1개에 대해서 총량을 맞추면 총 개수는 알아서 설정
    x = F.relu(self.fc(x)) # 3136 -> 1024 (feature size)
    x = F.dropout ( x, training=self.training) # 학습률로 표시하는 예
    # 출력층
    x = F.relu( self.output(x) ) # 1024 -> 10
    # x를 리턴
    return F.log_softmax( x, dim=1 )
    pass
    

  def __init__(self):
    super(Net,self).__init__()
    self.conv1 = nn.Conv2d( 1, 32, 5, 1, padding='same' ) 
    self.conv2 = nn.Conv2d( 32, 32*2, 5, 1, padding='same' ) 
    self.dropout = nn.Dropout2d(0.1) 
    self.fc = nn.Linear( 7*7*32*2, 1024 )
    self.output = nn.Linear( 1024,10 )    
    pass


In [8]:
# 모델 생성 -> GPU 적용
model = Net().to( DEVICE )

In [9]:
# 파라미터 확인 -> W,b 
model

Net(
  (conv1): Conv2d(1, 32, kernel_size=(5, 5), stride=(1, 1), padding=same)
  (conv2): Conv2d(32, 64, kernel_size=(5, 5), stride=(1, 1), padding=same)
  (dropout): Dropout2d(p=0.1, inplace=False)
  (fc): Linear(in_features=3136, out_features=1024, bias=True)
  (output): Linear(in_features=1024, out_features=10, bias=True)
)

In [10]:
# 최적화 도구 준비 -> 케라스( compile() 연결된다 )
# ( 튜닝대상(W,b), 학습계수(적용비율), 속도 )
optimizer = optim.SGD( model.parameters(), lr=0.01, momentum=0.5 ) # 경사 하강법

# 학습 구성

# 학습 실행 및 테스트

In [15]:
def train( model, train_loader, optimizerm, epoch ):
  '''
    model         : 모델 (학습 대상)
    train_loader  : 학습시 사용할 데이터를 제공하는 공급자
    optimizer     : 모델 학습시 손실값을 최소로 줄일 수 있게 조정하는 주체
                    최적의 W,b를 계산(조정, 수정)하는 주체
  '''
  # 학습 모드로 전환
  model.train()
  # 미니 배치 학습 진행 -> 1회 반복시 64개 데이터를 추출(batch_size 만큼)
  for idx, (data, target) in enumerate(train_loader):
    # data : 훈련용 이미지 (텐서 [64,]), target : 정답
    # 데이터 형태
    # torch.Size([64, 1, 28, 28]) tensor([7, 8, 5, 1, 8, 2, 5, 5, 6, 7, 3, 8, 0, 4, 8, 7, 4, 0, 7, 4, 1, 5, 2, 0,
    #     5, 6, 4, 8, 6, 1, 4, 7, 1, 3, 4, 2, 2, 0, 0, 6, 1, 6, 4, 7, 4, 7, 2, 6,
    #     8, 5, 4, 9, 0, 0, 5, 9, 5, 8, 5, 0, 8, 8, 3, 0])
    # print(data.size(),target)
    # 1. 학습에 사용된 데이터를 gpu 혹은 cpu 사용 설정
    data   = data.to( DEVICE )
    target = target.to( DEVICE )
    # 2. 최적화 도구에 남아있는 값을 초기화한다 (이전 세대에서 사용한 잔재 초기화)
    optimizer.zero_grad()
    # 3. 모델에 데이터 삽입 -> 출력 획득 -> 예측 수행되었음
    output = model(data)
    # print( output.size() ) # (64,10) : torch.Size([64, 10])
    # 4. 정답과 예측값의 차이를 계산 -> 양으로 계산 -> 손실값
    #     cross_entropy() : 손실함수
    loss = F.cross_entropy( output, target )
    # 5. 최적화 수행 -> 오차 역전파 수행 (y->x) -> 이를 위해 w,b 조정
    loss.backward()
    # 6. 최적화된 도구 모델 파라미터 최종에 반영
    optimizer.step()
    # 7. 로그 출력 세대별 로그
    if idx % 200 == 0:
      # 학습세대 회차/세개별 학습총회차 손실값
      print( f'Ephoch:{epoch}, {idx*len(data)} / {len(train_loader.dataset)} loss:{loss.item()}')
    # break
  pass

In [16]:
def test(model,test_loader):
  # 테스트 모드로 전환
  model.eval()
  loss = 0
  acc = 0
  # 테스트 행위간 발생되는 모든 내용을 기록하지 않는다
  with torch.no_grad():
    for data,target in test_loader:
      data,target = data.to(DEVICE), target.to(DEVICE)
      output = model(data)
      # 손실값 획득
      loss += F.cross_entropy( output,target,reduction='sum' ).item()
      # 예측값 추출 -> 최고값을 가진 인덱스 추출 -> numpy.argmax() ;주어진 배열에서 최고값을 가진 인덱스의 번호
      pred = output.max(1, keepdim=True)[1] # 최고값을 가지는 방번호를 모아서 텐서로 리턴
      # 정확도 체크 -> 행렬 비교 -> True -> 1 -> sum
      acc += pred.eq( target.view_as(pred) ).sum().item()
      pass
    pass
  # 평균 손실(값)
  mean_loss = loss / len(test_loader.dataset)
  # 평균 정확도(%)
  mean_acc = acc / len(test_loader.dataset) * 100
  return mean_loss,mean_acc
  pass

In [17]:
# 조기학습 종료 컨셉이 없으므로 EPOCHES 설정 횟수만큼 진행
for epoch in range(1,EPOCHES+1):
  # 1세대 학습이 끝나면 바로 테스트
  # 훈련
  train( model, train_loader, optimizer, epoch )
  # 테스트
  loss, acc = test(model, test_loader)
  # 로그
  print( f'epoch:{epoch}, 손실:{loss}, 정확도:{acc}%' )
  # 조기학습 종료 추가 (전체 세대 학습을 채울 필요는 없다), 여기서는 생략
  # break

Ephoch:1, 0 / 60000 loss:0.5058769583702087
Ephoch:1, 12800 / 60000 loss:0.41434499621391296
Ephoch:1, 25600 / 60000 loss:0.3074445128440857
Ephoch:1, 38400 / 60000 loss:0.290014386177063
Ephoch:1, 51200 / 60000 loss:0.43489858508110046
epoch:1, 손실:0.3854799113035202, 정확도:86.06%
Ephoch:2, 0 / 60000 loss:0.3072800636291504
Ephoch:2, 12800 / 60000 loss:0.4672187566757202
Ephoch:2, 25600 / 60000 loss:0.27181383967399597
Ephoch:2, 38400 / 60000 loss:0.30188512802124023
Ephoch:2, 51200 / 60000 loss:0.4909271001815796
epoch:2, 손실:0.3538678282260895, 정확도:87.13%
Ephoch:3, 0 / 60000 loss:0.3351293206214905
Ephoch:3, 12800 / 60000 loss:0.20735657215118408
Ephoch:3, 25600 / 60000 loss:0.2031710296869278
Ephoch:3, 38400 / 60000 loss:0.3410649001598358
Ephoch:3, 51200 / 60000 loss:0.41748952865600586
epoch:3, 손실:0.3285999725818634, 정확도:87.94999999999999%
Ephoch:4, 0 / 60000 loss:0.3049285411834717
Ephoch:4, 12800 / 60000 loss:0.20629073679447174
Ephoch:4, 25600 / 60000 loss:0.19549855589866638
Epho

In [14]:
# 학습 결과 시각화 >
torch.save(model,)