In [None]:
from scandeval import load_dataset
from datasets import Dataset, DatasetDict
import random
import re
import pandas as pd
import warnings
from tqdm.auto import tqdm
tqdm.pandas()
pd.options.mode.chained_assignment = None

In [None]:
def flip_verbs(tokens: list, pos_tags: list) -> str:
    
    # Copy the token list
    new_tokens = tokens.copy()
    
    # Collect all indices that are verbs
    indices = [idx for idx, pos_tag in enumerate(pos_tags) 
               if pos_tag in ['VERB', 'AUX']]
    
    # If there are fewer than two verbs then return None
    if len(indices) < 2:
        return None
    
    # Get two random verb indices
    rnd_fst_idx = random.choice(indices)
    rnd_snd_idx = random.choice(
        [idx for idx in indices if idx != rnd_fst_idx]
    )
        
    # Flip the two indices
    new_tokens[rnd_fst_idx] = tokens[rnd_snd_idx]
    new_tokens[rnd_snd_idx] = tokens[rnd_fst_idx]
    
    return join_tokens(new_tokens)


def flip_nouns(tokens: list, pos_tags: list) -> str:
    
    # Copy the token list
    new_tokens = tokens.copy()
    
    # Collect all indices that are nouns
    indices = [idx for idx, pos_tag in enumerate(pos_tags) 
               if pos_tag  == 'NOUN']
    
    # If there are no relevant indices then return None
    if len(indices) < 2:
        return None
    
    # Get two random noun indices
    rnd_fst_idx = random.choice(indices)
    rnd_snd_idx = random.choice(
        [idx for idx in indices if idx != rnd_fst_idx]
    )
        
    # Flip the two indices
    new_tokens[rnd_fst_idx] = tokens[rnd_snd_idx]
    new_tokens[rnd_snd_idx] = tokens[rnd_fst_idx]
    
    return join_tokens(new_tokens)


def change_case(tokens: list, pos_tags: list) -> str:
    
    # Copy the token list
    new_tokens = tokens.copy()
    
    # Loop until candidate indices have been found
    indices = list()
    cases = ['lower', 'upper', 'title']
    while len(indices) == 0 and len(cases) > 0:
        
        # Randomly choose what case to change
        case_to_change = random.choice(cases)
        
        # Get the indices with the given case. We only change the case of nouns
        # and proper nouns.
        if case_to_change == 'lower':
            indices = [idx for idx, token in enumerate(new_tokens)
                       if pos_tags[idx] in ['NOUN', 'PROPN'] and token.lower() == token]
        elif case_to_change == 'upper':
            indices = [idx for idx, token in enumerate(new_tokens) 
                       if pos_tags[idx] in ['NOUN', 'PROPN'] and token.upper() == token]
        elif case_to_change == 'title':
            indices = [idx for idx, token in enumerate(new_tokens) 
                       if pos_tags[idx] in ['NOUN', 'PROPN'] and token.title() == token]
        
        # Ensure that there are letters in the tokens
        indices = [idx for idx in indices if re.search('\w', new_tokens[idx]) is not None]
        
        # If there were no tokens to change with the given case, remove it from the list of cases
        if len(indices) == 0:
            cases.remove(case_to_change)
            
    # If we ran out of possible cases then return None
    if len(cases) == 0:
        return None
    
    # Randomly choose an index to change
    rnd_idx = random.choice(indices)
        
    # Randomly choose a new casing
    if case_to_change == 'lower':
        if random.random() < 0.1:
            new_case = 'upper'
        else:
            new_case = 'title'
    elif case_to_change == 'upper':
        new_case = random.choice(['lower', 'title'])
    elif case_to_change == 'title':
        if random.random() < 0.1:
            new_case = 'upper'
        else:
            new_case = 'lower'
    
    # Change the case of the random token
    token = new_tokens[rnd_idx]
    if new_case == 'lower':
        new_tokens[rnd_idx] = token.lower()
    elif new_case == 'upper':
        new_tokens[rnd_idx] = token.upper()
    elif new_case == 'title':
        new_tokens[rnd_idx] = token.title()
        
    return join_tokens(new_tokens)


