In [None]:
import torch
from transformers import AutoTokenizer, AutoModel
from annoy import AnnoyIndex

# Define the hyperparameters
num_epochs = 10
learning_rate = 0.001
embedding_size = 768
num_trees = 10
num_nearest_neighbors = 10

# Load the data
with open('data.txt', 'r') as f:
    data = [line.strip().split('\t') for line in f.readlines()]

queries = [d[0] for d in data]
documents = [d[1] for d in data]
urls = [d[2] for d in data]

# Load the tokenizer and model
tokenizer = AutoTokenizer.from_pretrained('ProsusAI/finbert')
model = AutoModel.from_pretrained('ProsusAI/finbert')

# Define the query tower
class QueryTower(torch.nn.Module):
    def __init__(self):
        super(QueryTower, self).__init__()
        self.model = model

    def forward(self, input_ids, attention_mask):
        outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)
        pooler_output = outputs.pooler_output
        return pooler_output

# Define the document tower
class DocumentTower(torch.nn.Module):
    def __init__(self):
        super(DocumentTower, self).__init__()
        self.model = model

    def forward(self, input_ids, attention_mask):
        outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)
        pooler_output = outputs.pooler_output
        return pooler_output

# Define the loss function
loss_fn = torch.nn.CosineEmbeddingLoss()

# Define the optimizer for the query tower
optimizer_query = torch.optim.Adam(QueryTower().parameters(), lr=learning_rate)

# Train the query tower
query_tower = QueryTower()
for epoch in range(num_epochs):
    for i in range(len(queries)):
        # Encode the query
        query = tokenizer(queries[i], padding=True, truncation=True, return_tensors='pt')
        # Forward pass
        query_embedding = query_tower(query['input_ids'], query['attention_mask'])
        # Compute the loss
        loss = loss_fn(query_embedding, torch.ones_like(query_embedding), torch.ones_like(query_embedding))
        # Backward pass
        optimizer_query.zero_grad()
        loss.backward()
        optimizer_query.step()
    print(f'Epoch {epoch + 1}/{num_epochs} - Loss: {loss.item():.4f}')

# Define the optimizer for the document tower
optimizer_document = torch.optim.Adam(DocumentTower().parameters(), lr=learning_rate)

# Train the document tower
document_tower = DocumentTower()
for epoch in range(num_epochs):
    for i in range(len(documents)):
        # Encode the document
        document = tokenizer(documents[i], padding=True, truncation=True, return_tensors='pt')
        # Forward pass
        document_embedding = document_tower(document['input_ids'], document['attention_mask'])
        # Compute the loss
        loss = loss_fn(document_embedding, torch.zeros_like(document_embedding), torch.zeros_like(document_embedding))
        # Backward pass
        optimizer_document.zero_grad()
        loss.backward()
        optimizer_document.step()
    print(f'Epoch {epoch + 1}/{num_epochs} - Loss: {loss.item():.4f}')

 

In [None]:
for i, key in enumerate(encoded_data.keys()):
    index.add_item(i, encoded_data[key])

# Build the Annoy index
index.build(num_trees)

# Save the Annoy index to disk
index.save(index_file_path)

# Query the Annoy index for the nearest neighbors
for i, query in enumerate(queries):
    # Encode the query
    encoded_query = encode_text(query)
    # Get the nearest neighbors
    nearest_neighbors = index.get_nns_by_vector(encoded_query, num_nearest_neighbors)
    # Print the results
    print(f'Query {i}: {query}')
    for neighbor in nearest_neighbors:
        key = list(encoded_data.keys())[neighbor]
        print(f'  Neighbor: {key}')
        print(f'  Distance: {index.get_distance(i, neighbor)}')

In [None]:
def semantic_search(query, query_tower, document_tower, tokenizer, annoy_index, num_nearest_neighbors):
    # Encode the query
    query = tokenizer(query, padding=True, truncation=True, return_tensors='pt')
    # Forward pass through the query tower
    query_embedding = query_tower(query['input_ids'], query['attention_mask']).detach().numpy().squeeze()
    # Find the nearest neighbors
    nearest_neighbors = annoy_index.get_nns_by_vector(query_embedding, num_nearest_neighbors)
    # Get the corresponding documents and URLs
    documents = [data[i][1] for i in nearest_neighbors]
    urls = [data[i][2] for i in nearest_neighbors]
    return documents, urls

In [None]:
import torch.nn as nn
from transformers import BertModel

# Load the FinBERT model
finbert_model = BertModel.from_pretrained('ProsusAI/finbert')

