In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow import nn
import pandas as pd
import numpy as np
import pickle

In [None]:
#증강 작업을 진행한 데이터셋 불러오기
with open('/content/drive/MyDrive/kcc_휴먼이해논문공모전/Text_x_train.pkl', "rb") as f:
    x_train = pickle.load(f)

with open('/content/drive/MyDrive/kcc_휴먼이해논문공모전/Text_y_train.pkl', "rb") as f:
    y_train = pickle.load(f)

with open('/content/drive/MyDrive/kcc_휴먼이해논문공모전/Text_x_valid.pkl', "rb") as f:
    x_valid = pickle.load(f)

with open('/content/drive/MyDrive/kcc_휴먼이해논문공모전/Text_y_valid.pkl', "rb") as f:
    y_valid = pickle.load(f)

with open('/content/drive/MyDrive/kcc_휴먼이해논문공모전/Text_x_test.pkl', "rb") as f:
    x_test = pickle.load(f)

with open('/content/drive/MyDrive/kcc_휴먼이해논문공모전/Text_y_test.pkl', "rb") as f:
    y_test = pickle.load(f)

In [None]:
#text 데이터의 98% 이상이 모두 50토큰 이내 이므로 max_len=50, 제로 패딩 진행
from tensorflow.keras.preprocessing.sequence import pad_sequences
x_train = pad_sequences(x_train, maxlen=50)
x_test = pad_sequences(x_test, maxlen=50)
x_valid = pad_sequences(x_valid, maxlen=50)

In [None]:
x_train

array([[   0,    0,    0, ..., 2506, 3206,  447],
       [   0,    0,    0, ..., 4683,    6, 6358],
       [   0,    0,    0, ..., 1980,  733, 6359],
       ...,
       [   0,    0,    0, ...,    0,    8,  436],
       [   0,    0,    0, ..., 1139,    2,  307],
       [   0,    0,    0, ..., 1139,    2,  307]], dtype=int32)

In [None]:
y_train

1        1
2        0
3        0
4        4
5        0
        ..
29653    3
29654    3
29655    3
29656    1
29658    1
Name: sentiment, Length: 24500, dtype: int64

In [None]:
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen,vocab_size, embed_dim):
        super().__init__()
        self.t_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim) #텍스트 임베딩
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen=50
        print("t")
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)

        x = self.t_emb(x) #텍스트 임베딩
        return x + positions
class TransformerBlock(layers.Layer):  #공통적인 트랜스포머 블럭
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super().__init__()
        self.att = MultiHeadAttention(embedding_dim=embed_dim, num_heads=num_heads)
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs)
        #print('어텐션 후',attn_output.shape)
        attn_output = self.dropout1(attn_output, training=training)
        #print('드롭아웃',attn_output.shape)
        out1 = self.layernorm1(inputs + attn_output)
        #print('레이어놈',out1.shape)
        ffn_output = self.ffn(out1)
        #print('ffn후',ffn_output.shape)
        ffn_output = self.dropout2(ffn_output, training=training)
        print("트랜스포머")
        return self.layernorm2(out1 + ffn_output)

In [None]:
class MultiHeadAttention(tf.keras.layers.Layer): #공통적인 트랜스포머 멀티헤드어텐션
    def __init__(self, embedding_dim, num_heads=4):
        super(MultiHeadAttention, self).__init__()
        self.embedding_dim = embedding_dim # d_model
        self.num_heads = num_heads

        assert embedding_dim % self.num_heads == 0

        self.projection_dim = embedding_dim // num_heads
        self.query_dense = tf.keras.layers.Dense(embedding_dim)
        self.key_dense = tf.keras.layers.Dense(embedding_dim)
        self.value_dense = tf.keras.layers.Dense(embedding_dim)
        self.dense = tf.keras.layers.Dense(embedding_dim)

    def scaled_dot_product_attention(self, query, key, value):
        matmul_qk = tf.matmul(query, key, transpose_b=True)
        depth = tf.cast(tf.shape(key)[-1], tf.float32)
        logits = matmul_qk / tf.math.sqrt(depth)
        attention_weights = tf.nn.softmax(logits, axis=-1)
        output = tf.matmul(attention_weights, value)
        return output, attention_weights

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, inputs):
        # x.shape = [batch_size, seq_len, embedding_dim]
        batch_size = tf.shape(inputs)[0]

        # (batch_size, seq_len, embedding_dim)
        query = self.query_dense(inputs)
        key = self.key_dense(inputs)
        value = self.value_dense(inputs)

        # (batch_size, num_heads, seq_len, projection_dim)
        query = self.split_heads(query, batch_size)  
        key = self.split_heads(key, batch_size)
        value = self.split_heads(value, batch_size)

        scaled_attention, _ = self.scaled_dot_product_attention(query, key, value)
        # (batch_size, seq_len, num_heads, projection_dim)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])  

        # (batch_size, seq_len, embedding_dim)
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.embedding_dim))
        outputs = self.dense(concat_attention)
        return outputs

In [None]:
from tensorflow.keras import backend as K
def recall(y_target, y_pred):
    # clip(t, clip_value_min, clip_value_max) : clip_value_min~clip_value_max 이외 가장자리를 깎아 낸다
    # round : 반올림한다
    y_target_yn = K.round(K.clip(y_target, 0, 1)) # 실제값을 0(Negative) 또는 1(Positive)로 설정한다
    y_pred_yn = K.round(K.clip(y_pred, 0, 1)) # 예측값을 0(Negative) 또는 1(Positive)로 설정한다

    # True Positive는 실제 값과 예측 값이 모두 1(Positive)인 경우이다
    count_true_positive = K.sum(y_target_yn * y_pred_yn) 

    # (True Positive + False Negative) = 실제 값이 1(Positive) 전체
    count_true_positive_false_negative = K.sum(y_target_yn)

    # Recall =  (True Positive) / (True Positive + False Negative)
    # K.epsilon()는 'divide by zero error' 예방차원에서 작은 수를 더한다
    recall = count_true_positive / (count_true_positive_false_negative + K.epsilon())

    # return a single tensor value
    return recall


