In [None]:
'''
https://velog.io/@gibonki77/ResNetwithPyTorch?fbclid=IwAR2uQa5gtIAXk_D-Sd4FLhQmLIntwvAgeSttG_cT-nxzk5sRXIU6nnF1m2g
'''

import torch
import torch.nn as nn
import torch.nn.functional as F

import torch.optim as optim

import torchvision.transforms as transforms  # 이미지 변환

class Conv_block(nn.Module):
    def __init__(self, in_channels, out_channels, activation=True):
        super(Conv_block, self).__init__()
        
        self.relu = nn.ReLU()
        self.conv = nn.Conv2d(in_channels, out_channels)
        self.batch_norm = nn.BatchNorm2d(out_channels)
        self.activation = activation
        
        
    def forward(self, x):
        if not self.activation:
            return self.batch_norm(self.conv(x))
        
        return self.relu(self.batch_norm(self.conv(x)))
        
        
class Res_block(nn.Module):
    def __init__(self, in_channels, red_channels, out_channels, is_plain=False):
        super(Res_block, self).__init__()
        self.relu = nn.ReLU()
        self.is_plain = is_plain
        
        if in_channels == 64:
            self.conv_seq = nn.Seqential(
                                    Conv_block(in_channels, red_channels, kernel_size = 1, padding=0)
                                    Conv_block(red_channels, red_channels, kernel_size = 3, padding =1)
                                    Conv_block(red_channels, out_channels, activation=False, kernel_size=1, padding=0)
            )
        elif in_channels == out_channels:
            self.conv_seq = nn.Seqential(
                                    Conv_block(in_channels, red_channels, kernel_size = 1, padding=0)
                                    Conv_block(red_channels, red_channels, kernel_size = 3, padding =1)
                                    Conv_block(red_channels, out_channels, activation=False, kernel_size=1, padding=0)
            )
            self.iden = nn.Identity()
        else :
            self.conv_seq = nn.Seqential(
                                    Conv_block(in_channels, red_channels, kernel_size = 1, padding=0, stride=2)
                                    Conv_block(red_channels, red_channels, kernel_size = 3, padding =1)
                                    Conv_block(red_channels, out_channels, activation=False, kernel_size=1, padding=0)
            )
            self.iden = nn.Identity()
        
    def forward(self, x):
        y = self.conv_seq(x)
        if self.is_plain:
            x = y
        else:
            x = y + self.iden(x)
        x = self.relu(x)
        return x
        
print("success")

In [None]:

class ResNet(nn.Module):
    def __init__(self, in_channels=3, num_classes=1000, is_plain=False):
        self.num_classes = num_classes
        
        super(ResNet, self).__init__()
        
        self.conv1 = Conv_block(in_channels=in_channels, out_channels=64, kernel_size=7, stride=2, padding=3)
        self.maxpool1 = nn.MaxPool2d(kernel_size = 3, stride=2, padding=1)
        
        self.conv2_x = nn.Sequential(
                                Res_block(64, 64, 256, is_plain),
                                Res_block(256, 64, 256, is_plain),
                                Res_block(256, 64, 256, is_plain)      
        )
        self.conv3_x = nn.Sequential(
                                Res_block(256, 128, 512, is_plain),
                                Res_block(512, 128, 512, is_plain),
                                Res_block(512, 128, 512, is_plain),
                                Res_block(512, 128, 512, is_plain)      
        )
        self.conv4_x = nn.Sequential(
                                Res_block(512, 256, 1024, is_plain),
                                Res_block(1024, 256, 1024, is_plain),
                                Res_block(1024, 256, 1024, is_plain),
                                Res_block(1024, 256, 1024, is_plain),
                                Res_block(1024, 256, 1024, is_plain),
                                Res_block(1024, 256, 1024, is_plain),      
        )
        self.conv5_x = nn.Sequential(
                                Res_block(1024, 512, 2048, is_plain),
                                Res_block(1024, 512, 2048, is_plain),
                                Res_block(1024, 512, 2048, is_plain),      
        )
        
        self.avg_pool = nn.AvgPool2d(kernel_size = 7, stride=1)
        self.fc = nn.Linear(2048, num_classes)
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.maxpool1(x)
        x = self.conv2_x(x)
        x = self.conv3_x(x)
        x = self.conv4_x(x)
        x = self.conv5_x(x)
        x = self.avg_pool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)
        return x
        

In [None]:
from torchsummary import summary as summary_

