2017년에 구글에서 발표한 "Attention is All You Need" 논문에서는 Transformer 모델이 제안

Transformer 모델은 어텐션 메커니즘을 기반으로 하여, 기존의 RNN이나 LSTM을 대체하는 순차 처리 대신 병렬 처리를 가능하게 했습니다.

특히 Self-Attention과 Multi-Head Attention 메커니즘이 핵심입니다.

 Transformer 모델 구현 코드 (구글 어텐션 논문 구현)

[트랜스포머는 크게 다음과 같은 주요 구성 요소]

입력 임베딩(Input Embedding): 입력 토큰을 고차원 벡터로 변환.

포지셔널 인코딩(Positional Encoding): 위치 정보를 입력 벡터에 추가.

인코더 블록(Encoder Block): Self-Attention과 피드포워드 층을 포함하는 인코더.

디코더 블록(Decoder Block): Masked Multi-Head Attention과 피드포워드 층을 포함하는 디코더

.
출력층(Output Layer): 예측된 토큰을 확률로 변환하는 소프트맥스 층.

# 영어를 한국어로

In [None]:
# i am a student

## 사전학습된 경우가 아닌 경우

In [None]:
# Transformer 모델을 직접 구현하기 위해서는 기본적으로 TensorFlow 또는 PyTorch 같은 딥러닝 프레임워크를 사용해야 하며, 기본적인 Transformer 구조를 정의해야 합니다.
# 여기서는 TensorFlow를 사용하여 Transformer 모델을 직접 구성하는 예제를 보여드리겠습니다.

In [None]:
# 1. Transformer 모델 구현 개요
# Transformer는 인코더와 디코더로 구성된 자연어 처리(NLP) 모델입니다. 번역 작업에서, 인코더는 입력 문장을 처리하고, 디코더는 번역된 결과 문장을 생성합니다. 우리는 직접 데이터를 준비해 Transformer 모델을 훈련시키고 영어 문장을 한국어로 번역할 수 있습니다.

# 이 코드 예제는 Transformer 모델을 직접 정의하여 사용하는 과정 중 일부만을 다룹니다.
# 실제로 이 모델을 학습하려면 많은 데이터와 연산 자원이 필요하지만, 여기서는 간단한 Transformer 모델 구조를 보여드립니다.

In [4]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, Embedding, LayerNormalization, Dropout
from tensorflow.keras import Model

# Multi-head Attention Layer
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads

        self.wq = Dense(d_model)
        self.wk = Dense(d_model)
        self.wv = Dense(d_model)

        self.dense = Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)

        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        scaled_attention_logits = tf.matmul(q, k, transpose_b=True) / tf.math.sqrt(tf.cast(self.depth, tf.float32))

        if mask is not None:
            scaled_attention_logits += (mask * -1e9)

        attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)

        output = tf.matmul(attention_weights, v)
        output = tf.transpose(output, perm=[0, 2, 1, 3])
        output = tf.reshape(output, (batch_size, -1, self.d_model))
        return self.dense(output)

# Transformer Encoder Layer
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, dropout_rate=0.1):
        super(EncoderLayer, self).__init__()

        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = tf.keras.Sequential([Dense(dff, activation='relu'), Dense(d_model)])

        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)

        self.dropout1 = Dropout(dropout_rate)
        self.dropout2 = Dropout(dropout_rate)

    def call(self, x, training, mask):
        attn_output = self.mha(x, x, x, mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)

        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

# 전체 Transformer 모델 구현 (간단히 인코더만 포함)
class Transformer(Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, max_pos_encoding, dropout_rate=0.1):
        super(Transformer, self).__init__()

        self.encoder_layers = [EncoderLayer(d_model, num_heads, dff, dropout_rate) for _ in range(num_layers)]
        self.embedding = Embedding(input_vocab_size, d_model)
        self.pos_encoding = self.positional_encoding(max_pos_encoding, d_model)
        self.dropout = Dropout(dropout_rate)
        self.final_layer = Dense(target_vocab_size)

    def positional_encoding(self, position, d_model):
        angle_rads = self.get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model)
        angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
        angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
        pos_encoding = angle_rads[np.newaxis, ...]
        return tf.cast(pos_encoding, dtype=tf.float32)

    def call(self, x, training, mask):
        seq_len = tf.shape(x)[1]
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]
        x = self.dropout(x, training=training)

        for encoder_layer in self.encoder_layers:
            x = encoder_layer(x, training, mask)

        return self.final_layer(x)


In [14]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, Embedding, LayerNormalization, Dropout
from tensorflow.keras import Model

# Multi-head Attention Layer
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads

        self.wq = Dense(d_model)
        self.wk = Dense(d_model)
        self.wv = Dense(d_model)

        self.dense = Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)

        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        scaled_attention_logits = tf.matmul(q, k, transpose_b=True) / tf.math.sqrt(tf.cast(self.depth, tf.float32))

        if mask is not None:
            scaled_attention_logits += (mask * -1e9)

        attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)

        output = tf.matmul(attention_weights, v)
        output = tf.transpose(output, perm=[0, 2, 1, 3])
        output = tf.reshape(output, (batch_size, -1, self.d_model))
        return self.dense(output)

# Transformer Encoder Layer
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, dropout_rate=0.1):
        super(EncoderLayer, self).__init__()

        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = tf.keras.Sequential([Dense(dff, activation='relu'), Dense(d_model)])

        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)

        self.dropout1 = Dropout(dropout_rate)
        self.dropout2 = Dropout(dropout_rate)

    def call(self, x, training, mask):
        attn_output = self.mha(x, x, x, mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)

        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

