## **Machine** **Learning** **MatchSense**

### Load Data

In [None]:
#Import requirements

import tensorflow as tf
from tensorflow.keras.layers import *
from tensorflow.keras.regularizers import l2
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras import backend as K
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json
import os

In [None]:
#load Dataset
import kagglehub
datagede_path = kagglehub.dataset_download('gloriara2/datagede')
similaritytext_path = kagglehub.dataset_download('gloriara2/similaritytext')

# Paths
BASE_PATH = "/kaggle/working/"
TRAIN_PATH = "/kaggle/input/datagede/train_balanced_150k.csv"
VAL_PATH = "/kaggle/input/datagede/val_balanced_10k.csv"
TEST_PATH = "/kaggle/input/similaritytext/test_cut.csv"
GLOVE_PATH = "/kaggle/input/glove6b300dtxt/glove.6B.300d.txt"

# Updated model parameters
max_length = 40
embedding_dim = 300
dropout_rate = 0.3
lstm_units = 128
batch_size = 128
num_epochs = 30

#### Custom TensorFlow Layers

In [None]:
# Custom layers for TensorFlow operations
class AbsDiffLayer(tf.keras.layers.Layer):
    """Custom layer to calculate absolute difference between two tensors."""
    def call(self, inputs):
        x1, x2 = inputs
        return tf.abs(x1 - x2)

class ElementWiseMulLayer(tf.keras.layers.Layer):
    """Custom layer to perform element-wise multiplication of two tensors."""
    def call(self, inputs):
        x1, x2 = inputs
        return tf.multiply(x1, x2)


#### Load and Preprocess Data

In [None]:
def load_and_preprocess_data():
    """Enhanced data loading and preprocessing"""
    print("Loading datasets...")
    df_train = pd.read_csv(TRAIN_PATH)
    df_val = pd.read_csv(VAL_PATH)
    df_test = pd.read_csv(TEST_PATH)

    # Text cleaning
    def clean_text(text):
        if isinstance(text, str):
            text = text.lower().strip()
            text = ' '.join(text.split())
            return text
        return ''

    print("Cleaning text...")
    for df in [df_train, df_val, df_test]:
        df['sentence1'] = df['sentence1'].apply(clean_text)
        df['sentence2'] = df['sentence2'].apply(clean_text)

    # Initialize tokenizer
    print("Tokenizing text...")
    tokenizer = Tokenizer(
        num_words=40000,
        filters='!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n',
        lower=True,
        oov_token='<UNK>'
    )

    all_sentences = np.concatenate([
        df_train['sentence1'].values, df_train['sentence2'].values,
        df_val['sentence1'].values, df_val['sentence2'].values,
        df_test['sentence1'].values, df_test['sentence2'].values
    ])

    tokenizer.fit_on_texts(all_sentences)

    def prepare_data(df):
        seq1 = tokenizer.texts_to_sequences(df['sentence1'].values)
        seq2 = tokenizer.texts_to_sequences(df['sentence2'].values)
        pad1 = pad_sequences(seq1, maxlen=max_length, padding='post', truncating='post')
        pad2 = pad_sequences(seq2, maxlen=max_length, padding='post', truncating='post')
        labels = df['label'].values.astype('float32')
        return pad1, pad2, labels

    print("Preparing data...")
    train_data = prepare_data(df_train)
    val_data = prepare_data(df_val)
    test_data = prepare_data(df_test)

    return train_data, val_data, test_data, tokenizer


#### Load GloVe Embeddings

In [None]:
def load_glove_embeddings():
    """Load GloVe embeddings"""
    embeddings_index = {}
    print(f"Loading GloVe embeddings from {GLOVE_PATH}...")
    with open(GLOVE_PATH, 'r', encoding='utf-8') as file:
        for line in file:
            values = line.split()
            word = values[0]
            coefs = np.asarray(values[1:], dtype='float32')
            embeddings_index[word] = coefs
    print(f"Loaded {len(embeddings_index)} word vectors.")
    return embeddings_index


#### Create Embedding Matrix

In [None]:
def create_embedding_matrix(word_index, embeddings_index):
    """Create embedding matrix"""
    print("Creating embedding matrix...")
    vocab_size = len(word_index) + 1
    embedding_matrix = np.zeros((vocab_size, embedding_dim))
    for word, i in word_index.items():
        embedding_vector = embeddings_index.get(word)
        if embedding_vector is not None:
            embedding_matrix[i] = embedding_vector
    print("Embedding matrix created.")
    return embedding_matrix


#### Create Model Architecture