def build_resnet(input_shape=(3,225,225), is_50 = True, is_plain=False):
    
    x = torch.randn(2, *input_shape).to(device)
    
    if is_50 :
        model = ResNet(is_plain=is_plain).to(device)
        
        assert = model(x).shape == torch.Size([2, model.num_classes])
        
        if is_plain == False:
            print("ResNet 50")
        if is_plain == True:
            print("PlainNet50 ")
            
        print(summary_(model, (3,225,225), batch_size = 2))
        
        return model
    
    model = ResNet_34(is_plain).to(device)
    
    assert model(x).shape == torch.Size([2, model.num_classes])
    
    if is_plain == False:
        print("ResNet 34 ")
    
    if is_plain == True:
        print("PlainNet 34")
    
    print(summary_(model, (3,225,255), batch_size = 2))
    
    return model
        
        
        

In [None]:
# device = "cuda" if torch.cuda.is_available() else "cpu"
# print("using {}".format(device))

device = torch.device("cuda")


model = build_resnet().to(device)

loss = nn.MSELoss(reduction = 'sum')
optimizer = optim.Adam(model.parameters(), lr = 0.001)


In [None]:
def train(model, loss_func, optimizer):
    
    best_model_weight = copy.deepcopy(model.state_dict())
    best_acc = 0
    
    EPOCH = 100
    for epoch in range(EPOCH):
        print("epoch : {}/{} \n".format(epoch,EPOCH))
        
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()
                
            current_loss = 0
            current_correct = 0
            
            for inputs, labels in database['phase']:
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                optimizer.zero_grad()
                
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs,1)
                    loss = loss_func(outputs, labels)
                    
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                        
                current_loss += loss.item() * inputs.size(0)
                current_correct += torch.sum(preds == labels.data)
                
            epoch_loss = current_loss / database[phase]
            epoch_acc = current_correct.double() / database[phase]
            
            
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_weight = copy.deepcopy(model.state_dict())
    
    model.load_state_dict(best_model_wight)
    
    return model
    
def train(model):
    model.train(True)
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr = 0.001)
    
    for _ in range(batches):
        inputs = dataset
    
        optimizer.zero_grad()  # 변화도 초기화 (0)
        outputs = model(inputs.to(device))  # 순전파 학습

        labels = labels.to(outputs.device)
        
        loss = loss_fn(outputs, labels)  # 결과 해석
        loss.backward() # 역전파 단계 / 해당 단계를 거치면 inputs.grad에 변화도(gradient) 저장
        
        optimizer.step()  # 경사 하강법 (gradient descent)
    
    
def test(model, dataset, loss, optimizer):
    model.eval()
    total_loss = 0
    with torch.no_grad():   # 변화도 계산 안함. eval이나 test용으로 사용
        for i in range(dataset):
            data, targets = get_batch(dataset, i)
            
# 매개변수 고정
model = Net(require_grad = True) # require_grad : True = 변화도 계산 / False = 변화도 계산 X (전이학습 용)

for param in model.parameters():
    param.requires_grad = False

model.fc = nn.Linear(126,10) # 분류기 layer / 10개의 항목 분류

optimizer = optim.Adam(model.parameters(), lr = 0.001)


In [None]:
Path = './*.pt'
# Path = './*.pth'

## state_dict 활용 모델 저장 & 호출 (모델의 매개변수만 저장, 사용할 땐 모델 선언 후 인수로 투입)
torch.save(model.state_dict(), Path)

model = build_resnet()
model.load_state_dict(torch.load(Path))
model.eval()


## 전체 모델 저장 & 호출 ()
torch.save(model, Path)

model = torch.load(Path)
model.eval()

## checkpoint 저장 & 호출
EPOCH = 100
LOSS = 0.01

torch.save({
    'epoch':EPOCH,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict' : optimizer.state_dict(),
    'loss':LOSS,
}, Path)

model = build_resnet()
optimizer = optim.Adam(model.parameter(), lr = 0.001)

checkpoint = torch.load(Path)

model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
epoch = checkpoint['epoch']
loss = checkpoint['loss']

model.eval()
model.train()

## 여러 모델을 하나의 파일에 저장 & 호출

model_A = build_resnet()
model_B = build_resnet()

optimizer_A = optim.Adam(model_A.parameters(), lr = 0.001)
optimizer_B = optim.Adam(model_B.parameters(), lr = 0.001)

torch.save({
    'model_A_state_dict': model_A.state_dict(),
    'model_B_state_dict': model_B.state_dict(),
    'optimizer_A_state_dict': optimizer_A.state_dict(),
    'optimizer_B_state_dict': optimizer_B.state_dict()
}, Path)

## 부분적인 모델 파라미터 호출 예시

model_A = Net_A()
torch.save(model_A.state_dict(), Path)

