In [None]:
import spacy
from spacy.tokens import DocBin
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import json
# from datasets import load_dataset
from collections import defaultdict
import ast

### Testing data

In [None]:

# Load spaCy's language model if necessary (for example, the English model)
nlp = spacy.blank("en")  # Replace "en" with the appropriate language code

# Load your .spacy file
train_doc_bin = DocBin().from_disk("data/train.spacy")
dev_doc_bin = DocBin().from_disk("data/dev.spacy")
# Deserialize the docs
train_docs = list(train_doc_bin.get_docs(nlp.vocab))
dev_docs = list(dev_doc_bin.get_docs(nlp.vocab))

In [None]:
# Now you can work with the docs
for doc in train_docs[2:3]:
    for ent in doc.ents:
        print(ent.text, ent.label_)

In [None]:
def getLabelsCounts(docs):
    labels = []
    for doc in docs:
        for ent in doc.ents:
            labels.append(ent.label_)

    # Convert the list of labels to a NumPy array
    labels_array = np.array(labels)
    unique_labels, counts = np.unique(labels_array, return_counts=True)
    counts = dict(zip(unique_labels, counts))
    return counts

In [None]:
trainLabelsCounts = getLabelsCounts(train_docs)
devLabelsCounts = getLabelsCounts(dev_docs)

In [None]:
def saveLabelsPie(LabelsCounts, name):
    plt.figure(figsize=(8, 8))
    colors = plt.cm.hsv(np.linspace(0, 1, len(LabelsCounts)))
    patches, texts, autotexts = plt.pie(LabelsCounts.values(), labels=LabelsCounts.keys(), 
            autopct='%1.1f%%', 
            colors=colors, 
            startangle=60,
            wedgeprops=dict(edgecolor='w'))
    for text in texts + autotexts:
        text.set_fontsize(9)
    plt.axis('equal')  # Equal aspect ratio ensures the pie chart is circular.
    plt.title(name, pad=30, fontdict = {'fontsize':20, 'fontstyle' : 'oblique'})
    plt.savefig(f"./plots/{name}.png", bbox_inches='tight', transparent=True)
    plt.show()



In [None]:
saveLabelsPie(trainLabelsCounts, "Named entity proportions in training")
saveLabelsPie(devLabelsCounts, "Named entity proportions in development")

### Creating train and dev csv

In [None]:
import spacy
import pandas as pd
from collections import defaultdict
import re

# categories = 

def format_text(text):
    # Replace multiple spaces with a single space
    text = re.sub(r'\s+', ' ', text)
    text = text.replace('\n', ' ')
    # Strip leading and trailing whitespace
    text = text.strip()
    return text

# Load your .spacy file
def load_spacy_file(file_path):
    nlp = spacy.blank("en")  # replace "en" with your model's language if different
    docs = DocBin().from_disk(file_path)
    return list(docs.get_docs(nlp.vocab))
    # return list(nlp.from_disk(file_path))

# Process documents and extract entities
# def process_docs(docs):
#     data = []
#     for doc in docs:
#         text = doc.text
#         entities = defaultdict(set)
#         for ent in doc.ents:
#             entities[ent.label_].add(format_text(ent.text))
#         entities = {label: list(ents) for label, ents in entities.items()}
#         data.append([text, entities])
#     return data


def process_docs(docs):
    data = []
    for doc in docs:
        text = doc.text
        # Using a dict to maintain insertion order and uniqueness
        entities = defaultdict(dict)
        for ent in doc.ents:
            entities[ent.label_][format_text(ent.text)] = None  # Key is the entity, value is a placeholder
        # Extracting the keys (unique entities) from each dictionary
        entities = {label: list(ents.keys()) for label, ents in entities.items()}
        data.append([text, entities])
    return data

