In [None]:
from datasets import load_dataset
from transformers import BertTokenizer, BertModel, AdamW
from keras.models import Sequential
from keras.layers import Dense, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers.legacy import Adam
from tensorflow.keras.layers import Input,BatchNormalization
import numpy as np
import torch
from torch.nn.functional import cosine_similarity

# Load the dataset
dataset = load_dataset("PiC/phrase_similarity")
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert_model = BertModel.from_pretrained('bert-base-uncased', return_dict=True)

In [None]:

def get_embedding(phrase, sentence):
    # Tokenize and encode the sentence
    inputs = tokenizer.encode_plus(sentence, return_tensors='pt', add_special_tokens=True)
    input_ids = inputs['input_ids'][0]

    # Tokenize the phrase and find the corresponding token ids
    phrase_tokens = tokenizer.tokenize(phrase)
    phrase_ids = tokenizer.convert_tokens_to_ids(phrase_tokens)

    start_index = None

    # Locating phrase tokens
    phrase_len = len(phrase_ids)
    for i in range(len(input_ids) - phrase_len + 1):
        if input_ids[i:i + phrase_len].tolist() == phrase_ids:
            start_index = i
            break

    # Edge case when start index is not found
    if start_index is None:
        return torch.zeros(1, bert_model.config.hidden_size)

    # Getting the BERT embeddings
    outputs = bert_model(**inputs)
    embeddings = outputs.last_hidden_state

    # Averaging the embeddings for the phrase tokens
    phrase_embedding = embeddings[0, start_index:start_index + phrase_len, :].mean(dim=0)
    return phrase_embedding.unsqueeze(0)


def prepare_data(data_split):
    X, Y = [], []
    for item in data_split:
        emb1 = get_embedding(item['phrase1'], item['sentence1'])
        emb2 = get_embedding(item['phrase2'], item['sentence2'])

        # Calculate cosine similarity
        cos_sim = cosine_similarity(emb1, emb2).reshape(-1, 1)

        X.append(cos_sim.detach().numpy())
        Y.append(item['label'])
        print(item['idx'])

    return np.array(X), np.array(Y)



In [None]:
# Prepare the data
train_data = dataset['train'].shard(num_shards=1, index=0)
val_data = dataset['validation'].shard(num_shards=1, index=0)
test_data = dataset['test'].shard(num_shards=1, index=0)


X_train, y_train = prepare_data(train_data)
X_val, y_val = prepare_data(val_data)
X_test, y_test = prepare_data(test_data)

In [None]:
def build_model(input_shape):
    input_layer = Input(shape=input_shape)

    x = Dense(256, activation='relu')(input_layer)
    x = BatchNormalization()(x)
    x = Dropout(0.3)(x)

    x = Dense(128, activation='relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.3)(x)

    x = Dense(64, activation='relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.3)(x)

    # Output layer with a single unit for binary classification
    output_layer = Dense(1, activation='sigmoid')(x)

    # Create and compile the model
    model = Model(inputs=input_layer, outputs=output_layer)
    model.compile(optimizer=Adam(learning_rate=0.001), loss='binary_crossentropy', metrics=['accuracy'])
    return model


model = build_model(1)

model.fit(X_train, y_train, epochs=100, batch_size=32, validation_data=(X_val, y_val))
loss, accuracy = model.evaluate(X_test, y_test)
print(f'Test Accuracy: {accuracy}')