model_B = Net_B()
model_B = load_state_dict(torch.load(Path), strict=False)

## 저장한 곳 -> 불러오는 곳 (GPU -> CPU / GPU -> GPU / CPU -> GPU)

device = torch.device('cpu')
model = Net()
model.load_state_dict(torch.load(Path, map_location = device))

device = torch.device("cuda")
model = Net()
model.load_state_dict(torch.load(Path))
model.to(device)

device = torch.device("cuda")
model = Net()
model.load_state_dict(torch.load(Path, map_location="cuda:0"))
model.to(device)

## 이미지 변환 함수
from PIL import Image

def transform_image(image):
    input_transforms = [transforms.Resize(255),
                        transforms.CenterCrop(225),
                        transforms.ToTensor(),
                        transforms.Normalize([]),        
    ]
    my_transforms = transforms.Compose(input_transforms)
    
    img = Image.open(image)
#     img = Imga.open(io.BytesIO(image)) # 이미지를 bytes 단위로 읽음
    
    t_img = my_transforms(img)
    t_img.unsqueeze_(0)
    
    return t_img

## 예측 결과, class 일치 여부 확인
def render_prediction(prediction_idx):
    str_idx = str(prediction_idx)
    class_name = 'unknown'
    
    if img_class_list is not None:
        if str_idx in img_class_list is not None:
            class_name = image_class_list[str_idx][1]
            
    return prediction_idx, class_name


In [None]:
### encoder & decoder























In [None]:
### torch GAN


    
    
    
    
    
    
    
    
    
    

In [None]:
# 모델 배포

model = Net()
model.eval()

# 성능을 동일하게 유지하면서 모델의 크기를 줄이기 위한 모델 양자화(quantization)
# LSTM 등 시계열 모델은 동적 양자화에 최적화

backend = "fbgemm"

model.qconfig = torch.quantization.get_default_qconfig(backend)
torch.backends.quantized.engine = backend

quantized_model = torch.quantization.quantize_dynamic(model, qconfig_spec = {torch.nn.Linear}, dtype = torch.qint8)

# 양자화된 모델을 모바일에서 실행 가능할 수 있도록 TorchScript 형식으로 변환
# 모델에 제어흐름이 있는 경우는 script, 제어가 없는 간단한 모델일 경우 trace


# scripted_quantized_model = torch.jit.trace(quantized_model)
scripted_quantized_model = torch.jit.script(quantized_model)
scripted_quantized_model.save(Path + "/scripted_qauntized_model.pt")


# 모바일 최적화
from torch.utils.mobile_optimizer import optimize_for_mobile
optimized_scripted_quantized_model = optimize_for_mobile(scripted_quantized_model)
optimized_scripted_quantized_model.save(Path + "/optimized_scripted_qauntized_model.pt")


# 라이트 인터프리터 적용 (용량 감소로 속도 증가 도모)
optimized_scripted_quantized_model._save_for_lite_interpreter("lite_optimized_scripted_qauntized_model.pt")


# 테스트 코드 / 보통 모바일에서는 cpu를 사용하니 cpu 성능으로 테스트
with torch.autograd.profiler.profile(use_cuda=False) as profile:
    output = optimized_scripted_quantized_model(input_image)
    
print("{:.2f}".format(profile.self_cpu_time_total)/1000)




In [None]:
## 가지치기

import torch.nn.utils.prune as prune

model = Net().to(device)

module = model.conv1

prune.random_unstructured(module, name = "test", amount=0.3)






In [None]:
## Pytorch 모델을 C++ 에서 로드하기

# LibTorch 사용

include <torch/script.h>


In [6]:
## Flask 배포 (Rest API)

from flask import Flask, jsonify
from flask import request

app = Flask(__name__)

model = Net()
model.eval()

## 예측

def get_prediction(image):
    tensor = transform_image(image)
    outputs = model.forward(tensor)
    _, y_hat = outputs.max(1)
    
    image_class = y_hat.item()

    return image_class



@app.route('/predict', methods=['POST'])
def predict():
    if request.method == 'POST':
        file = request.files['file']
        
        image = file.read()
        
        class_id, class_name = get_prediction(image)
    
        return jsonify({'class_id': 'id', 'class_name':'name'})


if __name__ == '__main__':
    app.run()

    
# $ FLASK_ENV = development FLASK_APP = app.py flask run

# 잘못된 형식의 파일을 전송할 경우를 대비해 예외 처리가 필요함.
# 모델이 파일의 데이터 자체를 인식 못할 경우가 발생하니 이를 방지해야함.
# 

In [None]:
## Module sharing

