In [None]:
!pip install numpy scikit-learn tensorflow



In [None]:
def transform_dataset(page_dataset, for_inference):
    labeled_text_dataset = []
    for page in page_dataset:
        page_words = page["representativeData"]["page_data_words"]

        geo_dictionary = {}
        if not for_inference:
            page_answers = page.get("answers")
            for page_answer in page_answers[0]["answer"]:
                geo_label = page_answer["id"]
                for geo_part in page_answer["data"]:
                    for index in range(geo_part["start"], geo_part["end"]):
                        geo_dictionary[index] = geo_label

        labeled_text = []
        for word_index, word in enumerate(page_words):
            word_label = "0" if for_inference else geo_dictionary.get(word_index, "O")
            labeled_text.append((word, word_label))

        if not for_inference:
            labeled_text_dataset.append(labeled_text)
        else:
            labeled_text_dataset.append((page["taskId"], labeled_text))

    return labeled_text_dataset

In [None]:
import json

def get_labeled_dataset(dataset_path, for_inference=False):
    with open(dataset_path) as json_dataset:
        dataset = json.load(json_dataset)

    labeled_dataset = transform_dataset(dataset["data"]["results"], for_inference)
    return labeled_dataset

In [None]:
def get_validation_result(X_validation, y_pred):
    validation_result = []

    for ((task_id, _), predictions) in zip(X_validation, y_pred):
        answers = {}
        current_label = None
        start_index = None

        for current_index, label in enumerate(predictions):
            if label == current_label:
                continue
            else:
                if current_label is not None and current_label != "O":
                    if current_label not in answers:
                        answers[current_label] = []
                    answers[current_label].append({"start": start_index, "end": current_index})

                if label != "0":
                    current_label = label
                    start_index = current_index
                else:
                    current_label = None

        if current_label is not None and current_label != "O":
            if current_label not in answers:
                answers[current_label] = []
            answers[current_label].append({"start": start_index, "end": len(predictions)})

        validation_answers = []
        for label, segments in answers.items():
            validation_answers.append({"id": label, "data": segments})

        validation_result.append({
            "taskId": task_id,
            "answer": validation_answers
        })

    return validation_result

In [None]:
import tensorflow as tf

def focal_loss(alpha=0.25, gamma=2.):
    def focal_loss_parametrized(y_true, y_pred):
        e = 1.e-9
        y_true = tf.convert_to_tensor(y_true, tf.float32)
        y_pred = tf.convert_to_tensor(y_pred, tf.float32)

        model_output = tf.add(y_pred, e)
        ce = tf.multiply(y_true, -tf.math.log(model_output))
        w = tf.multiply(y_true, tf.pow(tf.subtract(1., model_output), gamma))
        fl = tf.multiply(alpha, tf.multiply(w, ce))
        reduced_fl = tf.reduce_max(fl, axis=1)
        return tf.reduce_mean(reduced_fl)

    return focal_loss_parametrized

In [None]:
def test_inference(batch_size):
    predictions = model.predict(X_test, batch_size=batch_size, use_multiprocessing=True, workers=os.cpu_count())

    y_pad_pred_test = [[labels[np.argmax(prediction)] for prediction in text_prediction]
                      for text_prediction in predictions]

    y_pred_test = []

    for i, text in enumerate(test_dataset):
        text_predictions = []
        for j, (word, _) in enumerate(text):
            if j < len(y_pad_pred_test[i]):
                text_predictions.append((word, y_pad_pred_test[i][j]))

        y_pred_test.append(text_predictions)

    y_test_flat = [label for text in test_dataset for _, label in text]
    y_pred_flat = [label for text in y_pred_test for _, label in text]

    return y_test_flat, y_pred_flat

In [None]:
from google.colab import drive
drive.mount("/content/drive")

train_dataset = get_labeled_dataset("/content/drive/My Drive/Colab Notebooks/datasets/train_geo_extractor.json")
test_dataset = get_labeled_dataset("/content/drive/My Drive/Colab Notebooks/datasets/test_geo_extractor.json")
validation_dataset = get_labeled_dataset("/content/drive/My Drive/Colab Notebooks/datasets/val_no_answer_geo_extractor.json",
                                         for_inference=True)

Mounted at /content/drive


In [None]:
max_text_length = max([len(text) for text in train_dataset])

words = [word for text in train_dataset for word, _ in text]
words.append("UNKNOWN")
words.append("ENDPAD")
words = list(set(words))

labels = list(set([label for text in train_dataset for _, label in text]))

In [None]:
word2index = {word: index for index, word in enumerate(words)}
label2index = {label: index for index, label in enumerate(labels)}

In [None]:
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical

X_train = [[word2index[word] for word, _ in text] for text in train_dataset]
X_train = pad_sequences(maxlen=max_text_length, sequences=X_train, padding="post", value=len(word2index) - 1)

y_train = [[label2index[label] for _, label in text] for text in train_dataset]
y_train = pad_sequences(maxlen=max_text_length, sequences=y_train, padding="post", value=label2index["O"])
y_train = [to_categorical(index, num_classes=len(label2index)) for index in y_train]

In [None]:
X_test = [[word2index.get(word, word2index["UNKNOWN"]) for word, _ in text] for text in test_dataset]
X_test = pad_sequences(maxlen=max_text_length, sequences=X_test, padding="post", value=len(word2index) - 1)

