In [45]:
import tensorflow as tf
random_seed = 0

# 12.2 Use Tensorflow like Numpy

tf.constant(): create immutable constant tensor


In [53]:
t = tf.constant([[1., 2., 3.], [4., 5., 6.]])

print("1. Constant Tensor")
print(t) # Show variable
print('')

print(f't.shape: {t.shape}')
print(f't.dtype: {t.dtype}')
print(f't[:, 1:] \n {t[:, 1:]}')

1. Constant Tensor
tf.Tensor(
[[1. 2. 3.]
 [4. 5. 6.]], shape=(2, 3), dtype=float32)

t.shape: (2, 3)
t.dtype: <dtype: 'float32'>
t[:, 1:] 
 [[2. 3.]
 [5. 6.]]


In [54]:
t

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

### 12.2.1 Operation


In [55]:
# Matrix Multiplication
tf.square(t) 
t @ tf.transpose(t)

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[14., 32.],
       [32., 77.]], dtype=float32)>

In [56]:
# Elementwise multiplication
t * t

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)>

#### tf.squeeze

- 크기가 1인 차원(Axis)을 제거


In [57]:
x = tf.constant([[[1], [2], [3]]])  # Shape: (1, 3, 1)
print(x)  # Output: [[[1], [2], [3]]]
print(f"shape before squeeze {x.shape}")
result = tf.squeeze(x)  # Shape: (3,)
print(f"shape after squeeze {result.shape}")
print(result)  # Output: [1, 2, 3]


tf.Tensor(
[[[1]
  [2]
  [3]]], shape=(1, 3, 1), dtype=int32)
shape before squeeze (1, 3, 1)
shape after squeeze (3,)
tf.Tensor([1 2 3], shape=(3,), dtype=int32)


### tf.tile(input, multiples)

- 특정 차원(axis)을 따라 복제(반복)합니다

Parameters

- input: 복제할 대상 텐서.
- multiples: 각 축(axis)에서 반복할 횟수를 나타내는 정수 리스트.


In [58]:
x = tf.constant([[1, 2],
                 [3, 4]])  # Shape: (2, 2)

# Repeat 2 times along axis 0, 3 times along axis 1
result = tf.tile(x, multiples=[2, 3])  # Shape: (4, 6)
print(result)


tf.Tensor(
[[1 2 1 2 1 2]
 [3 4 3 4 3 4]
 [1 2 1 2 1 2]
 [3 4 3 4 3 4]], shape=(4, 6), dtype=int32)


### 12.2.3 Type Casting

- tensorflow does not automatically cast any data types
- any operation between other data type will raise error


In [59]:
t2 = tf.constant(40., dtype = tf.float64)
t2 = tf.cast(t2, tf.float32)

### 12.2.4 Variable

- tf.constant is immutable
- for weight update, need variable to work with


In [64]:
v = tf.Variable([[1., 2., 3.,], [4., 5., 6.]])
v

<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

#### assign

- tf variable valus cannot be normally changed
- need assign function to change variable values


In [71]:
v.assign(2 * v) #[2., 4., 6.]
v[0, 1].assign(42) # (0,1) element become 42
v[:, 2].assign([0.1, 1.0]) # all rows at 2nd column become 0.1, 1.0

# Assign new values for speicif elements
v.scatter_nd_update(
    indices = [[0,0], [1,2]], updates = [100., 200.]
)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[1.00e+02, 4.20e+01, 1.00e-01],
       [5.12e+02, 6.40e+02, 2.00e+02]], dtype=float32)>

### Data Types

**1. `tf.SparseTensor`**

- **설명**: 희소 텐서를 표현. 대부분의 값이 0인 데이터를 효율적으로 저장.
- **사용 예**: 희소 행렬(예: 원-핫 인코딩, NLP에서 단어 벡터).
- **특징**:
  - 좌표(`indices`), 값(`values`), 전체 모양(`dense_shape`)으로 구성.
  - 메모리 사용량을 줄임.

**2. `tf.TensorArray`**