def change_verb_ending(tokens: list, pos_tags: list) -> str:
    
    # Copy the token list
    new_tokens = tokens.copy()
    
    # Create list of all vowels
    vowels = list('aeiouyæøåäöáíóúý')
    
    # Collect all indices that are verbs ending in 'r'
    indices_with_r = [idx for idx, pos_tag in enumerate(pos_tags) 
                      if pos_tag in ['VERB', 'AUX'] and 
                      new_tokens[idx][-1].lower() == 'r']
    
    # Collect all indices that are verbs ending in a vowel
    indices_with_vowel = [idx for idx, pos_tag in enumerate(pos_tags) 
                          if pos_tag in ['VERB', 'AUX'] and 
                          new_tokens[idx][-1].lower() in vowels]
    
    # If there are no relevant indices then return None
    if len(indices_with_r) + len(indices_with_vowel) == 0:
        return None
    
    # If only the vowel list is non-empty, then choose it
    elif len(indices_with_r) == 0:
        chosen_list = indices_with_vowel

    # If only the 'r' list is non-empty, then choose it
    elif len(indices_with_vowel) == 0:
        chosen_list = indices_with_r
        
    # Otherwise, choose a list at random
    else:
        chosen_list = random.choice([indices_with_vowel, indices_with_r])
    
    # Get a random index from the list
    rnd_idx = random.choice(chosen_list)
    
    # If the given token ends with an 'r' then remove it
    if new_tokens[rnd_idx][-1].lower() == 'r':
        new_tokens[rnd_idx] = new_tokens[rnd_idx][:-1]
        
    # Otherwise the token ends with a vowel, and we add an 'r'
    else:
        new_tokens[rnd_idx] = new_tokens[rnd_idx] + 'r'
        
    return join_tokens(new_tokens)


def change_double_consonant(tokens: list, pos_tags: list) -> str:
    
    # Copy the token list
    new_tokens = tokens.copy()
    
    # Create list of consonants
    consonants = list('qwrtpsdfghjklzxcvbnmðþ')
    
    # Collect all indices that are words with a consecutive consonants
    indices_with_double_consonants = [
        idx for idx, token in enumerate(new_tokens)
        if any(i > 0 and token[i] == token[i+1] and token[i].lower() in consonants
               for i in range(len(token) - 1))
    ]
    
    # Collect all indices that are words with single consonants
    indices_with_single_consonants = [
        idx for idx, token in enumerate(new_tokens)
        if any((i == len(token) - 1 or token[i] != token[i+1]) and
               token[i].lower() in consonants
               for i in range(1, len(token)))
    ]
    
    # If there are no indices with consonants then return None
    if len(indices_with_single_consonants) + len(indices_with_double_consonants) == 0:
        return None
    
    # Otherwise, if there are no double consonants then our task will be
    # to add a double consonant
    elif len(indices_with_double_consonants) == 0:
        indices = indices_with_single_consonants
        
    # Otherwise, if there are no single consonants then our task will be
    # to remove a double consonant
    elif len(indices_with_single_consonants) == 0:
        indices = indices_with_double_consonants
        
    # If there are double consonants then choose at random whether to add
    # or remove a double consonant
    elif random.random() < 0.5:
        indices = indices_with_single_consonants
    else:
        indices = indices_with_double_consonants
    
    # Get a random index from the list
    rnd_idx = random.choice(indices)
    new_token = new_tokens[rnd_idx]
    
    # Case 1: If we are adding a double consonant
    if indices == indices_with_single_consonants:
        
        # Get a list of character indices with are consonants
        char_indices = [
            idx for idx in range(1, len(new_token))
            if (idx == len(new_token) - 1 or new_token[idx] != new_token[idx+1]) and
               new_token[idx].lower() in consonants
        ]
        
        # Get a random character index from the list
        rnd_char_idx = random.choice(char_indices)
        
        # Duplicate the consonant
        new_token = (new_tokens[rnd_idx][:rnd_char_idx] + 
                     new_tokens[rnd_idx][rnd_char_idx] +
                     new_tokens[rnd_idx][rnd_char_idx:])
        new_tokens[rnd_idx] = new_token
        
    # Case 2: If we are deleting a double consonant
    else:
        
        # Get list of character indices which begins a double consonant
        char_indices = [idx for idx, char in enumerate(new_tokens[rnd_idx][:-1])
                        if char.lower() in consonants and char == new_tokens[rnd_idx][idx+1]]

        # Get a random character index from the list
        rnd_char_idx = random.choice(char_indices)

        # Remove the letter from the token
        new_token = ''.join(
            [char for idx, char in enumerate(new_tokens[rnd_idx]) 
             if idx != rnd_char_idx]
        )
        new_tokens[rnd_idx] = new_token
        
    return join_tokens(new_tokens)  