In [None]:
import os
import numpy as np
from sklearn.metrics import matthews_corrcoef
from tensorflow.keras.callbacks import Callback

class ComputeMCC(Callback):
    def __init__(self):
        super(ComputeMCC, self).__init__()

    def on_epoch_end(self, epoch, logs=None):
        y_test_flat, y_pred_flat = test_inference(batch_size=512)

        print(f"Epoch {epoch + 1}. MCC: {matthews_corrcoef(y_test_flat, y_pred_flat)}")

In [None]:
import os
import numpy as np
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Embedding, Dropout, Conv1D, Dense

model_input = Input(shape=(max_text_length, ))
embedding_output = Embedding(input_dim=len(word2index), output_dim=max_text_length, input_length=max_text_length)(model_input)

dropout_output = Dropout(0.5)(embedding_output)
conv1d_output = Conv1D(filters=300, kernel_size=3, padding='same', activation='relu')(dropout_output)

dropout_output = Dropout(0.3)(conv1d_output)
conv1d_output = Conv1D(filters=200, kernel_size=3, padding='same', activation='relu')(dropout_output)

dropout_output = Dropout(0.1)(conv1d_output)
conv1d_output = Conv1D(filters=100, kernel_size=3, padding='same', activation='relu')(dropout_output)

model_output = Dense(len(label2index), activation="softmax")(conv1d_output)

model = Model(model_input, model_output)
model.compile(optimizer="adam", loss=focal_loss(), metrics=["accuracy"])

model.fit(X_train, np.array(y_train), batch_size=512, epochs=15, callbacks=[ComputeMCC()],
          use_multiprocessing=True, workers=os.cpu_count())

Epoch 1/15
Epoch 1. MCC: 0.35735998684330006
Epoch 2/15
Epoch 2. MCC: 0.5976594902141319
Epoch 3/15
Epoch 3. MCC: 0.6147680821649801
Epoch 4/15
Epoch 4. MCC: 0.6701884496014346
Epoch 5/15
Epoch 5. MCC: 0.7077121169699299
Epoch 6/15
Epoch 6. MCC: 0.772902138595918
Epoch 7/15
Epoch 7. MCC: 0.7933368943862786
Epoch 8/15
Epoch 8. MCC: 0.813086135560299
Epoch 9/15
Epoch 9. MCC: 0.8255154100446584
Epoch 10/15
Epoch 10. MCC: 0.8365629718771677
Epoch 11/15
Epoch 11. MCC: 0.8392065156194172
Epoch 12/15
Epoch 12. MCC: 0.8442358369609005
Epoch 13/15
Epoch 13. MCC: 0.8443766485649706
Epoch 14/15
Epoch 14. MCC: 0.8485222075669824
Epoch 15/15
Epoch 15. MCC: 0.8492536796133454


<keras.src.callbacks.History at 0x7c1afff0e890>

In [None]:
from sklearn.metrics import classification_report, matthews_corrcoef

y_test_flat, y_pred_flat = test_inference(batch_size=512)

print(classification_report(y_test_flat, y_pred_flat))
print(f"Matthews Correlation Coefficient: {matthews_corrcoef(y_test_flat, y_pred_flat)}")

                   precision    recall  f1-score   support

                O       0.99      0.99      0.99     62822
     central_city       0.45      0.32      0.37       184
      geo_address       0.85      0.65      0.74      1040
     geo_building       0.78      0.77      0.77       453
         geo_city       0.83      0.78      0.81      1433
     geo_district       0.82      0.77      0.79       387
geo_microdistrict       0.62      0.58      0.60       382
       geo_region       0.99      0.98      0.99      1733
geo_region_oblast       0.87      0.78      0.82       297
       geo_street       0.70      0.76      0.73      1059

         accuracy                           0.97     69790
        macro avg       0.79      0.74      0.76     69790
     weighted avg       0.97      0.97      0.97     69790

Matthews Correlation Coefficient: 0.8492536796133454


In [None]:
X_validation = [[word2index.get(word, word2index["UNKNOWN"]) for word, _ in text]
                for task_id, text in validation_dataset]
X_validation = pad_sequences(maxlen=max_text_length, sequences=X_validation, padding="post",
                             value=len(word2index) - 1)

In [None]:
y_pred_validation = model.predict(X_validation, batch_size=512, use_multiprocessing=True, workers=os.cpu_count())



In [None]:
X_validation = [(task_id, text) for task_id, text in validation_dataset]

y_pad_pred_validation = [[labels[np.argmax(prediction)] for prediction in text_prediction]
              for text_prediction in y_pred_validation]

y_pred_validation = []

for i, text in enumerate(validation_dataset):
    text_predictions = []
    for j, (word, _) in enumerate(text[1]):
        if j < len(y_pad_pred_validation[i]):
            text_predictions.append((word, y_pad_pred_validation[i][j]))

    y_pred_validation.append(text_predictions)

In [None]:
import json

y_pred_validation = [[label for _, label in text] for text in y_pred_validation]

validation_result = get_validation_result(X_validation, y_pred_validation)

with open("/content/drive/My Drive/Colab Notebooks/hybrid_validation_result.json", "w", encoding="utf-8") as file:
    json.dump(validation_result, file, ensure_ascii=False, indent=4)

print("Validation result has been saved!")

Validation result has been saved!