# 전체 Transformer 모델 구현 (간단히 인코더만 포함)
class Transformer(Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, max_pos_encoding, dropout_rate=0.1):
        super(Transformer, self).__init__()

        self.encoder_layers = [EncoderLayer(d_model, num_heads, dff, dropout_rate) for _ in range(num_layers)]
        self.embedding = Embedding(input_vocab_size, d_model)
        self.pos_encoding = self.positional_encoding(max_pos_encoding, d_model)
        self.dropout = Dropout(dropout_rate)
        self.final_layer = Dense(target_vocab_size)

    def positional_encoding(self, position, d_model):
        angle_rads = self.get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model)
        angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
        angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
        pos_encoding = angle_rads[np.newaxis, ...]
        return tf.cast(pos_encoding, dtype=tf.float32)

    def call(self, x, training, mask):
        seq_len = tf.shape(x)[1]
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]
        x = self.dropout(x, training=training)

        for encoder_layer in self.encoder_layers:
            x = encoder_layer(x, training, mask)

        return self.final_layer(x)


# Multi-head Attention Layer (이미 구현된 부분)
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads
        self.wq = Dense(d_model)
        self.wk = Dense(d_model)
        self.wv = Dense(d_model)
        self.dense = Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]
        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)
        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        scaled_attention_logits = tf.matmul(q, k, transpose_b=True) / tf.math.sqrt(tf.cast(self.depth, tf.float32))

        if mask is not None:
            scaled_attention_logits += (mask * -1e9)

        attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
        output = tf.matmul(attention_weights, v)
        output = tf.transpose(output, perm=[0, 2, 1, 3])
        output = tf.reshape(output, (batch_size, -1, self.d_model))
        return self.dense(output)

# 디코더 레이어
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, dropout_rate=0.1):
        super(DecoderLayer, self).__init__()

        self.mha1 = MultiHeadAttention(d_model, num_heads)  # Self-attention
        self.mha2 = MultiHeadAttention(d_model, num_heads)  # Enc-Dec attention

        self.ffn = tf.keras.Sequential([Dense(dff, activation='relu'), Dense(d_model)])

        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.layernorm3 = LayerNormalization(epsilon=1e-6)

        self.dropout1 = Dropout(dropout_rate)
        self.dropout2 = Dropout(dropout_rate)
        self.dropout3 = Dropout(dropout_rate)

    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        attn1 = self.mha1(x, x, x, look_ahead_mask)  # Self-attention
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)

        attn2 = self.mha2(enc_output, enc_output, out1, padding_mask)  # Enc-Dec attention
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2 + out1)

        ffn_output = self.ffn(out2)  # Feed forward network
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(ffn_output + out2)

        return out3

# 전체 Transformer 모델
class Transformer(Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, max_pos_encoding, dropout_rate=0.1):
        super(Transformer, self).__init__()

        self.encoder_layers = [EncoderLayer(d_model, num_heads, dff, dropout_rate) for _ in range(num_layers)]
        self.decoder_layers = [DecoderLayer(d_model, num_heads, dff, dropout_rate) for _ in range(num_layers)]

        self.embedding = Embedding(input_vocab_size, d_model)
        self.pos_encoding = self.positional_encoding(max_pos_encoding, d_model)
        self.dropout = Dropout(dropout_rate)
        self.final_layer = Dense(target_vocab_size)

    def positional_encoding(self, position, d_model):
        angle_rads = self.get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model)
        angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
        angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
        pos_encoding = angle_rads[np.newaxis, ...]
        return tf.cast(pos_encoding, dtype=tf.float32)

    def call(self, inp, tar, training, enc_padding_mask, look_ahead_mask, dec_padding_mask):
        # 인코더 처리
        enc_output = inp
        for encoder_layer in self.encoder_layers:
            enc_output = encoder_layer(enc_output, training, enc_padding_mask)

        # 디코더 처리
        dec_output = tar
        for decoder_layer in self.decoder_layers:
            dec_output = decoder_layer(dec_output, enc_output, training, look_ahead_mask, dec_padding_mask)

        # 최종 출력
        final_output = self.final_layer(dec_output)
        return final_output



In [18]:
# 가상 데이터 예시
import numpy as np

# 입력 데이터
sample_input = np.array([[1, 2, 3, 4]])  # "I am a student"에 해당하는 인덱스
sample_target = np.array([[5, 6, 7, 8]])  # "나는 학생입니다"에 해당하는 인덱스

import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Dense, Embedding, LayerNormalization, Dropout
from tensorflow.keras import Model

class Transformer(Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, max_pos_encoding, dropout_rate=0.1):
        super(Transformer, self).__init__()

        # 인코더와 디코더 레이어 생성
        self.encoder_layers = [EncoderLayer(d_model, num_heads, dff, dropout_rate) for _ in range(num_layers)]
        self.decoder_layers = [DecoderLayer(d_model, num_heads, dff, dropout_rate) for _ in range(num_layers)]

        self.embedding = Embedding(input_vocab_size, d_model)
        self.pos_encoding = self.positional_encoding(max_pos_encoding, d_model)
        self.dropout = Dropout(dropout_rate)
        self.final_layer = Dense(target_vocab_size)

    def get_angles(self, pos, i, d_model):
        """
        포지셔널 인코딩을 위한 각도 계산 함수
        pos: 각 단어의 위치
        i: 임베딩 벡터의 차원
        d_model: 모델의 임베딩 차원
        """
        angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
        return pos * angle_rates

    def positional_encoding(self, position, d_model):
        angle_rads = self.get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model)
        angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])  # 짝수 인덱스에 대해 사인 적용
        angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])  # 홀수 인덱스에 대해 코사인 적용
        pos_encoding = angle_rads[np.newaxis, ...]
        return tf.cast(pos_encoding, dtype=tf.float32)

    def call(self, inp, tar, training, enc_padding_mask, look_ahead_mask, dec_padding_mask):
        # 인코더 처리
        enc_output = inp
        for encoder_layer in self.encoder_layers:
            enc_output = encoder_layer(enc_output, training, enc_padding_mask)

        # 디코더 처리
        dec_output = tar
        for decoder_layer in self.decoder_layers:
            dec_output = decoder_layer(dec_output, enc_output, training, look_ahead_mask, dec_padding_mask)

        # 최종 출력
        final_output = self.final_layer(dec_output)
        return final_output


# Transformer 모델 생성
sample_transformer = Transformer(
    num_layers=2,
    d_model=512,
    num_heads=8,
    dff=2048,
    input_vocab_size=10000,
    target_vocab_size=10000,
    max_pos_encoding=5000
)

