In [1]:
import random
import pandas as pd

from ME_BPER import ME_BPER
from ME_IPER import extract_last_names
from ME_LOC import add_location
from ME_ORG import add_organisation


In [2]:
# reading label data from a given column
# this is the readNlu function from the provided span_f1 file
# minor modifications were made to make it usable with our data. 
def readNlu(path, target_column = 1): # default to index 1 (thats where DaN+ labels are)
    '''
    This function reads labeled annotations from a CoNLL-like file.

    It parses a file where each line typically represents a single token and its annotations,
    separated by tabs. Empty lines denote sentence boundaries. It extracts labels from a specified column
    (by default, column index 1), collecting them as a list of label sequences, one per sentence.

    Parameters:
        path (str): Path to the input file.
        target_column (int, optional): Index of the column to extract labels from. Defaults to 1.

    Returns:
        List[List[str]]: A list where each element is a list of labels (strings) corresponding
                         to tokens in a sentence.
    '''

    annotations = []    # list for storing all the label sequences (one per sentence)
    cur_annotation = [] # temp list for labels of the current sentence

    # reading through the file line by line
    for line in open(path, encoding='utf-8'):
        line = line.strip()                     # remove leading/trailing whitespaces

        # empty lines denotes end of sentence
        if line == '':
            annotations.append(cur_annotation)  # add current annotations to annotations list
            cur_annotation = []                 # reset for the next sentence
        
        # skipping comments (start with "#" and no tokens columns)
        elif line[0] == '#' and len(line.split('\t')) == 1:
            continue
        
        else:
            # extract the label from the specified column and add to current sentence
            cur_annotation.append(line.split('\t')[target_column])

    return annotations


# mapping funciton 
def mapping(path):
    '''
    This function generates mappings between labels and their corresponding integer IDs from a labeled dataset.

    It reads annotations from a CoNLL-like file using the `readNlu` function,
    filters out labels containing substrings like "part" or "deriv" (case-insensitive),
    and creates a bidirectional mapping between the remaining unique labels and integer IDs.

    Parameters:
        path (str): Path to the labeled data file.

    Returns:
        Tuple[Dict[str, int], Dict[int, str]]:
            - label2id: A dictionary mapping each label to a unique integer ID.
            - id2label: A reverse dictionary mapping each integer ID back to its label.
    '''

    # get the data labels
    data_labels = readNlu(path) 

    # create empty set to store unique labels
    label_set = set()

    for labels in data_labels:
        #  filter out any labels that contain 'part' or 'deriv' (case-insensitive)
        filtered = [label for label in labels if 'part' not in label.lower() and 'deriv' not in label.lower()]
        label_set.update(filtered)

    # count of unique filtered labels
    num_labels = len(label_set)

    # create a dictionary mapping each label to a unique integer ID
    label2id = {label: id for id, label in enumerate(label_set)}

    # create a dictionary mapping each unique integer ID to a label
    id2label = {id: label for label, id in label2id.items()}

    return label2id, id2label


# load data function
# heavily inspired by the solution from assignment 5
def read_tsv_file(path, label2id):
    '''
    This function reads a TSV file containing tokens and NER labels and converts it into structured data.
    It collects the tokens, their original labels, and their corresponding integer IDs (based on the provided `label2id` mapping) for each sentence.
    Sentences are separated by empty lines. 

    Each non-empty line in the file is expected to have at least two tab-separated columns:
    - The first column is the token.
    - The second column is the corresponding NER label.

    Parameters:
        path (str): Path to the TSV file to read.
        label2id (dict): A dictionary mapping NER label strings to their corresponding integer IDs.

    Returns:
        List[dict]: A list of dictionaries, one per sentence, with keys:
            - 'tokens': list of tokens.
            - 'ner_tags': list of original NER label strings.
            - 'tag_ids': list of integer tag IDs corresponding to the NER labels.
    '''

    data = []               # final list to hold all sentences as dictionaries
    current_words = []      # tokens for the current sentence
    current_tags = []       # NER tags for the current sentence
    current_tag_ids = []    # corresponding tag IDs for the current sentence

    for line in open(path, encoding='utf-8'):
        line = line.strip() # removes any leading and trailing whitespaces from the line

        if line:
            if line[0] == '#': 
                continue # skip comments

            # splitting at 'tab', as the data is tab separated 
            tok = line.split('\t')
            
            # extract the token (first column)
            token = tok[0]

            # check if the label is in the provided label2id dictionary
            # if it's not, replace the label with 'O'
            label = tok[1] if tok[1] in label2id else 'O'

            current_words.append(token)
            current_tags.append(label)
            current_tag_ids.append(label2id[label])
        
        else: # skip empty lines
            if current_words: # if current_words is not empty

                # add entry to dict where tokens and ner_tags are keys and the values are lists
                data.append({"tokens": current_words, "ner_tags": current_tags, "tag_ids": current_tag_ids})

            # start over  
            current_words = []
            current_tags = []
            current_tag_ids = []

    # check for last one
    if current_tags != []:
        data.append({"tokens": current_words, "ner_tags": current_tags, "tag_ids": current_tag_ids})
  
    return data

