[1] 모듈 로딩 및 데이터 준비

In [39]:
# 모듈 로딩
import torch
import torch.nn as nn
import torch.optim as optim 
from torch.utils.data import Dataset, DataLoader

import numpy as np
import pandas as pd

In [6]:
# 데이터 준비
x_data = torch.IntTensor([[10, 20, 30], [20, 30, 40], [30, 40, 50], [40, 50, 60], [50, 60 ,70]])
y_data = torch.IntTensor([[20], [30], [40], [50], [60]])
print(f'x_data => {x_data.shape}, {x_data.ndim}')
print(f'y_data => {y_data.shape}, {y_data.ndim}')

x_data => torch.Size([5, 3]), 2
y_data => torch.Size([5, 1]), 2


[2] 데이터셋 생성

[2-1] TensorDataset 활용 : Dataset의 sub_class

In [7]:
# TensorDataset 클래스 로딩
from torch.utils.data import TensorDataset

In [8]:
dataset = TensorDataset(x_data, y_data)
dataset

<torch.utils.data.dataset.TensorDataset at 0x19faf0bb400>

In [9]:
dataset.tensors

(tensor([[10, 20, 30],
         [20, 30, 40],
         [30, 40, 50],
         [40, 50, 60],
         [50, 60, 70]], dtype=torch.int32),
 tensor([[20],
         [30],
         [40],
         [50],
         [60]], dtype=torch.int32))

In [10]:
# __getitem__() 메서드 호출
dataset[0]

(tensor([10, 20, 30], dtype=torch.int32), tensor([20], dtype=torch.int32))

In [11]:
len(dataset)

5

[2-2] 사용자 정의 데이터셋 생성

In [12]:
# 데이터 준비
filename = '../DATA/iris.csv'