# Transformer 모델 생성 후 호출할 때
output = sample_transformer(
    inp=sample_input,              # 입력 인자
    tar=sample_target,             # 타겟 인자
    training=False,                # training을 명시적인 키워드 인자로 전달
    enc_padding_mask=None,         # 패딩 마스크
    look_ahead_mask=None,          # look-ahead 마스크
    dec_padding_mask=None          # 디코더 패딩 마스크
)


# 모델을 통해 번역 결과 생성
output = sample_transformer(
    inp=sample_input,
    tar=sample_target,
    training=False,
    enc_padding_mask=None,
    look_ahead_mask=None,
    dec_padding_mask=None
)

print("번역된 결과:", output)


ValueError: Exception encountered when calling Transformer.call().

[1mOnly input tensors may be passed as positional arguments. The following argument value should be passed as a keyword argument: False (of type <class 'bool'>)[0m

Arguments received by Transformer.call():
  • inp=tf.Tensor(shape=(1, 4), dtype=int64)
  • tar=tf.Tensor(shape=(1, 4), dtype=int64)
  • training=False
  • enc_padding_mask=None
  • look_ahead_mask=None
  • dec_padding_mask=None

In [9]:
# 디코드 과정

In [7]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, Embedding, LayerNormalization, Dropout
from tensorflow.keras import Model

# Multi-head Attention Layer (이미 구현된 부분)
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads
        self.wq = Dense(d_model)
        self.wk = Dense(d_model)
        self.wv = Dense(d_model)
        self.dense = Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]
        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)
        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        scaled_attention_logits = tf.matmul(q, k, transpose_b=True) / tf.math.sqrt(tf.cast(self.depth, tf.float32))

        if mask is not None:
            scaled_attention_logits += (mask * -1e9)

        attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
        output = tf.matmul(attention_weights, v)
        output = tf.transpose(output, perm=[0, 2, 1, 3])
        output = tf.reshape(output, (batch_size, -1, self.d_model))
        return self.dense(output)

# 디코더 레이어
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, dropout_rate=0.1):
        super(DecoderLayer, self).__init__()

        self.mha1 = MultiHeadAttention(d_model, num_heads)  # Self-attention
        self.mha2 = MultiHeadAttention(d_model, num_heads)  # Enc-Dec attention

        self.ffn = tf.keras.Sequential([Dense(dff, activation='relu'), Dense(d_model)])

        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.layernorm3 = LayerNormalization(epsilon=1e-6)

        self.dropout1 = Dropout(dropout_rate)
        self.dropout2 = Dropout(dropout_rate)
        self.dropout3 = Dropout(dropout_rate)

    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        attn1 = self.mha1(x, x, x, look_ahead_mask)  # Self-attention
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)

        attn2 = self.mha2(enc_output, enc_output, out1, padding_mask)  # Enc-Dec attention
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2 + out1)

        ffn_output = self.ffn(out2)  # Feed forward network
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(ffn_output + out2)

        return out3

# 전체 Transformer 모델
class Transformer(Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, max_pos_encoding, dropout_rate=0.1):
        super(Transformer, self).__init__()

        self.encoder_layers = [EncoderLayer(d_model, num_heads, dff, dropout_rate) for _ in range(num_layers)]
        self.decoder_layers = [DecoderLayer(d_model, num_heads, dff, dropout_rate) for _ in range(num_layers)]

        self.embedding = Embedding(input_vocab_size, d_model)
        self.pos_encoding = self.positional_encoding(max_pos_encoding, d_model)
        self.dropout = Dropout(dropout_rate)
        self.final_layer = Dense(target_vocab_size)

    def positional_encoding(self, position, d_model):
        angle_rads = self.get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model)
        angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
        angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
        pos_encoding = angle_rads[np.newaxis, ...]
        return tf.cast(pos_encoding, dtype=tf.float32)

    def call(self, inp, tar, training, enc_padding_mask, look_ahead_mask, dec_padding_mask):
        # 인코더 처리
        enc_output = inp
        for encoder_layer in self.encoder_layers:
            enc_output = encoder_layer(enc_output, training, enc_padding_mask)

        # 디코더 처리
        dec_output = tar
        for decoder_layer in self.decoder_layers:
            dec_output = decoder_layer(dec_output, enc_output, training, look_ahead_mask, dec_padding_mask)

        # 최종 출력
        final_output = self.final_layer(dec_output)
        return final_output


## 사전학습모델 소개(작동 문제)

In [None]:
# Transformer 모델을 이용하여 "I am a student" 문장을 한국어로 번역하는 코드를 구현하려면,
# 사전 학습된 Transformer 기반 번역 모델(예: Hugging Face의 transformers 라이브러리에서 제공하는 MarianMT 모델)을 사용할 수 있습니다. 이 모델은 다양한 언어 간 번역 작업을 쉽게 수행할 수 있도록 지원합니다.
# 여기서는 Hugging Face의 transformers 라이브러리를 사용하여 영어-한국어 번역을 하는 Transformer 모델 예시 코드를 보여드리겠습니다.

In [None]:
# Hugging Face는 **자연어 처리(NLP)**와 머신러닝을 위한 오픈소스 도구와 라이브러리를 제공하는 AI 연구 및 개발 회사입니다. 특히 Transformer 모델과 관련된 연구와 도구 개발에 큰 기여를 하고 있습니다. Hugging Face의 대표적인 제품 중 하나는 transformers 라이브러리로, 다양한 사전 학습된 모델을 쉽게 사용할 수 있도록 지원합니다.

# Hugging Face의 주요 특징 및 도구
# 1. Transformers 라이브러리
# Transformers 라이브러리는 Hugging Face에서 제공하는 가장 대표적인 오픈소스 라이브러리로, BERT, GPT-2, T5, RoBERTa 등과 같은 Transformer 기반의 사전 학습된 모델을 지원합니다. 이 라이브러리는 매우 다양한 언어 모델을 손쉽게 사용할 수 있게 하며, 텍스트 생성, 번역, 질문 답변, 텍스트 분류, 감정 분석 등 여러 NLP 작업을 수행할 수 있습니다.

