In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
!pip install sentence_transformers

In [None]:
from sentence_transformers import SentenceTransformer
sentences = ["This is an example sentence", "Each sentence is converted"]

model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
embeddings = model.encode(sentences)
print(embeddings.shape)

In [None]:
import json

# Load JSON data from a file
with open('/kaggle/input/anlp-project-contract-nli/train.json', 'r') as file:
    data = json.load(file)


In [None]:
import time

start_time = time.time()

labels = data['labels']
label_texts = {}
for key, item in labels.items():
    label_texts[key] = item['hypothesis']

label_embedding = {}
for key, item in label_texts.items():
    label_embedding[key] = model.encode(item)

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")

In [None]:
labels=data['labels']
label_texts={}
for key, item in labels.items():
    label_texts[key]=item['hypothesis']
label_embedding={}
for key, item in label_texts.items():
    label_embedding[key]=model.encode(item)
# print(label_embedding)

In [None]:
evidence_spans_per_document = {}
for document in data['documents']:
    hypothesis_to_span = {}
    annotations = document['annotation_sets'][0]['annotations']

    for key, annotation in annotations.items():
        # print(f"Key: {key}, Annotation: {annotation}")
        hypothesis_to_span[key] = annotation['spans']
    evidence_spans_per_document[document['id']] = hypothesis_to_span
# print(evidence_spans_per_document)

In [None]:
dataset={}
for document in data['documents']:
    for index,span in enumerate(document['spans']):
        dataset[document['text'][span[0]:span[1]]]='null'

In [None]:
for document in data['documents']:
    for index,span in enumerate(document['spans']):
        for key, spans in evidence_spans_per_document[document['id']].items():
            if index in spans:
                dataset[document['text'][span[0]:span[1]]]=key


In [None]:
print(len(dataset))

In [None]:
labels = data['labels']
label_texts = {}
for key, item in labels.items():
    label_texts[key] = item['hypothesis']

# print(label_texts)

In [None]:
label_embedding = {}
for key, item in label_texts.items():
    label_embedding[key] = model.encode(item)

In [None]:
label_keys = list(data['labels'].keys())

In [None]:
final_dataset = []
for key, item in dataset.items():
    for label_key in label_keys:
        if item == label_key:
            final_dataset.append((key, data['labels'][label_key]['hypothesis'], 1))
        else:
            final_dataset.append((key, data['labels'][label_key]['hypothesis'], 0))

In [None]:
print(len(final_dataset))

In [None]:
class SentenceDataset(Dataset):
    def __init__(self, final_dataset):
        self.data = final_dataset
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        sentence, hypothesis, label = self.data[idx]
        return sentence, hypothesis, torch.tensor(label, dtype=torch.float)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from sentence_transformers import SentenceTransformer


class SimilarityClassifier(nn.Module):
    def __init__(self, model_name='sentence-transformers/all-MiniLM-L12-v2'):
        super().__init__()
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.encoder = SentenceTransformer(model_name)
        
        # Unfreeze encoder parameters
        for param in self.encoder.parameters():
            param.requires_grad = True
        
        self.encoder.to(self.device)
        
        self.embedding_dim = self.encoder.get_sentence_embedding_dimension()
        
        # Neural network taking concatenated embeddings
        self.classifier = nn.Sequential(
            nn.Linear(self.embedding_dim * 2, 512),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
            nn.Sigmoid()
        ).to(self.device)
        # self.threshold = nn.Parameter(torch.tensor([init_threshold], device=self.device))

    def encode_text(self, sentences):
        # Enable gradients for encoding
        embeddings = self.encoder.encode(sentences, convert_to_tensor=True,show_progress_bar = False)
        return embeddings.to(self.device)

    def forward(self, sentences, hypothesis):
        # Encode input sentences
        if isinstance(sentences, tuple):
            sentences = list(sentences)
        
        if isinstance(hypothesis,tuple):
            hypothesis=list(hypothesis)
        
        
        sentence_embeddings = self.encode_text(sentences)
        hypothesis_embedding = self.encode_text(hypothesis)
        
        # Concatenate embeddings
        combined = torch.cat((sentence_embeddings, hypothesis_embedding), dim=1)
        
        # Get predictions from neural network
        predictions = self.classifier(combined)
        return predictions.squeeze(), None



In [None]:
# Training setup with different learning rates
def get_optimizer(model):
    encoder_params = model.encoder.parameters()
    classifier_params = model.classifier.parameters()
    
    return torch.optim.AdamW([
        {'params': encoder_params, 'lr': 1e-5},  # Lower learning rate for encoder
        {'params': classifier_params, 'lr': 1e-3} # Higher learning rate for classifier
    ])

In [None]:

# Training function
from tqdm.auto import tqdm
def train_model(model, final_dataset, num_epochs=10, batch_size=64, lr=1e-3):
    # Create dataset and dataloader
    dataset = SentenceDataset(final_dataset)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    
    # Setup training
#     optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = torch.nn.BCELoss()
    
    model = model.to(device)
    optimizer = get_optimizer(model)
    # Training loop
    for epoch in tqdm(range(num_epochs)):
        model.train()
        total_loss = 0
        progress_bar = tqdm(dataloader, desc=f'Epoch {epoch+1}/{num_epochs}')
        
        for batch_sentences, batch_hypotheses, batch_labels in tqdm(dataloader):
            optimizer.zero_grad()
            
            batch_labels = batch_labels.to(device)
            # Forward pass
            predictions, _ = model(batch_sentences, batch_hypotheses)
            loss = criterion(predictions, batch_labels)
            
            # Backward pass
#             loss.requires_grad = True
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
#             progress_bar.set_postfix({'loss': f'{loss.item():.4f}'})
        
        avg_loss = total_loss / len(dataloader)
        print(f'Epoch {epoch+1} - Average Loss: {avg_loss:.4f}')
#         print(f'Current threshold: {torch.sigmoid(torch.tensor(model.threshold.item())):.4f}')

# Usage
model = SimilarityClassifier()
train_model(model, final_dataset)
torch.save(model.state_dict(), "model.pt")


In [None]:
model = SimilarityClassifier()
model.load_state_dict(torch.load("/kaggle/input/anlpproject/pytorch/default/1/model.pt"))

In [None]:
import json

# Load JSON data from a file
with open('/kaggle/input/anlp-project-contract-nli/test.json', 'r') as file:
    test_data = json.load(file)
    
    
    
    
    

In [None]:
evidence_spans_per_document = {}
for document in test_data['documents']:
    hypothesis_to_span = {}
    annotations = document['annotation_sets'][0]['annotations']

    for key, annotation in annotations.items():
        # print(f"Key: {key}, Annotation: {annotation}")
        hypothesis_to_span[key] = (annotation['spans'],annotation['choice'])
    evidence_spans_per_document[document['id']] = hypothesis_to_span
# print(evidence_spans_per_document)


In [None]:
test_dataset={}
for document in test_data['documents']:
    for index,span in enumerate(document['spans']):
        test_dataset[document['text'][span[0]:span[1]]]=(document['id'],'null', 'null')


In [None]:
print(len(test_dataset))

In [None]:
for document in test_data['documents']:
    for index,span in enumerate(document['spans']):
        for key, items in evidence_spans_per_document[document['id']].items():
            if index in items[0]:
                test_dataset[document['text'][span[0]:span[1]]]=(document['id'],key,items[1])


In [None]:
print(len(test_dataset))

In [None]:
temp_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')


In [None]:
labels = test_data['labels']
label_texts = {}
for key, item in labels.items():
    label_texts[key] = item['hypothesis']

label_embedding = {}
for key, item in label_texts.items():
    label_embedding[key] = temp_model.encode(item)

label_keys = list(test_data['labels'].keys())
# print(label_keys)


In [None]:
final_test_dataset = []
for key, item in test_dataset.items():
    for label_key in label_keys:
        if item[1] == label_key:
            final_test_dataset.append((item[0], key,  test_data['labels'][label_key]['hypothesis'], 1, item[2]))
        else:
            final_test_dataset.append((item[0], key,  test_data['labels'][label_key]['hypothesis'], 0, 'null'))

In [None]:
import random
random.seed(42)  # Python's built-in random

# Calculate size for 10% of data
sample_size = int(0.1 * len(final_test_dataset))

# Randomly sample indices
sampled_indices = random.sample(range(len(final_test_dataset)), sample_size)

# Create new dataset with sampled elements
final_test_dataset = [final_test_dataset[i] for i in sampled_indices]

print(f"Original dataset size: {len(final_test_dataset) * 10}")
print(f"Sampled dataset size: {len(final_test_dataset)}")

In [None]:
positives=[]
negatives=[]
total_spans={}
true_spans={}
false_spans={}

global Total_entailment, Total_contradiction, predicted_entailment, predicted_contradiction
Total_entailment = 0
Total_contradiction = 0
predicted_entailment = 0
predicted_contradiction = 0

final_evidence_dataset=[]
def evaluate_model(model, test_dataset, threshold=None):
    # Initialize metrics
    true_positives = 0
    false_positives = 0
    true_negatives = 0
    false_negatives = 0
    
    # Set model to evaluation mode
    model.eval()
    threshold = 0.003 
    global Total_entailment, Total_contradiction, predicted_entailment, predicted_contradiction

    with torch.no_grad():
        for doc_id, sentence, hypothesis, true_label, inference in tqdm(test_dataset, position=0, leave=True):
            # Get model predictions
            pred, _ = model([sentence], [hypothesis])
            predicted_label = pred.item()