# Convert to DataFrame
def to_dataframe(data):
    # Find all unique entity labels
    all_labels = set()
    for _, entities in data:
        all_labels.update(entities.keys())
    all_labels = sorted(all_labels)

    # Create DataFrame
    df_data = []
    for text, entities in data:
        row = [format_text(text)] + [entities.get(label, []) for label in all_labels]
        df_data.append(row)

    columns = ['sentence'] + all_labels
    return pd.DataFrame(df_data, columns=columns)

# Load data
train_docs = load_spacy_file('data/train.spacy')
dev_docs = load_spacy_file('data/dev.spacy')

# Process documents
train_data = process_docs(train_docs)
dev_data = process_docs(dev_docs)

# Convert to DataFrame
train_df = to_dataframe(train_data)
dev_df = to_dataframe(dev_data)

# Export to CSV (optional)
train_df.to_csv('./data/raw/train_data.csv', index=False)
dev_df.to_csv('./data/raw/dev_data.csv', index=False)


In [None]:
train = pd.read_csv("./data/raw/train_data.csv")
test = pd.read_csv("./data/raw/dev_data.csv")

In [None]:
dev = train.sample(frac=0.1, random_state=42) # random_state for reproducibility
train = train.drop(dev.index)

In [None]:
print(test['sentence'].iloc[900])
print(train.iloc[900])

In [None]:
print(dev['sentence'].iloc[-4])
print(dev.iloc[-4])

### Creating dataset that contains the prompts

In [None]:
def create_raw_entities_column(df):
    def entities_to_string(row):
        # Build a dictionary of non-empty entity categories
        entities_dict = {category: entities for category, entities in row.items() if category != 'sentence' and "entities" not in category }
        # Convert the dictionary to a JSON string
        return json.dumps(entities_dict)

    # Apply the function to each row and create the new column
    df['raw_entities'] = df.apply(entities_to_string, axis=1)
    return df

def create_dict_column(df):
    # Function to convert a JSON string to a dictionary
    def string_to_dict(json_str):
        try:
            return json.loads(json_str)
        except json.JSONDecodeError:
            return {}  # Returns an empty dictionary in case of a decoding error

    # Apply the function to the 'raw_entities' column to create a new dictionary column
    df['entities_dict'] = df['raw_entities'].apply(string_to_dict)
    return df

In [None]:
train_data = create_raw_entities_column(train)
dev_data = create_raw_entities_column(dev)
test_data = create_raw_entities_column(test)

In [None]:
train_data = create_dict_column(train_data)

dev_data = create_dict_column(dev_data)

test_data = create_dict_column(test_data)

In [None]:
dev_data['raw_entities'].iloc[-5]

In [None]:
dev_data['entities_dict'].iloc[-1]

In [None]:
len(train_data), len(dev_data), len(test_data)

In [None]:
def create_text_col(row):
    instruction = "You are solving the NER problem in indian legal documents. You have to extract from the text, entities related to each of the following categories: CASE_NUMBER, COURT, DATE, GPE, JUDGE, LAWYER, ORG, OTHER_PERSON, PETITIONER, PRECEDENT, PROVISION, RESPONDENT, STATUTE, WITNESS. Extract them exactly as they are in the text (Don't format them). Your output always should be a dictionary in a json readable format (category: list of entities)."
    text_row = f"""<s> [INST] {instruction} Find the entities in the following text: {row['sentence']} [/INST]\n {row['raw_entities']} </s>"""
    return text_row

In [None]:
train_data['text'] = train_data.apply(create_text_col, axis=1)
dev_data['text'] = dev_data.apply(create_text_col, axis=1)
test_data['text'] = test_data.apply(create_text_col, axis=1)

In [None]:
# train_data['train'] = train_data['raw_entities']
# dev_data['train'] = dev_data['raw_entities']
# test_data['train'] = test_data['raw_entities']

# train_data['test'] = train_data['raw_entities']
# dev_data['test'] = dev_data['raw_entities']
# test_data['test'] = test_data['raw_entities']