# 사전 학습된 모델: Hugging Face는 수천 개의 사전 학습된 모델을 제공하여 사용자가 자신만의 데이터를 사용해 쉽게 모델을 파인튜닝하거나 바로 사용할 수 있습니다.
# 지원되는 모델 종류: BERT, GPT, GPT-2, T5, RoBERTa, DistilBERT, MarianMT, BART 등.
# 다양한 작업: 텍스트 분류, 텍스트 생성, 번역, 질문 답변, 감정 분석 등.
# 2. Datasets 라이브러리
# Datasets 라이브러리는 Hugging Face에서 제공하는 또 다른 중요한 도구로, 다양한 공개 데이터셋에 대한 간편한 접근을 제공합니다. NLP 연구에서 자주 사용되는 대규모 데이터셋을 손쉽게 다운로드하고 사용할 수 있습니다.

# 대규모 데이터셋 지원: Datasets 라이브러리를 통해 대규모 NLP 데이터셋을 쉽게 로드할 수 있습니다. 예를 들어, GLUE, SQuAD, COCO, CNN/DailyMail 등의 데이터셋이 지원됩니다.
# 빠르고 효율적: 대규모 데이터셋도 메모리 효율적으로 처리할 수 있으며, 다양한 포맷을 지원합니다.
# 3. Hugging Face Model Hub
# Hugging Face Model Hub는 다양한 사전 학습된 모델이 저장된 플랫폼으로, 수천 개의 프리트레인된 모델을 커뮤니티가 공유하고 사용할 수 있는 저장소입니다. 사용자는 여기서 텍스트 분류, 번역, 요약, 질문 답변 등의 작업에 적합한 모델을 검색하여 사용할 수 있습니다.

# 모델 검색 및 다운로드: 특정 작업이나 언어에 적합한 사전 학습된 모델을 검색하고, 간단한 코드 한 줄로 불러올 수 있습니다.
# 모델 공유: 자신이 훈련한 모델을 커뮤니티와 공유할 수 있으며, 다른 사용자가 제공한 모델도 쉽게 사용할 수 있습니다.
# 4. Tokenizers 라이브러리
# Tokenizers는 고속으로 텍스트를 토큰화할 수 있도록 설계된 라이브러리로, Hugging Face의 모델을 사용할 때 필수적인 토큰화 과정을 매우 빠르고 효율적으로 수행합니다.

# 다양한 토크나이저 지원: BERT, GPT-2, T5 등 각 모델에 최적화된 토크나이저를 제공합니다.
# 병렬 처리: 매우 빠른 속도로 대규모 텍스트 데이터를 병렬 처리할 수 있어 대규모 데이터셋을 다룰 때 유리합니다.
# 5. Gradio 및 Streamlit과의 통합
# Hugging Face는 Gradio와 Streamlit 같은 라이브러리와 통합되어, 머신러닝 모델을 쉽게 웹 인터페이스로 배포할 수 있습니다. 이를 통해 NLP 모델을 웹 애플리케이션으로 빠르게 배포하고, 실시간으로 결과를 확인할 수 있습니다.

# 6. Hugging Face Spaces
# Hugging Face Spaces는 사용자가 쉽게 자신의 모델을 배포하고 공유할 수 있는 클라우드 플랫폼입니다. Streamlit이나 Gradio와 같은 인터페이스를 사용하여 웹 애플리케이션 형태로 모델을 배포할 수 있으며, 이를 통해 누구나 사용할 수 있도록 쉽게 공유할 수 있습니다.

# 7. Community and Open Source
# Hugging Face는 오픈소스 커뮤니티에 큰 비중을 두고 있습니다. 많은 연구자, 개발자가 Hugging Face를 통해 자신들의 연구 결과나 툴을 공유하며, 그 결과 커뮤니티 주도로 모델과 데이터셋이 빠르게 발전하고 있습니다.

# Hugging Face의 주요 응용 분야
# Hugging Face는 다양한 NLP 작업에 매우 유용합니다. 다음은 Hugging Face의 도구들이 사용되는 대표적인 분야입니다:

# 텍스트 분류: 감정 분석, 주제 분류, 스팸 필터링 등.
# 기계 번역: 한 언어에서 다른 언어로 텍스트 번역.
# 텍스트 생성: 텍스트 완성, 이야기 생성, 챗봇 등.
# 질문 답변: 질문에 대한 자연스러운 답변 생성.
# 텍스트 요약: 긴 문서나 기사 등을 짧게 요약.
# Hugging Face의 장점
# 사용자 친화적: 코드 몇 줄만으로 강력한 NLP 모델을 불러와 사용할 수 있으며, 모델을 쉽게 파인튜닝할 수 있습니다.
# 풍부한 모델 및 데이터셋: 다양한 사전 학습된 모델과 데이터셋이 제공되어 연구 및 개발에 매우 유용합니다.
# 오픈소스: Hugging Face의 도구들은 오픈소스로 제공되어 누구나 무료로 사용할 수 있습니다.
# 빠른 배포: 모델을 학습하고 배포하는 과정이 매우 간단하며, Gradio, Streamlit 등을 통해 웹에서 모델을 실시간으로 사용할 수 있습니다.

In [None]:
# *MarianTokenizer**가 SentencePiece 라이브러리를 필요
# SentencePiece는 Google에서 만든 텍스트 토크나이저이자 디토크나이저로, NLP 모델에서 자주 사용됩니다.

In [None]:
pip install sentencepiece

Collecting sentencepiece
  Downloading sentencepiece-0.2.0-cp39-cp39-win_amd64.whl.metadata (8.3 kB)
Downloading sentencepiece-0.2.0-cp39-cp39-win_amd64.whl (991 kB)
   ---------------------------------------- 991.5/991.5 kB 5.8 MB/s eta 0:00:00