class Module_A(nn.Module):
    def __init__(self, Module_S):
        super(Module_A, self).__init__()
        
        self.Module_S = Module_S()
        
    def forward(self, input_data ):
        X1 = self.Module_S(input_data)
        X2 = 
        
        X = torch.cat((X1, X2), dim=1)
        X = F.relu(~~)
        
        return X
    

class Module_B(nn.Module):
    def __init__(self, Module_S):
        super(Module_B, self).__init__()
        
        self.Module_S = Module_S()
        
    def forward(self, input_data ):
        X1 = self.Module_S(input_data)
        X2 = 
        
        X = torch.cat((X1, X2), dim=1)
        X = F.relu(~~)
        
        return X

    
class Module_S(nn.Module):
    def __init__(self,):
        super(Module_S, self).__init__()
        
    def forward(self, input, ):
        
        return output

    
Shared_Module = Module_S()

Model_A = Module_A(Module_S = Shared_Module)
Model_B = Module_B(Module_S = Shared_Module)

def train(Model_A, Model_B, dataset):
    global Shared_Module
    
    EPOCH = 100
    
    For_A, For_B = dataset
    
    for i in range(dataset):
        
        if i % 2 == 0:
            inputs, targets = get_batch(For_A)
            output = Model_A.train(inputs)
            
            loss_A = loss_function(output, targets)
            loss_A.backward()
            
        else:
            inputs, targets = get_batch(For_B)
            output = Model_B.train(inputs, tragets)
            
            loss_B = loss_function(output, targets)
            loss_B.backward()
            
    Shared_Module.save(Path)

    
    

In [14]:
from sklearn.datasets import load_digits
import torch

digits = load_digits()
X = digits.data
y = digits.target

print(X)
print(y)

print(type(X))
print(type(y))

print(X.shape)
print(y.shape)

X = torch.tensor(X)
y = torch.tensor(y)

print(type(X))
print(type(y))

print(X.shape)
print(y.shape)

print(y[0])

[[ 0.  0.  5. ...  0.  0.  0.]
 [ 0.  0.  0. ... 10.  0.  0.]
 [ 0.  0.  0. ... 16.  9.  0.]
 ...
 [ 0.  0.  1. ...  6.  0.  0.]
 [ 0.  0.  2. ... 12.  0.  0.]
 [ 0.  0. 10. ... 12.  1.  0.]]
[0 1 2 ... 8 9 8]
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
(1797, 64)
(1797,)
<class 'torch.Tensor'>
<class 'torch.Tensor'>
torch.Size([1797, 64])
torch.Size([1797])
tensor(0)


In [16]:
print(X.shape)
print(X[0].shape)
print(X[0][0].shape)

torch.Size([1797, 64])
torch.Size([64])
torch.Size([])


IndexError: invalid index of a 0-dim tensor. Use `tensor.item()` in Python or `tensor.item<T>()` in C++ to convert a 0-dim tensor to a number

In [19]:
digits.data

{'data': array([[ 0.,  0.,  5., ...,  0.,  0.,  0.],
        [ 0.,  0.,  0., ..., 10.,  0.,  0.],
        [ 0.,  0.,  0., ..., 16.,  9.,  0.],
        ...,
        [ 0.,  0.,  1., ...,  6.,  0.,  0.],
        [ 0.,  0.,  2., ..., 12.,  0.,  0.],
        [ 0.,  0., 10., ..., 12.,  1.,  0.]]),
 'target': array([0, 1, 2, ..., 8, 9, 8]),
 'frame': None,
 'feature_names': ['pixel_0_0',
  'pixel_0_1',
  'pixel_0_2',
  'pixel_0_3',
  'pixel_0_4',
  'pixel_0_5',
  'pixel_0_6',
  'pixel_0_7',
  'pixel_1_0',
  'pixel_1_1',
  'pixel_1_2',
  'pixel_1_3',
  'pixel_1_4',
  'pixel_1_5',
  'pixel_1_6',
  'pixel_1_7',
  'pixel_2_0',
  'pixel_2_1',
  'pixel_2_2',
  'pixel_2_3',
  'pixel_2_4',
  'pixel_2_5',
  'pixel_2_6',
  'pixel_2_7',
  'pixel_3_0',
  'pixel_3_1',
  'pixel_3_2',
  'pixel_3_3',
  'pixel_3_4',
  'pixel_3_5',
  'pixel_3_6',
  'pixel_3_7',
  'pixel_4_0',
  'pixel_4_1',
  'pixel_4_2',
  'pixel_4_3',
  'pixel_4_4',
  'pixel_4_5',
  'pixel_4_6',
  'pixel_4_7',
  'pixel_5_0',
  'pixel_5_1',
 