# extracting tokens to check for overlap in train, dev and test sets
def extract_labeled_tokens(dataset, exclude_label = "O", include_label_pair=False):
    '''
    This function extracts tokens from a dataset that have a string label different from `exclude_label`.
    Optionally, it can return the (token, label) pairs instead of just tokens.

    Parameters:
        dataset (List[dict]): The token-tagged dataset.
        exclude_label (str): The label to ignore (default is 'O').
        include_label_pair (bool): Whether to include the (token, label) pairs in the result (default is False).
        
    Returns:
         Set[str] or Set[Tuple[str, str]]: 
            - A set of tokens with meaningful (non-O) labels if `include_label_pair` is False.
            - A set of (token, label) pairs if `include_label_pair` is True.
    '''

    # create empty set to store the unique tokens
    labeled_tokens = set()
    
    for sentence in dataset:
        # iterate over each token and its corresponding tag ID
        for token, label in zip(sentence["tokens"], sentence["ner_tags"]):
            if label != exclude_label:                      # check if the tag is not the excluded one
                if include_label_pair:
                    labeled_tokens.add((token, label))      # add (token, label) pair if the flag is True
                else:
                    labeled_tokens.add(token)               # add just the token if the flag is False
    
    return labeled_tokens


In [3]:
df = pd.read_csv("../data_aug_sources/the-middle-east-cities.csv", sep = ";", skiprows = 1)
unique_city_da = df["city_da"].drop_duplicates()
ME_LOC = [add_location(loc) for loc in unique_city_da]

In [4]:
df = pd.read_csv("../data_aug_sources/middle_eastern_organisations.csv", sep = ";", skiprows = 1)
unique_orgs = df["org"].drop_duplicates()
ME_ORG = [add_organisation(org) for org in unique_orgs]

In [5]:
def collect_entity_strings(data, target_label_prefix):
    """
    Collects labeled spans as strings from the dataset. Multi-token spans are joined with spaces.
    
    Args:
        data (List[Dict]): Dataset containing 'tokens' and 'tags' for each sentence.
        target_label_prefix (str): Label prefix to filter for (e.g., 'B-LOC', 'B-ORG').
        
    Returns:
        Set[str]: A set of labeled token strings (e.g., {'Beirut', 'Al Mawsil al Jadidah'})
    """
    grouped_strings = set()

    for item in data:
        tokens = item['tokens']
        tags = item['ner_tags']

        i = 0
        while i < len(tokens):
            tag = tags[i]

            if tag.startswith(target_label_prefix):
                span_tokens = [tokens[i]]
                i += 1
                while i < len(tokens) and tags[i].startswith('I'):
                    span_tokens.append(tokens[i])
                    i += 1

                # Join tokens into a single string and add to the set
                entity_string = ' '.join(span_tokens)
                grouped_strings.add(entity_string)
            else:
                i += 1

    return grouped_strings


In [6]:
# overlap between train, dev, test and MENAPT NEs
ME_LOC_tokens = collect_entity_strings(ME_LOC, target_label_prefix = "B-LOC")

ME_ORG_tokens = collect_entity_strings(ME_ORG, target_label_prefix = "B-ORG")

print(ME_LOC_tokens)
print(len(ME_LOC_tokens))
print(len(ME_LOC))

print("\n")

print(ME_ORG_tokens)
print(len(ME_ORG_tokens))
print(len(ME_ORG))

ME_BPER_tokens = set(ME_BPER)
#ME_IPER_tokens = set(ME_IPER)


KeyError: 'ner_tags'

In [None]:
# path to the data files
path_train = "../data/da_news_train.tsv"
path_dev = "../data/da_news_dev.tsv"
path_test = "../data/da_news_test.tsv"

# create mapping
label2id, id2label = mapping(path_train)

