# Import Libraries

In [1]:
import numpy as np
import tensorflow as tf
import keras
from keras import layers

# Transformer Block

## Positional Encoding

In [4]:
class PositionalEncoding(layers.Layer):
    def __init__(self, sequence_size, output_dim, **kwargs):
        super(PositionalEncoding, self).__init__(**kwargs)
        self.pos_encoding = self.positional_encoding(sequence_size, output_dim)

    def get_angles(self, pos, i, output_dim):
        angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(output_dim))
        return pos * angle_rates

    def positional_encoding(self, sequence_size, output_dim):
        angle_rads = self.get_angles(np.arange(sequence_size)[:, np.newaxis],
                                     np.arange(output_dim)[np.newaxis, :],
                                     output_dim)
        # 배열의 짝수 인덱스(2i)에는 사인 함수 적용
        sines = np.sin(angle_rads[:, 0::2])
        # 배열의 홀수 인덱스(2i+1)에는 코사인 함수 적용
        cosines = np.cos(angle_rads[:, 1::2])
        pos_encoding = np.concatenate([sines, cosines], axis=-1)
        pos_encoding = pos_encoding[np.newaxis, ...]
        return tf.cast(pos_encoding, dtype=tf.float32)

    def call(self, inputs):
        return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]


## Transformer Encoder Layer

In [5]:
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    # Multi-head attention
    x = layers.MultiHeadAttention(
        num_heads=num_heads, key_dim=head_size, dropout=dropout)(inputs, inputs)

    # Add & Norm
    x = layers.LayerNormalization(epsilon=1e-6)(x + inputs)

    # Feed forward network
    ff = layers.Dense(ff_dim, activation="relu")(x)
    ff = layers.Dense(inputs.shape[-1])(ff)
    
    # Add & Norm
    ff = layers.LayerNormalization(epsilon=1e-6)(ff + x)
    
    return ff

## Build Model

In [7]:
def build_model(sequence_size, feature_dim, head_size, num_heads, ff_dim, num_transformer_blocks, mlp_units, dropout=0, mlp_dropout=0):
    inputs = keras.Input(shape=(sequence_size, feature_dim))
    x = PositionalEncoding(sequence_size, feature_dim)(inputs)
    
    for _ in range(num_transformer_blocks):
        x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)

    x = layers.GlobalAveragePooling1D()(x)
    for units in mlp_units:
        x = layers.Dense(units, activation="relu")(x)
        x = layers.Dropout(mlp_dropout)(x)
    outputs = layers.Dense(1)(x)
    
    return keras.Model(inputs, outputs)

In [8]:
model = build_model(
    sequence_size=10,
    feature_dim=4,
    head_size=256,
    num_heads=4,
    ff_dim=512,
    num_transformer_blocks=4,
    mlp_units=[128, 64],
    dropout=0.1,
    mlp_dropout=0.1
)

model.compile(
    optimizer="adam",
    loss="mean_squared_error",
    metrics=["mean_squared_error"]
)

2024-04-13 12:33:17.156509: I metal_plugin/src/device/metal_device.cc:1154] Metal device set to: Apple M2
2024-04-13 12:33:17.156531: I metal_plugin/src/device/metal_device.cc:296] systemMemory: 16.00 GB
2024-04-13 12:33:17.156539: I metal_plugin/src/device/metal_device.cc:313] maxCacheSize: 5.33 GB
2024-04-13 12:33:17.156557: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2024-04-13 12:33:17.156573: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


In [9]:
model.summary()


# Train

In [10]:
# 가상의 데이터 생성
x_train = np.random.random((1000, 10, 4))  # 1000개의 샘플, 각 10개의 시간 스텝, 4개의 피쳐
y_train = np.random.random((1000, 1))  # 1000개의 출력 샘플

# 모델 훈련
model.fit(x_train, y_train, epochs=10, batch_size=32, validation_split=0.2)


Epoch 1/10


2024-04-13 12:33:32.253465: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:117] Plugin optimizer for device_type GPU is enabled.


[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 157ms/step - loss: 0.2097 - mean_squared_error: 0.2097 - val_loss: 0.0916 - val_mean_squared_error: 0.0881
Epoch 2/10
[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 67ms/step - loss: 0.0914 - mean_squared_error: 0.0914 - val_loss: 0.0910 - val_mean_squared_error: 0.0876
Epoch 3/10
[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 64ms/step - loss: 0.0890 - mean_squared_error: 0.0890 - val_loss: 0.0910 - val_mean_squared_error: 0.0876
Epoch 4/10
[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 63ms/step - loss: 0.0900 - mean_squared_error: 0.0900 - val_loss: 0.0939 - val_mean_squared_error: 0.0908
Epoch 5/10
[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 66ms/step - loss: 0.0906 - mean_squared_error: 0.0906 - val_loss: 0.0930 - val_mean_squared_error: 0.0894
Epoch 6/10
[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 63ms/step - loss: 0.0829 - me

<keras.src.callbacks.history.History at 0x30c854b20>

In [11]:
# 새로운 데이터 생성 (예시)
x_new = np.random.random((1, 10, 4))  # 1개의 새로운 샘플, 각 10개의 시간 스텝, 4개의 피쳐

# 예측 수행
y_pred = model.predict(x_new)
print("Predicted Value:", y_pred[0][0])

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
Predicted Value: 0.4558711