- **설명**: 가변 길이의 텐서 배열. 반복문이나 그래프 모드에서 동적으로 텐서를 저장/추출.
- **사용 예**: RNN, 동적 계산 그래프에서 중간 계산 결과 저장.
- **특징**:
  - 정적 크기 및 동적 크기 지원.
  - GPU 및 TPU와 호환 가능.

**3. `tf.RaggedTensor`**

- **설명**: 불규칙한 길이를 가진 데이터를 표현하는 텐서.
- **사용 예**: NLP(문장 길이가 다른 배치), 시퀀스 데이터.
- **특징**:
  - 행마다 길이가 다를 수 있음.
  - 예: `[[1, 2], [3], [4, 5, 6]]` (모양: `[3, None]`).

**4. `tf.string`**

- **설명**: 문자열 데이터를 다루는 텐서.
- **사용 예**: 텍스트 데이터 처리, 파일 경로 관리.
- **특징**:
  - 텐서 내부에 문자열 저장 가능.
  - 문자열은 이진 데이터도 포함 가능.

**5. `tf.sets`**

- **설명**: 집합 연산을 위한 API 제공(예: 교집합, 합집합, 차집합).
- **사용 예**: NLP에서 단어 집합 처리, 고유 데이터 계산.
- **특징**:
  - `SparseTensor` 기반으로 동작.
  - `tf.sets.intersection`, `tf.sets.union` 등의 함수 제공.

**6. `tf.queue`**

- **설명**: 입력 데이터의 대기열(queue)을 처리. (TensorFlow 1.x에서 주로 사용됨.)
- **사용 예**: 데이터 입력 파이프라인(큐에서 데이터 읽기/쓰기).
- **특징**:
  - FIFOQueue, RandomShuffleQueue 등의 구현 제공.
  - **TensorFlow 2.x에서는 `tf.data` API**로 대체됨.


# 12.3 Customized Model and Training Algorithm


### 12.3.1 Customized Loss Function

- **MSE** gives too much penalty for a large error, so it's very sensitive to noises or outliers
- **MAE** gives too less penalty for a large error, so the training won't be good enough or too slow to train
- **Huber** might be useful for this case
- Many loss functions are already included in the keras, but pretend it doesn exists


In [None]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow import keras

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

input_shape = X_train_scaled.shape[1:]

tf.random.set_seed(42)
model = tf.keras.Sequential([
    tf.keras.layers.Input(input_shape),
    tf.keras.layers.Dense(30, activation="relu", kernel_initializer="he_normal"),
    tf.keras.layers.Dense(1),
])

class HuberLoss(tf.keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)

    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss  = self.threshold * tf.abs(error) - self.threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}
    

model.compile(loss=HuberLoss(2.), optimizer="nadam", metrics=["mae"])

model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))


# Save the model with customized loss
model.save("./120/custom_loss_model.keras")

# Load the model with customized loss
loaded_model = tf.keras.models.load_model("./120/custom_loss_model.keras",
                                   custom_objects={"HuberLoss": HuberLoss})

loaded_model.compile(loss = HuberLoss(3.0), optimizer = 'adam')
#  정상적으로 로드되고 모델을 정상적으로 사용할 수 있음을 보여줍니다.
loaded_model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 462us/step - loss: 2.5190 - mae: 2.0277 - val_loss: 0.4348 - val_mae: 0.6962
Epoch 2/2
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 320us/step - loss: 0.3911 - mae: 0.6600 - val_loss: 0.2791 - val_mae: 0.5451
Epoch 1/2
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 436us/step - loss: 0.2888 - mae: 0.5493 - val_loss: 0.2514 - val_mae: 0.4894
Epoch 2/2
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 303us/step - loss: 0.2400 - mae: 0.5004 - val_loss: 0.2085 - val_mae: 0.4625


<keras.src.callbacks.history.History at 0x2a82e70e0>

### 12.3.3 Customizing activation function, initializer, regulation, and limitation


In [None]:
housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

input_shape = X_train_scaled.shape[1:]


