In [4]:
import numpy as np
import pandas as pd
from sentence_transformers import SentenceTransformer

def transform_sequences(df: pd.DataFrame, max_sequence_length: int = 10) -> tuple:
    """Transform data into padded sequences for attention layer.
    
    Args:
        df: Input DataFrame
        max_sequence_length: Maximum length of sequence (will pad/truncate to this length)
        
    Returns:
        tuple: (X_sequences, y_labels) where:
            - X_sequences is a list of lists of embedded texts
            - y_labels is corresponding labels for each sequence
    """
    sequences = []  # Will store lists of texts
    labels = []     # Will store corresponding labels
    
    current_sequence = []
    
    for index in df.index.tolist():
        text = str(df.loc[index, 'text'])
        
        if text.startswith('#'):
            if current_sequence:
                # Add the sequence and its labels
                sequences.append(current_sequence)
                labels.append(df.loc[index, ['Joy', 'Trust', 'Anticipation', 'Surprise', 
                                           'Fear', 'Sadness', 'Disgust', 'Anger',
                                           'Positive', 'Negative', 'Neutral']].values)
                current_sequence = []
        else:
            current_sequence.append(text)
    
    # Handle any remaining sequence
    if current_sequence:
        sequences.append(current_sequence)
        # Use neutral labels for sequences without hash
        labels.append(np.zeros(11))
    
    # Convert labels to numpy array
    labels = np.array(labels)
    
    # Pad sequences to max_sequence_length
    padded_sequences = []
    for seq in sequences:
        # Truncate if longer than max_sequence_length
        if len(seq) > max_sequence_length:
            seq = seq[:max_sequence_length]
        # Pad if shorter than max_sequence_length
        elif len(seq) < max_sequence_length:
            seq = seq + [''] * (max_sequence_length - len(seq))
        padded_sequences.append(seq)
    
    # Embed each text in the sequences
    model = SentenceTransformer('sdadas/st-polish-paraphrase-from-distilroberta')
    embedded_sequences = []
    
    for sequence in padded_sequences:
        # Embed each text in sequence
        sequence_embeddings = []
        for text in sequence:
            if text:  # If not empty padding
                embedding = model.encode(text)
            else:  # For padding, use zero vector
                embedding = np.zeros(768)  # 768 is the embedding dimension
            sequence_embeddings.append(embedding)
        embedded_sequences.append(sequence_embeddings)
    
    # Convert to numpy array
    X_sequences = np.array(embedded_sequences)
    y_labels = np.array(labels)
    
    return X_sequences, y_labels

# Usage example:
df = pd.read_csv('../../data/raw/train.csv')
X_sequences, y_labels = transform_sequences(df)

print("X shape:", X_sequences.shape)  # Should be (num_sequences, max_sequence_length, embedding_dim)
print("y shape:", y_labels.shape)     # Should be (num_sequences, num_labels)

X shape: (776, 10, 768)
y shape: (776, 11)


In [9]:
from keras.layers import Input, Dense, MultiHeadAttention, LayerNormalization, Dropout, GlobalAveragePooling1D
import keras
import tensorflow as tf

def create_attention_model(max_sequence_length, embedding_dim, num_labels):
    # Input shape: (batch_size, sequence_length, embedding_dim)
    inputs = Input(shape=(max_sequence_length, embedding_dim))
    
    # Multi-head attention layer
    attention = MultiHeadAttention(num_heads=8, key_dim=64)(inputs, inputs)
    attention = Dropout(0.1)(attention)
    attention = LayerNormalization(epsilon=1e-6)(inputs + attention)
    
    # Dense layers processing each timestep independently
    x = Dense(256, activation='relu')(attention)
    x = Dropout(0.1)(x)
    x = LayerNormalization(epsilon=1e-6)(x)
    
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.1)(x)
    x = LayerNormalization(epsilon=1e-6)(x)
    
    # Global average pooling to combine sequence
    x = GlobalAveragePooling1D()(x)
    
    # Output layer
    outputs = Dense(num_labels, activation='sigmoid')(x)
    
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model