# read in the DaN+ data
train_data = read_tsv_file(path_train, label2id)
dev_data = read_tsv_file(path_dev, label2id)
test_data = read_tsv_file(path_test, label2id)

In [None]:
def collect_all_entity_strings(data, exclude_label="O"):
    """
    Collects all labeled (non-"O") entity spans as strings from the dataset.
    Multi-token spans are joined with spaces.

    Args:
        data (List[Dict]): Dataset with 'tokens' and 'tags' per sentence.
        exclude_label (str): Label to ignore (default is 'O').

    Returns:
        Set[str]: Set of labeled entity strings (e.g., {'Beirut', 'Al Mawsil al Jadidah'})
    """
    grouped_strings = set()

    for item in data:
        tokens = item['tokens']
        tags = item['ner_tags']
        i = 0
        while i < len(tokens):
            tag = tags[i]

            if tag != exclude_label and tag.startswith('B-'):
                span_tokens = [tokens[i]]
                i += 1
                # Collect I-XXX continuation tags
                while i < len(tokens) and tags[i].startswith('I-'):
                    span_tokens.append(tokens[i])
                    i += 1

                entity_string = ' '.join(span_tokens)
                grouped_strings.add(entity_string)
            else:
                i += 1

    return grouped_strings


In [None]:
train_tokens = collect_all_entity_strings(train_data)
dev_tokens = collect_all_entity_strings(dev_data)
test_tokens = collect_all_entity_strings(test_data)

In [None]:
print("overlap loc train: ", train_tokens & ME_LOC_tokens)
print("overlap loc dev: ", dev_tokens & ME_LOC_tokens)
print("overlap loc test: ", test_tokens & ME_LOC_tokens)

print("overlap org train: ", train_tokens & ME_ORG_tokens)
print("overlap org dev: ", dev_tokens & ME_ORG_tokens)
print("overlap org test: ", test_tokens & ME_ORG_tokens)

print("overlap BPER train: ", train_tokens & ME_BPER_tokens)
print("overlap BPER dev: ", dev_tokens & ME_BPER_tokens)
print("overlap BPER test: ", test_tokens & ME_BPER_tokens)


overlap loc train:  {'Bush', 'Irak', 'Ankara', 'Abu Dhabi', 'Syrien', 'Luxor', 'Erzincan', 'Kuwait'}
overlap loc dev:  {'Bahrain', 'Oman'}
overlap loc test:  {'Irak', 'Bagdad'}
overlap org train:  {'CBC'}
overlap org dev:  {'CBC'}
overlap org test:  set()
overlap BPER train:  {'S', 'Bassam', 'Elias', 'K'}
overlap BPER dev:  set()
overlap BPER test:  {'Z', 'K'}


In [None]:
def remove_entity_span_overlap(data, overlap_entities, is_list_of_dicts=True):
    """
    Removes overlapping entities from either:
    - A list of dicts with 'tokens' and 'tags' (for NER-style data).
    - A simple list of entity strings (for name lists like ME_BPER or ME_IPER).

    Args:
        data (List[Dict] or List[str]): The dataset to filter.
        overlap_entities (Set[str]): Set of full entity spans to exclude.
        is_list_of_dicts (bool): Set True if data is NER-style with tokens/tags, False if it's a simple list of strings.

    Returns:
        List: Filtered dataset with overlaps removed.
    """

    if not is_list_of_dicts:
        # Data is a list of entity strings (e.g., ME_BPER)
        return [entity for entity in data if entity not in overlap_entities]

    # Helper: extract entity spans from token/tag lists
    def extract_entity_spans(tokens, tags):
        spans = []
        i = 0
        while i < len(tokens):
            if tags[i].startswith("B-"):
                span_tokens = [tokens[i]]
                i += 1
                while i < len(tokens) and tags[i].startswith("I-"):
                    span_tokens.append(tokens[i])
                    i += 1
                spans.append(" ".join(span_tokens))
            else:
                i += 1
        return set(spans)

    # NER-style case: remove full examples that contain overlapping spans
    filtered_data = []
    for item in data:
        spans = extract_entity_spans(item["tokens"], item["ner_tags"])
        if not spans & overlap_entities:
            filtered_data.append(item)
    return filtered_data


In [None]:
overlap_loc_spans = (
    collect_entity_strings(train_data, "B-LOC") |
    collect_entity_strings(dev_data, "B-LOC") |
    collect_entity_strings(test_data, "B-LOC")
)