# Define the customized functions
def my_softplus(z):
    return tf.math.log(1.0 + tf.exp(z))

def my_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01 * weights))

def my_positive_weights(weights):  # 반환 값은 tf.nn.relu(weights)입니다.
    return tf.where(weights < 0., tf.zeros_like(weights), weights)

@keras.utils.register_keras_serializable()
class MyL1Regularizer(tf.keras.regularizers.Regularizer):
    def __init__(self, factor):
        self.factor = factor

    def __call__(self, weights):
        return tf.reduce_sum(tf.abs(self.factor * weights))

    def get_config(self):
        return {"factor": self.factor}
    

input_shape = X_train_scaled.shape[1:]
    
# train with the customized model
tf.random.set_seed(42)
model = tf.keras.Sequential([
    tf.keras.layers.Input(input_shape),
    tf.keras.layers.Dense(30, activation="relu", kernel_initializer="he_normal"),
    tf.keras.layers.Dense(1, activation=my_softplus,
                          kernel_initializer=my_glorot_initializer,
                          kernel_regularizer=MyL1Regularizer(0.01))
])

model.compile(loss="mse", optimizer="nadam", metrics=["mae"])

model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

model.save("./120/my_model_with_many_custom_parts.keras")

loaded_model = tf.keras.models.load_model(
    "./120/my_model_with_many_custom_parts.keras",
    custom_objects={
       "my_l1_regularizer": MyL1Regularizer,
       "my_glorot_initializer": my_glorot_initializer,
       "my_softplus": my_softplus,
    }
)

loaded_model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))


loaded_model.layers[-1].kernel_regularizer.get_config()

Epoch 1/2
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 467us/step - loss: 3.0827 - mae: 1.2805 - val_loss: 0.7606 - val_mae: 0.6066
Epoch 2/2
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 324us/step - loss: 0.7635 - mae: 0.6057 - val_loss: 0.5594 - val_mae: 0.5060
Epoch 1/2
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 475us/step - loss: 0.5965 - mae: 0.5237 - val_loss: 0.4857 - val_mae: 0.4651
Epoch 2/2
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 330us/step - loss: 0.5292 - mae: 0.4865 - val_loss: 0.4530 - val_mae: 0.4453


{'factor': 0.01}

### 12.3.4 Customized metrics

- loss and metric are similar but different
- loss is used for training, differentiable and gradient is not for all domain
- metric have no condition but it need to be easy to interpret


In [116]:
model = tf.keras.Sequential([
    tf.keras.layers.Input(input_shape),
    tf.keras.layers.Dense(30, activation="relu", kernel_initializer="he_normal",),
    tf.keras.layers.Dense(1),
])

model.compile(loss = 'mse', optimizer='nadam', metrics = [HuberLoss(2.0)])

model.fit(X_train_scaled, y_train, epochs=2)

Epoch 1/2
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 262us/step - huber_loss_5: 1.2148 - loss: 2.8134
Epoch 2/2
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 239us/step - huber_loss_5: 0.3255 - loss: 0.6772


<keras.src.callbacks.history.History at 0x2b3322c90>

### Streaming Metric

- update the metric as per batch


In [40]:
import tensorflow as tf
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np

# 🔹 캘리포니아 주택 가격 데이터 로드 및 전처리
housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)  # 데이터셋을 학습/테스트로 분할
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)  # 학습 데이터에서 검증 데이터 분할

# 🔹 데이터 정규화 (표준화)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

# 🔹 입력 데이터 차원 설정
input_shape = X_train.shape[1:]  # (특성 개수,)