def precision(y_target, y_pred):
    # clip(t, clip_value_min, clip_value_max) : clip_value_min~clip_value_max 이외 가장자리를 깎아 낸다
    # round : 반올림한다
    y_pred_yn = K.round(K.clip(y_pred, 0, 1)) # 예측값을 0(Negative) 또는 1(Positive)로 설정한다
    y_target_yn = K.round(K.clip(y_target, 0, 1)) # 실제값을 0(Negative) 또는 1(Positive)로 설정한다

    # True Positive는 실제 값과 예측 값이 모두 1(Positive)인 경우이다
    count_true_positive = K.sum(y_target_yn * y_pred_yn) 

    # (True Positive + False Positive) = 예측 값이 1(Positive) 전체
    count_true_positive_false_positive = K.sum(y_pred_yn)

    # Precision = (True Positive) / (True Positive + False Positive)
    # K.epsilon()는 'divide by zero error' 예방차원에서 작은 수를 더한다
    precision = count_true_positive / (count_true_positive_false_positive + K.epsilon())

    # return a single tensor value
    return precision


def f1score(y_target, y_pred):
    _recall = recall(y_target, y_pred)
    _precision = precision(y_target, y_pred)
    # K.epsilon()는 'divide by zero error' 예방차원에서 작은 수를 더한다
    _f1score = ( 2 * _recall * _precision) / (_recall + _precision+ K.epsilon())
    
    # return a single tensor value
    return _f1score

In [None]:
embed_dim = 32 # Embedding size for each token
num_heads = 4  # Number of attention heads
ff_dim = 32  # Hidden layer size in feed forward network inside transformer
maxlen=50
training=1

text_input = layers.Input(shape=(50,)) #
t_embedding_layer = TokenAndPositionEmbedding(50,12440+1, embed_dim)
text_embedding = t_embedding_layer(text_input)  ########## 

transformer_block = TransformerBlock(32, num_heads, ff_dim) 

output = transformer_block(text_embedding)
output = transformer_block(output) #트랜스포머 블럭 1층 추가

x = layers.GlobalAveragePooling1D()(output)
x = layers.Dropout(0.1)(x)
x = layers.Dense(20, activation="sigmoid")(x) #활성함수로 시그모이드 함수 사용
x = layers.Dropout(0.1)(x)

outputs = layers.Dense(7, activation="softmax")(x)  #클래시파이어(분류기)
print(outputs.shape)

model = keras.Model(inputs=text_input, outputs=outputs)
model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=['accuracy'])

t
트랜스포머
트랜스포머
(None, 7)


In [None]:
history = model.fit(
    x_train, y_train, batch_size=16, epochs=5, validation_data=(x_valid, y_valid)
)

Epoch 1/5
t
트랜스포머
트랜스포머
t
트랜스포머
트랜스포머
트랜스포머
트랜스포머
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [None]:
x_test_np = np.array(x_test)

list_col = x_test
list_col_array = np.array(list_col.tolist())
new_df = pd.DataFrame(list_col_array, columns=[f'col{i+1}' for i in range(list_col_array.shape[1])])

In [None]:
new_df #test에 사용할 데이터프레임 생성

Unnamed: 0,col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,...,col41,col42,col43,col44,col45,col46,col47,col48,col49,col50
0,0,0,0,0,0,0,0,0,0,0,...,20,508,1900,28,17,38,1900,58,69,2018
1,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,9,632,564,58,126,5849
2,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,20,4,2771,5,1996
3,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,250,613,50
4,0,0,0,0,0,0,0,0,0,0,...,2857,148,4,871,303,77,296,1213,29,1327
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2609,0,0,0,0,0,0,0,0,0,0,...,59,75,143,12438,29,751,746,29,751,44
2610,0,0,0,0,0,0,0,0,0,0,...,0,0,0,7,572,333,7,4603,494,466
2611,0,0,0,0,0,0,0,0,0,0,...,22,2453,246,75,22,29,2453,29,1999,495
2612,0,0,0,0,0,0,0,0,0,0,...,763,22,2453,26,2432,189,36,2,231,44


In [None]:
x_test_np = np.array(new_df)
x_test_tensor = tf.convert_to_tensor(x_test_np)
x_test_tensor = tf.cast(x_test_tensor, tf.int64) #텐서로 변환 작업 진행

In [None]:
y_pred = model.predict(x_test_tensor)

yp=[]
for i in y_pred:
    yp.append(i.argmax())

t
트랜스포머
트랜스포머


In [None]:
from sklearn import metrics

# Print the precision and recall, among other metrics
print("accuracy: ",metrics.classification_report(y_test.to_list(), yp, output_dict=True, digits=3)['accuracy'])
print("weighted avg: ",metrics.classification_report(y_test.to_list(), yp, output_dict=True, digits=3)['weighted avg'])

accuracy:  0.6966335118592196
weighted avg:  {'precision': 0.701616330759148, 'recall': 0.6966335118592196, 'f1-score': 0.6977080718299516, 'support': 2614}