Installing collected packages: sentencepiece
Successfully installed sentencepiece-0.2.0
Note: you may need to restart the kernel to use updated packages.


    matplotlib-inline (<0.2.0appnope,>=0.1.0) ; platform_system == "Darwin"
                      ~~~~~~~~^


In [None]:
from transformers import MarianMTModel, MarianTokenizer

# 사전 학습된 영어 -> 한국어 번역 모델 로드
model_name = 'Helsinki-NLP/opus-mt-en-ko'
tokenizer = MarianTokenizer.from_pretrained(model_name)
model = MarianMTModel.from_pretrained(model_name)

# 번역할 문장
english_text = "I am a student."

# 입력 문장을 토큰화
inputs = tokenizer(english_text, return_tensors="pt", padding=True)

# 모델을 사용해 번역
translated = model.generate(**inputs)

# 번역된 텍스트 디코딩
korean_translation = tokenizer.decode(translated[0], skip_special_tokens=True)

# 결과 출력
print("Original English Text:", english_text)
print("Translated Korean Text:", korean_translation)


OSError: Helsinki-NLP/opus-mt-en-ko is not a local folder and is not a valid model identifier listed on 'https://huggingface.co/models'
If this is a private repository, make sure to pass a token having permission to this repo either by logging in with `huggingface-cli login` or by passing `token=<your_token>`

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, Embedding, LayerNormalization, Dropout
import numpy as np

# 단순한 임베딩 레이어 및 포지셔널 인코딩을 포함한 Transformer 모델
class Transformer(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, max_pos_encoding, rate=0.1):
        super(Transformer, self).__init__()

        self.tokenizer = tf.keras.layers.Embedding(input_vocab_size, d_model)
        self.pos_encoding = self.positional_encoding(max_pos_encoding, d_model)

        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = Dropout(rate)

    def positional_encoding(self, position, d_model):
        angle_rads = self.get_angles(np.arange(position)[:, np.newaxis],
                                     np.arange(d_model)[np.newaxis, :],
                                     d_model)
        angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])  # 짝수 인덱스: 사인 적용
        angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])  # 홀수 인덱스: 코사인 적용
        pos_encoding = angle_rads[np.newaxis, ...]
        return tf.cast(pos_encoding, dtype=tf.float32)

    def get_angles(self, pos, i, d_model):
        angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
        return pos * angle_rates

    def call(self, x, training, mask):
        seq_len = tf.shape(x)[1]
        x = self.tokenizer(x)  # 임베딩 레이어 적용
        x += self.pos_encoding[:, :seq_len, :]
        x = self.dropout(x, training=training)

        for i in range(len(self.enc_layers)):
            x = self.enc_layers[i](x, training, mask)

        return x

# 기본적인 Encoder 레이어 구조
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(EncoderLayer, self).__init__()

        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)

        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)

    def call(self, x, training, mask):
        attn_output = self.mha(x, x, x, mask)  # Self-attention
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)  # Residual 연결 + Layernorm

        ffn_output = self.ffn(out1)  # Feed-forward network
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)  # Residual 연결 + Layernorm

        return out2

# 멀티헤드 어텐션 레이어
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads
        self.wq = Dense(d_model)
        self.wk = Dense(d_model)
        self.wv = Dense(d_model)

        self.dense = Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)  # (batch_size, seq_len, d_model)
        k = self.wk(k)  # (batch_size, seq_len, d_model)
        v = self.wv(v)  # (batch_size, seq_len, d_model)

        q = self.split_heads(q, batch_size)  # (batch_size, num_heads, seq_len_q, depth)
        k = self.split_heads(k, batch_size)  # (batch_size, num_heads, seq_len_k, depth)
        v = self.split_heads(v, batch_size)  # (batch_size, num_heads, seq_len_v, depth)

        scaled_attention, _ = scaled_dot_product_attention(q, k, v, mask)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])  # (batch_size, seq_len_q, num_heads, depth)
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))  # (batch_size, seq_len_q, d_model)

        output = self.dense(concat_attention)  # (batch_size, seq_len_q, d_model)
        return output

# Scaled dot-product attention
def scaled_dot_product_attention(q, k, v, mask):
    matmul_qk = tf.matmul(q, k, transpose_b=True)  # (batch_size, num_heads, seq_len_q, seq_len_k)
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    if mask is not None:
        scaled_attention_logits += (mask * -1e9)  # 마스킹된 부분은 매우 작은 값으로 설정

    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)  # (batch_size, num_heads, seq_len_q, seq_len_k)
    output = tf.matmul(attention_weights, v)  # (batch_size, num_heads, seq_len_q, depth_v)

    return output, attention_weights

# 포인트-와이즈 피드포워드 네트워크
def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
        Dense(dff, activation='relu'),  # (batch_size, seq_len, dff)
        Dense(d_model)  # (batch_size, seq_len, d_model)
    ])

# "I am a student"에 대한 임의 입력 데이터
tokenized_sentence = [1, 4, 3, 5]  # "I am a student"라는 문장의 단순화된 토큰화 (단어에 대한 임의 번호)
tokenized_sentence = tf.convert_to_tensor([tokenized_sentence])

# Transformer 모델 생성
transformer = Transformer(num_layers=2, d_model=512, num_heads=8, dff=2048, input_vocab_size=10000, target_vocab_size=10000, max_pos_encoding=5000)

# 모델 출력
output = transformer(tokenized_sentence, training=False, mask=None)
print(output.shape)  # (batch_size, input_seq_len, d_model)


RuntimeError: module compiled against API version 0x10 but this version of numpy is 0xe

SystemError: initialization of _pywrap_checkpoint_reader raised unreported exception

In [None]:
import tensorflow as tf
import numpy as np

# 하이퍼파라미터 설정
d_model = 512  # 임베딩 벡터의 차원
num_heads = 8  # Multi-Head Attention에서의 헤드 수
num_layers = 6  # 인코더와 디코더 층의 개수
dff = 2048  # 피드포워드 네트워크의 차원
input_vocab_size = 8500  # 입력 어휘 크기
target_vocab_size = 8000  # 출력 어휘 크기
max_seq_len = 100  # 최대 시퀀스 길이
dropout_rate = 0.1  # 드롭아웃 비율

