# Sub Class 모델링

모델이란 것은 Input을 Output으로 만들어주는 수식이다.

해당 기능을 수행하는 두가지 클래스가 tf.keras.layers.Layer 와 tf.keras.layers.Mdel클래스이다.

두가지 모두 **연산을 추상화** 하는 것으로 동일한 역할을 하지만, tf.keras.layers.Model 클래스의 경우 모델을 저장하는 기능과 fit 함수를 사용할 수 있다는 점에서 차이가 있다.

- tf.keras.layers.Layer
- tf.keras.layers.Model

In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf

import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

In [2]:
np.random.seed(7777)
tf.random.set_seed(7777)

### Linear Regression을 Layer로 만들어 보자

In [7]:
class LinearRegression(tf.keras.layers.Layer):
    def __init__(self, units=1):
        super(LinearRegression, self).__init__()
        self.units = units
        
    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer='random_normal',
            trainable=True
        )
        self.b = tf.Variable(0.0)
        
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

#### 가상데이터

In [8]:
W_true = np.array([3., 2., 4., 1.]).reshape(4,1)
B_true = np.array([1.])

In [9]:
X = tf.random.normal((500, 4))
noise = tf.random.normal((500, 4))

y = X @ W_true + B_true + noise

In [10]:
opt = tf.keras.optimizers.SGD(learning_rate=0.03)

linear_layer = LinearRegression(1)

In [12]:
for epoch in range(100):
    with tf.GradientTape() as tape:
        y_hat = linear_layer(X)
        loss = tf.reduce_mean(tf.square(y - y_hat))
        
    grads = tape.gradient(loss, linear_layer.trainable_weights)
    opt.apply_gradients(zip(grads, linear_layer.trainable_weights))
    
    if epoch % 10 == 0:
        print('epoch : {} loss : {}'.format(epoch, loss.numpy()))

epoch : 0 loss : 34.02492904663086
epoch : 10 loss : 9.377575874328613
epoch : 20 loss : 3.130983352661133
epoch : 30 loss : 1.5325844287872314
epoch : 40 loss : 1.1196198463439941
epoch : 50 loss : 1.0118862390518188
epoch : 60 loss : 0.9835036993026733
epoch : 70 loss : 0.9759520888328552
epoch : 80 loss : 0.9739224910736084
epoch : 90 loss : 0.9733714461326599


In [13]:
type(y_hat)

tensorflow.python.framework.ops.EagerTensor

### ResNet - Sub Class로 구현하기

1. Residual Block - Layer
2. ResNet - Model

In [15]:
from tensorflow.keras.layers import Input, Conv2D, MaxPool2D, Flatten, Dense, Add

In [30]:
class ResidualBlock(tf.keras.layers.Layer):
    
    def __init__(self, filters=32, filter_match=False):
        super(ResidualBlock, self).__init__()
        self.conv1 = Conv2D(filters, kernel_size=1, padding='same', activation='relu')
        self.conv2 = Conv2D(filters, kernel_size=3, padding='same', activation='relu')
        self.conv3 = Conv2D(filters, kernel_size=1, padding='same', activation='relu')
        self.add = Add()
        self.filters = filters
        self.filter_match = filter_match
        if self.filter_match:
            self.conv_ext = Conv2D(filters, kernel_size=1, padding='same')
            
    def call(self, inputs):
        net1 = self.conv1(inputs)
        net2 = self.conv2(net1)
        net3 = self.conv3(net2)
        if self.filter_match:
            res = self.add([self.conv_ext(inputs), net3])
        else:
            res = self.add([inputs, net3])
            
        return res


In [31]:
class ResNet(tf.keras.Model):
    def __init__(self, num_classes):
        super(ResNet, self).__init__()
        
        self.conv1 = Conv2D(32, kernel_size=3, strides=2, padding='same', activation='relu')
        self.maxp1 = MaxPool2D()
        self.block1 = ResidualBlock(64, True)
        self.block2 = ResidualBlock(64)
        self.maxp2 = MaxPool2D()
        self.flat = Flatten()
        self.dense = Dense(num_classes)
        
    def call(self, inputs):
        x = self.conv1(inputs)
        x = self.maxp1(x)
        x = self.block1(x)
        x = self.block2(x)
        x = self.maxp2(x)
        x = self.flat(x)
        return self.dense(x)

In [32]:
model = ResNet(10)

#### 학습 시켜보기

In [33]:
class DataLoader():
    
    def __init__(self):
        (self.train_x, self.train_y),(self.test_x, self.test_y) = tf.keras.datasets.cifar10.load_data()
        
    def validate_pixel_scale(self, x):
        return 255 >= x.max() and 0 <= x.min()
    
    def scale(self, x):
        return (x / 255.0).astype(np.float32)
    
    def preprocess_dataset(self, dataset):
        feature, target = dataset
        
        validated_x = np.array([x for x in feature if self.validate_pixel_scale(x)])
        validated_y = np.array([y for x, y in zip(feature, target) if self.validate_pixel_scale(x)])
        
        # scale
        scaled_x = np.array([self.scale(x) for x in validated_x])
    
        # label encoding
        ohe_y = np.array([tf.keras.utils.to_categorical(y, num_classes=10) for y in validated_y])
        
        return scaled_x, np.squeeze(ohe_y, axis=1)
    
    def get_train_dataset(self):
        return self.preprocess_dataset((self.train_x, self.train_y))
    
    def get_test_dataset(self):
        return self.preprocess_dataset((self.test_x, self.test_y))

In [34]:
loader = DataLoader()

train_x, train_y = loader.get_train_dataset()

test_x, test_y = loader.get_train_dataset()

print(train_x.shape)
print(test_x.shape)
print(train_y.shape)
print(test_y.shape)

(50000, 32, 32, 3)
(50000, 32, 32, 3)
(50000, 10)
(50000, 10)


In [35]:
lr = 0.03
opt = tf.keras.optimizers.Adam(lr)
loss = tf.keras.losses.categorical_crossentropy

model.compile(optimizer=opt, loss=loss, metrics=['accuracy'])

In [37]:
hist = model.fit(train_x, train_y, epochs=1, batch_size=128, validation_data=(test_x, test_y))