# Define the Two-Tower model architecture
class TwoTowerModel(nn.Module):
    def __init__(self):
        super(TwoTowerModel, self).__init__()
        self.query_encoder = finbert_model
        self.doc_encoder = finbert_model
        self.dropout = nn.Dropout(0.1)
        self.fc1 = nn.Linear(1536, 256)
        self.fc2 = nn.Linear(256, 2)
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=1)
        
    def forward(self, query_inputs, doc_inputs):
        query_outputs = self.query_encoder(input_ids=query_inputs['input_ids'], attention_mask=query_inputs['attention_mask'])[1]
        doc_outputs = self.doc_encoder(input_ids=doc_inputs['input_ids'], attention_mask=doc_inputs['attention_mask'])[1]
        merged_outputs = torch.cat([query_outputs, doc_outputs], dim=-1)
        merged_outputs = self.dropout(merged_outputs)
        merged_outputs = self.fc1(merged_outputs)
        merged_outputs = self.relu(merged_outputs)
        merged_outputs = self.fc2(merged_outputs)
        merged_outputs = self.softmax(merged_outputs)
        return merged_outputs

# Initialize the Two-Tower model and move it to the device (e.g., GPU)
two_tower_model = TwoTowerModel().to(device)

In [None]:
query_texts = [...]  
doc_texts = [...]  # list of strings containing the document text
labels = [...]  # list of integers containing the labels (0 or 1)

In [None]:
query_encodings = tokenizer(query_texts, truncation=True, padding=True, max_length=512, return_tensors='pt')
doc_encodings = tokenizer(doc_texts, truncation=True, padding=True, max_length=512, return_tensors='pt')
labels = torch.tensor(labels)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(two_tower_model.parameters(), lr=2e-5)

# Train the Two-Tower model on the training set
num_epochs = 10
for epoch in range(num_epochs):
    running_loss = 0.0
    for i in range(len(query_encodings)):
        query_inputs = {key: val[i].unsqueeze(0).to(device) for key, val in query_encodings.items()}
        doc_inputs = {key: val[i].unsqueeze(0).to(device) for key, val in doc_encodings.items()}
        label = labels[i].unsqueeze(0).to(device)

        optimizer.zero_grad()

        outputs = two_tower_model(query_inputs, doc_inputs)
        loss = criterion(outputs, label)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    epoch_loss = running_loss / len(query_encodings)
    print('Epoch %d loss: %.4f' % (epoch + 1, epoch_loss))

In [None]:
# Evaluate the Two-Tower model on the validation set
with torch.no_grad():
    correct = 0
    total = 0
    for i in range(len(val_query_encodings)):
        query_inputs = {key: val[i].unsqueeze(0).to(device) for key, val in val_query_encodings.items()}
        doc_inputs = {key: val[i].unsqueeze(0).to(device) for key, val in val_doc_encodings.items()}
        label = val_labels[i].unsqueeze(0).to(device)

        outputs = two_tower_model(query_inputs, doc_inputs)
        _, predicted = torch.max(outputs.data, 1)

        total += 1
        correct += (predicted == label).sum().item()

    accuracy = 100 * correct / total
    print('Validation accuracy: %.2f%%' % accuracy)

# Fine-tune the Two-Tower model on the validation set
num_epochs = 5
for epoch in range(num_epochs):
    running_loss = 0.0
    for i in range(len(val_query_encodings)):
        query_inputs = {key: val[i].unsqueeze(0).to(device) for key, val in val_query_encodings.items()}
        doc_inputs = {key: val[i].unsqueeze(0).to(device) for key, val in val_doc_encodings.items()}
        label = val_labels[i].unsqueeze(0).to(device)

        optimizer.zero_grad()

        outputs = two_tower_model(query_inputs, doc_inputs)
        loss = criterion(outputs, label)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    epoch_loss = running_loss / len(val_query_encodings)
    print('Epoch %d loss: %.4f' % (epoch + 1, epoch_loss))

In [None]:
doc_embeddings = []
for i in range(len(doc_encodings)):
    doc_inputs = {key: val[i].unsqueeze(0).to(device) for key, val in doc_encodings.items()}
    doc_embeddings.append(two_tower_model.embedding_model(**doc_inputs)[0][:, 0, :].squeeze().detach().numpy())

# Build the Annoy index for efficient nearest neighbor search
num_trees = 10
embedding_size = 128
annoy_index = AnnoyIndex(embedding_size, metric='angular')
for i in range(len(doc_embeddings)):
    annoy_index.add_item(i, doc_embeddings[i])
annoy_index.build(num_trees)

# Perform nearest neighbor search for each test query
k = 10  # number of nearest neighbors to retrieve
for i in range(len(test_query_encodings)):
    query_inputs = {key: val[i].unsqueeze(0).to(device) for key, val in test_query_encodings.items()}
    query_embedding = two_tower_model.dense_layer(two_tower_model.embedding_model(**query_inputs)[0][:, 0, :]).squeeze().detach().numpy()
    nearest_neighbors = annoy_index.get_nns_by_vector(query_embedding, k)
    print('Query:', test_query_texts[i])
    for neighbor in nearest_neighbors:
        print('Document:', doc_texts[neighbor])