# 포지셔널 인코딩(Positional Encoding) 구현
def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model)

    # 짝수 인덱스는 sin, 홀수 인덱스는 cos 적용
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])  # 짝수 인덱스
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])  # 홀수 인덱스

    pos_encoding = angle_rads[np.newaxis, ...]

    return tf.cast(pos_encoding, dtype=tf.float32)

def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

# 멀티헤드 어텐션(Multi-Head Attention) 구현
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads

        self.wq = tf.keras.layers.Dense(d_model)
        self.wk = tf.keras.layers.Dense(d_model)
        self.wv = tf.keras.layers.Dense(d_model)

        self.dense = tf.keras.layers.Dense(d_model)

    def split_heads(self, x, batch_size):
        """입력을 (batch_size, num_heads, seq_len, depth)로 분리"""
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)  # (batch_size, seq_len, d_model)
        k = self.wk(k)  # (batch_size, seq_len, d_model)
        v = self.wv(v)  # (batch_size, seq_len, d_model)

        q = self.split_heads(q, batch_size)  # (batch_size, num_heads, seq_len_q, depth)
        k = self.split_heads(k, batch_size)  # (batch_size, num_heads, seq_len_k, depth)
        v = self.split_heads(v, batch_size)  # (batch_size, num_heads, seq_len_v, depth)

        scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])  # (batch_size, seq_len_q, num_heads, depth)

        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))  # (batch_size, seq_len_q, d_model)

        output = self.dense(concat_attention)  # (batch_size, seq_len_q, d_model)

        return output, attention_weights

# Scaled Dot-Product Attention 구현
def scaled_dot_product_attention(q, k, v, mask):
    """어텐션 가중치를 계산"""
    matmul_qk = tf.matmul(q, k, transpose_b=True)  # (..., seq_len_q, seq_len_k)

    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    if mask is not None:
        scaled_attention_logits += (mask * -1e9)

    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)

    output = tf.matmul(attention_weights, v)  # (..., seq_len_q, depth_v)

    return output, attention_weights

# 포지션별 피드포워드 네트워크
def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
        tf.keras.layers.Dense(dff, activation='relu'),  # (batch_size, seq_len, dff)
        tf.keras.layers.Dense(d_model)  # (batch_size, seq_len, d_model)
    ])

# 인코더 블록 구현
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(EncoderLayer, self).__init__()

        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)

    def call(self, x, training, mask):
        attn_output, _ = self.mha(x, x, x, mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)

        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)

        return out2

# 인코더 구현
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, maximum_position_encoding, rate=0.1):
        super(Encoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)

        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, training, mask):
        seq_len = tf.shape(x)[1]

        # 임베딩 및 포지셔널 인코딩 추가
        x = self.embedding(x)  # (batch_size, input_seq_len, d_model)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training, mask)

        return x  # (batch_size, input_seq_len, d_model)

# Transformer 모델 구현 (인코더 중심)
class Transformer(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, pe_input, pe_target, rate=0.1):
        super(Transformer, self).__init__()

        self.encoder = Encoder(num_layers, d_model, num_heads, dff, input_vocab_size, pe_input, rate)

    def call(self, inp, training, enc_padding_mask):
        enc_output = self.encoder(inp, training, enc_padding_mask)  # (batch_size, inp_seq_len, d_model)
        return enc_output


RuntimeError: module compiled against API version 0x10 but this version of numpy is 0xe

SystemError: initialization of _pywrap_checkpoint_reader raised unreported exception

# 준비

## 라이브러리

In [None]:
# TensorFlow와 Keras를 사용하여 Transformer의 핵심 요소인 Multi-Head Attention과 Encoder-Decoder 구조를 구현하는 방식입니다.

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Dense, LayerNormalization, Dropout
from tensorflow.keras.models import Model

## Multi-Head Attention Layer 정의

In [None]:
# MultiHeadAttention 클래스는 논문에서 설명된 다중 어텐션 메커니즘을 구현합니다.

# split_heads 함수는 입력 벡터를 여러 헤드로 나누고, 각 헤드마다 다른 Self-Attention을 적용한 후 다시 결합합니다.
# scaled_dot_product_attention는 어텐션 메커니즘의 핵심인 어텐션 점수를 계산하여, 각 값에 가중합을 적용합니다.

In [None]:
# Multi-Head Attention Layer 정의
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads

        self.wq = Dense(d_model)
        self.wk = Dense(d_model)
        self.wv = Dense(d_model)

        self.dense = Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)

        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))

        output = self.dense(concat_attention)
        return output, attention_weights


## Scaled Dot-Product Attention 정의

In [None]:
# Scaled Dot-Product Attention 정의
def scaled_dot_product_attention(q, k, v, mask):
    matmul_qk = tf.matmul(q, k, transpose_b=True)
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    if mask is not None:
        scaled_attention_logits += (mask * -1e9)

    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
    output = tf.matmul(attention_weights, v)
    return output, attention_weights


## Transformer Encoder Layer 정의

In [None]:

class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(EncoderLayer, self).__init__()

        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)

        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)

    def call(self, x, training, mask):
        attn_output, _ = self.mha(x, x, x, mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)

        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)

        return out2

# Feed Forward Network 정의
def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
        Dense(dff, activation='relu'),
        Dense(d_model)
    ])


## Feed Forward Network 정의

In [None]:

def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
        Dense(dff, activation='relu'),
        Dense(d_model)
    ])

## Positional Encoding

In [None]:
# Transformer 모델은 RNN과 달리 순차 데이터에 대한 순서 정보를 자연스럽게 처리할 수 없기 때문에, Positional Encoding을 추가하여 입력 순서 정보를 반영합니다.
# positional_encoding 함수는 입력 시퀀스의 각 위치에 대해 사인과 코사인 함수를 적용하여, 모델이 위치 정보를 학습할 수 있도록 합니다.