In [None]:
selected_columns = ['sentence', 'raw_entities', 'entities_dict', 'text']
train_data = train_data[selected_columns]
dev_data = dev_data[selected_columns]
test_data = test_data[selected_columns]

In [None]:
json.loads(test_data['raw_entities'].iloc[0])

In [None]:
test_data['text'].iloc[0]

In [None]:
path = "./Data/Finetuning/"

In [None]:

train_data.to_csv(path+'train.csv', index=False)
dev_data.to_csv(path+'dev.csv', index=False)
test_data.to_csv(path+'test.csv', index=False)

In [None]:
import pandas as pd
from typing import List, Dict, Tuple
import ast
import spacy


nlp = spacy.load('en_core_web_sm')
categories = ['CASE_NUMBER', 'COURT', 'DATE', 'GPE', 'JUDGE', 'LAWYER', 'ORG', 'OTHER_PERSON', 'PETITIONER', 'PRECEDENT', 'PROVISION', 'RESPONDENT', 'STATUTE', 'WITNESS']
def tokenize_and_tag(df: pd.DataFrame, categories: List[str]) -> pd.DataFrame:
    # Define tag prefixes
    B_PREFIX = 'B-'
    I_PREFIX = 'I-'
    O_TAG = 'O'

    # Prepare output data
    output_data = {'tokens': [], 'ner_tags': []}

    for _, row in df.iterrows():
        sentence = row['sentence']
        entities = row['entities_dict']
        # print(entities)

        # Tokenize the sentence
        # tokens = sentence.split()  # Simple tokenization, can be replaced with a more robust tokenizer
        doc = nlp(sentence)
        tokens = [token.text for token in doc]

        # Initialize tags as 'Outside' for each token
        tags = [O_TAG for _ in tokens]

        # Update tags based on entities
        for category, entity_list in entities.items():
            entity_lista = ast.literal_eval(entity_list)
            for entity in entity_lista:
                entity_tokens = entity.split()
                # Find all occurrences of the entity in the tokens
                for i in range(len(tokens)):
                    # print(entity_tokens, tokens[i:i+len(entity_tokens)])
                    if tokens[i:i+len(entity_tokens)] == entity_tokens:
                        # Update the tags for this occurrence of the entity
                        tags[i] = B_PREFIX + category
                        for j in range(i + 1, i + len(entity_tokens)):
                            tags[j] = I_PREFIX + category

        output_data['tokens'].append(tokens)
        output_data['ner_tags'].append(tags)

    return pd.DataFrame(output_data)


In [None]:
test_prova = tokenize_and_tag(test_data, categories)

In [None]:
print(test_data['entities_dict'].iloc[0])
list(zip(test_prova['tokens'].iloc[0], test_prova['ner_tags'].iloc[0]))

### Computing f1 score from mistral model results

In [None]:
categories = ['CASE_NUMBER', 'COURT', 'DATE', 'GPE', 'JUDGE', 'LAWYER', 'ORG', 'OTHER_PERSON', 'PETITIONER', 'PRECEDENT', 'PROVISION', 'RESPONDENT', 'STATUTE', 'WITNESS']

In [None]:
import pandas as pd
import json
import re


# Function to correct syntax errors
def correct_syntax_errors(string):
    corrected_string = string
    corrected_string = corrected_string.replace("\\'", "\\\\'")
    corrected_string = corrected_string.replace(';', ',')
    return corrected_string

def parse_json_string(json_str):
    try:
        corrected_string = correct_syntax_errors(json_str)
        return True, json.loads(corrected_string)
    except json.JSONDecodeError as e:
        # print(f"Error: {e}")
        # print(f"corrected string: {corrected_string}")
        print(f"Problematic string: {json_str}")
        return False, None

def extract_ground_truth_dict(row):
    # Extract and parse the JSON string from GroundTruth
    success, result = parse_json_string(row['GroundTruth'])
    return result if success else None

def check_categories(dictionary):
    for categ in categories:
        if categ not in dictionary.keys():
            dictionary[categ] = "[]"
    return dictionary