In [None]:
def create_model(vocab_size, embedding_matrix):
    """Optimized model architecture"""
    # Input layers
    input_1 = Input(shape=(max_length,))
    input_2 = Input(shape=(max_length,))

    # GloVe embedding layer
    embedding = Embedding(
        vocab_size,
        embedding_dim,
        weights=[embedding_matrix],
        trainable=True,
        name='embedding'
    )

    def encoder_block(x):
        # BiLSTM with stronger regularization
        lstm = Bidirectional(LSTM(
            lstm_units,
            return_sequences=True,
            dropout=dropout_rate,
            recurrent_dropout=0.2,
            kernel_regularizer=l2(1e-5),
        ))(x)

        # Layer normalization
        lstm = LayerNormalization(epsilon=1e-6)(lstm)

        # Self-attention
        attention = Dense(1, activation='relu')(lstm)
        attention = Flatten()(attention)
        attention = Activation('softmax')(attention)
        attention = RepeatVector(lstm_units * 2)(attention)
        attention = Permute([2, 1])(attention)

        # Residual connection
        attended = multiply([lstm, attention])
        output = Add()([lstm, attended])

        return output

    def encode_sequence(x):
        x = embedding(x)
        x = encoder_block(x)

        # Multiple pooling with normalization
        avg_pool = GlobalAveragePooling1D()(x)
        max_pool = GlobalMaxPooling1D()(x)

        # Normalize pooled features
        avg_pool = LayerNormalization(epsilon=1e-6)(avg_pool)
        max_pool = LayerNormalization(epsilon=1e-6)(max_pool)

        return concatenate([avg_pool, max_pool])

    # Encode both input sequences
    encoded_1 = encode_sequence(input_1)
    encoded_2 = encode_sequence(input_2)

    # custom layers
    abs_diff = AbsDiffLayer()([encoded_1, encoded_2])
    mul = ElementWiseMulLayer()([encoded_1, encoded_2])

    # Combine features
    merged = concatenate([encoded_1, encoded_2, abs_diff, mul])

    # Dense layers with stronger regularization and normalization
    x = merged
    for units in [512, 256, 128]:
        x = Dense(units, kernel_regularizer=l2(1e-4))(x)
        x = BatchNormalization()(x)
        x = Activation('relu')(x)
        x = Dropout(0.3)(x)

    # Output with increased regularization
    output = Dense(1, activation='sigmoid', kernel_regularizer=l2(1e-4))(x)

    # Create model
    model = tf.keras.Model(inputs=[input_1, input_2], outputs=output)

    # Optimized learning rate schedule
    initial_learning_rate = 1e-3
    decay_steps = 5000
    min_lr = 1e-6

    lr_schedule = tf.keras.optimizers.schedules.PolynomialDecay(
        initial_learning_rate,
        decay_steps,
        end_learning_rate=min_lr,
        power=1.0
    )

    # Optimizer with increased weight decay
    optimizer = tf.keras.optimizers.AdamW(
        learning_rate=lr_schedule,
        weight_decay=1e-4,
        clipnorm=1.0
    )

    model.compile(
        loss='binary_crossentropy',
        optimizer=optimizer,
        metrics=['accuracy']
    )

    return model

#### Main Function

In [None]:
def main():
    embeddings_index = load_glove_embeddings()
    train_data, val_data, test_data, tokenizer = load_and_preprocess_data()
    vocab_size = len(tokenizer.word_index) + 1
    embedding_matrix = create_embedding_matrix(tokenizer.word_index, embeddings_index)
    model = create_model(vocab_size, embedding_matrix)

    callbacks = [
        tf.keras.callbacks.EarlyStopping(
            monitor='val_accuracy',
            patience=10,
            restore_best_weights=True
        ),
        tf.keras.callbacks.ModelCheckpoint(
            os.path.join(BASE_PATH, 'best_model.keras'),
            monitor='val_accuracy',
            save_best_only=True
        )

    ]

    history = model.fit(
        [train_data[0], train_data[1]],
        train_data[2],
        validation_data=([val_data[0], val_data[1]], val_data[2]),
        epochs=num_epochs,
        batch_size=batch_size,
        callbacks=callbacks,
        verbose=1
    )

    model.save(os.path.join(BASE_PATH, 'final_model.h5'))
    with open(os.path.join(BASE_PATH, 'tokenizer.json'), 'w') as f:
        f.write(json.dumps(tokenizer.to_json()))

    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Training')
    plt.plot(history.history['val_accuracy'], label='Validation')
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Training')
    plt.plot(history.history['val_loss'], label='Validation')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.tight_layout()
    plt.savefig(os.path.join(BASE_PATH, 'training_history.png'))

    return model, history, tokenizer


if __name__ == "__main__":
    model, history, tokenizer = main()


### Predict

In [None]:
def predict_test_data(model, test_path):
    """Function to predict and display test data results."""
    print("Predicting test data...")

    # Load test data
    print("Loading test data...")
    df_test = pd.read_csv(test_path)

    def clean_text(text):
        if isinstance(text, str):
            text = text.lower().strip()
            text = ' '.join(text.split())
            return text
        return ''

    # Clean text data
    print("Cleaning test data...")
    df_test['sentence1'] = df_test['sentence1'].apply(clean_text)
    df_test['sentence2'] = df_test['sentence2'].apply(clean_text)

    # Tokenize text data
    print("Encoding test data...")
    seq1 = tokenizer.texts_to_sequences(df_test['sentence1'].values)
    seq2 = tokenizer.texts_to_sequences(df_test['sentence2'].values)
    pad1 = pad_sequences(seq1, maxlen=max_length, padding='post', truncating='post')
    pad2 = pad_sequences(seq2, maxlen=max_length, padding='post', truncating='post')

    # Predict similarity scores
    print("Predicting...")
    predictions = model.predict([pad1, pad2], batch_size=batch_size, verbose=1)
    df_test['similarity_score'] = predictions
    df_test['predicted_label'] = (predictions > 0.5).astype(int)

    # Save predictions to file
    output_path = os.path.join(BASE_PATH, 'test_predictions.csv')
    df_test.to_csv(output_path, index=False)
    print(f"Predictions saved to {output_path}!")

    return df_test


In [None]:
# Panggil fungsi prediksi
test_results = predict_test_data(model, TEST_PATH)

# Tampilkan beberapa baris hasil prediksi
print("Test data predictions completed!")
print(test_results.head())