irisDF = pd.read_csv(filename)
irisDF.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 150 entries, 0 to 149
Data columns (total 5 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   sepal_length  150 non-null    float64
 1   sepal_width   150 non-null    float64
 2   petal_length  150 non-null    float64
 3   petal_width   150 non-null    float64
 4   species       150 non-null    object 
dtypes: float64(4), object(1)
memory usage: 6.0+ KB


In [13]:
irisNP = np.loadtxt(filename, delimiter = ',', usecols = [0, 1, 2, 3], skiprows = 1)
irisNP[:2]

array([[5.1, 3.5, 1.4, 0.2],
       [4.9, 3. , 1.4, 0.2]])

In [14]:
## 데이터의 타입 체크
type(irisDF), type(irisNP), irisDF.__class__.__name__, irisNP.__class__.__name__

(pandas.core.frame.DataFrame, numpy.ndarray, 'DataFrame', 'ndarray')

In [15]:
isinstance(irisDF, pd.DataFrame), isinstance(irisNP, pd.DataFrame), isinstance(irisNP, np.ndarray)

(True, False, True)

In [62]:
# 사용자 정의 DataSet 클래스
class DLDataset(Dataset):

    # 초기화 콜백 함수(callback function)
    def __init__(self, x_data, y_data):
        super().__init__()
        x_data = x_data.values if isinstance(x_data, pd.DataFrame) else x_data
        y_data = y_data.values if isinstance(y_data, pd.DataFrame) else y_data
        self.feature = torch.tensor(x_data)
        self.target = torch.tensor(y_data)

        # ndarray => tensor
        self.feature = torch.FloatTensor(x_data)
        self.target = torch.FloatTensor(y_data)
    
    # 데이터셋의 개수 체크 콜백 함수(callback function)
    def __len__(self):
        return self.target.shape[0]
    
    # 특정 인덱스 데이터 + 라벨 반환 콜백 함수(callback function)
    def __getitem__(self, index):
        return self.feature[index], self.target[index]

In [17]:
# 피처와 라벨 분리
featureDF = irisDF[irisDF.columns[:-1]]
targetSR = irisDF[irisDF.columns[-1]]

print(f'featureDF => {featureDF.shape}, {featureDF.ndim}D')
print(f'targerSR => {targetSR.shape}, {targetSR.ndim}D')

featureDF => (150, 4), 2D
targerDF => (150,), 1D


In [18]:
# 타겟이 object이므로 정수로 변환
from sklearn.preprocessing import LabelEncoder

targetNP = LabelEncoder().fit_transform(targetSR)
targetNP = targetNP.reshape(-1, 1)
print(targetNP.shape, targetNP.ndim)

(150, 1) 2


In [19]:
# 데이터셋 생성 => DF, NP
my_dataset = DLDataset(featureDF, targetNP)
my_dataset[0]

(tensor([5.1000, 3.5000, 1.4000, 0.2000], dtype=torch.float64),
 tensor([0], dtype=torch.int32))

In [20]:
# 데이터셋 생성 => NP, NP
my_dataset2 = DLDataset(irisNP, targetNP)
my_dataset2[0]

(tensor([5.1000, 3.5000, 1.4000, 0.2000], dtype=torch.float64),
 tensor([0], dtype=torch.int32))

[2-3] 학습용, 검증용, 테스트용 Dataset

In [24]:
# 파이토치
from torch.utils.data import random_split

# 학습용, 검증용, 테스트 데이터 비율
seed = torch.Generator().manual_seed(42)

trainDS, validDS, testDS = random_split(my_dataset2, [0.7, 0.1, 0.2], generator = seed)

print(f'trainDS =>{len(trainDS)}개, validDS => {len(validDS)}개, testDS => {len(testDS)}개')

print(f'Subset 속성 =>\nindices : {trainDS.indices} \nindices : {trainDS.dataset}')
print(f'Subset 속성 =>\nindices : {validDS.indices} \nindices : {validDS.dataset}')
print(f'Subset 속성 =>\nindices : {testDS.indices} \nindices : {testDS.dataset}')

trainDS =>105개, validDS => 15개, testDS => 30개
Subset 속성 =>
indices : [42, 95, 30, 64, 52, 35, 130, 40, 82, 17, 108, 94, 68, 97, 117, 127, 41, 44, 57, 140, 149, 32, 23, 102, 16, 113, 71, 18, 67, 66, 0, 25, 101, 112, 91, 3, 59, 116, 86, 84, 106, 142, 43, 39, 26, 98, 93, 20, 87, 19, 120, 114, 7, 63, 76, 89, 36, 45, 37, 56, 58, 122, 51, 145, 24, 21, 105, 62, 15, 11, 48, 133, 88, 50, 6, 134, 111, 8, 49, 75, 69, 124, 4, 147, 80, 100, 99, 141, 47, 107, 13, 109, 129, 28, 38, 53, 121, 5, 55, 31, 73, 74, 54, 29, 12] 
indices : <__main__.DLDataset object at 0x0000019FB089BD30>
Subset 속성 =>
indices : [22, 104, 81, 1, 103, 125, 85, 2, 96, 128, 27, 118, 77, 110, 146] 
indices : <__main__.DLDataset object at 0x0000019FB089BD30>
Subset 속성 =>
indices : [72, 139, 131, 60, 65, 92, 135, 83, 14, 34, 137, 10, 119, 9, 148, 79, 78, 70, 144, 143, 123, 115, 61, 132, 90, 46, 126, 136, 33, 138] 
indices : <__main__.DLDataset object at 0x0000019FB089BD30>


[3] DataLoader 생성 : 학습용, 검증용, 테스트용

In [27]:
# DataLoader 생성
# drop_last 매개변수 : 배치 사이즈로 데이터셋 분리 후 남는 데이터 처리 방법 설정 [기본 : False]
batch = 10
trainDL = DataLoader(trainDS, batch_size = batch, drop_last = True)
validDL = DataLoader(validDS, batch_size = batch)
testDL = DataLoader(testDS, batch_size = batch)

len(trainDL), len(validDL), len(testDL)

(10, 2, 3)

In [34]:
# Epoch당 반복 단위
print(f'batch_size = {batch}')
print(f'trainDS => {len(trainDS)}개, validDS => {len(validDS)}개, testDS => {len(testDS)}개')
print(f'trainDL => {len(trainDL)}개, validDL => {len(validDL)}개, testDL => {len(testDL)}개')

batch_size = 10
trainDS => 105개, validDS => 15개, testDS => 30개
trainDL => 10개, validDL => 2개, testDL => 3개


In [None]:
batch = 10
trainDL = DataLoader(trainDS, batch_size = batch, drop_last = True)
validDL = DataLoader(validDS, batch_size = batch)
testDL = DataLoader(testDS, batch_size = batch)

len(trainDL), len(validDL), len(testDL)

In [29]:
# DataLoader 속성
for _, (feature, target) in enumerate(validDL):
    print(f'[{_}] feature {feature}')

[0] feature tensor([[4.6000, 3.6000, 1.0000, 0.2000],
        [6.5000, 3.0000, 5.8000, 2.2000],
        [5.5000, 2.4000, 3.7000, 1.0000],
        [4.9000, 3.0000, 1.4000, 0.2000],
        [6.3000, 2.9000, 5.6000, 1.8000],
        [7.2000, 3.2000, 6.0000, 1.8000],
        [6.0000, 3.4000, 4.5000, 1.6000],
        [4.7000, 3.2000, 1.3000, 0.2000],
        [5.7000, 2.9000, 4.2000, 1.3000],
        [6.4000, 2.8000, 5.6000, 2.1000]], dtype=torch.float64)
[1] feature tensor([[5.2000, 3.5000, 1.5000, 0.2000],
        [7.7000, 2.6000, 6.9000, 2.3000],
        [6.7000, 3.0000, 5.0000, 1.7000],
        [6.5000, 3.2000, 5.1000, 2.0000],
        [6.3000, 2.5000, 5.0000, 1.9000]], dtype=torch.float64)


[4] Model 클래스 정의 : 입출력 피처 수, 층 수, 은닉층의 노드 수
- 구조 설계
    * 입력층 : 입력 => 피처 개수, iris 데이터의 경우 4개
    * 은닉층 : 사용자가 직접 조정
    * 출력층 : [분류] => 타겟 클래스 개수, [회귀] => 1개

In [50]:
# 모델 클래스 정의
# 클래스명 : CModel
class CModel(nn.Module):

    # 구성요소 정의 함수
    def __init__(self, in_dim, out_dim):
        super().__init__()
        self.input_layer = nn.Linear(in_dim, 100)
        self.reul = nn.ReLU()
        self.hidden_layer = nn.Linear(100, 27)
        self.output_layer = nn.Linear(27, out_dim)
    
    # 순방향 학습 진행 함수
    def forward(self, x):
        y = self.input_layer(x) 
        y = self.relu(y)  # 결과 100개 반환
        y = self.hidden_layer(y) 
        y = self.output_layer(y)
        return y

[5] 학습 준비 : 실행디바이스, 모델, 최적화, 손실함수, 학습횟수, 학습함수, 평가함수, 예측함수

In [51]:
# 실행 디바이스 선택
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# 학습 횟수
EPOCHS = 50

In [52]:
my_dataset2.target.shape[1]

1

In [53]:
np.unique(targetSR)

array(['setosa', 'versicolor', 'virginica'], dtype=object)

In [54]:
targetSR.nunique()

3

In [55]:
# 모델 인스턴스
IN_DIM, OUT_DIM = my_dataset2.feature.shape[1], targetSR.nunique()
model = CModel(IN_DIM, OUT_DIM)
print(f'IN : {IN_DIM}, OUT : {OUT_DIM}')

IN : 4, OUT : 3


In [56]:
print(model)

CModel(
  (input_layer): Linear(in_features=4, out_features=100, bias=True)
  (reul): ReLU()
  (hidden_layer): Linear(in_features=100, out_features=27, bias=True)
  (output_layer): Linear(in_features=27, out_features=3, bias=True)
)


In [59]:
# 손실함수
LOSS_FN = nn.CrossEntropyLoss().to(DEVICE)

# 최적화 인스턴스
import torch.optim as optim
OPTIMIZER = optim.Adam(model.parameters())

- 학습 및 검증관련 함수 정의

In [63]:
# 학습 진행함수
def training():
    # 학습모드 => 정규화, 경사하강법, 드롭아웃 등의 기능 활성화
    model.train()
    # 배치크기만큼 학습 진행 및 저장
    train_loss = []
    for cnt, (feature, target) in enumerate(trainDL):
        print(cnt, feature, target)
        feature, target = feature.to(DEVICE), target.to(DEVICE)
        target = target.

        # 학습
        pre_target = model(feature)
        print(f'pre_target => {pre_target.shape}, {pre_target.ndim}D')
        print(f'target => {target.shape}, {target.ndim}D')
        
        # 손실함수
        loss = LOSS_FN(pre_target, target)
        train_loss.append(loss)
        
        # W, b 업데이트
        OPTIMIZER.zero_grad()
        loss.backward()
        OPTIMIZER.step()

         # 배치 단위 학습 진행 메세지 출력
        print(f'[Train {cnt} Batch Loss] ==> {loss}')

    # 에포크 단위 학습 진행 메세지 출력
    print(f'[Train Loss] ==> {loss}')

    return train_loss

In [None]:
# 검증 및 평가 진행함수
def testing():
    

In [None]:
# 예측 함수
def predict():
    

In [64]:
for eps in range(EPOCHS):
    # 학습
    train_loss = training()
    
    # 검증
    testing()
    print(f'[{eps}/{EPOCHS}] {sum(train_loss)}/{len(train_loss)}')

0 tensor([[4.4000, 3.2000, 1.3000, 0.2000],
        [5.7000, 3.0000, 4.2000, 1.2000],
        [4.8000, 3.1000, 1.6000, 0.2000],
        [5.6000, 2.9000, 3.6000, 1.3000],
        [6.9000, 3.1000, 4.9000, 1.5000],
        [5.0000, 3.2000, 1.2000, 0.2000],
        [7.4000, 2.8000, 6.1000, 1.9000],
        [5.0000, 3.5000, 1.3000, 0.3000],
        [5.8000, 2.7000, 3.9000, 1.2000],
        [5.1000, 3.5000, 1.4000, 0.3000]], dtype=torch.float64) tensor([[0],
        [1],
        [0],
        [1],
        [1],
        [0],
        [2],
        [0],
        [1],
        [0]], dtype=torch.int32)


RuntimeError: mat1 and mat2 must have the same dtype, but got Double and Float

In [31]:
class LogicsticLinear(nn.Module):
    
    def __init__(self, in_, out_):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(in_, 4),
            nn.ReLU(),
            nn.Linear(4, out_),
            nn.Sigmoid()
        )
    def forward(self, x):
        x = self.layers(x)
        return x