overlap_org_spans = (
    collect_entity_strings(train_data, "B-ORG") |
    collect_entity_strings(dev_data, "B-ORG") |
    collect_entity_strings(test_data, "B-ORG")
)

overlap_BPER_spans = (
    train_tokens & ME_BPER_tokens |
    dev_tokens & ME_BPER_tokens |
    test_tokens & ME_BPER_tokens
)

KeyError: 'tags'

In [None]:
# For list of dicts (e.g., ME_LOC or ME_ORG)
updated_ME_LOC = remove_entity_span_overlap(ME_LOC, overlap_loc_spans, is_list_of_dicts=True)
print("Before ME_LOC:", len(ME_LOC))
print("After ME_LOC:", len(updated_ME_LOC))

# For list of dicts (e.g., ME_LOC or ME_ORG)
updated_ME_ORG = remove_entity_span_overlap(ME_ORG, overlap_org_spans, is_list_of_dicts=True)
print("Before ME_ORG:", len(ME_ORG))
print("After ME_ORG:", len(updated_ME_ORG))

# For simple list (e.g., ME_BPER)
updated_ME_BPER = remove_entity_span_overlap(ME_BPER, overlap_bper_spans, is_list_of_dicts=False)
print("Before ME_BPER:", len(ME_BPER))
print("After ME_BPER:", len(updated_ME_BPER))


In [None]:
print("overlap loc train: ", train_tokens & ME_LOC_tokens)
print("overlap loc dev: ", dev_tokens & ME_LOC_tokens)
print("overlap loc test: ", test_tokens & ME_LOC_tokens)

print("overlap org train: ", train_tokens & ME_ORG_tokens)
print("overlap org dev: ", dev_tokens & ME_ORG_tokens)
print("overlap org test: ", test_tokens & ME_ORG_tokens)

print("overlap BPER train: ", train_tokens & set(updated_ME_BPER_tokens))
print("overlap BPER dev: ", dev_tokens & set(updated_ME_BPER_tokens))
print("overlap BPER test: ", test_tokens & set(updated_ME_BPER_tokens))


overlap loc train:  {'Abu Dhabi'}
overlap loc dev:  set()
overlap loc test:  set()
overlap org train:  set()
overlap org dev:  set()
overlap org test:  set()
overlap BPER train:  set()
overlap BPER dev:  set()
overlap BPER test:  set()


In [None]:
def data_aug_replace(dataset, sentence_amount):
    # First, filter sentences that have at least one non-"O" tag
    eligible_sentences = [sent for sent in dataset if any(tag != "O" for tag in sent["ner_tags"])]

    # Select up to sentence_amount randomly from the eligible ones
    selected_sentences = random.sample(eligible_sentences, min(sentence_amount, len(eligible_sentences)))


    for sent in selected_sentences:
        i = 0

        while i<len(sent["tokens"]):
            tag = sent["ner_tags"][i]

            if tag == 'B-PER':
                replace = random.choice(ME_BPER)
                sent["ner_tags"][i] = replace
                ME_BPER.remove(replace)
                i+=1

            elif tag == 'I-PER':
                replace = random.choice(ME_IPER)
                sent["ner_tags"][i] = replace
                ME_IPER.remove(replace)
                i+=1

            elif tag == 'B-LOC':
                span_start = i
                span_len = 1
                i += 1
                while i < len(sent["ner_tags"]) and sent["ner_tags"][i] == "I-LOC":
                    span_len += 1
                    i += 1

                same_length_LOC = []
                for loc in ME_LOC:
                    if len(loc) == span_len:
                        same_length_LOC.append(loc)
                if same_length_LOC:
                    replace = random.choice(same_length_LOC)
                    sent["tokens"][span_start:span_start+span_len] = replace
                    ME_LOC.remove(replace)
            
            elif tag == 'B-ORG':
                span_start = i
                span_len = 1
                i += 1
                while i < len(sent["ner_tags"]) and sent["ner_tags"][i] == "I-ORG":
                    span_len += 1
                    i += 1

                same_length_ORG = []
                for org in ME_ORG:
                    if len(org) == span_len:
                        same_length_ORG.append(org)
                if same_length_ORG:
                    replace = random.choice(same_length_ORG)
                    sent["tokens"][span_start:span_start+span_len] = replace
                    ME_ORG.remove(replace)

            else:
                i+=1


In [None]:
dev_data = 
what = 

SyntaxError: invalid syntax (2143288949.py, line 1)