## Customized BGE-M3

In [None]:
# from transformers import TFAutoModel, TFPreTrainedModel, PretrainedConfig
# import torch
# import torch.nn as nn
# import tensorflow as tf

# class SentenceMultiHeadAttention(tf.keras.layers.Layer):
#   def __init__(self, d_model, num_heads, name="sentence_multi_head_attention"):
#     super(SentenceMultiHeadAttention, self).__init__(name=name)
#     self.d_model = d_model
#     self.num_heads = num_heads

#     assert d_model % num_heads == 0

#     # transformer 논문 기준 512 // 8
#     self.depth = d_model // num_heads

#     # WQ, WK, WV
#     self.query_dense = tf.keras.layers.Dense(units=d_model)
#     self.key_dense = tf.keras.layers.Dense(units=d_model)
#     self.value_dense = tf.keras.layers.Dense(units=d_model)

#     # W0
#     self.dense = tf.keras.layers.Dense(units=d_model)


#   def split_heads(self, inputs):
#     # (문장 갯수(bge-m3 의 batch_size), 모델 차원 d_model) -> (문장 갯수, num_heads, depth)
#     inputs = tf.reshape(inputs, shape=[-1, self.num_heads, self.depth])
#     return tf.transpose(inputs, perm=[1, 0, 2])


#   def scaled_dot_product(self, query, key, value):
#     # query, key, value : (num_heads, batch_size 문장갯수, d_model/num_heads)
#     # 문장 임베딩 끼리의 내적이므로 padding mask 는 필요없음
#     mat_mul = tf.matmul(query, key, transpose_b=True)
#     # dk = d_model/num_heads
#     dk = tf.cast(tf.shape(key)[-1], tf.float32)
#     logits = tf.divide(mat_mul, tf.math.sqrt(dk))

#     attention_weights = tf.nn.softmax(logits, axis=-1) # key 방향
#     output = tf.matmul(attention_weights, value)

#     return output, attention_weights

#   def call(self, input):
#     # query, key, value : (batch_size, d_model)
#     query, key, value = input['query'], input['key'], input['value']

#     # WQ
#     query = self.query_dense(query)
#     # WK
#     key = self.key_dense(key)
#     # WV
#     value = self.value_dense(value)

#     # -> (batch_size, num_heads, d_model/num_heads)
#     query = self.split_heads(query)
#     key = self.split_heads(key)
#     value = self.split_heads(value)

#     # (num_heads, batch_size, d_model / num_heads)
#     scaled_attention, _ = self.scaled_dot_product(query, key, value)
#     # (batch_size, num_heads, d_model / num_heads)
#     scaled_attention = tf.transpose(scaled_attention, perm=[1, 0, 2])

#     # (batch_size, d_model)
#     concat_attention = tf.reshape(scaled_attention, (-1, self.d_model))

#     # W0
#     outputs = self.dense(concat_attention)

#     return outputs

# # self attention of sentences
# class CustomBGEM3FlagModel(TFPreTrainedModel):
#   config_class = PretrainedConfig
#   def __init__(self,config):
#     super().__init__(config)
#     self.config = config
#     self.bge_m3 = TFAutoModel.from_pretrained("BAAI/bge-m3", from_pt=True)
#     self.dff = config.intermediate_size # 4096
#     # self.dff = 2048
#     self.d_model = 1024
#     self.num_heads = config.num_attention_heads # 16
#     self.dropout = config.hidden_dropout_prob # 0.1

#     # the additional layer
#     self.clustering_layer = self.sentence_clustering_layer(dff=self.dff, d_model=self.d_model, num_heads=self.num_heads, dropout=self.dropout)

#   def call(self,inputs, training=False):
#     # sequence_output : [batch_size, sequence_length, the dimension of bge-m3]
#     ids, attention_mask = inputs
#     # (batch_size(문장 수), 문장 길이(토큰 수), 모델 차원 d_model)
#     sequence_output = self.bge_m3(ids, attention_mask=attention_mask, training=training).last_hidden_state

#     # (batch_size, d_model)
#     # extract the 1st token's ([CLS]) embeddings
#     sentence_embeddings = sequence_output[:, 0, :]

#     output = self.clustering_layer(sentence_embeddings)
#     return output

#   def sentence_clustering_layer(self, dff, d_model, num_heads, dropout, name="bert_layer"):
#     # (문장 갯수 batch_size, d_model)
#     inputs = tf.keras.Input(shape=(d_model,), name="input")

#     # sub-layer 1 : Multi head self attention
#     attention = SentenceMultiHeadAttention(d_model, num_heads, name="sentence_multi_head_attention")({
#         'query' : inputs,
#         'key' : inputs,
#         'value' : inputs,
#     })

#     # drop-out
#     attention = tf.keras.layers.Dropout(rate=dropout)(attention)
#     # 잔차연결, 층 정규화
#     attention = tf.keras.layers.LayerNormalization(epsilon=1e-6)(inputs + attention)

#     # sub-layer 2 : Position-wide FFNN
#     outputs = tf.keras.layers.Dense(units=dff, activation='relu')(attention)
#     outputs = tf.keras.layers.Dense(units=d_model)(outputs)

#     # drop-out
#     outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
#     # 잔차연결, 층 정규화
#     outputs = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention + outputs)

#     return tf.keras.Model(inputs=[inputs], outputs=outputs, name=name)

Pytorch