In [36]:
# 학습 인스턴스 생성
in_, out_ = len(trainDL), 3
model = LogicsticLinear(in_, out_)
model

LogicsticLinear(
  (layers): Sequential(
    (0): Linear(in_features=10, out_features=4, bias=True)
    (1): ReLU()
    (2): Linear(in_features=4, out_features=3, bias=True)
    (3): Sigmoid()
  )
)

In [38]:
# 최적화 인스턴스 생성 => 모델에서 사용하는 W, b 변수들 전달
optimizer = optim.Adam(model.parameters())

NameError: name 'optim' is not defined

In [None]:
loss_list = [[], []]
accuaray_list = [[], []]
EPOCHS= 1000  # 학습 횟수 => 샘플 처음부터 끝까지 읽는 것을 기준으로 횟수 지정

for epoch in range(EPOCHS):
         
    train_prediction = model(trainDL)
    # print(f'예측값 : {train_prediction}')

    train_loss = F.cross_entropy(train_prediction, y_train)
    loss_list[0].append(round(train_loss.item(), 2))
    # print(f'[{epoch}] Loss => {round(train_loss.item(), 2)}')

    with torch.no_grad():
        val_prediction = model(validDL)
        val_loss = F.cross_entropy(val_prediction, y_val)
        loss_list[1].append(val_loss.item())
        assert val_loss.requires_grad == False
    
    # W, b 업데이트
    optimizer.zero_grad()
    train_loss.backward()   # 손실함수 계산값으로 미분 진행하여 새로운 W, b 계산
    optimizer.step()  # 새로운 값으로 W, b 업데이트

    train_accuracy = (y_train.argmax(dim = 1) == train_prediction.argmax(dim = 1)).sum() / y_train.shape[0]
    accuaray_list[0].append(round(train_accuracy.item(), 2))
    # print(f'훈련 정확도 => {round(train_accuracy.item(), 2)}')

    val_accuracy = (y_val.argmax(dim = 1) == val_prediction.argmax(dim = 1)).sum() / y_val.shape[0]
    accuaray_list[1].append(round(val_accuracy.item(), 2))
    # print(f'검증 정확도 => {round(val_accuracy.item(), 2)}')

    loss_diff = (val_loss - train_loss).abs()
    if epoch % 10 == 0:
        print(
            f"Epoch {epoch:4d}/{EPOCHS} Train_loss: {train_loss.item():.4f} Val_loss: {val_loss.item():.4f} Diff: {loss_diff.item():.6f}    Train_accuracy: {train_accuracy:.4f} Val_accuracy: {val_accuracy:.4f}"
        )

    if loss_diff.item() >= 0.03:
        count += 1
    else:
        count = 0

    if count >= 5:
        print(
            f"Epoch {epoch:4d}/{EPOCHS} Train_loss: {train_loss.item():.4f} Val_loss: {val_loss.item():.4f} Diff: {loss_diff.item():.6f}    Train_accuracy: {train_accuracy:.4f} Val_accuracy: {val_accuracy:.4f}"
        )
        break