#  Huber 손실 함수 (손실이 크면 MAE, 작으면 MSE 적용)
class HuberLoss(tf.keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        """
        Huber 손실 함수 초기화
        Args:
            threshold (float): MSE와 MAE를 구분하는 임계값
        """
        self.threshold = threshold
        super().__init__(**kwargs)  # 부모 클래스(tf.keras.losses.Loss) 초기화

    def call(self, y_true, y_pred):
        """
        손실 값 계산
        Args:
            y_true: 실제 값 (정답)
            y_pred: 모델의 예측 값
        Returns:
            Tensor: Huber 손실 값
        """
        error = y_true - y_pred  # 예측 오차 계산
        is_small_error = tf.abs(error) < self.threshold  # 오차 크기 비교

        squared_loss = tf.square(error) / 2  # 작은 오차일 때 MSE 방식
        linear_loss = self.threshold * (tf.abs(error) - self.threshold / 2)  # 큰 오차일 때 MAE 방식

        return tf.where(is_small_error, squared_loss, linear_loss)  # 두 손실 중 해당되는 값 반환

    def get_config(self):
        """
        모델 저장 시 설정 정보 반환
        Returns:
            dict: 설정 값 (threshold 포함)
        """
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

#  Huber 손실을 메트릭으로 사용하기 위한 함수 (HuberLoss와 동일한 수식 사용)
def create_huber(threshold=1.0):
    """
    Huber 손실 계산을 위한 함수 (Metric 용)
    Args:
        threshold (float): MSE와 MAE를 구분하는 임계값
    Returns:
        function: 손실 계산 함수
    """
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = threshold * (tf.abs(error) - threshold / 2)
        return tf.where(is_small_error, squared_loss, linear_loss)
    
    return huber_fn

#  Huber 손실을 평균 계산하여 평가 지표(Metric)로 활용
class HuberMetric(tf.keras.metrics.Mean):
    def __init__(self, threshold=1.0, name='custom_huber_metric', dtype=None):
        """
        Huber 손실을 기반으로 한 평균 메트릭
        Args:
            threshold (float): MSE와 MAE를 구분하는 임계값
        """
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)  # Huber 손실 함수 생성
        super().__init__(name=name, dtype=dtype)  # 부모 클래스(tf.keras.metrics.Mean) 초기화

    def update_state(self, y_true, y_pred, sample_weight=None):
        """
        배치(batch) 단위로 메트릭 값을 업데이트
        Args:
            y_true: 실제 값
            y_pred: 예측 값
            sample_weight (Tensor, optional): 샘플 가중치 (각 데이터 포인트별 중요도 조정)
        """
        metric = self.huber_fn(y_true, y_pred)  # Huber 손실 계산
        super(HuberMetric, self).update_state(metric, sample_weight)  # 평균 값 업데이트

    def get_config(self):
        """
        모델 저장 시 설정 정보 반환
        Returns:
            dict: 설정 값 (threshold 포함)
        """
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

# 🔹 랜덤 시드 설정 (재현 가능성 확보)
tf.random.set_seed(42)

#  모델 생성 (Dense 레이어 2개 사용)
model = tf.keras.Sequential([
    tf.keras.layers.Dense(30, activation="relu", kernel_initializer="he_normal",
                          input_shape=input_shape),  # 첫 번째 Dense 레이어 (30개 뉴런, He 초기화)
    tf.keras.layers.Dense(1),  # 출력층 (1개의 뉴런, 회귀 문제)
])

#  모델 컴파일 (손실 함수, 옵티마이저, 평가 메트릭 설정)
model.compile(loss=HuberLoss(2.0),  # Huber 손실 적용
              optimizer="nadam",  # Nadam 옵티마이저 사용
              metrics=[HuberMetric(2.0)])  # Huber 메트릭 적용

#  샘플 가중치 생성 (랜덤 가중치 부여)
sample_weight = np.random.rand(len(y_train))

#  모델 학습
history = model.fit(X_train_scaled, y_train, epochs=2,
                    sample_weight=sample_weight)  # 샘플 가중치 적용하여 학습

#  학습 손실 및 메트릭 확인
print(
    history.history["loss"][0],  # 첫 번째 에포크 손실
    history.history["custom_huber_metric"][0] * sample_weight.mean()  # 가중치를 반영한 평균 메트릭
)

#  모델 저장
model.save("./120/my_model_with_a_custom_metric.keras")

#  저장된 모델 로드 (커스텀 객체 포함)
loaded_model = tf.keras.models.load_model(
    "./120/my_model_with_a_custom_metric.keras",
    custom_objects={
        "HuberLoss": HuberLoss,
        "HuberMetric": HuberMetric
    }
)

