# Segmentation
Imports and define names of datafiles

In [None]:
from sklearn.model_selection import train_test_split, KFold
from transformers import AutoTokenizer
from typing import List,Tuple
from tqdm import tqdm  
import regex as re
import random
import json
import torch.nn as nn
import torch
from torch.utils.data import DataLoader, TensorDataset
datafiles= {
  "E1" : [''],
  "E2" : ['a', 'b'],
  "E3" : [''],
  "E4" : ['']
}
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

Function that extracts headwords out of \<b\> tags to build a headword dataset.

In [None]:
def build_b_tag_dataset(datastring, next_chars = 500, verbose=False):
  b_tag_dict = []

  # BUILD POSITIVE 
  for match in tqdm(re.finditer(r"((?<=<b>).+<\/b>)(.*(?<=<b>).+<\/b>)*", datastring), disable=(not verbose)):
    g1 = match.group(0)
    matched_b_tag = re.sub(r"</b>.*<b>|</b>"," ",g1).strip()
    end_of_b_tag = match.end()  
    
    surrounding_text_match = re.search(r"([^<]{1,"+str(next_chars)+r"})(?=<|$)", datastring[end_of_b_tag:end_of_b_tag+next_chars])
    surrounding_text = surrounding_text_match.group(0) if surrounding_text_match else ""

    short_def = re.sub(r"\s+", " ", surrounding_text).strip()
    if len(short_def) > 0:
      b_tag_dict.append([f"{matched_b_tag} {short_def}", matched_b_tag])

  # BUILD NEGATIVE
  for match in tqdm(re.finditer(r"(\n\n\p{Upper}[^<]{10,500})(?=\n|$|<)", datastring), disable=(not verbose)):
    g = match.group(0)
    matched_text = re.sub(r"\s+", " ", g).strip()
    b_tag_dict.append([matched_text, "<NO_HEADWORD>"])

  return b_tag_dict

Build the headword datasets for the first and second editions (E1 \& E2) where for each entry there is:
  - Feature: A paragraph or piece of text that starts with a headword, followed by up to <i>next_chars</i> number of characters, default is 500.
  - label: The headword at the beginning of the corresponding feature, empty string if feature wasn't a <i>"headword"</i> paragraph.

Save results to json files:
```json
  ["Lund, uppstad i Malmöhus län...beskaffenhet. I all", "Lund,"]
  ["betjenade sig af rapporter från...till privatlifvet", ""]
```

In [None]:
# for i,edition in enumerate(['E1', 'E2']):

#   dataset = ""
#   for file in datafiles.get(edition):
#     with open(f"./dataset/NF_{edition}{file}.txt", "r", encoding='utf-8') as fr:
#       dataset += fr.read()
#       fr.close()
      
#   b_tag_dict = build_b_tag_dataset(dataset, verbose=True)
#   print(f"{edition} has {len(b_tag_dict):,} entries")

#   with open(f"./dataset/NF_{edition}_B.json", "w") as b_json:
#     json.dump(b_tag_dict, b_json, indent=2, ensure_ascii=False)
# del i, edition, dataset, file, fr, b_tag_dict, b_json

In [None]:
tokenizer = AutoTokenizer.from_pretrained("KB/bert-base-swedish-cased")
#_ = tokenizer.add_tokens(["<NO_HEADWORD>"])

In [None]:
dataset = []
for i, edition in enumerate(['E1', 'E2']):
  for file in datafiles.get(edition):
    with open(f"./dataset/NF_{edition}_B.json", "r", encoding='utf-8') as b_json:
      dataset += json.load(b_json)
      b_json.close()

random.shuffle(dataset)

def process_data(sentence, headword):
    encoded_sentence = tokenizer(
        sentence,
        add_special_tokens=True, # Add [CLS] and [SEP] tokens
        padding='max_length',   # Pad to a maximum length
        max_length=100,        # Choose an appropriate max length
        truncation=True,        # Truncate if longer than max length
        return_tensors='pt'   # Return PyTorch tensors
    )
    # Encode the headword
    encoded_headword = tokenizer(
        headword,
        add_special_tokens=True,
        padding='max_length',
        max_length=20,           # Choose a suitable max length for headwords
        truncation=True,
        return_tensors='pt'
    )
    return encoded_sentence['input_ids'][0], encoded_headword['input_ids'][0]