#             true_label = true_label.item()

            if true_label == 1:
                positives.append(predicted_label)
            else:
                negatives.append(predicted_label)
                
            predicted_label = 1 if predicted_label >= threshold else 0
            true_label = 1 if true_label >= threshold else 0
            
            if predicted_label == 1:
                final_evidence_dataset.append((doc_id, sentence, hypothesis, true_label,inference))
            
            if inference=="Entailment":
                Total_entailment+=1
                if predicted_label==1:
                    predicted_entailment+=1
            elif inference=="Contradiction":
                Total_contradiction+=1
                if predicted_label==1:
                    predicted_contradiction+=1
                    
            # Update confusion matrix
            if predicted_label == 1 and true_label == 1:
                true_positives += 1
                total_spans[sentence]=True
                true_spans[sentence]=True
            elif predicted_label == 1 and true_label == 0:
                false_positives += 1
                total_spans[sentence]=True
                false_spans[sentence]=True
            elif predicted_label == 0 and true_label == 0:
                true_negatives += 1
            else:
                false_negatives += 1
    
    # Calculate metrics
    total = len(test_dataset)
    accuracy = (true_positives + true_negatives) / total
    
    precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
    recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    
    # Print results
    print("\nConfusion Matrix:")
    print(f"True Positives: {true_positives}")
    print(f"False Positives: {false_positives}")
    print(f"True Negatives: {true_negatives}")
    print(f"False Negatives: {false_negatives}")
    
    print("\nMetrics:")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    
    return {
        'confusion_matrix': {
            'tp': true_positives,
            'fp': false_positives,
            'tn': true_negatives,
            'fn': false_negatives
        },
        'metrics': {
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1': f1
        }
    }

# Usage
results = evaluate_model(model, final_test_dataset)

In [None]:
print(Total_entailment)
print(Total_contradiction)
print(predicted_entailment)
print(predicted_contradiction)

In [None]:
with open('final_evidence_dataset.json', 'w') as file:
    json.dump(final_evidence_dataset, file)
    

In [None]:
print(len(total_spans))
print(len(true_spans))
print((false_spans))

In [None]:
print(len(negatives))

In [None]:
import matplotlib.pyplot as plt

# Create subplots
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

# Plot histograms
ax1.hist(positives, bins=100, alpha=0.7, color='blue')
ax1.set_title('Histogram of Array 1')
ax1.set_xlabel('Values')
ax1.set_ylabel('Frequency')

ax2.hist(negatives, bins=100, alpha=0.7, color='green')
ax2.set_title('Histogram of Array 2')
ax2.set_xlabel('Values')
ax2.set_ylabel('Frequency')

plt.tight_layout()
plt.show()

In [None]:
import numpy as np
positives_np=np.array(positives, dtype=float)
negatives_np=np.array(negatives, dtype=float)
positives_new=positives_np[positives_np<0.005]
negatives_new=negatives_np[negatives_np<0.005]


In [None]:
# Create subplots
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

# Plot histograms
ax1.hist(positives_new, bins=100, alpha=0.7, color='blue')
ax1.set_title('Histogram of Array 1')
ax1.set_xlabel('Values')
ax1.set_ylabel('Frequency')

ax2.hist(negatives_new, bins=100, alpha=0.7, color='green')
ax2.set_title('Histogram of Array 2')
ax2.set_xlabel('Values')
ax2.set_ylabel('Frequency')

plt.tight_layout()
plt.show()

In [None]:
# Create histogram data
counts, bins = np.histogram(negatives_new, bins=20)
cumulative = np.cumsum(counts)

# Plot cumulative histogram
plt.figure(figsize=(10, 6))
plt.hist(negatives_new, bins=1000, density=True, cumulative=True, 
         histtype='step', label='Cumulative', color='blue')
plt.hist(positives_new, bins=1000, density=True, cumulative=True, 
         histtype='step', label='Cumulative', color='orange')
plt.grid(True, alpha=0.3)
plt.xticks(np.arange(0, 0.011, 0.005))  # Labels from 0 to 1 in steps of 0.05
plt.xticks(rotation=45)  
plt.xlabel('Values')
plt.ylabel('Cumulative Frequency')
plt.title('Cumulative Histogram')
plt.legend()
plt.show()

In [None]:
def count_parameters(model):
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    
    print(f'Total Parameters: {total_params:,}')
    print(f'Trainable Parameters: {trainable_params:,}')
    
    # Print breakdown by component
    print('\nParameter breakdown:')
    for name, param in model.named_parameters():
        print(f'{name}: {param.numel():,} parameters')

# Call function on model
count_parameters(model)