#  로드한 모델 추가 학습
loaded_model.fit(X_train_scaled, y_train, epochs=2)


Epoch 1/2


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 283us/step - custom_huber_metric: 0.5967 - loss: 0.2909
Epoch 2/2
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 247us/step - custom_huber_metric: 0.3324 - loss: 0.1654
Epoch 1/2
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 264us/step - custom_huber_metric: 0.2548 - loss: 0.2548
Epoch 2/2
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 249us/step - custom_huber_metric: 0.2147 - loss: 0.2147


<keras.src.callbacks.history.History at 0x291b20230>

In [39]:
history.history

{'custom_huber_metric': [1.2572898864746094, 0.3699382245540619],
 'loss': [0.6316739916801453, 0.18845683336257935]}

In [36]:
model.metrics[-1]

<CompileMetrics name=compile_metrics>

In [37]:
model.lossx 

<__main__.HuberLoss at 0x287fe6090>

### 12.3.5 Custom Layer


In [55]:
import tensorflow as tf

class MyDense(tf.keras.layers.Layer):  
    def __init__(self, units, activation=None, **kwargs):
        """
        사용자 정의 Dense 레이어 초기화 함수
        
        Args:
            units (int): 뉴런 개수 (출력 차원)
            activation (str 또는 함수, optional): 사용할 활성화 함수 (예: "relu", "sigmoid")
            **kwargs: 추가적인 Keras Layer 인자 (예: name, dtype)
        """
        super().__init__(**kwargs)  # 🔹 부모 클래스(tf.keras.layers.Layer)의 초기화 메서드 호출
        self.units = units  # 🔹 출력 뉴런 개수 저장
        self.activation = tf.keras.activations.get(activation)  # 🔹 활성화 함수 지정 (Keras 제공 함수 사용)

    def build(self, batch_input_shape):
        """
        입력 데이터를 기반으로 가중치 (kernel)와 편향 (bias) 정의

        Args:
            batch_input_shape (tuple): 입력 데이터의 shape (배치 크기 포함)
        """
        # 🔹 가중치 행렬(kernel) 생성 → (입력 차원, 출력 차원)
        self.kernel = self.add_weight(
            name="kernel",  # 변수명 지정
            shape=[batch_input_shape[-1], self.units],  # (입력 특성 개수, 출력 뉴런 개수)
            initializer="he_normal"  # He 초기화 사용 (ReLU 활성화 함수에 적합)
        )

        # 🔹 편향 벡터(bias) 생성 → (출력 차원,)
        self.bias = self.add_weight(
            name="bias",
            shape=[self.units],  # 편향은 출력 뉴런 개수만큼 존재
            initializer="zeros"  # 기본적으로 0으로 초기화
        )

    def call(self, X):
        """
        순전파 (Forward Propagation) 연산 수행
        Args:
            X (Tensor): 입력 데이터 (batch_size, input_dim)
        Returns:
            Tensor: 활성화 함수를 적용한 출력 결과 (batch_size, output_dim)
        """
        # 🔹 Dense 레이어 수식 적용: Y = activation(X @ W + b)
        return self.activation(X @ self.kernel + self.bias)

    def get_config(self):
        """
        모델 저장 및 로드 시 필요한 설정 반환
        Returns:
            dict: 객체의 설정 정보 딕셔너리
        """
        base_config = super().get_config()  # 부모 클래스의 설정 정보 가져오기
        return {
            **base_config,  # 기존 설정 유지
            "units": self.units,  # 뉴런 개수 추가 저장
            "activation": tf.keras.activations.serialize(self.activation)  # 활성화 함수 직렬화
        }

        
        
# 추가 코드 - 사용자 정의 층을 보통처럼 사용할 수 있음을 보여줍니다.
tf.random.set_seed(42)
model = tf.keras.Sequential([
    tf.keras.layers.Input(input_shape),
    MyDense(30, activation="relu"),
    MyDense(1)
])
model.compile(loss="mse", optimizer="nadam")
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))
model.evaluate(X_test_scaled, y_test)
model.save("./120/my_model_with_a_custom_layer.keras")