def extract_features_labels(dataset) -> Tuple[List, List]:
    x = []
    y = []
    for entry in tqdm(dataset):
      s, h = process_data(entry[0], entry[1])
      x.append(s)
      y.append(h)
    #return x,y
    return torch.stack(x).to(device), torch.stack(y).to(device)

#dataset = dataset[:int(0.2*len(dataset))]
X, y = extract_features_labels(dataset)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20)
print(X_train[0], y_train[0])

In [None]:
print("Vocab size:", tokenizer.vocab_size)
print("Max input ID:", torch.max(X_train))
print("Max target ID:", torch.max(y_train))
#tokenizer.convert_tokens_to_ids('<NO_HEADWORD>')
print(tokenizer.convert_ids_to_tokens(50325))

In [None]:
vocab_size = tokenizer.vocab_size

class HeadwordPredictorLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, tokenizer, max_length=20): # add max_length
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.encoder = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.decoder = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)
        self.tokenizer = tokenizer
        self.max_length = max_length


    def forward(self, input_seq, target_seq=None, teacher_forcing_ratio=0.5):
        batch_size = input_seq.size(0)
        input_embeddings = self.embedding(input_seq)
        _, (hidden, cell) = self.encoder(input_embeddings)


        decoder_hidden = hidden
        decoder_cell = cell

        target_length = target_seq.size(1) if target_seq is not None else self.max_length

        outputs = torch.zeros(batch_size, target_length, vocab_size).to(input_seq.device)

        decoder_input = torch.full((batch_size, 1), self.tokenizer.cls_token_id, dtype=torch.long).to(input_seq.device)

        for t in range(target_length):
          #print(decoder_input.shape)
          decoder_embeddings = self.embedding(decoder_input)
          decoder_output, (decoder_hidden, decoder_cell) = self.decoder(decoder_embeddings, (decoder_hidden, decoder_cell))

          output = self.fc(decoder_output)
          outputs[:, t:t+1, :] = output

          use_teacher_forcing = torch.rand(1) < teacher_forcing_ratio if target_seq is not None else False

          if use_teacher_forcing:
              decoder_input = target_seq[:, t:t+1].long()

          else:

              top1 = output.argmax(2)
              decoder_input = top1



        return outputs

In [None]:
print(len(tokenizer))

In [None]:
#print tokenid 0-4
print(tokenizer.decode([0,1,2,3,4]))

In [None]:
embedding_dim = 128
hidden_dim = 256
batch_size = 32
num_epochs = 20
learning_rate = 1e-3
max_length = 20

model = HeadwordPredictorLSTM(vocab_size, embedding_dim, hidden_dim, tokenizer, max_length).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

#criterion = nn.CrossEntropyLoss()
criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)

In [None]:
train_dataset = TensorDataset(X_train.long(), y_train.long())
test_dataset = TensorDataset(X_test.long(), y_test.long())

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
for epoch in range(num_epochs):
    model.train()
    train_loss = 0

    for input_batch, target_batch in (train_loader):
        input_batch = input_batch#.to(device)
        target_batch = target_batch#.to(device)

        optimizer.zero_grad()
        outputs = model(input_batch, target_batch, teacher_forcing_ratio=0.5)
        #print(outputs.shape)
        #print(target_batch.shape)
        #print(outputs.view(-1, vocab_size).shape)
        #print(target_batch.view(-1).shape)
        loss = criterion(outputs.view(-1, vocab_size), target_batch.view(-1))
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    avg_train_loss = train_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {avg_train_loss:.4f}")


    model.eval()
    with torch.no_grad():
        test_loss = 0

        for input_batch, target_batch in tqdm(test_loader, desc = "Testing"):
          input_batch = input_batch#.to(device)
          target_batch = target_batch#.to(device)

          outputs = model(input_batch, target_batch, teacher_forcing_ratio=0)

          loss = criterion(outputs.view(-1, vocab_size), target_batch.view(-1))

          test_loss += loss.item()

    avg_test_loss = test_loss/len(test_loader)

    print(f"Test Loss: {avg_test_loss:.4f}")


In [None]:

model.eval()
input_sentence = "En stad i skåne"
encoded_input = tokenizer(input_sentence, return_tensors="pt", padding = "max_length", max_length = 128, truncation = True).to(device)


with torch.no_grad():
    output = model(encoded_input['input_ids'])


predicted_indices = output.argmax(2)[0].cpu().tolist()
predicted_headword = tokenizer.decode(predicted_indices, skip_special_tokens=True)


print("Predicted headword:", predicted_headword)


torch.save(model.state_dict(),"headword_predictor.pth")