In [None]:
def join_tokens(tokens: list) -> str:
    
    # Form document
    doc = ' '.join(tokens)
    
    # Remove whitespace around punctuation
    doc = (doc.replace(' .', '.')
              .replace(' ,', ',')
              .replace(' ;', ';')
              .replace(' :', ':')
              .replace('( ', '(')
              .replace(' )', ')')
              .replace('[ ', '[')
              .replace(' ]', ']')
              .replace('{ ', '{')
              .replace(' }', '}')
              .replace(' ?', '?')
              .replace(' !', '!'))
    
    # Remove whitespace around quotes
    if doc.count('"') % 2 == 0:
        doc = re.sub('" ([^"]*) "', '"\\1"', doc)
        
    return doc


def delete(tokens: list, pos_tags: list) -> str:
    
    # Copy the token list
    new_tokens = tokens.copy()
    
    # Get candidate indices to remove. We do not remove adjectives,
    # adverbs, punctuation, determiners or numbers, as the resulting sentence 
    # will probably still be grammatically correct. Further, we do not 
    # remove nouns or proper nouns if they have another noun or proper
    # noun as neighbour, as that usually does not make the sentence
    # incorrect either.
    indices = [
        idx for idx, pos_tag in enumerate(pos_tags)
        if pos_tag not in ['ADJ', 'ADV', 'PUNCT', 'SYM', 'DET', 'NUM'] and
        (pos_tag not in ['NOUN', 'PROPN'] or 
         ((idx == 0 or pos_tags[idx - 1] not in ['NOUN', 'PROPN']) and 
          (idx == len(new_tokens) - 1 or pos_tags[idx + 1] not in ['NOUN', 'PROPN'])))
    ]
        
    # If there are no candidates then return None
    if len(indices) == 0:
        return None
    
    # Get the random index
    rnd_idx = random.choice(indices)
        
    # Delete the token at the index
    new_tokens.pop(rnd_idx)

    return join_tokens(new_tokens)


def flip_neighbours(tokens: list, pos_tags: list) -> str:
    
    # Copy the token list
    new_tokens = tokens.copy()
    
    # Collect all indices that are proper words, and which
    # has a neighbour which is also a proper word as well as having
    # a different POS tag
    indices = [idx for idx, pos_tag in enumerate(pos_tags) 
               if pos_tag not in ['PUNCT', 'SYM']]
    indices = [idx for idx in indices 
               if (idx + 1 in indices and pos_tags[idx] != pos_tags[idx + 1]) or 
                  (idx - 1 in indices and pos_tags[idx] != pos_tags[idx - 1])]
    
    # If there are fewer than two relevant tokens then return None
    if len(indices) < 2:
        return None
    
    # Get the first random index
    rnd_fst_idx = random.choice(indices)
    
    # Get the second (neighbouring) index
    if rnd_fst_idx == 0:
        rnd_snd_idx = rnd_fst_idx + 1
    elif rnd_fst_idx == len(tokens) - 1:
        rnd_snd_idx = rnd_fst_idx - 1
    elif (pos_tags[rnd_fst_idx + 1] in ['PUNCT', 'SYM'] or 
          pos_tags[rnd_fst_idx] == pos_tags[rnd_fst_idx + 1] or
          {pos_tags[rnd_fst_idx], pos_tags[rnd_fst_idx + 1]} == {'PRON', 'AUX'}):
        rnd_snd_idx = rnd_fst_idx - 1
    elif (pos_tags[rnd_fst_idx - 1] in ['PUNCT', 'SYM'] or 
          pos_tags[rnd_fst_idx] == pos_tags[rnd_fst_idx - 1] or
          {pos_tags[rnd_fst_idx], pos_tags[rnd_fst_idx + 1]} == {'PRON', 'AUX'}):
        rnd_snd_idx = rnd_fst_idx + 1
    elif random.random() > 0.5:
        rnd_snd_idx = rnd_fst_idx - 1
    else:
        rnd_snd_idx = rnd_fst_idx + 1
        
    # Flip the two indices
    new_tokens[rnd_fst_idx] = tokens[rnd_snd_idx]
    new_tokens[rnd_snd_idx] = tokens[rnd_fst_idx]
    
    # If we flipped the first character, then ensure that the new first character
    # is title-cased and the second character is of lower case. We only do this if
    # they are not upper cased, however.
    if rnd_fst_idx == 0 or rnd_snd_idx == 0:
        if new_tokens[0] != new_tokens[0].upper():
            new_tokens[0] = new_tokens[0].title()
        if new_tokens[1] != new_tokens[1].upper():
            new_tokens[1] = new_tokens[1].lower()
    
    return join_tokens(new_tokens)  