loaded_model = tf.keras.models.load_model(
    "./120/my_model_with_a_custom_layer.keras",
    custom_objects={
        "MyDense":MyDense
    }
)
loaded_model.fit(X_train_scaled, y_train, epochs=2)
loaded_model.predict(X_test_scaled)

Epoch 1/2
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 463us/step - loss: 4.2519 - val_loss: 0.8888
Epoch 2/2
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 297us/step - loss: 0.7923 - val_loss: 0.6242
[1m162/162[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 174us/step - loss: 0.6235
Epoch 1/2
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 262us/step - loss: 0.6419
Epoch 2/2
[1m363/363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 221us/step - loss: 0.5540
[1m162/162[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 235us/step


array([[0.26484483],
       [1.2860062 ],
       [3.5598931 ],
       ...,
       [1.2615829 ],
       [2.261335  ],
       [3.5348647 ]], dtype=float32)

## Custom Model

**`keras.Model`을 상속한 커스텀 모델 구성 요소**
| 구성 요소 | 설명 | 주요 역할 |
|-----------|--------------------------------|------------------|
| `__init__()` | 모델의 층(layer)과 하이퍼파라미터 정의 | 네트워크 구조 생성 |
| `call(inputs)` | 순전파(forward pass) 정의 | 데이터 입력 → 출력 변환 |
| `build(input_shape)` | 입력 데이터 크기를 기반으로 가중치 초기화 | 가중치 생성 및 배치 크기 고려 |
| `compile()` | 손실 함수, 옵티마이저, 평가 지표 설정 | 학습을 위한 설정 |
| `fit()` | 데이터 학습 | 모델 훈련 실행 |
| `evaluate()` | 테스트 데이터 평가 | 모델 성능 평가 |
| `predict()` | 입력 데이터 예측 | 새로운 데이터에 대한 예측 |
| `get_config()` | 모델 저장 및 로드 시 설정 유지 | 모델 직렬화 지원 |

---


In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

class CustomModel(keras.Model):
    def __init__(self, num_classes=10, dropout_rate=0.5, **kwargs):
        """
        사용자 정의 CNN 모델 (Keras Model 상속)

        Args:
            num_classes (int): 출력 클래스 개수
            dropout_rate (float): 드롭아웃 비율
            **kwargs: 추가적인 Keras Model 설정
        """
        super().__init__(**kwargs)
        self.num_classes = num_classes
        self.dropout_rate = dropout_rate

        # 🔹 CNN 기반 신경망 정의
        self.conv1 = layers.Conv2D(32, (3, 3), activation='relu', name="conv1")
        self.pool1 = layers.MaxPooling2D((2, 2), name="pool1")
        self.conv2 = layers.Conv2D(64, (3, 3), activation='relu', name="conv2")
        self.pool2 = layers.MaxPooling2D((2, 2), name="pool2")
        self.flatten = layers.Flatten(name="flatten")
        self.fc1 = layers.Dense(128, activation='relu', name="fc1")
        self.dropout = layers.Dropout(self.dropout_rate, name="dropout")
        self.fc2 = layers.Dense(num_classes, activation='softmax', name="fc2")

    def call(self, inputs):
        """
        순전파(Forward Propagation) 정의
        """
        x = self.conv1(inputs)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.pool2(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.dropout(x)
        return self.fc2(x)

    def get_config(self):
        """
        모델 저장 및 로드 시 필요한 설정 정보 반환
        Returns:
            dict: 모델의 설정 값 저장 (num_classes, dropout_rate 등)
        """
        config = super().get_config()  # 부모 클래스(Keras Model)의 설정 정보 가져오기
        config.update({
            "num_classes": self.num_classes,  # 클래스 개수 저장
            "dropout_rate": self.dropout_rate  # 드롭아웃 비율 저장
        })
        return config  # 직렬화 가능한 딕셔너리 반환