In [3]:
from transformers import AutoModel, PreTrainedModel, PretrainedConfig
import torch
import torch.nn as nn
import torch.nn.functional as F


class SentenceMultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(SentenceMultiHeadAttention, self).__init__()
        self.d_model = d_model
        self.num_heads = num_heads

        assert d_model % num_heads == 0
        self.depth = d_model // num_heads

        self.query_dense = nn.Linear(d_model, d_model)
        self.key_dense = nn.Linear(d_model, d_model)
        self.value_dense = nn.Linear(d_model, d_model)
        self.out_dense = nn.Linear(d_model, d_model)

    def split_heads(self, x):
        # (batch_size, d_model) → (batch_size, num_heads, depth)
        x = x.view(-1, self.num_heads, self.depth)
        return x.transpose(0, 1)  # (num_heads, batch_size, depth)

    def scaled_dot_product(self, query, key, value):
        dk = query.size(-1)
        scores = torch.matmul(query, key.transpose(-2, -1)) / (dk ** 0.5)
        weights = F.softmax(scores, dim=-1)
        output = torch.matmul(weights, value)
        return output, weights

    def forward(self, query, key, value):
        # Input: (batch_size, d_model)
        query = self.query_dense(query)
        key = self.key_dense(key)
        value = self.value_dense(value)

        query = self.split_heads(query)
        key = self.split_heads(key)
        value = self.split_heads(value)

        scaled_attention, _ = self.scaled_dot_product(query, key, value)

        # (num_heads, batch_size, depth) → (batch_size, num_heads, depth)
        scaled_attention = scaled_attention.transpose(0, 1)
        concat_attention = scaled_attention.reshape(-1, self.d_model)

        output = self.out_dense(concat_attention)
        return output


class SentenceClusteringLayer(nn.Module):
    def __init__(self, dff, d_model, num_heads, dropout):
        super(SentenceClusteringLayer, self).__init__()
        self.attention = SentenceMultiHeadAttention(d_model, num_heads)
        self.dropout1 = nn.Dropout(dropout)
        self.norm1 = nn.LayerNorm(d_model)

        self.ffn = nn.Sequential(
            nn.Linear(d_model, dff),
            nn.ReLU(),
            nn.Linear(dff, d_model),
        )
        self.dropout2 = nn.Dropout(dropout)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x):
        # x: (batch_size, d_model)
        attn_output = self.attention(x, x, x)
        attn_output = self.dropout1(attn_output)
        out1 = self.norm1(x + attn_output)

        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output)
        out2 = self.norm2(out1 + ffn_output)
        return out2


class CustomBGEM3FlagModel(PreTrainedModel):
    config_class = PretrainedConfig

    def __init__(self, config):
        super().__init__(config)
        self.config = config
        self.d_model = 1024
        self.dff = config.intermediate_size
        self.num_heads = config.num_attention_heads
        self.dropout = config.hidden_dropout_prob

        self.bge_m3 = AutoModel.from_pretrained("BAAI/bge-m3")

        self.clustering_layer = SentenceClusteringLayer(
            dff=self.dff,
            d_model=self.d_model,
            num_heads=self.num_heads,
            dropout=self.dropout
        )

    def forward(self, input_ids, attention_mask=None):
        # input_ids: [batch_size, seq_len]
        output = self.bge_m3(input_ids=input_ids, attention_mask=attention_mask)
        sequence_output = output.last_hidden_state  # [batch_size, seq_len, d_model]

        sentence_embeddings = sequence_output[:, 0, :]  # [CLS] token
        out = self.clustering_layer(sentence_embeddings)  # [batch_size, d_model]

        return out


In [4]:
config = PretrainedConfig.from_pretrained("BAAI/bge-m3") # 그대로 사용
model = CustomBGEM3FlagModel(config)

You are using a model of type xlm-roberta to instantiate a model of type . This is not supported for all configurations of models and can yield errors.


model.safetensors:   0%|          | 0.00/2.27G [00:00<?, ?B/s]

In [8]:
path = "./sas-bge-m3"

model.save_pretrained(path)
tokenizer = AutoTokenizer.from_pretrained("BAAI/bge-m3")
tokenizer.save_pretrained(path)

('./sas-bge-m3/tokenizer_config.json',
 './sas-bge-m3/special_tokens_map.json',
 './sas-bge-m3/sentencepiece.bpe.model',
 './sas-bge-m3/added_tokens.json',
 './sas-bge-m3/tokenizer.json')

In [10]:
from transformers import AutoTokenizer

# model_path = "./sas-bge-m3"
# config = PretrainedConfig.from_pretrained(model_path)
# custom_model = CustomBGEM3FlagModel.from_pretrained(model_path, config=config, from_pt=True)

# custom_model.summary()

model = CustomBGEM3FlagModel.from_pretrained(path) # pytorch
tokenizer = AutoTokenizer.from_pretrained(path)

In [13]:
texts = ["아버지 가방에 들어가십니다.", "아버지가 방에 들어가십니다."]
inputs = tokenizer(texts, padding=True, truncation=True, return_tensors="pt") # pytorch
model(**inputs)

tensor([[ 0.7489,  0.4860, -2.1363,  ..., -0.2958, -0.1386,  0.3238],
        [ 0.9537,  0.5229, -2.3019,  ..., -1.0513,  0.4561,  0.7512]],
       grad_fn=<NativeLayerNormBackward0>)