In [None]:
def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model)

    sines = np.sin(angle_rads[:, 0::2])
    cosines = np.cos(angle_rads[:, 1::2])

    pos_encoding = np.concatenate([sines, cosines], axis=-1)
    pos_encoding = pos_encoding[np.newaxis, ...]

    return tf.cast(pos_encoding, dtype=tf.float32)

def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates


## Transformer Encoder 정의

In [None]:
# Encoder Layer:

# EncoderLayer는 하나의 인코더 층을 정의합니다. 각 층은 Multi-Head Attention과 **Feed Forward Network(FFN)**으로 구성됩니다.
# Residual 연결 및 Layer Normalization을 통해 학습을 안정적으로 수행할 수 있습니다.

# Encoder:

# Encoder 클래스는 여러 개의 EncoderLayer를 쌓아 전체 인코더 블록을 구현합니다.
# 입력 시퀀스는 임베딩된 후, 위치 정보를 반영하여 각 인코더 층을 거쳐 처리됩니다.

In [None]:
# Transformer Encoder 정의
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, maximum_position_encoding, rate=0.1):
        super(Encoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)

        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = Dropout(rate)

    def call(self, x, training, mask):
        seq_len = tf.shape(x)[1]

        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training, mask)

        return x

    # 샘플 모델 학습
sample_transformer = Encoder(num_layers=2, d_model=512, num_heads=8, dff=2048, input_vocab_size=8500, maximum_position_encoding=10000)
temp_input = tf.random.uniform((64, 62))
sample_transformer_output = sample_transformer(temp_input, training=False, mask=None)
print(sample_transformer_output.shape)

ValueError: Exception encountered when calling Encoder.call().

[1mOnly input tensors may be passed as positional arguments. The following argument value should be passed as a keyword argument: False (of type <class 'bool'>)[0m

Arguments received by Encoder.call():
  • x=tf.Tensor(shape=(64, 62), dtype=float32)
  • training=False
  • mask=None

# 종합

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, LayerNormalization, Dropout
from tensorflow.keras.models import Model

# Multi-Head Attention Layer 정의
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads

        self.wq = Dense(d_model)
        self.wk = Dense(d_model)
        self.wv = Dense(d_model)

        self.dense = Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)

        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))

        output = self.dense(concat_attention)
        return output, attention_weights

# Scaled Dot-Product Attention 정의
def scaled_dot_product_attention(q, k, v, mask):
    matmul_qk = tf.matmul(q, k, transpose_b=True)
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    if mask is not None:
        scaled_attention_logits += (mask * -1e9)

    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
    output = tf.matmul(attention_weights, v)
    return output, attention_weights

# Transformer Encoder Layer 정의
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(EncoderLayer, self).__init__()

        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)

        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)

    def call(self, x, training, mask):
        attn_output, _ = self.mha(x, x, x, mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)

        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)

        return out2

# Feed Forward Network 정의
def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
        Dense(dff, activation='relu'),
        Dense(d_model)
    ])

# Positional Encoding 정의
def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model)

    sines = np.sin(angle_rads[:, 0::2])
    cosines = np.cos(angle_rads[:, 1::2])

    pos_encoding = np.concatenate([sines, cosines], axis=-1)
    pos_encoding = pos_encoding[np.newaxis, ...]

    return tf.cast(pos_encoding, dtype=tf.float32)

def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

# Transformer Encoder 정의
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, maximum_position_encoding, rate=0.1):
        super(Encoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)

        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = Dropout(rate)

    def call(self, x, training, mask):
        seq_len = tf.shape(x)[1]

        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training, mask)

        return x

# 샘플 모델 학습
sample_transformer = Encoder(num_layers=2, d_model=512, num_heads=8, dff=2048, input_vocab_size=8500, maximum_position_encoding=10000)
temp_input = tf.random.uniform((64, 62))
sample_transformer_output = sample_transformer(temp_input, training=False, mask=None)
print(sample_transformer_output.shape)


# Decoder

In [None]:
# Decoder: 인코더와 유사한 구조로, 입력 시퀀스를 받아 최종 출력을 생성하는 과정이 필요합니다. 디코더는 인코더-디코더 어텐션을 통해 인코더에서 생성된 정보와 상호작용합니다.
# Loss function & Optimizer: 손실 함수는 Sparse Categorical Crossentropy와 같은 분류용 손실 함수를 사용하고, 최적화는 Adam을 사용합니다.

In [None]:
#  Transformer 모델의 디코더(Decoder) 부분을 구현한 코드입니다.
#     디코더는 **인코더-디코더 어텐션(Encoder-Decoder Attention)**과 Masked Multi-Head Attention을 포함하며, 이를 통해 입력 시퀀스를 받아 최종 출력을 생성하는 역할을 합니다.

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, LayerNormalization, Dropout


In [None]:
# Masked Multi-Head Attention:
# 디코더에서 첫 번째 Multi-Head Attention은 Masked Self-Attention입니다.
# 이는 디코더의 출력이 순차적으로 생성되기 때문에, 현재 토큰 이후의 정보는 마스킹하여 미래 정보를 참조하지 않도록 합니다.



In [None]:

# Scaled Dot-Product Attention 정의 (위 코드와 동일)
def scaled_dot_product_attention(q, k, v, mask):
    matmul_qk = tf.matmul(q, k, transpose_b=True)
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    if mask is not None:
        scaled_attention_logits += (mask * -1e9)

    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
    output = tf.matmul(attention_weights, v)
    return output, attention_weights


In [None]:

# Encoder-Decoder Attention:
# 디코더에서 두 번째 Multi-Head Attention은 인코더 출력을 입력으로 받아, 인코더에서 처리된 정보를 디코더가 참조할 수 있게 합니다.
# 이를 통해 인코더-디코더 간 상호작용이 이루어집니다.


In [None]:
# Multi-Head Attention Layer 정의 (위 코드와 동일)
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads

        self.wq = Dense(d_model)
        self.wk = Dense(d_model)
        self.wv = Dense(d_model)

        self.dense = Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)

        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))

        output = self.dense(concat_attention)
        return output, attention_weights


In [None]:
# Feed Forward Network:

# FFN(Point-Wise Feed Forward Network)는 인코더와 마찬가지로 각 디코더 층에서 사용됩니다.
# 이는 각 층의 출력을 비선형적으로 변환하는 역할을 합니다.


In [None]:
# Feed Forward Network 정의 (위 코드와 동일)
def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
        Dense(dff, activation='relu'),
        Dense(d_model)
    ])

In [None]:
# Transformer Decoder Layer 정의
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(DecoderLayer, self).__init__()

        self.mha1 = MultiHeadAttention(d_model, num_heads)
        self.mha2 = MultiHeadAttention(d_model, num_heads)

        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.layernorm3 = LayerNormalization(epsilon=1e-6)

        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)
        self.dropout3 = Dropout(rate)

    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        # 첫 번째 Multi-Head Attention (Masked Multi-Head Attention)
        attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)

        # 두 번째 Multi-Head Attention (Encoder-Decoder Attention)
        attn2, attn_weights_block2 = self.mha2(enc_output, enc_output, out1, padding_mask)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2 + out1)

        # Feed Forward Network
        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(ffn_output + out2)

        return out3, attn_weights_block1, attn_weights_block2



In [None]:

# Decoder Class:
# 디코더는 여러 개의 DecoderLayer를 쌓아서 구성되며,
# 각 층에서는 Masked Multi-Head Attention, Encoder-Decoder Attention, FFN을 통해 데이터를 처리합니다.

In [None]:
# Transformer Decoder 정의
class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size, maximum_position_encoding, rate=0.1):
        super(Decoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)

        self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = Dropout(rate)

    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        seq_len = tf.shape(x)[1]
        attention_weights = {}

        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x, block1, block2 = self.dec_layers[i](x, enc_output, training, look_ahead_mask, padding_mask)
            attention_weights[f'decoder_layer{i+1}_block1'] = block1
            attention_weights[f'decoder_layer{i+1}_block2'] = block2

        return x, attention_weights



In [None]:
# Positional Encoding:
# 인코더와 마찬가지로 디코더에서도 Positional Encoding을 사용하여 입력 시퀀스의 위치 정보를 모델이 학습할 수 있도록 합니다.


In [None]:
# Positional Encoding 정의 (위 코드에서 이미 사용된 함수)
import numpy as np

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model)

    sines = np.sin(angle_rads[:, 0::2])
    cosines = np.cos(angle_rads[:, 1::2])

    pos_encoding = np.concatenate([sines, cosines], axis=-1)
    pos_encoding = pos_encoding[np.newaxis, ...]

    return tf.cast(pos_encoding, dtype=tf.float32)

def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

# 샘플 디코더 테스트
sample_decoder = Decoder(num_layers=2, d_model=512, num_heads=8, dff=2048, target_vocab_size=8000, maximum_position_encoding=5000)
temp_input = tf.random.uniform((64, 26))
sample_decoder_output, attn_weights = sample_decoder(temp_input, enc_output=sample_transformer_output, training=False, look_ahead_mask=None, padding_mask=None)

print(sample_decoder_output.shape)

In [None]:
# 종합

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, LayerNormalization, Dropout

# Scaled Dot-Product Attention 정의 (위 코드와 동일)
def scaled_dot_product_attention(q, k, v, mask):
    matmul_qk = tf.matmul(q, k, transpose_b=True)
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    if mask is not None:
        scaled_attention_logits += (mask * -1e9)

    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
    output = tf.matmul(attention_weights, v)
    return output, attention_weights

# Multi-Head Attention Layer 정의 (위 코드와 동일)
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads

        self.wq = Dense(d_model)
        self.wk = Dense(d_model)
        self.wv = Dense(d_model)

        self.dense = Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)

        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))

        output = self.dense(concat_attention)
        return output, attention_weights

# Feed Forward Network 정의 (위 코드와 동일)
def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
        Dense(dff, activation='relu'),
        Dense(d_model)
    ])

# Transformer Decoder Layer 정의
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(DecoderLayer, self).__init__()

        self.mha1 = MultiHeadAttention(d_model, num_heads)
        self.mha2 = MultiHeadAttention(d_model, num_heads)

        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.layernorm3 = LayerNormalization(epsilon=1e-6)

        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)
        self.dropout3 = Dropout(rate)

    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        # 첫 번째 Multi-Head Attention (Masked Multi-Head Attention)
        attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)

        # 두 번째 Multi-Head Attention (Encoder-Decoder Attention)
        attn2, attn_weights_block2 = self.mha2(enc_output, enc_output, out1, padding_mask)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2 + out1)

        # Feed Forward Network
        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(ffn_output + out2)

        return out3, attn_weights_block1, attn_weights_block2

# Transformer Decoder 정의
class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size, maximum_position_encoding, rate=0.1):
        super(Decoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)

        self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = Dropout(rate)

    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        seq_len = tf.shape(x)[1]
        attention_weights = {}

        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x, block1, block2 = self.dec_layers[i](x, enc_output, training, look_ahead_mask, padding_mask)
            attention_weights[f'decoder_layer{i+1}_block1'] = block1
            attention_weights[f'decoder_layer{i+1}_block2'] = block2

        return x, attention_weights

# Positional Encoding 정의 (위 코드에서 이미 사용된 함수)
import numpy as np

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model)

    sines = np.sin(angle_rads[:, 0::2])
    cosines = np.cos(angle_rads[:, 1::2])

    pos_encoding = np.concatenate([sines, cosines], axis=-1)
    pos_encoding = pos_encoding[np.newaxis, ...]

    return tf.cast(pos_encoding, dtype=tf.float32)

def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

# 샘플 디코더 테스트
sample_decoder = Decoder(num_layers=2, d_model=512, num_heads=8, dff=2048, target_vocab_size=8000, maximum_position_encoding=5000)
temp_input = tf.random.uniform((64, 26))
sample_decoder_output, attn_weights = sample_decoder(temp_input, enc_output=sample_transformer_output, training=False, look_ahead_mask=None, padding_mask=None)

print(sample_decoder_output.shape)