def corrupt(tokens: list, pos_tags: list, num_corruptions: int = 3) -> list:
    corruptions = list()
    while len(corruptions) < num_corruptions:
        corruption_fn = random.choice([
            flip_neighbours, 
            delete,
        ])
        corruption = corruption_fn(tokens, pos_tags)
        if corruption not in corruptions and corruption is not None:
            corruptions.append((corruption, corruption_fn.__name__))
    return corruptions
    
    
def prepare_df(df, split: str):
    corrupted_list = [corrupt(tokens=tokens, pos_tags=pos_tags, num_corruptions=1)
                      for tokens, pos_tags in zip(df.tokens, df.pos_tags)]
    df['corrupted'] = [[tup[0] for tup in lst] for lst in corrupted_list]
    df['corruption_type'] = [[tup[1] for tup in lst] for lst in corrupted_list]
    df = pd.concat([
        pd.DataFrame(dict(text=df.tokens.map(join_tokens).tolist(), 
                          corruption_type=[None for _ in range(len(df))], 
                          label=['correct' for _ in range(len(df))])),
        pd.DataFrame(dict(text=df.corrupted.explode().tolist(), 
                          corruption_type=df.corruption_type.explode().tolist(), 
                          label=['incorrect' for _ in range(len(df))]))
    ]).sample(frac=1.0).reset_index(drop=True)
    return Dataset.from_pandas(df, split=split)

In [None]:
# Load the dataset
X_train, X_test, y_train, y_test = load_dataset('ddt-pos')

# Concatenate the POS tags to the tokens
train_df = pd.concat([X_train, y_train], axis=1)
test_df = pd.concat([X_test, y_test], axis=1)

# Remove samples with five or fewer tokens
train_df = train_df[train_df.tokens.map(lambda lst: len(lst) > 5)]
test_df = test_df[test_df.tokens.map(lambda lst: len(lst) > 5)]

# Remove samples with five or fewer distinct POS tags
train_df = train_df[train_df.pos_tags.map(lambda lst: len(set(lst)) > 5)]
test_df = test_df[test_df.pos_tags.map(lambda lst: len(set(lst)) > 5)]

# Remove samples with an odd number of quotes
train_df = train_df[train_df.doc.map(lambda doc: doc.count('"') % 2 == 0)]
test_df = test_df[test_df.doc.map(lambda doc: doc.count('"') % 2 == 0)]

# Remove samples which starts with punctuation
train_df = train_df[train_df.pos_tags.map(lambda lst: lst[0] not in ['PUNCT', 'SYM'])]
test_df = test_df[test_df.pos_tags.map(lambda lst: lst[0] not in ['PUNCT', 'SYM'])]