def extract_model_output_dict(row):
    # Extract and parse the JSON string from ModelOutput
    model_output = row['ModelOutput']
    model_output_part = model_output.replace("\r", "").split("[/INST]\n")[-1]
    success, result = parse_json_string(model_output_part)
    if success:
        result = check_categories(result)
    return result if success else 'drop'

def parseRawOutput(results):
# Apply the functions to each row to create new columns
    results['GroundTruthDict'] = results.apply(extract_ground_truth_dict, axis=1)
    results['ModelOutputDict'] = results.apply(extract_model_output_dict, axis=1)
    # Drop rows where either column has 'drop' value
    dropped = results[(results['ModelOutputDict'] == 'drop') | (results['GroundTruthDict'] == 'drop')].index
    print(f"dropped {len(dropped)} rows due to parsing errors")
    results = results[(results['ModelOutputDict'] != 'drop') & (results['GroundTruthDict'] != 'drop')]
    return results, dropped

In [None]:
results1 = pd.read_csv("./results/model_results_2.csv")
results2 = pd.read_csv("./results/model_results_1.csv")
results = pd.concat([results1, results2], axis=0)
results.reset_index(drop=True)
results.to_csv("./results/model_results.csv", index=False)

In [None]:
results = pd.read_csv("./results/model_results.csv")

In [None]:
results_cleaned, dropped = parseRawOutput(results)

In [None]:
# Function to compute F1 score for a category
def f1_score_category(truth, prediction):
    truth = set(truth)
    prediction = set(prediction)
    if truth == prediction and len(prediction) == 0:
        tp = 1
        fp = 0
        fn = 0
    else:
        tp = len(truth & prediction)
        fp = len(prediction - truth)
        fn = len(truth - prediction)
    # print(truth, prediction, tp, fp, fn)

    precision = tp / (tp + fp) if tp + fp != 0 else 0
    recall = tp / (tp + fn) if tp + fn != 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if precision + recall != 0 else 0

    return f1

def createDictionary(dictionary):
    new_dict = {}
    for category in dictionary:
        try:
            new_dict[category] = ast.literal_eval(dictionary[category])
        except (ValueError, SyntaxError):
            # Return None if ast.literal_eval fails
            return None
    return new_dict

# Function to process a row and compute F1 scores
def process_row(row):
    ground_truth = createDictionary(row['GroundTruthDict'])
    model_output = createDictionary(row['ModelOutputDict'])
    # Check if ast.literal_eval failed for either ground_truth or model_output
    if ground_truth is None or model_output is None:
        return None
    f1_scores = {}
    for category in ground_truth:
        gt_list = ground_truth[category]
        model_list = model_output[category]
        f1_scores[category] = f1_score_category(gt_list, model_list)
    return f1_scores


In [None]:
def compute_f1_score(df):
    # Apply the function to each row and aggregate results
    category_f1_scores = defaultdict(list)
    indices_to_drop = []

    for index, row in df.iterrows():
        row_scores = process_row(row)

        # Skip rows where process_row returns None
        if row_scores is None:
            indices_to_drop.append(index)
            continue

        for category, score in row_scores.items():
            category_f1_scores[category].append(score)

    # Calculate average F1 scores for each category and macro F1 score for the dataset
    average_f1_scores = {category: sum(scores) / len(scores) for category, scores in category_f1_scores.items()}
    macro_f1_score = sum(average_f1_scores.values()) / len(average_f1_scores)

    print("Average F1 Scores by Category:", average_f1_scores)
    print("Macro F1 Score for the Dataset:", macro_f1_score)
    return average_f1_scores, macro_f1_score, indices_to_drop

In [None]:
average_f1_scores, macro_f1_score, indices_to_drop = compute_f1_score(results_cleaned)

In [None]:
results_cleaned = results_cleaned.drop(index=indices_to_drop)

In [None]:
len(np.unique(results_cleaned.index))