# Create and compile model
model = create_attention_model(
    max_sequence_length=10,
    embedding_dim=768,
    num_labels=11
)

model.compile(
    optimizer='adam',
    loss='binary_crossentropy',
    metrics=['accuracy']
)

In [10]:
# Get the sequences and labels for training
train_df = pd.read_csv('../../data/raw/train.csv')
val_df = pd.read_csv('../../data/raw/val.csv')
test_df = pd.read_csv('../../data/raw/test.csv')

# Transform data into sequences
X_train, y_train = transform_sequences(train_df)
X_val, y_val = transform_sequences(val_df)
X_test, y_test = transform_sequences(test_df)

In [12]:
from keras.metrics import Precision, Recall
from keras.callbacks import EarlyStopping, ReduceLROnPlateau

X_train = X_train.astype('float32')
X_val = X_val.astype('float32')
X_test = X_test.astype('float32')

y_train = y_train.astype('float32')
y_val = y_val.astype('float32')
y_test = y_test.astype('float32')

print("X_train shape:", X_train.shape)
print("y_train shape:", y_train.shape)

# Create and compile the attention model
model = create_attention_model(
    max_sequence_length=10,
    embedding_dim=768,
    num_labels=11
)

model.compile(
    optimizer='adam',
    loss='binary_crossentropy',
    metrics=['accuracy', Precision(name='precision'), Recall(name='recall')]
)

# Add callbacks
early_stop = EarlyStopping(
    monitor='val_loss', 
    patience=5,
    restore_best_weights=True,
    verbose=1
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=3,
    min_lr=1e-6,
    verbose=1
)

# Train the model
history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=20,
    batch_size=16,
    callbacks=[early_stop, reduce_lr]
)



X_train shape: (776, 10, 768)
y_train shape: (776, 11)
Epoch 1/20


2025-05-28 11:08:20.383685: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:117] Plugin optimizer for device_type GPU is enabled.