# Remove samples containing the more than one '=' character, as this
# is used to indicate a tag
train_df = train_df[train_df.doc.map(lambda doc: doc.count('=') <= 1)]
test_df = test_df[test_df.doc.map(lambda doc: doc.count('=') <= 1)]

# Shuffle the samples and reset the index
train_df = train_df.sample(frac=1.0).reset_index(drop=True)
test_df = test_df.sample(frac=1.0).reset_index(drop=True)

In [None]:
IDX = random.choice(train_df.index)
print('Sample:', IDX)

tokens = train_df.iloc[IDX].tokens
doc = join_tokens(tokens)
pos_tags = train_df.iloc[IDX].pos_tags
corrupted = corrupt(tokens, pos_tags, num_corruptions=1)[0]

print(f'Original: "{doc}"')
print(f'Corrupted: "{corrupted[0]}"')
print(f'Method: {corrupted[1]}')
print('POS tags:')
print(list(zip(tokens, pos_tags)))

In [None]:
all_datasets = [
    ('da', 'ddt-pos'),
    ('sv', 'sdt-pos'),
    ('nb', 'ndt-nb-pos'),
    ('nn', 'ndt-nn-pos'),
    ('is', 'idt-pos'),
    ('fo', 'fdt-pos')
]
 
# Loop over all the datasets
for language, dataset_id in tqdm(all_datasets):
    
    # Load the dataset
    X_train, X_test, y_train, y_test = load_dataset(dataset_id)
    
    # Concatenate the POS tags to the tokens
    train_df = pd.concat([X_train, y_train], axis=1)
    test_df = pd.concat([X_test, y_test], axis=1)
    
    # Remove samples with five or fewer tokens
    train_df = train_df[train_df.tokens.map(lambda lst: len(lst) > 5)]
    test_df = test_df[test_df.tokens.map(lambda lst: len(lst) > 5)]
    
    # Remove samples with five or fewer distinct POS tags
    train_df = train_df[train_df.pos_tags.map(lambda lst: len(set(lst)) > 5)]
    test_df = test_df[test_df.pos_tags.map(lambda lst: len(set(lst)) > 5)]
    
    # Remove samples with an odd number of quotes
    train_df = train_df[train_df.doc.map(lambda doc: doc.count('"') % 2 == 0)]
    test_df = test_df[test_df.doc.map(lambda doc: doc.count('"') % 2 == 0)]
    
    # Remove samples which starts with punctuation
    train_df = train_df[train_df.pos_tags.map(lambda lst: lst[0] not in ['PUNCT', 'SYM'])]
    test_df = test_df[test_df.pos_tags.map(lambda lst: lst[0] not in ['PUNCT', 'SYM'])]

    # Remove samples containing the more than one '=' character, as this
    # is used to indicate a tag
    train_df = train_df[train_df.doc.map(lambda doc: doc.count('=') <= 1)]
    test_df = test_df[test_df.doc.map(lambda doc: doc.count('=') <= 1)]
    
    # Remove samples containing 'SLUTORD', as this is used to indicate a tag
    train_df = train_df[~train_df.doc.str.contains('SLUTORD')]
    test_df = test_df[~test_df.doc.str.contains('SLUTORD')]
    
    # Shuffle the samples and reset the index
    train_df = train_df.sample(frac=1.0).reset_index(drop=True)
    test_df = test_df.sample(frac=1.0).reset_index(drop=True)

    # Create a validation set, and a small copy of the training set
    val_df = train_df.iloc[-128:]
    train_df = train_df.iloc[:-128]
    small_train_df = train_df.copy().iloc[:512]
    test_df = test_df.iloc[:512]

    # Prepare the datasets by adding corruptions
    small_train = prepare_df(small_train_df, split='small_train')
    train = prepare_df(train_df, split='train')
    val = prepare_df(val_df, split='val')
    test = prepare_df(test_df, split='test')
    
    # Collect datasets in a dataset dictionary
    dataset = DatasetDict(
        small_train=small_train,
        train=train,
        val=val,
        test=test
    )

    # Push the dataset to the Hugging Face Hub
    dataset.push_to_hub(f'ScandEval/scala-{language}')