[1m49/49[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m13s[0m 90ms/step - accuracy: 0.2181 - loss: 0.4636 - precision: 0.6402 - recall: 0.6496 - val_accuracy: 0.2874 - val_loss: 0.2807 - val_precision: 0.8488 - val_recall: 0.7768 - learning_rate: 0.0010
Epoch 2/20
[1m49/49[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 49ms/step - accuracy: 0.3223 - loss: 0.2794 - precision: 0.8366 - recall: 0.7862 - val_accuracy: 0.2395 - val_loss: 0.3058 - val_precision: 0.8665 - val_recall: 0.7526 - learning_rate: 0.0010
Epoch 3/20
[1m49/49[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 46ms/step - accuracy: 0.1868 - loss: 0.2751 - precision: 0.8482 - recall: 0.7781 - val_accuracy: 0.4012 - val_loss: 0.2990 - val_precision: 0.8387 - val_recall: 0.8183 - learning_rate: 0.0010
Epoch 4/20
[1m49/49[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 42ms/step - accuracy: 0.3878 - loss: 0.2430 - precision: 0.8730 - recall: 0.8258
Epoch 4: ReduceLROnPlateau reducing learning rate to 

In [25]:
# Evaluate on test set
texts_metrics = evaluate_texts(model, X_test=X_test, y_test=y_test)
sentences_metrics = evaluate_sentences_v2(model, X_test=X_test, y_test=y_test)
final_score = calculate_final_score(texts_metrics, sentences_metrics)

print("Final score:", final_score)

[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 34ms/step
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 13ms/step
Final score: 0.3731994532702938


In [20]:
import numpy as np
from typing import Dict

def evaluate_texts(model, X_test: pd.DataFrame, y_test: pd.DataFrame, threshold: float=0.5) -> Dict:
    y_pred = model.predict(X_test)
    y_pred_binary = (y_pred > threshold).astype(int)
    
    # y_test = y_test.values
    
    labels = ['Joy', 'Trust', 'Anticipation', 'Surprise', 'Fear', 'Sadness', 'Disgust', 'Anger', 'Positive', 'Negative', 'Neutral']
    metrics = {}
    
    metrics['F1-score macro texts'] = 0
    for label in labels:
        metrics[f"Precision {label}"] = 0
        metrics[f"Recall {label}"] = 0
        metrics[f"F1-score {label}"] = 0
        metrics[f"TP {label}"] = 0
        metrics[f"FP {label}"] = 0
        metrics[f"TN {label}"] = 0
        metrics[f"FN {label}"] = 0
    
    for i, label in enumerate(labels):
        metrics[f"TP {label}"] = int(np.sum((y_pred_binary[:, i] == 1) & (y_test[:, i] == 1)))
        metrics[f"FP {label}"] = int(np.sum((y_pred_binary[:, i] == 1) & (y_test[:, i] == 0)))
        metrics[f"TN {label}"] = int(np.sum((y_pred_binary[:, i] == 0) & (y_test[:, i] == 0)))
        metrics[f"FN {label}"] = int(np.sum((y_pred_binary[:, i] == 0) & (y_test[:, i] == 1)))
        
    for label in labels:
        metrics[f"Precision {label}"] = metrics[f"TP {label}"] / (metrics[f"TP {label}"] + metrics[f"FP {label}"] + 1e-8)
        metrics[f"Recall {label}"] = metrics[f"TP {label}"] / (metrics[f"TP {label}"] + metrics[f"FN {label}"] + 1e-8)
        metrics[f"F1-score {label}"] = 2 * (metrics[f"Precision {label}"] * metrics[f"Recall {label}"]) / (metrics[f"Precision {label}"] + metrics[f"Recall {label}"] + 1e-8)

    metrics['F1-score macro texts'] = sum(metrics[f"F1-score {label}"] for label in labels) / len(labels)
    
    metrics = {k: v for (k, v) in metrics.items() if ('Precision' in k) or ('Recall' in k) or ('F1' in k)}

    return metrics

In [21]:
def evaluate_sentences_v1(model, X_test: pd.DataFrame, y_test: pd.DataFrame, threshold: float=0.5) -> Dict:
    y_pred = model.predict(X_test)
    y_pred_binary = (y_pred > threshold).astype(int)
    
    # y_test = y_test.values
    
    labels = ['Joy', 'Trust', 'Anticipation', 'Surprise', 'Fear', 'Sadness', 'Disgust', 'Anger', 'Positive', 'Negative', 'Neutral']
    metrics = {}
    
    metrics['F1-score macro sentences'] = 0
    for label in labels:
        metrics[f"Precision {label}"] = 0
        metrics[f"Recall {label}"] = 0
        metrics[f"F1-score {label}"] = 0
        metrics[f"TP {label}"] = 0
        metrics[f"FP {label}"] = 0
        metrics[f"TN {label}"] = 0
        metrics[f"FN {label}"] = 0
    
    for i, label in enumerate(labels):
        metrics[f"TP {label}"] = int(np.sum((y_pred_binary[:, i] == 1) & (y_test[:, i] == 1)))
        metrics[f"FP {label}"] = int(np.sum((y_pred_binary[:, i] == 1) & (y_test[:, i] == 0)))
        metrics[f"TN {label}"] = int(np.sum((y_pred_binary[:, i] == 0) & (y_test[:, i] == 0)))
        metrics[f"FN {label}"] = int(np.sum((y_pred_binary[:, i] == 0) & (y_test[:, i] == 1)))
        
    for label in labels:
        metrics[f"Precision {label}"] = metrics[f"TP {label}"] / (metrics[f"TP {label}"] + metrics[f"FP {label}"] + 1e-8)
        metrics[f"Recall {label}"] = metrics[f"TP {label}"] / (metrics[f"TP {label}"] + metrics[f"FN {label}"] + 1e-8)
        metrics[f"F1-score {label}"] = 2 * (metrics[f"Precision {label}"] * metrics[f"Recall {label}"]) / (metrics[f"Precision {label}"] + metrics[f"Recall {label}"] + 1e-8)

    metrics['F1-score macro sentences'] = sum(metrics[f"F1-score {label}"] for label in labels) / len(labels)

    metrics = {k: v for (k, v) in metrics.items() if ('Precision' in k) or ('Recall' in k) or ('F1' in k)}

    return metrics


In [17]:
from typing import List
def get_hash_indeces(df: pd.DataFrame) -> List:
    hash_indices = []
    for index in df.index.tolist():
        if str(df.loc[index, 'text']).startswith('#'):
            hash_indices.append(index)
          
    result_hash_indices = []   
    i = 0;
    for index in hash_indices:
        index = index - i
        result_hash_indices.append(index)
        i += 1
    
    return result_hash_indices

In [22]:
def evaluate_sentences_v2(model: keras.Model, X_test: pd.DataFrame, y_test: pd.DataFrame, threshold: float=1): # -> Dict:
    raw_data = encode_labels(pd.read_csv('../../data/raw/test.csv'))
    hash_indices = get_hash_indeces(raw_data)
    
    y_pred = model.predict(X_test)
    # y_test = y_test.values
    
    labels = ['Joy', 'Trust', 'Anticipation', 'Surprise', 'Fear', 'Sadness', 'Disgust', 'Anger', 'Positive', 'Negative', 'Neutral']
    metrics = {}
    
    metrics['F1-score macro sentences'] = 0
    for label in labels:
        metrics[f"Precision {label}"] = 0
        metrics[f"Recall {label}"] = 0
        metrics[f"F1-score {label}"] = 0
        metrics[f"TP {label}"] = 0
        metrics[f"FP {label}"] = 0
        metrics[f"TN {label}"] = 0
        metrics[f"FN {label}"] = 0
    
    y_true_segments = []
    y_pred_segments = []
    
    i = 0
    start = 0
    for index in hash_indices:
        pred_sum = np.zeros(11, dtype=np.float64)
        for y_pred_i in y_pred[start:index]:
            pred_sum += y_pred_i
            
        y_true_segments.append(raw_data.iloc[index+0, 1:].to_numpy())
        y_pred_segments.append((pred_sum >= threshold).astype(int))
        
        # y_true_segments.append(raw_data.iloc[index+0, 1:].to_numpy())
        # y_pred_segments.append((sum >= threshold).astype(int))
            
        i += 1
        start = index

    y_true_segments = np.array(y_true_segments)
    y_pred_segments = np.array(y_pred_segments)

    for i, label in enumerate(labels):
        metrics[f"TP {label}"] = int(np.sum((y_pred_segments[:, i] == 1) & (y_true_segments[:, i] == 1)))
        metrics[f"FP {label}"] = int(np.sum((y_pred_segments[:, i] == 1) & (y_true_segments[:, i] == 0)))
        metrics[f"TN {label}"] = int(np.sum((y_pred_segments[:, i] == 0) & (y_true_segments[:, i] == 0)))
        metrics[f"FN {label}"] = int(np.sum((y_pred_segments[:, i] == 0) & (y_true_segments[:, i] == 1)))
        
    for label in labels:
        metrics[f"Precision {label}"] = metrics[f"TP {label}"] / (metrics[f"TP {label}"] + metrics[f"FP {label}"] + 1e-8)
        metrics[f"Recall {label}"] = metrics[f"TP {label}"] / (metrics[f"TP {label}"] + metrics[f"FN {label}"] + 1e-8)
        metrics[f"F1-score {label}"] = 2 * (metrics[f"Precision {label}"] * metrics[f"Recall {label}"]) / (metrics[f"Precision {label}"] + metrics[f"Recall {label}"] + 1e-8)

    metrics['F1-score macro sentences'] = sum(metrics[f"F1-score {label}"] for label in labels) / len(labels)

    metrics = {k: v for (k, v) in metrics.items() if ('Precision' in k) or ('Recall' in k) or ('F1' in k)}

    return metrics

In [24]:
def calculate_final_score(text_metrics: Dict, sentences_metrics: Dict) -> float:
    return (text_metrics['F1-score macro texts'] + sentences_metrics['F1-score macro sentences']) / 2