This file loads pre-trained Word2Vec models for task of filling in gaps of Ancient Egyptian sentences.

The code is hidden by default for brevity. Click button Show code (or equivalent in your language) to delve in to the details.



# Setup

In [1]:
# @title Setup work space and download files { display-mode: "form" }

# Create working directory and move to it.
!echo -n "Populating working directory: "
!mkdir -p /content/data-word2vec
%cd /content/data-word2vec

from google.colab import userdata # Needed if using private repository
from tqdm.notebook import tqdm, tqdm_notebook
import requests
import os

rt_url = 'https://zenodo.org/records/4954597/files/ramses-trl_2021_05_29.zip?download=1'
aes_filenames = [
    "_aes_bbawamarna.json", "_aes_bbawhistbiospzt.json",
    "_aes_sawmedizin.json", "_aes_bbawarchive.json",
    "_aes_bbawpyramidentexte.json", "_aes_bbawbriefe.json",
    "_aes_bbawramessiden.json", "_aes_smaek.json",
    "_aes_bbawfelsinschriften.json", "_aes_bbawtempelbib.json", "_aes_tb.json",
    "_aes_bbawgrabinschriften.json", "_aes_bbawtotenlit.json",
    "_aes_tuebingerstelen.json", "_aes_bbawgraeberspzt.json",
    "_aes_sawlit.json" ]
aes_path = 'https://raw.githubusercontent.com/simondschweitzer/aes/main/files/aes/'
misc_files = [ 'preprocessing/final_files/gaps/all_gaps_id.txt',
               'data/aed-tei/dictionary.json',
               'translations/ge_en_dictionary.json',
               'data/marete-ramses/aligned/combined_dev.txt',
               'data/marete-ramses/aligned/combined_test.txt',
               'preprocessing/final_files/gaps/all_gaps_id_harmonized.txt' ]
egy_path='https://raw.githubusercontent.com/annasahola/egy-gaps/main/'
models = [ '2024-03-13a-W2V-AES-alt.model', '2024-03-13a-W2V.RTC-optimized.model',
           '2024-03-13a-W2V-AES-TR-alt.model', '2024-03-13a-W2V-TR-combined-2.01.model' ]
egy_release='https://github.com/annasahola/egy-gaps/releases/download/release-1.0pre1/'

for x in tqdm_notebook(range(4), desc="Downloading"):
  if x == 0 and os.path.exists('ramses-trl/data/src-train.txt') != True:
    print(f"Downloading {rt_url}")
    response = requests.get(rt_url)
    if not response.ok:
      raise(Exception(f"Failed to read {rt_url}"))
    open("ramses-trl_2021_05_29.zip", "wb").write(response.content)
    !unzip -q ramses-trl_2021_05_29.zip
  if x == 1 and os.path.exists('_aes_bbawamarna.json') != True:
    for file in aes_filenames:
      print(f"Downloading {file}")
      response = requests.get(aes_path + file)
      if not response.ok:
        raise(Exception(f"Failed to read {file}"))
      with open(file, "wb") as file_out:
        file_out.write(response.content)
  if x == 2:
    for path_file in misc_files:
      filename = os.path.basename(path_file)
      if not os.path.exists(filename):
        print(f"Downloading {filename}")
        response = requests.get(egy_path + path_file)
        if not response.ok:
          raise(Exception(f"Failed to download {egy_path + path_file}"))
        with open(filename, "wb") as file_out:
          file_out.write(response.content)
  if x == 3:
    for model in models:
      if not os.path.exists("models/" + model):
        !mkdir -p models
        print(f"Downloading models/{model}")
        response = requests.get(egy_release + model)
        if not response.ok:
          raise(Exception(f"Failed to download {egy_release + model}"))
        with open(f"models/{model}", "wb") as file_out:
          file_out.write(response.content)
    if not os.path.exists('models/2024-03-13a-W2V RTC-optimized.model'):
      !cp 'models/2024-03-13a-W2V.RTC-optimized.model' 'models/2024-03-13a-W2V RTC-optimized.model'


Populating working directory: /content/data-word2vec


Downloading:   0%|          | 0/4 [00:00<?, ?it/s]

Downloading https://zenodo.org/records/4954597/files/ramses-trl_2021_05_29.zip?download=1
Downloading _aes_bbawamarna.json
Downloading _aes_bbawhistbiospzt.json
Downloading _aes_sawmedizin.json
Downloading _aes_bbawarchive.json
Downloading _aes_bbawpyramidentexte.json
Downloading _aes_bbawbriefe.json
Downloading _aes_bbawramessiden.json
Downloading _aes_smaek.json
Downloading _aes_bbawfelsinschriften.json
Downloading _aes_bbawtempelbib.json
Downloading _aes_tb.json
Downloading _aes_bbawgrabinschriften.json
Downloading _aes_bbawtotenlit.json
Downloading _aes_tuebingerstelen.json
Downloading _aes_bbawgraeberspzt.json
Downloading _aes_sawlit.json
Downloading all_gaps_id.txt
Downloading dictionary.json
Downloading ge_en_dictionary.json
Downloading combined_dev.txt
Downloading combined_test.txt
Downloading all_gaps_id_harmonized.txt
Downloading models/2024-03-13a-W2V-AES-alt.model
Downloading models/2024-03-13a-W2V.RTC-optimized.model
Downloading models/2024-03-13a-W2V-AES-TR-alt.model
Down

In [2]:
# @title Check files exist. { display-mode: "form" }
import os

aes_filenames = [
    "_aes_bbawamarna.json", "_aes_bbawhistbiospzt.json",
    "_aes_sawmedizin.json", "_aes_bbawarchive.json",
    "_aes_bbawpyramidentexte.json", "_aes_bbawbriefe.json",
    "_aes_bbawramessiden.json", "_aes_smaek.json",
    "_aes_bbawfelsinschriften.json", "_aes_bbawtempelbib.json", "_aes_tb.json",
    "_aes_bbawgrabinschriften.json", "_aes_bbawtotenlit.json",
    "_aes_tuebingerstelen.json", "_aes_bbawgraeberspzt.json",
    "_aes_sawlit.json" ]
gaps_file = "all_gaps_id.txt"
aed_dictionary = "dictionary.json"
ge_en_dict = "ge_en_dictionary.json"

for fi in aes_filenames + [aed_dictionary, ge_en_dict, gaps_file]:
  if os.path.exists(fi) != True:
    print(f"file {fi} does not exist on File System")
    raise Exception("Missing file(s)")

for fi in [f'models/{model}' for model in models]:
  if os.path.exists(fi) != True:
    print(f"file {fi} does not exist on File system")
    raise Exception("Missing file(s)")

# Make sure ramses-trl directory exists.
for di in [ 'ramses-trl', 'ramses-trl/data' ]:
  if os.path.exists(di) != True:
    os.mkdir(di)

if os.path.exists('ramses-trl/data/src-train.txt') != True:
  raise(Exception(f"{rt_url} not yet loaded."))

rtc_path = 'ramses-trl/data/'
rtc_ext = '.txt'
rtc_src_files = [ 'src-train', 'src-val' ] # Use -train for training/gaps and -val for testing/gaps.
rtc_tgt_files = [ 'tgt-train', 'tgt-val' ]
rtc_files = [rtc_path+fname+rtc_ext for fname in rtc_src_files + rtc_tgt_files]
all_files = aes_filenames + [aed_dictionary] + rtc_files
for fi in all_files:
  if os.path.exists(fi) != True:
    print(f"file {fi} does not exist on Google Drive")
    raise Exception("Missing file(s)")
print("\U0001f44D All required files located.")

👍 All required files located.


In [3]:
# @title Install required libraries. { display-mode: "form" }
#%pip install pympler
%pip install Levenshtein
%pip install unidecode
%pip install geneticalgorithm2
%pip install func_timeout
%pip install gardiner2unicode
%pip install wikitextparser

Collecting Levenshtein
  Downloading Levenshtein-0.25.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (177 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m177.4/177.4 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting rapidfuzz<4.0.0,>=3.8.0 (from Levenshtein)
  Downloading rapidfuzz-3.8.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.4/3.4 MB[0m [31m13.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: rapidfuzz, Levenshtein
Successfully installed Levenshtein-0.25.1 rapidfuzz-3.8.1
Collecting unidecode
  Downloading Unidecode-1.3.8-py3-none-any.whl (235 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m235.5/235.5 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: unidecode
Successfully installed unidecode-1.3.8
Collecting geneticalgorithm2
  Downloading geneticalgorithm2-6.9.2-py3-non

In [4]:
# @title Load required libraries. { display-mode: "form" }
import nltk
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import re
import json
import io
import re
import string
import random
from tqdm.notebook import tqdm, tqdm_notebook
import numpy as np
import collections
import gensim.models
import Levenshtein
import heapq
from unidecode import unidecode
from geneticalgorithm2 import geneticalgorithm2 as ga2
from collections import Counter
import time
from tabulate import tabulate
from gardiner2unicode import GardinerToUnicodeMap
from ipywidgets import interact, interactive, fixed, interact_manual, widgets
from gensim.models.callbacks import CallbackAny2Vec
from IPython.display import clear_output

Import libraries (all libraries should be installed above)

# Data extraction

In [5]:
# @title Parse AES Files from Google Drive into `aes_file_contents`, `aes_sentence_dictionary`. Click `Show code` to view details. { display-mode: "form" }

aes_print_sentences = []
written_form_to_lemma_id = dict()
written_form_to_mdc = dict()
def read_aes_files():
  aes_file_contents = []
  aes_sentence_dictionary = {}
  global aes_print_sentences

  notebook = tqdm_notebook(aes_filenames,desc="Loading...")
  for filename in notebook:
    # go through each file
    notebook.desc = filename
    with open(filename, "r") as file:
      data = json.load(file)
      sentences = []

      # go through all elements (sentences) in the JSON data
      for e in data:
        sentence = []
        aes_sentence_dictionary[e] = {}

        sentence_object = data[e]
        token_list = sentence_object["token"]

        for token in token_list:
          # written_form and mdc exist for all tokens
          cotext_translation = token["cotext_translation"] if "cotext_translation" in token else ""
          lemma_form = token["lemma_form"] if "lemma_form" in token else ""
          lemma_id = token["lemmaID"] if "lemmaID" in token else "-1"
          pos = token["pos"] if "pos" in token else ""
          token_object = (token["mdc"], cotext_translation, lemma_form, lemma_id, pos)
          if 'written_form' in token:
            written_form_to_lemma_id[token['written_form']] = lemma_id
          if 'written_form' in token:
            written_form_to_mdc[token['written_form']] = token["mdc"]
          sentence.append(token_object)

        sentences.append(sentence)

        aes_sentence_dictionary[e]["text"] = sentence_object["text"]
        aes_sentence_dictionary[e]["translation"] = sentence_object["sentence_translation"]
        aes_sentence_dictionary[e]["corpus"] = sentence_object["corpus"]
        aes_sentence_dictionary[e]["date"] = sentence_object["date"]
        aes_sentence_dictionary[e]["tokens"] = sentence
        aes_sentence_dictionary[e]["token_list"] = "<s> " + " ".join([token[3] for token in sentence]) + " </s>" # list of token lemma IDs separated by whitespace
        aes_sentence_dictionary[e]["sentence_transliteration"] = " ".join([token[0] for token in sentence])
        # You may add here search term for interesting sentences. I search for sentence with numbers and fractions.
        if len(aes_print_sentences) < 20: # First 20
            aes_print_sentences.append(aes_sentence_dictionary[e])
        if len(aes_print_sentences) < 100 and sum(["5" in token["mdc"] for token in token_list]) and sum(["1/8" in token["mdc"] for token in token_list]):
            aes_print_sentences.append(aes_sentence_dictionary[e])

      aes_file_contents.append(sentences)
  notebook.desc = "Loading AES"
  return (aes_file_contents, aes_sentence_dictionary)
aes_file_contents, aes_sentence_dictionary = read_aes_files()
print(f"AES Loaded: {len(aes_file_contents)} files with {len(aes_sentence_dictionary.items())} sentences")

# Show interesting sentences. The reason to use subset is that all
# sentences could be too much.
aes_print_sentences_compact = [[sentence['text'], sentence['token_list'], sentence['sentence_transliteration']] for sentence in aes_print_sentences[:100]]
pd.set_option('display.max_rows', 5)
aes_print_dataframe = pd.DataFrame(aes_print_sentences_compact,
                         columns=['Text', 'Token list', 'Transliteration'])
aes_print_dataframe


Loading...:   0%|          | 0/16 [00:00<?, ?it/s]

AES Loaded: 16 files with 101796 sentences


Unnamed: 0,Text,Token list,Transliteration
0,25IKMZZWRBARVG5WSM7JVGVBDE,<s> -1 856314 -1 </s>,"[...] [nfr-xpr,pl]-raw-wa-[n-raw] [...]"
1,25IKMZZWRBARVG5WSM7JVGVBDE,<s> -1 73330 850814 64830 550077 851446 51990 ...,"[...] [mH] 4 [m-mj,tt] ⸢SAa⸣-m pA [wD] [rs,j-j..."
...,...,...,...
22,S36FKQSICNFDLADBECABA64XXE,<s> 26870 10100 69410 110440 850815 850815 850...,"jni.n =sn 〈m〉fkA,t HqA,t 14 1/4 1/8 1/32 (w)DA..."
23,S36FKQSICNFDLADBECABA64XXE,<s> 26870 10100 112330 178610 850814 850814 -1...,jni.n =sn HD dbn 1676 1/2 [...] ⸢Hzmn⸣ ⸢dbn⸣ ⸢...


In [6]:
# @title Create test set of AES sentences. Click `Show code` to view details. { display-mode: "form" }

# Divide randomically 20% of sentences to test.
def is_test(token_list):
  h = hash(token_list)
  return True if (h % 100) < 20 else False # Select 20% inputs

def get_aes(ae_dictionary):
  aecorpus = collections.namedtuple('AncientEgyptianCorpus', 'aes aes_1 aes_t aes_g aesen aesen_1 aesen_t aed rtc rtc_1 rtc_t rtc_g')

  aes_g = []
  aes = []
  aes_1 = []
  aes_t = []
  c = 5
  print("Examples of sentences with gaps:")
  for id, sentence_all in ae_dictionary.items():
    tokens = sentence_all["tokens"]
    gaps = True
    try:
      # Consider unreadable as gap.
      gaps = any(token[3] == "-1" for token in tokens)
      if gaps == True and c > 0:
        c -= 1
        print(sentence_all["token_list"], "    ", sentence_all["sentence_transliteration"])
    except:
      #print("Gaps check fail: ", sentence)
      pass
    if gaps:
      aes_g.append(sentence_all)
    else:
      cl = is_test(sentence_all["token_list"]) # classification is per token sequence.
      aes.append(sentence_all)
      if cl == False:
        aes_1.append(sentence_all)
      else:
        aes_t.append(sentence_all)

  aecorpus.aes_g = aes_g
  aecorpus.aes = aes
  aecorpus.aes_t = aes_t
  aecorpus.aes_1 = aes_1
  return aecorpus

aecorpus = get_aes(aes_sentence_dictionary)
print(f"AES partitioned: usable sentences={len(aecorpus.aes)} incl: training={len(aecorpus.aes_1)} test={len(aecorpus.aes_t)} gapped: {len(aecorpus.aes_g)}")

Examples of sentences with gaps:
<s> -1 856314 -1 </s>      [...] [nfr-xpr,pl]-raw-wa-[n-raw] [...]
<s> -1 73330 850814 64830 550077 851446 51990 96090 -1 </s>      [...] [mH] 4 [m-mj,tt] ⸢SAa⸣-m pA [wD] [rs,j-jmn,tj] [...]
<s> -1 851809 78870 33360 850814 -1 </s>      [...] jri.w [n] ⸢jtr,w⸣ [6] [...]
<s> 119600 851427 854537 851446 850814 51990 550077 851446 182830 20570 -1 </s>      [xr] [jr] ⸢Xnw⸣ pA 4 wD ⸢SAa-m⸣ pA [Dw] [jAb,tj] [...]
<s> 21881 10090 78870 32820 10030 859387 856317 -1 64360 182830 64360 114300 64360 141480 64360 66830 -1 64360 158000 64360 30750 81660 851809 851446 850317 550021 10030 32820 142050 10100 91900 86570 181400 </s>      [jw] =[s] [n] [jt(j)] =[j] [raw-Hr,w-Ax,tj-Hai-m-Ax,t] [m-rn≡f-m-Sw-n,tj-m-jtn] [...] [m] ⸢Dw.pl⸣ [m] ⸢xAs,t.pl⸣ m sx,t.pl m mAw,t.pl [...] m StA.pl [m] [(j)x,t] [nb.t] jri.w pA jtn ⸢pAy⸣ =j jt(j) ⸢sxpr⸣ =sn r nHH [D,t]
AES partitioned: usable sentences=79357 incl: training=63877 test=15480 gapped: 22439


In [7]:
# @title Read Ancient Egyptian Dictionary. Click `Show code` to view details. { display-mode: "form" }

def read_aed():
  aed_dict = {}

  with open(aed_dictionary, "r") as dictionary_file:
    dictionary_data = json.load(dictionary_file)
  dictionary_entries = dictionary_data["TEI"]["text"]["body"]["entry"]

  for entry in dictionary_entries:
    xml_id = entry["_xml:id"]
    lemma_id = xml_id[3:]
    entry_translations = [(e["quote"], e["_xml:lang"]) for e in entry["sense"]["cit"] if "quote" and "_xml:lang" and "_type" in e and e["_type"] == "translation"]
    aed_dict[lemma_id] = {
        "form": entry["form"]["orth"],
        "grammarGroup": entry["gramGrp"]["term"],
        "translations": entry_translations
    }

  print('Example entry in the dictionary:')
  print(aed_dict["10"])

  print('The size of the dictionary is:', len(aed_dict))

  unique_lemma_ids = set([token[3] for file in aes_file_contents for sentence in file for token in sentence])
  missing_vocabulary = [lemma_id for lemma_id in unique_lemma_ids if lemma_id not in aed_dict.keys()]
  missing_vocabulary_count = len(missing_vocabulary) - 1 # account for lemma_id -1 for lacunae
  print('The number of missing vocabulary is:', missing_vocabulary_count)

  vocabulary_items_with_en_translations = {k: v for k, v in aed_dict.items() if any(translation[1] == 'en' for translation in v["translations"])}
  print('The size of the English-only dictionary:', len(vocabulary_items_with_en_translations))
  print('Portion of vocabulary in the dictionary with English translations:', len(vocabulary_items_with_en_translations)/len(aed_dict))
  return aed_dict
aecorpus.aed = read_aed()

Example entry in the dictionary:
{'form': 'ꜣj.wj', 'grammarGroup': 'substantive/substantive_masc', 'translations': [('Doppelverband, Kreuzverband (?) (med.)', 'de'), ('pair of bandages (med.)', 'en')]}
The size of the dictionary is: 35052
The number of missing vocabulary is: 0
The size of the English-only dictionary: 16971
Portion of vocabulary in the dictionary with English translations: 0.484166381376241


Create English translations where applicable and attach them to AES corpus.

In [8]:
# @title Add English Translations to Ancient Egyptian Sentences. Click `Show code` to view details. { display-mode: "form" }
from textblob import TextBlob
from textblob.exceptions import NotTranslated, TranslatorError
import json
ge_en = dict() # A small dictionary for mappings texts to English
ge_en_changed = False
ge_en_new_entries = 0

if os.path.exists("ge_en_dictionary.json"):
  with open('ge_en_dictionary.json') as f:
    ge_en = json.load(f)

def new_translation_from_de(de):
  blob = TextBlob(de)
  try:
    res = blob.translate(from_lang='de', to='en')
  except NotTranslated:
    res = de
  except TranslatorError:
    res=" "

  global ge_en
  ge_en[de] = str(res)
  global ge_en_changed
  ge_en_changed = True
  global ge_en_new_entries
  ge_en_new_entries += 1
  # If added over 2000 entries it is good to store the database.
  if ge_en_new_entries >= 2000:
    with open('ge_en_dictionary.json', 'w') as f:
      json.dump(ge_en, f)
      print(f"Stored updated ge/en dictionary: {len(ge_en)} entries")
    ge_en_new_entries = 0
    ge_en_changed = False
  return str(res)

def add_english_translations(aes_dictionary, aed_dictionary):
  translations = 0
  translations_full = 0
  translations_full_lookup = 0
  translations_part = 0
  translations_part_lookup = 0
  en_translations = dict()
  de_translations = dict()
  for id,ent in aed_dictionary.items():
    t = list(filter(lambda translation: translation[1] == 'en', ent["translations"]))
    if len(t) > 0:
      en_translations[id] = str(t[0][0])
      translations += 1
    t_de = list(filter(lambda translation: translation[1] == 'de', ent["translations"]))
    if len(t) > 0:
      de_translations[id] = str(t_de[0][0])
  print(f"Using dictionary with {len(en_translations.items())} translations")
  translations = 0
  for id, sentence in tqdm_notebook(aes_sentence_dictionary.items(),desc="Translating"):
    if 'translation' in sentence and len(sentence['translation']) > 1:
      # Found german translation, using it.
      de_translations = sentence['translation']
      if de_translations in ge_en:
        sentence["en"] = ge_en[de_translations]
        if translations_full_lookup < 5:
          print('FULL', sentence["text"], sentence["sentence_transliteration"], sentence["en"])
        translations_full_lookup +=1
      else:
        res = new_translation_from_de(de_translations)
        sentence["en"] = res
        if translations_full < 5:
          print('FULL', sentence["text"], sentence["sentence_transliteration"], sentence["en"])
        translations_full += 1
      continue # Translated full sentence
    for token in sentence["tokens"]:
      lemma_id = token[3]
      if lemma_id not in en_translations:
        if lemma_id in de_translations:
          de_translation = de_translations[lemma_id]
          if de_translation in ge_en:
            en_translations[lemma_id] = ge_en[de_translation]
            if translations_part_lookup < 5:
              print('PART', sentence["text"], sentence["sentence_transliteration"], "PART:", en_translations[lemma_id])
            translations_part_lookup += 1
            continue
          else:
            res = new_translation_from_de(de_translation)
            en_translations[lemma_id] = res
            if translations_part < 5:
              translations_part += 1
              print('PART', sentence["text"], sentence["sentence_transliteration"], "PART:", en_translations[lemma_id])
        break
    else:
      translated_tokens = [en_translations[token[3]] for token in sentence["tokens"]]
      sentence["en"] = "  ".join(translated_tokens)
      if translations < 5:
        print('CONSTRUCTED', sentence["text"], sentence["sentence_transliteration"], sentence["en"])
      translations += 1
  print(f"AES: Used {translations_full + translations_full_lookup} English sentence translation from de")
  print(f"AES: Constructed {translations_part + translations_part_lookup} English sentence part translations from de")
  print(f"AES: Constructed {translations} English translations from dictionary")

add_english_translations(aes_sentence_dictionary, aecorpus.aed)

aecorpus.aesen = list([sentence for sentence in aecorpus.aes if "en" in sentence and "translation" in sentence])
aecorpus.aesen_1 = list([sentence for sentence in aecorpus.aes_1 if "en" in sentence and "translation" in sentence])
aecorpus.aesen_t = list([sentence for sentence in aecorpus.aes_t if "en" in sentence and "translation" in sentence])
aecorpus_readable = list([[sentence['text'], sentence['sentence_transliteration'], sentence['en'], sentence['translation']] for sentence in aecorpus.aesen])

pd.DataFrame(aecorpus_readable, columns=['sentence id', 'transliteration', 'translation', 'de translation'])

# Store updated ge/en dictionary if it has been updated
if ge_en_new_entries >= 5:
  #print(list(ge_en.items())[:10])
  with open('ge_en_dictionary.json', 'w') as f:
    json.dump(ge_en, f)
    print(f"Updated ge/en dictionary: {len(ge_en)} entries")

Using dictionary with 16971 translations


Translating:   0%|          | 0/101796 [00:00<?, ?it/s]

FULL 25IKMZZWRBARVG5WSM7JVGVBDE [...] [nfr-xpr,pl]-raw-wa-[n-raw] [...] ... [Nefer-cheper] -re-Wa [en-re] ...
FULL 25IKMZZWRBARVG5WSM7JVGVBDE [...] [mH] 4 [m-mj,tt] ⸢SAa⸣-m pA [wD] [rs,j-jmn,tj] [...] ... 4 Ellen, [equally] from the [southwestern stele] ...
FULL 25IKMZZWRBARVG5WSM7JVGVBDE [...] jri.w [n] ⸢jtr,w⸣ [6] [...] ... it makes [6] JTr, W lengths ...
FULL 25IKMZZWRBARVG5WSM7JVGVBDE [xr] [jr] ⸢Xnw⸣ pA 4 wD ⸢SAa-m⸣ pA [Dw] [jAb,tj] [...] [As for the] area within these four steles [concerns], starting with the [eastern mountain] ...
FULL 25IKMZZWRBARVG5WSM7JVGVBDE [jw] =[s] [n] [jt(j)] =[j] [raw-Hr,w-Ax,tj-Hai-m-Ax,t] [m-rn≡f-m-Sw-n,tj-m-jtn] [...] [m] ⸢Dw.pl⸣ [m] ⸢xAs,t.pl⸣ m sx,t.pl m mAw,t.pl [...] m StA.pl [m] [(j)x,t] [nb.t] jri.w pA jtn ⸢pAy⸣ =j jt(j) ⸢sxpr⸣ =sn r nHH [D,t] [And it belongs to my father] "[Re-Harachten, who cheers sunlight in his name, which is in the sun]" (= aton), ... [consisting of] mountains and deserts, from Weideland, new territory and ... from tree pla

There are differences between RTC and AES/AED transliterations. In order to investigate the differances, this short function `lemma_id_closest` was written. The function is generally not fast enough to run for all the words, but may prove useful when debugging the differences.

# RTC


In [9]:
# @title Lemmatization for RTC texts (partially incomplete). Click `Show code` to view details. { display-mode: "form" }
# Incomplete conversion from text to lemma form
# Unable to handle cases where there are multiple translations etc.

verbose_lemmatize=False # @param {type:"boolean"}

repl_dict = { 'A': 'ꜣ', 'H': 'ḥ', 'S': 'š', ',': '.', 'D': 'ḏ', 'x': 'ḫ', 'a': 'ꜥ', 's': 'z', 'i': 'i̯' }
rem_list = [ '⸢', '⸣', '[', ']', '(', ')', '{', '}', '<', '>', '⸮', '?' ]
lemmatize_dict = dict()
lemmatize_dict_id = dict()
lemma_id_dict = dict()
def lemmatize(txt):
  global repl_dict
  global rem_list
  global lemmatize_dict
  if txt in lemmatize_dict:
    return lemmatize_dict[txt]
  remtxt = ''
  for char in txt: # Remove '⸢', '⸣', '[', ']' etc.
    if char not in rem_list:
      remtxt += char
  if remtxt in lemmatize_dict:
    return lemmatize_dict[remtxt]
  if remtxt.isnumeric():
    return '1...n'
  out = []
  for char in remtxt:
    if char in repl_dict:
      out.append(repl_dict[char])
    else:
      repl_dict[char] = char
      out.append(char)
  out_str = "".join(out)
  # Some replacements I've seen. Not complete list.
  out_str = out_str.replace('jtn', 'Jtn')
  out_str = out_str.replace('ꜣḫ', 'Ꜣḫ')
  out_str = out_str.replace('.tt', '.tjt')
  lemmatize_dict[txt] = out_str
  return out_str

def get_lemma_id(lemma):
  global lemma_id_dict
  if lemma in lemma_id_dict:
     return lemma_id_dict[lemma]
  remtxt = ''
  for char in lemma: # Remove '⸢', '⸣', '[', ']' etc.
    if char not in rem_list:
      remtxt += char
  if remtxt in lemma_id_dict:
     return lemma_id_dict[remtxt]
  return None

mismatch_c = 0

def add_to_lemmatize_dict(txt, lemma, lemmaid, id, transl, transl2):
  global mismatch_c
  global lemmaize_dict
  if txt in lemmatize_dict:
    if lemma != lemmatize_dict[txt] and mismatch_c < 10 and verbose_lemmatize:
      print('Lemmatization mismatch: ', id, "v", lemmatize_dict_id[txt], txt, lemma, lemmatize_dict[txt])
      print(transl)
      print(transl2)
      mismatch_c += 1
    return
  lemmatize_dict[txt] = lemma
  lemma_id_dict[txt] = lemmaid
  lemmatize_dict_id[txt] = id
  remtxt = ''
  for char in txt:
    if char not in rem_list:
      remtxt += char
  if remtxt in lemmatize_dict:
    return
  lemmatize_dict[remtxt] = lemma
  lemma_id_dict[remtxt] = lemmaid
  lemmatize_dict_id[remtxt] = id

print("Lemmatization db")
add_to_lemmatize_dict('LACUNA', 'LACUNA', -1, 'LACUNA', 'LACUNA', 'LACUNA')
add_to_lemmatize_dict('1...n', '1...n', "850814", '1...n', '1...n', '1...n')

# Processing numbers
def add_numbers_to_lemmatize():
  digits = list([digit for digit in range(1,10)])
  magnitudes = list([pow(10,magnitude) for magnitude in range(7)])
  for num in [digit * magnitude for magnitude in magnitudes for digit in digits]:
    add_to_lemmatize_dict(str(num), '1...n', "850814", '1...n', '1...n', '1...n')
  for sfraction in range(1,9):
    add_to_lemmatize_dict("1/" + str(pow(2, sfraction)), '1...n', "850814", '1...n', '1...n', '1...n')
  for fraction in range(3,9):
    add_to_lemmatize_dict(f"1/{fraction}", '1...n', "850814", '1...n', '1...n', '1...n')
  add_to_lemmatize_dict("2/3", '1...n', "850814", '1...n', '1...n', '1...n')
  add_to_lemmatize_dict("3/4", '1...n', "850814", '1...n', '1...n', '1...n')
  add_to_lemmatize_dict("3/8", '1...n', "850814", '1...n', '1...n', '1...n')
  for fraction in range(1,10):
    add_to_lemmatize_dict("1/" + str(10*fraction), '1...n', "850814", '1...n', '1...n', '1...n')

add_numbers_to_lemmatize()

tr = set()
for sentence in aecorpus.aes:
  for token in sentence['tokens']:
    triplet = (token[3], token[0], token[2])
    tr.add(triplet)
    add_to_lemmatize_dict(triplet[1], triplet[2], triplet[0], sentence['text'], sentence['sentence_transliteration'], [token[2] for token in sentence['tokens']])

print("Built lemmatization database")

Lemmatization db
Built lemmatization database


In [10]:
# @title Load Ramses Transliteration Corpus (RTC). Click `Show code` to view details. [*Ramses Automated Transliterator*](https://gitlab.cnam.fr/gitlab/rosmorse/ramses-trl) by [**Université de Liège/Projet Ramsès**](http://ramses.ulg.ac.be/) & [**Serge Rosmorduc**](http://cedric.cnam.fr/lab/en/author/Rosmorduc/) is licensed under `CC BY-NC-SA` 4.0. { display-mode: "form" }

rtc_cannot_lemmatize = {}
def process_src_line(src):
  chars = src.rstrip('\n')
  return (chars, chars.split(" "))
def process_tgt_line(dst):
  dst = dst.rstrip('\n')
  chars = dst.split()
  chars = "".join(chars)
  chars = chars.replace('_', ' ').rstrip(' ')
  return (chars, chars.split())
count_err = 0
def lemma_id_translate(token):
  s = ""
  if token is None:
    return s
  # There is difference between j/i
  if token in aecorpus.aed:
    t = aecorpus.aed[token]
    if 'translations' in t:
      tr = t['translations']
      if len(tr) > 0 and len(tr[0]) > 0:
        tr1 = tr[:1][:1]
        tr0 = list(filter(lambda translation: translation[1] == 'en', tr))
        if len(tr0) > 0:
          tr1 = tr0
        s += "/" + str(tr1[0][0])
  return s
g2u_map = GardinerToUnicodeMap()
g2u_alt_map={ # Replacements for characters missing somewhere.
    'MISSING': "\uFFFD",
    '': "\u25A1",
    'Ff1': g2u_map.to_unicode_char('Z14')
}
g2u_alt_entries = len(g2u_alt_map)
def g2u(gardiner_list):
  u = []
  for g in gardiner_list.split(" "):
    if g in g2u_alt_map:
      out_char = g2u_alt_map[g]
    else:
      out_char = g2u_map.to_unicode_char(g)
    if out_char is None:
      out_char = "\uFFFD"
      g2u_alt_map[g] = out_char
    else:
      u.append(out_char)
  return " ".join(u)

def read_rtc():
  rtc = []
  rtc_g = []
  rtc_t = []
  rtc_1 = []

  c = 0
  c_found = 0
  model_sentences = []
  full_count = 0
  gaps_count = 0
  total_count = 0
  for i in range(len(rtc_src_files)):
    sentences = []
    print("Processing " + rtc_src_files[i])
    with open(rtc_path + rtc_src_files[i] + rtc_ext, "r") as src_f:
      src_lines = src_f.readlines()
    with open(rtc_path + rtc_tgt_files[i] + rtc_ext, "r") as tgt_f:
      tgt_lines = tgt_f.readlines()
    assert len(src_lines) == len(tgt_lines)
    for src_, tgt_ in zip(src_lines, tgt_lines):
      src, src_list = process_src_line(src_)
      tgt, tgt_list = process_tgt_line(tgt_)
      sentence = {
          'text': tgt,
          'translation': '', # No full sentence translations
          'gardiner': src,
          'gardiner_list': src_list,
          'corpus': rtc_src_files[i],
          'tokens': [],
          'token_list': [], # TODO
          'sentence_transliteration': tgt,
          'gaps': False
      }
      num_found = 0
      tokens_found = True
      print_sentence=False
      # Mapping between gardiner and transliteration would be good.
      # For now the tokens are based on transliteration.
      for idx,tr in enumerate(tgt_list):
        if tr == "//":
          continue # Skip spaces
        # Construct tokens similar to ones with AES.
        tr_ = tr.replace("i", "j").replace("I", "J") # Common difference
        if tr == "---" or tr == "": # Missing symbols have different notation.
          tr_ = "LACUNA"
        trl = lemmatize(tr_)
        trid = get_lemma_id(tr)
        if trid is None:
          trid = get_lemma_id(tr_)
        if trid is None:
          trid = get_lemma_id(trl)

        if len(tr) > 0 and trid is None:
            pattern = r'[^0-9\/[]'
            if re.search(pattern, tr):
              pass
            else:
              # All numbers should be handled
              print(f"Number {tr} not handled")
              global count_err
              count_err += 1
              print_sentence = True
              if count_err > 5:
                raise "Too many errors"
        if trid is None:
          rtc_cannot_lemmatize[tr] = trl
          #print('Cannot lemmatize', tr, trl, trid)
          #print('Closest matches', lemma_id_closest(tr))
          c += 1
        else:
          num_found += 1
        token = (
            tr,
            lemma_id_translate(trid),
            trl, # Rough lemmatization.
            "" if trid is None else trid, # Use lemma id if found
            "" # TODO: Find pos.
        )
        if trid is None:
          tokens_found = False
        sentence["tokens"].append(token)
      sentence["token_list"] = list([token[3] for token in sentence["tokens"]])

      # Check if there are gaps in sentence
      # TODO: Consider partially readable symbols etc.
      if 'LACUNA' in src_list or 'LACUNA' in tgt_list:
        sentence['gaps'] = True
        tokens_found = False
        gaps_count += 1
      sentence['tokens_found'] = tokens_found
      full_count += 1 if tokens_found else 0
      total_count += 1
      if sentence['gaps'] == False and tokens_found: # previously: (tokens_found or num_found > 3):
        if c_found < 5:
          model_sentences.append(sentence)
        if len(model_sentences) < 7 and '5' in sentence['sentence_transliteration'] and '1/8' in sentence['sentence_transliteration']:
          model_sentences.append(sentence)
      c_found += 1
      if sentence['gaps']:
        rtc_g.append(sentence)
      elif i == 1: # Validation set
        rtc.append(sentence)
        rtc_t.append(sentence)
      else:
        rtc.append(sentence)
        rtc_1.append(sentence)
      if print_sentence:
        print("Debug sentence:")
        print("  Text:", sentence["sentence_transliteration"])
        print("  Token_list", sentence["token_list"])
        print("  Gardiner:", sentence["gardiner"])
        print("  Unicode:", g2u(sentence["gardiner"]))
        print_sentence = False

  print("Model sentences")
  for idx, sentence in enumerate(model_sentences):
    print(idx+1)
    print("  Text:", sentence["sentence_transliteration"])
    print("  Token_list", sentence["token_list"])
    #print("  Tokens", sentence["tokens"])
    print("  Gardiner:", sentence["gardiner"])
    print("  Unicode:", g2u(sentence["gardiner"]))

  if g2u_alt_entries < len(g2u_alt_map):
    print(f"Unmapped Gardiner symbols: {len(g2u_alt_map) - g2u_alt_entries}")


  print()

  print(f"All Sentences: {total_count}; fully lemmatized and no gaps: {full_count}")
  #print(f"gapped: {gaps_count}") (already on the next line)
  return (rtc, rtc_1, rtc_t, rtc_g)

print("Reading and processing RTC.")
rtc = read_rtc()
aecorpus.rtc = rtc[0]
aecorpus.rtc_1 = rtc[1]
aecorpus.rtc_t = rtc[2]
aecorpus.rtc_g = rtc[3]
print(f"RTC partitioned: usable sentences={len(aecorpus.rtc)} incl: training={len(aecorpus.rtc_1)} test={len(aecorpus.rtc_t)} gapped: {len(aecorpus.rtc_g)}")

Reading and processing RTC.
Processing src-train
Processing src-val
Model sentences
1
  Text: iw iw =i r swr m =f
  Token_list ['21881', '21881', '10030', '91900', '130360', '64360', '10050']
  Gardiner: M17 Z7 M17 Z7 A1 D21 S29 G36 D21 N35A A2 M17 G17 I9 
  Unicode: 𓇋 𓏲 𓇋 𓏲 𓀀 𓂋 𓋴 𓅨 𓂋 𓈗 𓀁 𓇋 𓅓 𓆑 □
2
  Text: pA hrw n ms pA inw
  Token_list ['851446', '99060', '78870', '74700', '851446', '850830']
  Gardiner: G41 G1 O4 D21 Z7 N5 Z1 N35 MISSING G41 G1 W25 N35 W24 Z7 Y1 Z2 
  Unicode: 𓅯 𓄿 𓉔 𓂋 𓏲 𓇳 𓏤 𓈖 � 𓅯 𓄿 𓏎 𓈖 𓏌 𓏲 𓏛 𓏥 □
3
  Text: pA sS XAr 1/8 1/32 1/40 r 5 1/3
  Token_list ['851446', '144360', '122580', '850814', '850814', '850814', '91900', '850814', '850814']
  Gardiner: G41 G1 Y3 A1 V19 D11 F16 D21 V20 V20 V20 V20 D21 Z2 2 D21 Z2 
  Unicode: 𓅯 𓄿 𓏞 𓀀 𓎅 𓂁 𓄏 𓂋 𓎆 𓎆 𓎆 𓎆 𓂋 𓏥 𓂋 𓏥 □
4
  Text: s 50 6 wa nb XAr ir.w n XAr 10 2 1/8
  Token_list ['147350', '850814', '850814', '400101', '81650', '122580', '851809', '78870', '122580', '850814', '850814', '850814']
  Gardiner: O34 A1 V20 V20 V20 V20 V

# [Word2Vec](https://radimrehurek.com/gensim/models/word2vec.html)

Generate Word2Vec and FastText models, test them and allow user to view some results.

The models are implemented using gensim libraries.


In [11]:
# @title Details: Helper functions. Click `Show code` to view details. { display-mode: "form" }
# @markdown Helper functions are useful to make actual model generation functions easy to read.
# @markdown `print_sentence`, `token_to_text`, `start_pad`, `end_pad`, `is_pad`, `has_pad`, `token_seq`, `token_seq_tr`, `token_seq_rtc`
def print_sentence(sentence):
  print(f'{sentence["text"]}')
  print(f'. tokens: {sentence["token_list"]}')
  print(f'. transliteration: {sentence["sentence_transliteration"]}')
  print(f'. translation: {sentence["translation"]}')
  # Optional English translation (just one sentence here)
  # The sentence dictionary for German to English translations can be used for more.
  if sentence["sentence_transliteration"] == "bn ftt =tw =f":
    print('. It (the inscription on the stele) should not be erased.') # Google Translate (from de)
  elif 'en' in sentence: # Does dictionary lookup translation exist?
    print(f'. en: {sentence["en"]}')

def token_to_text(token):
  s = token
  if token in aecorpus.aed:
    t = aecorpus.aed[token]
    if 'form' in t:
      s += "/" + t['form']
    if 'translations' in t:
      tr = t['translations']
      if len(tr) > 0 and len(tr[0]) > 0:
        tr1 = tr[:1][:1]
        tr0 = list(filter(lambda translation: translation[1] == 'en', tr))
        if len(tr0) > 0:
          tr1 = tr0
        s += "/" + str(tr1[0][0])
  return s

def start_pad(padlen=2):
  if padlen == 2:
    return ['<s0>', '<s1>']
  return [f'<s{i}>' for i in range(padlen)]

def end_pad(padlen=2):
  if padlen == 2:
    return ['</s1>', '</s0>']
  return [f'</s{padlen-i-1}>' for i in range(padlen)]

def is_pad(x):
  return x in start_pad(9) + end_pad(9)

def has_pad(a):
  for x in a:
    if is_pad(x):
      return True
  return False

def token_seq(sentence, replace_numbers=False, padlen=2, nopad=False):
    # Note: replace numbers is no-op here. replacement has already been performed
    token_seq = [token[3] for token in sentence['tokens']] # With lemma id
    if nopad:
      return token_seq
    return start_pad(padlen) + token_seq + end_pad(padlen)

def token_seq_tr(sentence, replace_numbers=False, padlen=2, nopad=False):
    # Note: replace numbers is no-op here. replacement has already been performed
    token_seq = sentence["sentence_transliteration"].split(" ") # Before lemmatization
    if nopad:
      return token_seq
    return start_pad(padlen) + token_seq + end_pad(padlen)

re_fraction=re.compile('^[0-9]+\/[0-9]+$')
def token_seq_rtc(sentence, replace_numbers=False, padlen=2, nopad=False):
    try:
      token_seq = [token[0] for token in sentence['tokens']] # With text - for RTC currently best pick
    except:
      print("Error with: ", sentence)
      raise "Errorred"
    if replace_numbers:
      # Handle numbers
      global re_fraction
      for idx in range(len(token_seq)):
        if token_seq[idx].isnumeric():
          token_seq[idx] = "1...n"
        if re_fraction.match(token_seq[idx]):
          token_seq[idx] = "1...n"
    #token_seq = [token[3] for token in sentence['tokens']] # With lemma id
    if nopad:
      return token_seq
    return start_pad(padlen) + token_seq + end_pad(padlen)


To work in beginning or ending of sentences padding may be useful.
Two pads seemed to work better than 1.

In [12]:
# @title Prediction statistics -- `PredictionStats`. Click `Show code` in the code cell. { display-mode: "form" }
# @markdown
class PredictionStats:
    """Collect statistics regarding correctness of tests"""

    def __init__(self, name):
      self.name = name
      self.c = Counter()
      self.fails = 0
      self.start = time.time()
      self.end = float("nan")

    def start_testing(self):
      self.start = time.time()
      return self
    def end_testing(self):
      self.end = time.time()
      return self
    def miss(self):
      self.c[-1] += 1
    def fail(self):
      self.fails += 1   # Count failures outside N
    def predicted(self, item, predictions):
      if item in predictions:
        idx = predictions.index(item) + 1
        self.c[idx] += 1
      else:
        self.miss()
        idx = -1
      return idx
    def get_name(self):
      return self.name
    def get_n(self):
      # Failures are counted in n.
      return self.c.total() + self.fails
    def get_heading(self, include_mrr5=False):
      if include_mrr5:
        return ['hit', 'hit@5', 'hit@10', 'missed', 'MRR@5', 'MRR@10', 'N', 'untestable', 'ms/test']
      return ['hit', 'hit@5', 'hit@10', 'missed', 'MRR', 'N', 'untestable', 'ms/test']
    def get_stats(self, include_mrr5=False):
      accu = 0
      accu_ = 0
      accu5 = 0
      accu10 = 0
      for r in range(1, 6):
        accu_ += self.c[r]/r
      for r in range(1, 11):
        accu += self.c[r]/r
        accu5 += self.c[r] if r <= 5 else 0
        accu10 += self.c[r]
      n = self.get_n()
      if n < 1:
        n = 1
      if include_mrr5:
        return [
          self.c[1] / n,
          accu5 / n,
          accu10 / n,
          self.c[-1] / n,
          accu_ / n,
          accu / n,
          n,
          self.fails,
          ((self.end - self.start) * 1000) / n
        ]
      return [
          self.c[1] / n,
          accu5 / n,
          accu10 / n,
          self.c[-1] / n,
          accu / n,
          n,
          self.fails,
          ((self.end - self.start) * 1000) / n
      ]

# These variables collect statistics
prs = [None] * 2
stats = dict()

In [13]:
# @title Prediction Function `predict_words`. Use model(s) to predict word. Click `Show code` in the code cell. { display-mode: "form" }
# @markdown Prediction function has restrictions as follows:
# @markdown
# @markdown - It can only predict one part of text.
# @markdown - It can only work on single word missing parts.

def predict_words(tokens,model,model_score=None,lacuna="LACUNA",verbose=False,omitted=None,vocab=None,algo=0,topn=10,padlen=2,stat_append=None,stat_id=None,nopad=False):
  """Predict missing word(s) from sentence.

    Parameters
    ------------
        tokens: list of str
            Sentence (token sequence) to predict (including start_pad() and end_pad())
        model: Word2Vec model
            Model used for predicting with predict_output_word
        model_score: Word2Vec model
            Model used for scoring with score
        lacuna: str
            Placeholder for missing word
        verbose: bool
            Perform debug prints (default=False)
        omitted: str
            Provide omitted token (only needed for verbose mode debug prints)
        vocab: list of str or None
            Possible words - if none, all words are possible. (only for algo=2)
        algo: int [-1, 2]
            Select algorithm to perform prediction
        topn: int
            Select how many result to return (default=10)

    Return
    -----------
        predictions: list of tuples (str, float)
            List of predictions in the best first order with weights.

        May raise exceptions on exceptional situations.

    Bugs
    ------------
        current implementation is able to predict only one missing word.
  """
  if vocab is None:
    if hasattr(model, 'wv'):
      vocab = model.wv.index_to_key

  try:
    idx = tokens.index(lacuna)
  except ValueError:
    return [(1.0), " ".join(tokens)]*topn

  predict = predict_words_internal(idx, tokens,model,model_score,vocab,algo,topn,padlen,nopad)
  if verbose:
    print(f"OMITTED = {omitted}")
    for i, p in enumerate(predict):
      if p[0] == omitted:
        print(f"PREDICTED {i} {token_to_text(p[0])} (prob: {p[1]}) match [rank={i+1}]")
      else:
        if hasattr(model, 'wv'):
          similarity = model.wv.n_similarity(omitted, p[0])
        else:
          similarity = 1.0 - i / topn # No similarity function
        print(f"PREDICTED {i} {token_to_text(p[0])} (prob: {p[1]}) similarity with orig: {similarity}")
  if stat_append is not None:
    tokens_ = filter(lambda t: not has_pad(t), tokens)
    simi=[]
    hit_idx = None
    for i, p in enumerate(predict):
      if hasattr(model, 'wv'):
        similarity = model.wv.n_similarity(omitted, p[0])
      else:
        similarity = 1.0 - i / topn # No similarity function
      simi.append([p[0],p[1],similarity])
      if similarity == 1:
        hit_idx = i + 1
    stat_append.append((stat_id, tokens_, omitted, hit_idx, tuple([p[0] for p in predict]), simi))
  return predict

# Underlying worker function
def predict_words_internal(idx, tokens,model,model_score=None,vocab=None,algo=0,topn=10,padlen=2,nopad=False):
  token_seq_before = tokens[:idx]
  token_seq_after = tokens[idx + 1:]

  if algo == -1:
    # Simply predict top vocabulary words.
    i = 0
    while is_pad(vocab[i]):
      i = i + 1
    return [(vocab[i+l], 1/(l+1)) for l in range(0, 10)]
  if algo == 0:
    if len(token_seq_before) > 0 and len(token_seq_after) > 0:
      if len(token_seq_before) > len(token_seq_after):
        token_seq_before = token_seq_before[-len(token_seq_after):]
      elif len(token_seq_before) < len(token_seq_after):
        token_seq_after = token_seq_after[:len(token_seq_before)]
      maxpad=4
      predict = model.predict_output_word(token_seq_before + token_seq_after,topn=topn+maxpad)
      # Filter predictions, remove padding symbols
      predict_out = []
      for x in predict:
        if not is_pad(x[0]) and x[0] is not None:
          predict_out.append(x)
        if len(predict_out) == topn:
          break
      return predict_out
    else:
      return ValueError("Input not supported: no before or after tokens")
  if algo == 1:
    if len(token_seq_before) > 0 and len(token_seq_after) > 0:
      if len(token_seq_before) > len(token_seq_after):
        token_seq_before = token_seq_before[-len(token_seq_after):]
      elif len(token_seq_before) < len(token_seq_after):
        token_seq_after = token_seq_after[:len(token_seq_before)]
      predict = model.predict_output_word(token_seq_before + token_seq_after,topn=topn*10)
      sentences = []
      for pr in predict:
        tokens = token_seq_before + [pr[0]] + token_seq_after
        if nopad:
          tokens = tokens[len(start_pad(padlen)):len(tokens) - len(end_pad(padlen))]
        sentences.append(" ".join(tokens))
      scores = model_score.score(sentences)
      adjusted_predictions=[(predict[i][0], scores[i]) for i in range(len(scores))]
      adjusted_predictions.sort(key = lambda x: -x[1])
      predict = adjusted_predictions[:topn]
      return predict
    else:
      return ValueError("Input not supported: no before or after tokens")
  if algo == 2:
    sentences = []
    for token in vocab:
      tokens = token_seq_before + [token] + token_seq_after
      if nopad:
        tokens = tokens[len(start_pad(padlen)):len(tokens) - len(end_pad(padlen))]
      sentences.append(" ".join(tokens))
    scores = model_score.score(sentences)
    adjusted_predictions=[(vocab[i], scores[i]) for i in range(len(scores))]
    # TODO: Quicker sorting here
    adjusted_predictions.sort(key = lambda x: -x[1])
    predict = adjusted_predictions[:topn]
    return predict
  raise ValueError("algo is not supported")



In [14]:
# @title Define Model Generation Progress Class. Click `Show code` in the code cell. { display-mode: "form" }
# @markdown

class TqdmModelProgress(CallbackAny2Vec):
     '''Callback to log information about training'''

     def __init__(self, epochs):
         self.epoch = 0
         self.epochs = epochs
         self.tqdm = tqdm_notebook(desc="Starting", total=self.epochs,
                                   unit="epoch", leave=False)

     def on_epoch_begin(self, model):
         if self.epoch == 0:
             self.tqdm.desc = "Training"

     def on_epoch_end(self, model):
         self.epoch += 1
         if self.epoch == self.epochs:
             self.tqdm.description = "Finished"
         loss = model.get_latest_training_loss()
         if loss is not None:
             self.tqdm.set_postfix({'loss=': loss})
         self.tqdm.update(1)

     def close(self):
         self.tqdm.close()

In [15]:
# @title Define Model Testing Functions
# @markdown `test_w2v` for testing AES with lemma id. Use `seq` parameter to override `token_seq`to test AES/TR or RTC.
# @markdown Alternatively you may use convenience wrappers `test_w2v_tr` and `test_w2v_rtc`.
def test_w2v(name, w2v, sentences, silent=False, padlen=2, algo=0, seq=token_seq, omitted_indexes=None):
  pr = PredictionStats(name)
  spadlen = padlen
  fpadlen = padlen * 2
  random.seed(42)
  if silent == False:
    print("Processing test sentences with single hidden word")
  c=0
  idx=0
  for sentence in sentences:
    tokens = seq(sentence,padlen=padlen)
    if len(tokens) == fpadlen:
      pr.fail() # Failed processing
      continue
    if omitted_indexes is None:
      token_omitted = random.randrange(len(tokens) - fpadlen) + spadlen
    else:
      token_omitted = omitted_indexes[idx] + spadlen
      idx = idx + 1
    token_orig = tokens[token_omitted]
    tokens[token_omitted] = "LACUNA"
    verbose = False
    if c < 5 and silent == False:
      verbose = True
    if c == 5 and silent == False:
        print("Processing remaining sentences quietly")
    c += 1

    try:
      predict = predict_words(tokens, w2v, None, verbose=verbose, omitted=token_orig, algo=algo, padlen=padlen)
      if len(predict) == 0:
        raise Exception("No predictions")
      try:
        pr.predicted(token_orig, [x[0] for x in predict])
      except:
        pr.fail() # Failed processing predictions
    except:
      pr.fail() # Failed processing predicting

  return pr.end_testing()

def test_w2v_tr(name, w2v, sentences, silent=False, padlen=2, algo=0):
  return test_w2v(name, w2v, sentences, silent, padlen, algo, seq=token_seq_tr)

def test_w2v_rtc(name, w2v, w2v_hs=None, algo=0, sentences=aecorpus.rtc_t, padlen=2, silent=False, nopad=False):
  return test_w2v(name, w2v, sentences, silent, padlen, algo, seq=token_seq_rtc)



In [16]:
# @title Model Training function `get_gensim_gen`

# @markdown The function automates storage and retrieval of models.
# @markdown
# @markdown Update model id when inputs or model settings are updated.
#
gensim_model_prefix="2024-03-13a" # @param {type:"string"}
gensim_model_store = True # @param {type:"boolean"}
all_models_pregenerated = True # @param {type:"boolean"}

def get_gensim_gen(corpus, vector_size=200, epochs=100, padlen=2,
                   silent=False, token_seq_func = token_seq, nobar=False,
                   model = gensim.models.Word2Vec, id=None,
                   noload=False, nosave=False, test=None, testlimit=0.1,
                   prev_model = None,
                   **kwargs):
  global gensim_model_id
  global gensim_model_store
  if id is None:
    id = model.__class__.__name__
  filepath = f"models/{gensim_model_prefix}-{id}.model"
  # If all_models_pregenerated is set we expect to
  # find all models encountered in processing and
  # raise error if model is not found.
  if all_models_pregenerated == True:
    if not os.path.exists(filepath):
      raise(Exception(f"Model path {filepath} does not exist"))
  if gensim_model_store and not noload and os.path.exists(filepath):
    model_out = model.load(filepath)
    if test is not None:
      if test(model_out) < testlimit:
        raise Exception("Loaded Model failed to validate")
    return model_out

  c, cn, cn2 = (0, 0, 0)
  sentences_list = []
  token_kwargs = {key:kwargs[key] for key in ['nopad', 'replace_numbers'] if key in kwargs}
  for sentence in corpus:
    tokens = token_seq_func(sentence, padlen=padlen, **token_kwargs)
    if silent == False:
      if c < 5:
        print(tokens)
        c += 1
      elif cn < 2 and '1...n' in tokens:
        print(tokens)
        cn += 1
      elif cn2 < 2 and '123' in tokens:
        print(tokens)
        cn2 += 1
    sentences_list.append(tokens)
  in_kwargs = dict(kwargs)
  if 'window' not in in_kwargs:
    in_kwargs['window']=5
  if 'min_count' not in in_kwargs:
    in_kwargs['min_count']=1
  if 'replace_numbers' in in_kwargs:
    del in_kwargs['replace_numbers']
  if 'nopad' in in_kwargs:
    del in_kwargs['nopad']
  callbacks = []
  if not nobar:
    callbacks = [TqdmModelProgress(epochs)]

  if prev_model is None:
    model_out = model(sentences=sentences_list, vector_size=vector_size,
                      workers=10, epochs=epochs, callbacks=callbacks,
                      **in_kwargs)
  else:
    #model_out = prev_model.train(sentences=sentences_list, vector_size=vector_size,
    #                  workers=10, epochs=epochs-prev_model.epochs, callbacks=callbacks,
    #                  **in_kwargs)
    # Some arguments only apply in creation of model.
    if 'vector_size' in in_kwargs:
      del in_kwargs['vector_size']
    model_out = prev_model.train(corpus_iterable=sentences_list,
                      epochs=epochs, callbacks=callbacks,
                      **in_kwargs)

  if test is not None:
    if test(model_out) < testlimit:
      raise Exception("Trained model failed to validate")
    if gensim_model_store and not nosave:
      model_out.save(filepath)
  callbacks[0].close()
  return model_out

def get_gensim_ft_rtc(corpus, **kwargs):
  return get_gensim_gen(corpus, model=gensim.models.FastText,
                        token_seq_func = token_seq_rtc, **kwargs)

def get_gensim_ft_aes(corpus, **kwargs):
  return get_gensim_gen(corpus, model=gensim.models.FastText,
                        token_seq_func = token_seq_tr, **kwargs)




In [17]:
# @title Load or train models and perform testing. { display-mode: "form" }
# @markdown The trained Word2Vec models are stored on Google drive.
# @markdown Optional trained model is FastText (FT) model. It cannot be
# @markdown stored, but are always retrained. All models (loaded and trained)
# @markdown are tested and test results are stored for later processing or display.
# @markdown
# @markdown When all models are retrained expect execution of the cell to take
# @markdown around 20 minutes. If only FastText models are retrained, expect
# @markdown execution to succeed in 6-7 minutes.
# @markdown If no models are retrained this stage runs quickly.

train_ft=False # @param {type:"boolean"}
train_unoptimized_variants=False  # @param {type:"boolean"}

def store_stats(pr):
  stats[pr.get_name()] = pr
  values = pr.get_stats()
  return values[4]
for x in tqdm_notebook(range(7), desc="Model", leave=False):
  if x == 0:
    opts_known_good_150 = {'ns_exponent': 0.21821168418246245, 'negative': 21, 'sg': 1,
                   'vector_size': 227, 'window': 2, 'min_count': 4,
                   'shrink_windows': False, 'compute_loss': True, 'epochs': 150}
    w2v = get_gensim_gen(aecorpus.aes_1, token_seq_func = token_seq,
                         silent=True, id="W2V-AES-alt",
                         test=lambda model: store_stats(test_w2v("W2V AES", model, aecorpus.aes_t, silent=True)),
                         **opts_known_good_150) # Takes around 2 minutes
  if x == 1 and train_unoptimized_variants:
    opts_nontested_p0 = {'ns_exponent': 0.21821168418246245, 'negative': 21, 'sg': 1,
                         'vector_size': 227, 'window': 2, 'min_count': 1,
                         'shrink_windows': False, 'compute_loss': True, 'epochs': 15}
    w2vp0 = get_gensim_gen(aecorpus.aes_1, token_seq_func = token_seq,
                           silent=True, padlen=0, nopad=True, id="W2V-AES-p0v1.2",
                           test=lambda model: store_stats(test_w2v("W2V-p0 AES", model, aecorpus.aes_t, padlen=0, silent=True)),
                           **opts_nontested_p0) # Takes around 2 minutes
  if x == 2 and train_unoptimized_variants:
    opts_good = {'ns_exponent': 0.21821168418246245, 'negative': 21, 'sg': 1,
                   'vector_size': 227, 'window': 2, 'min_count': 4,
                   'shrink_windows': False, 'compute_loss': True, 'epochs': 15}

    w2vp1 = get_gensim_gen(aecorpus.aes_1, token_seq_func = token_seq,
                           silent=True, padlen=1, id="W2V-AES-p1",
                           test=lambda model: store_stats(test_w2v("W2V-p1 AES", model, aecorpus.aes_t, padlen=1, silent=True)),
                           **opts_good) # Takes around 2 minutes
  if x == 3 and train_unoptimized_variants:
    opts_good_p3 = {'ns_exponent': 0.30145585, 'negative': 26, 'sg': 1, 'vector_size': 172, 'window': 2, 'shrink_windows': False, 'compute_loss': True, 'epochs': 15}
    w2vp3 = get_gensim_gen(aecorpus.aes_1, token_seq_func = token_seq,
                           silent=True, padlen=3, id="W2V-AES-p3",
                           test=lambda model: store_stats(test_w2v("W2V-p3 AES", model, aecorpus.aes_t, padlen=3, silent=True)),
                           **opts_good_p3) # Takes around 2 minutes
  if x == 4:
    optimized_w2v_opts = {'ns_exponent': 0.05, 'negative': 21, 'sg': 0, 'vector_size': 200,
                      'window': 2, 'sample': 0.01, 'shrink_windows': True,
                      'compute_loss': True, 'epochs': 150, 'compute_loss': True}
    w2v_rtc = get_gensim_gen(aecorpus.rtc_1, token_seq_func = token_seq_rtc, silent=True, id="W2V RTC-optimized",
                             test=lambda model: store_stats(test_w2v_rtc("W2V RTC", model, None, 0, aecorpus.rtc_t, silent=True)),
                             **optimized_w2v_opts) # Takes around 20 minutes
  if x == 5 and train_ft:
    opts_good_ft = {'ns_exponent': 0.21821168418246245, 'negative': 21,
                    'sg': 1, 'vector_size': 227, 'window': 2, 'min_count': 4,
                    'shrink_windows': False, 'silent': True, 'epochs': 15}
    ft_rtc = get_gensim_ft_rtc(aecorpus.rtc_1, id="ft_rtc", nosave=True, noload=True,
                               test=lambda model: store_stats(test_w2v_rtc("FT RTC", model, None, 0, aecorpus.rtc_t, silent=True)),
                               **opts_good_ft) # Takes around 1,5 minutes
    #todo: try more variants like
    #ft_rtc_nopad = get_gensim_ft_rtc(aecorpus.rtc_1, id="ft_rtc", nopad=True, **opts_good_ft) # Takes around 1,5 minutes
  if x == 6:
    opts_aes_tr_150 = {'ns_exponent': 0.21821168418246245, 'negative': 21, 'sg': 1,
                       'vector_size': 227, 'window': 2, 'min_count': 4,
                       'shrink_windows': False, 'compute_loss': True, 'epochs': 150}
    w2v_tr = get_gensim_gen(aecorpus.aes_1, token_seq_func = token_seq_tr,
                         silent=True, id="W2V-AES-TR-alt",
                         test=lambda model: store_stats(test_w2v_tr("W2V AES TR", model, aecorpus.aes_t, silent=True)),
                         **opts_aes_tr_150) # Takes around 20 minutes

print("Trained Models are now available: " + (", ".join(stats.keys())))


Model:   0%|          | 0/7 [00:00<?, ?it/s]

Trained Models are now available: W2V AES, W2V RTC, W2V AES TR


In [18]:
#@title Train optional additional models

#@markdown The optional models test performance of different parameter combinations.
#@markdown
#@markdown The optional models are not part of the release, so they will be trained if additional model training is requested.
additional_experimental_models = False # @param {type:"boolean"}

out = widgets.Output(layout={'border': '1px solid black'})
display(out)

def stats_show(stats):
  row_names = stats.keys()
  ps = [pr.get_stats(include_mrr5=True) for pr in stats.values()]
  column_names = [ '&', 'hit@1', '&', 'hit@5', '&', 'hit@10', '&', 'MRR@10', '&', 'Processed sentences', '\\\\' ]
  dfdata = np.row_stack([('&', f'{x[0]:.6f}', '&', f'{x[1]:.6f}', '&', f'{x[2]:.6f}', '&', f'{x[5]:.6f}', '&', f'{x[6]}', '\\\\') for x in ps])
  pd.set_option('display.max_rows', 1000)
  pd.set_option('display.min_rows', 1000)
  text_stats_df = pd.DataFrame(dfdata, columns=column_names, index=row_names)
  text_stats_df.index = row_names
  out.clear_output(wait=True)
  with out:
    print(text_stats_df)

stats_new = dict()
def store_stats_print(pr):
  stats[pr.get_name()] = pr
  stats_new[pr.get_name()] = pr
  values = pr.get_stats()
  stats_show(stats_new)
  return values[4]

# Optimized parameters from two round optimization process with smallers sentence sets.
optimized_w2v_opts = {'ns_exponent': 0.05, 'negative': 21, 'sg': 0, 'vector_size': 200,
                      'window': 2, 'sample': 0.01, 'shrink_windows': True,
                      'compute_loss': True, 'epochs': 150, 'compute_loss': True}

optimized_w2v_sg_opts = {'ns_exponent': 0.05, 'negative': 21, 'sg': 1, 'vector_size': 200,
                         'window': 2, 'sample': 0.01, 'shrink_windows': True,
                         'compute_loss': True, 'epochs': 150, 'compute_loss': True}

if additional_experimental_models:
  # Compare against basic models and previous experimental models
  for x in ["W2V AES TR", "W2V AES TR-alt",
            "W2V AES TR-optimized",
            "W2V AES TR-optimized-sg",
            "W2V AES", "W2V AES-alt", "W2V AES-optimized", "W2V AES-optimized-sg",
            "W2V RTC", "W2V RTC-alt", "W2V RTC-optimized", "W2V RTC-optimized-sg"]:
    if x in stats:
      store_stats_print(stats[x])
  for x in tqdm_notebook(range(7, 8), desc="Model", leave=False):
    if x == 2:
      opts_known_good_150 = {'ns_exponent': 0.21821168418246245, 'negative': 21, 'sg': 1,
                   'vector_size': 227, 'window': 2, 'min_count': 4,
                   'shrink_windows': False, 'compute_loss': True, 'epochs': 150}
      w2v_alt = get_gensim_gen(aecorpus.aes_1, token_seq_func = token_seq,
                           silent=True, id="W2V-AES-alt",
                           test=lambda model: store_stats_print(test_w2v("W2V AES-alt", model, aecorpus.aes_t, silent=True)),
                           **opts_known_good_150) # Takes around 20 minutes
    if x == 6:
      w2v_alt2 = get_gensim_gen(aecorpus.aes_1, token_seq_func = token_seq,
                           silent=True, id="W2V-AES-optimized",
                           test=lambda model: store_stats_print(test_w2v("W2V AES-optimized", model, aecorpus.aes_t, silent=True)),
                           **optimized_w2v_opts) # Takes around 20 minutes
    if x == 7:
      w2v_alt3 = get_gensim_gen(aecorpus.aes_1, token_seq_func = token_seq,
                           silent=True, id="W2V-AES-optimized-sg",
                           test=lambda model: store_stats_print(test_w2v("W2V AES-optimized-sg", model, aecorpus.aes_t, silent=True)),
                           **optimized_w2v_sg_opts) # Takes around 20 minutes
    if x == 1:
      opts_good_rtc = {
        'ns_exponent': 0.17955186844534993, 'negative': 22, 'sg': 1,
        'vector_size': 259, 'window': 2, 'min_count': 2,
        'epochs': 150}
      w2v_rtc_alt = get_gensim_gen(aecorpus.rtc_1, token_seq_func = token_seq_rtc, silent=True, id="W2V-RTC-alt",
                                   test=lambda model: store_stats_print(test_w2v_rtc("W2V RTC-alt", model, None, 0, aecorpus.rtc_t, silent=True)),
                                   **opts_good_rtc) # Takes around 20 minutes
    if x == 4:
      w2v_rtc_alt2 = get_gensim_gen(aecorpus.rtc_1, token_seq_func = token_seq_rtc, silent=True, id="W2V-RTC-optimized",
                                   test=lambda model: store_stats_print(test_w2v_rtc("W2V RTC-optimized", model, None, 0, aecorpus.rtc_t, silent=True)),
                                   **optimized_w2v_opts) # Takes around 20 minutes
    if x == 5:
      w2v_rtc_alt3 = get_gensim_gen(aecorpus.rtc_1, token_seq_func = token_seq_rtc, silent=True, id="W2V-RTC-optimized-sg",
                                   test=lambda model: store_stats_print(test_w2v_rtc("W2V RTC-optimized-sg", model, None, 0, aecorpus.rtc_t, silent=True)),
                                   **optimized_w2v_sg_opts) # Takes around 20 minutes
    if x == 0:
      opts_aes_tr_150 = {'ns_exponent': 0.21821168418246245, 'negative': 21, 'sg': 1,
                   'vector_size': 227, 'window': 2, 'min_count': 4,
                   'shrink_windows': False, 'compute_loss': True, 'epochs': 150}
      w2v_tr_alt = get_gensim_gen(aecorpus.aes_1, token_seq_func = token_seq_tr,
                         silent=True, id="W2V-AES-TR-alt",
                           test=lambda model: store_stats_print(test_w2v_tr("W2V AES TR-alt", model, aecorpus.aes_t, silent=True)),
                           **opts_aes_tr_150) # Takes around 20 minutes
    if x == 3:
      w2v_tr_alt3 = get_gensim_gen(aecorpus.aes_1, token_seq_func = token_seq_tr,
                           silent=True, id="W2V-AES-TR-optimized",
                           test=lambda model: store_stats_print(test_w2v_tr("W2V AES TR-optimized", model, aecorpus.aes_t, silent=True)),
                           **optimized_w2v_sg_opts) # Takes around 20 minutes



Output(layout=Layout(border='1px solid black'))




# Use MaReTe transliterations

There are combined transliterations, using both AES and Ramses sentences.
These use slightly different transliteration than AES MdC so use different models.

In [19]:
#@title MaReTe transliterations Word2Vec model

# The cell needs to be cleaned up, but works.
# Remove exact same sentences from testing data than
# are present in training data. Default is false as
# some overlap is to be expected.
filter_duplicates=False # @ param {type:"boolean"}
w2v_tr_combined_variants=1 # @ param{type:"int"}

def sentence_transform_in(sentence):
  return {"sentence_transliteration": sentence}

def token_seq_out(sentence, replace_numbers=False, padlen=2, nopad=False):
    # Note: replace numbers has not been implemented
    token_seq = sentence["sentence_transliteration"].split(" ")
    if nopad:
      return token_seq
    return start_pad(padlen) + token_seq + end_pad(padlen)

# Read test materials
with open("combined_dev.txt", "r") as train_file:
  marete_train_file_lines = train_file.readlines()
  marete_train_file_lines = [x.replace("\n", "") for x in marete_train_file_lines]

with open("combined_test.txt", "r") as test_file:
  marete_test_file_lines = test_file.readlines()
  marete_test_file_lines = [x.replace("\n", "") for x in marete_test_file_lines]

marete_train_sentences_in = [sentence_transform_in(sentence) for sentence in marete_train_file_lines]
if filter_duplicates:
  marete_test_sentences_in = [sentence_transform_in(sentence) for sentence in marete_test_file_lines
                                if sentence not in marete_train_file_lines]
else:
  marete_test_sentences_in = [sentence_transform_in(sentence) for sentence in marete_test_file_lines]

for x in marete_train_file_lines:
  if '\n' in x:
    print("Found linefeed in marete_train_file_lines")
  if '\xA0' in x:
    print("Found nonbreaking space in marete_train_file_lines")
for x in marete_test_file_lines:
  if '\n' in x:
    print("Found linefeed in marete_test_file_lines")
  if '\xA0' in x:
    print("Found nonbreaking space in marete_test_file_lines")

filtered_tests = len(marete_test_file_lines) - len(marete_test_sentences_in)
print(f"Using {len(marete_train_sentences_in)} training sentences and {len(marete_test_sentences_in)} test sentences. {filtered_tests} filtered out.")

marete_train_sentences_in_q=marete_train_sentences_in[::10]
marete_test_sentences_in_q=marete_test_sentences_in[::10]

def stats_show(stats):
  row_names = stats.keys()
  ps = [pr.get_stats(include_mrr5=True) for pr in stats.values()]
  column_names = [ '&', 'hit@1', '&', 'hit@5', '&', 'hit@10', '&', 'MRR@10', '&', 'Processed sentences', '\\\\' ]
  dfdata = np.row_stack([('&', f'{x[0]:.6f}', '&', f'{x[1]:.6f}', '&', f'{x[2]:.6f}', '&', f'{x[5]:.6f}', '&', f'{x[6]}', '\\\\') for x in ps])
  pd.set_option('display.max_rows', 1000)
  pd.set_option('display.min_rows', 1000)
  text_stats_df = pd.DataFrame(dfdata, columns=column_names, index=row_names)
  text_stats_df.index = row_names
  print(text_stats_df)

w2v_tr_combined_array = dict()
w2v_tr_combined = None
w2v_tr_combined_score = -1
w2v_tr_combined_stats = dict()

w2v_tr_combined_data = []
w2v_tr_combined_opts = None
w2v_tr_opts = dict()
w2v_tr_id = dict()

# A table of results used to form set of models to try, currently 2 entries.
# Item #1 is optimized using optimization process outlayed in the doc.
resprev = """
1      0.050000        21   0     150          200      2          False  0.010000 4  1.0
2          0.1         21   0     150          200      5           True  0.01  4     0.5
""".split()
resprev = [
    (float(resprev[i + 10]),
     [float(resprev[i + 1]), int(resprev[i + 2]),
      int(resprev[i + 3]), int(resprev[i + 4]), int(resprev[i + 5]),
      int(resprev[i + 6]), bool(resprev[i + 7]), float(resprev[i + 8]), int(resprev[i + 9])],
     int(resprev[i])) for i in range(0,len(resprev),11)]
resprev.sort(key=lambda element: (-element[0], element[2], element[1]))
for test in resprev:
  ns_exponent, negative, sg, epochs, vector_size, window, shrink_window, sample, min_count = test[1]
  t=(ns_exponent, negative, sg, epochs, vector_size, window, shrink_window, sample, min_count)
  if t in w2v_tr_opts:
    continue
  opts = {'ns_exponent': ns_exponent, 'negative': negative, 'sg': sg,
          'vector_size': vector_size, 'window': window,
          'sample': sample,
          'shrink_windows': (shrink_window == 1), 'compute_loss': True, 'epochs': epochs}
  w2v_tr_opts[t] = opts
  w2v_tr_id[t] = f"W2V TR nse={ns_exponent}-neg={negative}-{'sg' if sg else 'cbow'}-epochs={epochs}-vs={vector_size}-w={window}-{'shrink' if shrink_window == 1 else 'static'}-sample={sample}-min={min_count}"
  if len(w2v_tr_opts) >= w2v_tr_combined_variants:
    break

res = None
def get_stats(pr):
  global res
  values = pr.get_stats()
  res = pr
  return values[4]

for t, opts in tqdm_notebook(w2v_tr_opts.items(), desc="Variants"):
  id = w2v_tr_id[t]
  short_id = "W2V-TR-combined-2.01"
  w2v_tr_combined_array[t] = get_gensim_gen(marete_train_sentences_in, token_seq_func = token_seq_out,
                                            silent=True, id=short_id, **opts, # Takes around 2 minutes (testing) + 20 minutes training on the first run
                                            test=lambda model: get_stats(test_w2v(short_id, model, marete_test_sentences_in, seq=token_seq_out, silent=True)))
  #w2v_tr_combined_array[t] = get_gensim_gen(aecorpus.aes_1, token_seq_func = token_seq_tr,
  #                                          silent=True, id="W2V-AES-TR-combined-1.5", noload=True,
  #                                          nosave = True, **opts) # Takes around 2 minutes
  #res = test_w2v(id, w2v_tr_combined_array[t], aecorpus.aes_t, seq=token_seq_tr, silent=True)
  mrr = res.get_stats()[4]
  w2v_tr_combined_stats[id] = res
  # Keep track of best (incase resprev has multiple variants)
  if mrr > w2v_tr_combined_score:
    w2v_tr_combined_score = mrr
    w2v_tr_combined = w2v_tr_combined_array[t]
    w2v_tr_combined_opts = opts
  w2v_tr_combined_data.append([*t,mrr])
pd.set_option('display.width', 1000)
stats_show(w2v_tr_combined_stats)
store_stats(res)
print(w2v_tr_combined_opts)
print(w2v_tr_combined_score)


Using 101466 training sentences and 25377 test sentences. 0 filtered out.


Variants:   0%|          | 0/1 [00:00<?, ?it/s]

                                                    &     hit@1  &     hit@5  &    hit@10  &    MRR@10  & Processed sentences  \\
W2V TR nse=0.05-neg=21-cbow-epochs=150-vs=200-w...  &  0.118336  &  0.328408  &  0.434527  &  0.206749  &               25377  \\
{'ns_exponent': 0.05, 'negative': 21, 'sg': 0, 'vector_size': 200, 'window': 2, 'sample': 0.01, 'shrink_windows': True, 'compute_loss': True, 'epochs': 150}
0.20674875262001402


In [20]:
# @title Vocabulary viewer { display-mode: "form" }
# @markdown Examine vocabularies of the models interactively.

if train_unoptimized_variants:
  w2v_extra = [("Word2Vec p0 / AES", (w2vp0, "token", "text")),
               ("Word2Vec p3 / AES", (w2vp3, "token", "text"))]
else:
  w2v_extra = []

@interact(model=[("Word2Vec / RTC", (w2v_rtc, "transliteration")),
                 ("Word2Vec / AES", (w2v, "token", "text")),
                 ("Word2Vec / AES TR", (w2v_tr, "transliteration")),] + w2v_extra)
def view_vocabulary(model):
  clear_output()
  fields = list(model)[1:]
  model = model[0]
  if model is not None and model != "":
      vocab = list(model.wv.index_to_key)
      d = dict()
      for f in fields:
        if f == "text":
          d[f.capitalize()] = [token_to_text(token) for token in vocab]
        else:
          d[f.capitalize()] = vocab
      print(f"Vocabulary size: {len(vocab)}")
      pd.set_option('display.min_rows', 25)
      pd.set_option('display.max_rows', 25)
      vocabulary_entries_df = pd.DataFrame(d)
      display(vocabulary_entries_df)



interactive(children=(Dropdown(description='model', options=(('Word2Vec / RTC', (<gensim.models.word2vec.Word2…

In [21]:
# @title Prediction statistics from training set. { display-mode: "form" }

def prediction_stats_to_widget(stats_predict):
  predict_html = f"""
  <table border="1">
   <tr>
    <th>Model / #</th>
    <th>Sentence</th>
    <th>Omitted word</th>
    <th>Hit idx</th>
    <th>Guesses</th>
   </tr>
  """
  for stat in stats_predict:
    subtab = tabulate(stat[5], headers=[ "Guess", "Probability", "Similarity with right guess" ], tablefmt='html')
    sentence = " ".join(stat[1])
    if len(sentence) > 32:
      sentence = sentence[:30] + "\u2026"
    predict_html += f"""
     <tr>
      <td>{stat[0]}</td>
      <td>{sentence}</td>
      <td>{stat[2]}</td>
      <td>{stat[3]}</td>
      <td><details><summary>{" ".join(stat[4])}</summary>
           <p>{subtab}</p></details></td>
     </tr>
    """
  predict_html += f"""
  </table>
  """
  return widgets.HTML(
    value=predict_html,
    placeholder='...Processing...',
    description='Predictions:')

print("Some manual trials of predict_words()")
stats_predict=[]
for idx,sentence in enumerate(aecorpus.rtc_t[:5]):
  seq = token_seq_rtc(sentence)
  omitted = seq[3]
  seq[3] = "LACUNA"
  res = predict_words(seq,w2v_rtc,stat_append=stats_predict,stat_id=f"W2V_RTC #{idx+1}", omitted=omitted)
  if train_ft:
    res = predict_words(seq,ft_rtc,stat_append=stats_predict,stat_id=f"FT_RTC #{idx+1}", omitted=omitted)
  res = predict_words(seq,w2v_rtc,stat_append=stats_predict,stat_id=f"Vocab_RTC #{idx+1}", omitted=omitted, algo=-1)
for idx, sentence in enumerate(aecorpus.aes_t[:5]):
  seq = token_seq(sentence)
  omitted = seq[3]
  seq[3] = "LACUNA"
  res = predict_words(seq,w2v,stat_append=stats_predict,stat_id=f"W2V_AES #{idx+1}", omitted=omitted)
  seq = token_seq(sentence,padlen=3)
  omitted = seq[4]
  seq[4] = "LACUNA"
  if train_unoptimized_variants:
    res = predict_words(seq,w2vp3,stat_append=stats_predict,stat_id=f"W2VAES p3 #{idx+1}",omitted=omitted)
  res = predict_words(seq,w2v,stat_append=stats_predict,stat_id=f"Vocab_AES #{idx+1}", omitted=omitted, algo=-1)

prediction_stats_to_widget(stats_predict)


Some manual trials of predict_words()


HTML(value='\n  <table border="1">\n   <tr>\n    <th>Model / #</th>\n    <th>Sentence</th>\n    <th>Omitted wo…

# Results

In [22]:
# @title Display statistics collected from model testing. { display-mode: "form" }

def stats_update():
  row_names = stats.keys()
  column_names = list(stats.values())[0].get_heading(include_mrr5=True)
  ps = [pr.get_stats(include_mrr5=True) for pr in stats.values()]
  dfdata = np.row_stack([(f'{x[0]:.4f}', f'{x[1]:.4f}', f'{x[2]:.4f}', f'{x[3]:.4f}', f'{x[4]:.4f}' , f'{x[5]:.4f}', f'{x[6]}', f'{x[7]}', f'{x[8]:.4f}') for x in ps])
  pd.set_option('display.max_rows', 40)
  pd.set_option('display.min_rows', 40)
  text_stats_df = pd.DataFrame(dfdata, columns=column_names, index=row_names)
  text_stats_df.index = row_names
  display(text_stats_df)

def stats_update_text():
  """Textual statistics for cut&paste"""
  row_names = stats.keys()
  column_names = list(stats.values())[0].get_heading(include_mrr5=True)
  ps = [pr.get_stats(include_mrr5=True) for pr in stats.values()]
  dfdata = np.row_stack([(f'{x[0]:.4f}', f'{x[1]:.4f}', f'{x[2]:.4f}', f'{x[3]:.4f}', f'{x[4]:.4f}' , f'{x[5]:.4f}', f'{x[6]}', f'{x[7]}', f'{x[8]:.4f}') for x in ps])
  pd.set_option('display.max_rows', 90)
  pd.set_option('display.min_rows', 90)
  text_stats_df = pd.DataFrame(dfdata, columns=column_names, index=row_names)
  text_stats_df.index = row_names
  pd.set_option('display.width', 1000)
  print(text_stats_df)
def stats_update_latex():
  """Latex format statistics for cut&paste"""
  row_names = stats.keys()
  column_names = [ '&', 'hit@1', '&', 'hit@5', '&', 'hit@10', '&', 'MRR@10', '&', 'Processed sentences', '\\\\' ]

  #list(stats.values())[0].get_heading(include_mrr5=True)
  ps = [pr.get_stats(include_mrr5=True) for pr in stats.values()]
  dfdata = np.row_stack([('&', f'{x[0]:.6f}', '&', f'{x[1]:.6f}', '&', f'{x[2]:.6f}', '&', f'{x[5]:.6f}', '&', f'{x[6]}', '\\\\') for x in ps])
  pd.set_option('display.max_rows', 90)
  pd.set_option('display.min_rows', 90)
  text_stats_df = pd.DataFrame(dfdata, columns=column_names, index=row_names)
  text_stats_df.index = row_names
  pd.set_option('display.width', 1000)
  print(text_stats_df)

def on_button_clicked(stats_button):
    with stats_output:
        clear_output(wait = True)
        global stats_format
        if stats_format == 'Table':
          stats_update()
        elif stats_format == 'Text':
          stats_update_text()
        else:
          stats_update_latex()

stats_button = widgets.Button(description="Refresh Stats")
stats_output = widgets.Output()
stats_format = 'Table'
with stats_output:
  on_button_clicked(None)
display_type = widgets.Dropdown(
    options=['Table', 'Text', 'Latex'],
    value='Table',
    description='Format:',
    disabled=False,
)

def on_change(change):
  if change['type'] == 'change' and change['name'] == 'value':
    global stats_format
    stats_format = change['new']
    on_button_clicked(None)

display_type.observe(on_change)
display(stats_button, display_type, stats_output)
stats_button.on_click(on_button_clicked)



Button(description='Refresh Stats', style=ButtonStyle())

Dropdown(description='Format:', options=('Table', 'Text', 'Latex'), value='Table')

Output()

# Gaps

In [23]:
# @title Use `W2V AES` and `AES TR` model against known gaps. { display-mode: "form" }

# @markdown Select language to use for translations.
prefer_en = True #@param {type:"boolean"}

lemmaform_map = dict()
def to_lemma_id(transliteration):
  if transliteration in lemmaform_map:
    return lemmaform_map[transliteration].most_common(1)[0][0]
  if transliteration in written_form_to_mdc:
    transliteration = written_form_to_mdc[transliteration]
    if transliteration in lemmaform_map:
      return lemmaform_map[transliteration].most_common(1)[0][0]
  return None

for sentence in aecorpus.aes:
  for token in sentence['tokens']:
    if token[2] not in lemmaform_map:
      lemmaform_map[token[2]] = Counter()
    lemmaform_map[token[2]] += Counter({token[3]: 1})

slw = []
cn=0
c=0
with open("all_gaps_id.txt") as gaps:
  lacunae = re.compile(r'LACUNAE')
  lacunae_word = re.compile(r'<LACUNAE_WORD>')
  for l in gaps.readlines():
    id,tr = l.split(" ", 1)
    id = id[1:-1]
    tr = tr.replace("\n", "")
    if id not in aes_sentence_dictionary:
      if c < 10:
        print(f"{id} not found")
      cn += 1
      continue
    sd = aes_sentence_dictionary[id]
    tokens = []
    if 'tokens' in sd:
      if '850830' in sd['token_list']:
        continue # Missing item is a name => generally not possible to match with models.
      tokens = " ".join([f"{x[3]}/{x[0]}" for x in sd['tokens']])
      ftokens = [(True if x[0] == '[___]' else False) for x in sd['tokens']]
      token_list = [('LACUNA' if x[0] == '[___]' else x[3]) for x in sd['tokens']]
      if 'LACUNA' not in token_list:
        continue # Could not find LACUNA in tokens.
    if prefer_en == False:
      transl = sd['translation'] if 'translation' in sd else "No translation"
    else:
      transl = sd['en'] if 'en' in sd else (sd['translation'] if 'translation' in sd else "No translation")
    tl = sd['sentence_transliteration'] if 'sentence_transliteration' in sd else ''
#  extra_info[id] = (s['token_list'], s['sentence_transliteration'], trans)
    t = [tr.split(" "), transl, tokens, tl, token_list, id ]
    res = lacunae.findall(tr)
    if len(res) == 1 and len(lacunae_word.findall(tr)) == 1:
      slw.append(t)
      c += 1
if cn > 0:
  print(f"{c} suitable sentences; {cn} sentences not found")
else:
  print(f"{c} suitable sentences")
def get_predictions(list_of_inputs):
  out = [
    f"""<form>
      """ ]
  for idx,t in enumerate(list_of_inputs):
    tokens = t[0]
    #print(tokens)
    pr = predict_words(start_pad() + tokens + end_pad(),w2v_tr,lacuna='<LACUNAE_WORD>')
    prx = predict_words(start_pad() + t[4] + end_pad(),w2v,lacuna='LACUNA')
    #print(start_pad() + t[4] + end_pad())
    pr2 = Counter()
    for p in prx:
      if not hasattr(p, '__iter__'):
        continue
      if is_pad(p[0]):
        continue
      #print(p)
      lid = p[0]
      lform = None
      transl = None
      if lid is not None:
        #if len(lid) > 1:
        #  not unique
        lid = str(lid)
        if lid in aecorpus.aed:
          lform = aecorpus.aed[lid]['form']
          if 'translations' in aecorpus.aed[lid]:
            transl = aecorpus.aed[lid]['translations']
            d = dict([(k[1], k[0]) for k in transl])
            if prefer_en == False and 'de' in d:
              transl = d['de']
            elif 'en' in d:
              transl = d['en']
            elif 'de' in d:
              transl = d['de']
            else:
              transl = str(d)
        else:
          lform = "(unknown)"
      pr2[(lform, lform, lid, transl)] += p[1]
    for p in pr:
      if isinstance(p, float):
        print(f"Unexpected float {p}")
        print(start_pad() + t[4] + end_pad())
        print(start_pad() + t[0] + end_pad())
        continue
      if not isinstance(p[0], str):
        print(f"Unexpected {p[0]}")
        continue
      if is_pad(p[0]):
        continue
      lid = to_lemma_id(p[0])
      lform = None
      transl = None
      if lid is not None:
        #if len(lid) > 1:
        #  not unique
        lid = str(lid)
        if lid in aecorpus.aed:
          lform = aecorpus.aed[lid]['form']
          if 'translations' in aecorpus.aed[lid]:
            transl = aecorpus.aed[lid]['translations']
            d = dict([(k[1], k[0]) for k in transl])
            if 'en' in d:
              transl = d['en']
            elif 'de' in d:
              transl = d['de']
            else:
              transl = str(d)
        else:
          lform = "(unknown)"
      pr2[(p[0], lform, lid, transl)] += p[1]
    # Combine results from both models
    pr2 = pr2.most_common()
    pr2 = [[p[1]] + list(p[0]) for p in pr2]
    #print(pr2)
    if False: # Use details/summary elements (old style)
        subtab = tabulate(pr2, headers=[ "Probability", "Guess", "Lemma Form", "Lemma ID", "Translation" ], tablefmt='html')
        s1 = '<details style="display: inline"><summary style="display: inline">'
        s2 = f'</summary> <p style="display: inline">{subtab}</p></details>'
    elif True:
      # Use datalist and input
      s1 = f'<input size="20" style="width: 300px; -webkit-rtl-ordering: logical;" type=text id=options-input-{idx} list=options-{idx}><!-- ' # value="{pr2[0][1]}"><!--
      s2 = f'--><datalist class="keepDatalistOptions" size="20" id=options-{idx}>'
      for p in pr2:
        s2 += f'<option value="{p[1]}">{p[2]} {p[3]} {p[4]} ({p[0]})'
      s2 += f'</datalist>'
    elif False:
      # Use select/optgroup/option
      js_opts = ''
      # alt behaviour - shows all options:
      size=len(pr2)
      js_opts = f"onfocus='this.size={size*2};' onblur='this.size=1;' onchange='this.size=1; this.blur();'"
      s1 = f'<select style="width: 500px;" type=text {js_opts} value="{pr2[0][1]}"><!-- '
      s2 = f'-->'
      for p in pr2:
        s2 += f'<optgroup label="{p[2]} {p[3]} {p[4]} ({p[0]:0.5f})"><option value="{p[1]}">{p[1]}</optgroup>'

    out.append(
        f"""
           <p>{" ".join(tokens).replace(">", "&extended-gt;").replace("<", s1 + "&lt;").replace("&extended-gt;", "&gt;" + s2)}</p>
        """)
    # optional extra context information
    #out.append("<p>"+(str(t[2]).replace("<", "&lt;").replace(">", "&gt;"))+"</p>")
    #out.append("<p>"+str(t[4])+"</p>")
    #out.append("<p>"+str(t[3])+"</p>")
    out.append("<p>"+str(t[1])+"</p>")
    out.append("<hr>")
  out[-1] = '</form>'
  return "".join(out)
w = widgets.HTML(
    value=get_predictions(slw[:50]),
    placeholder='...Processing...',
    description='Predictions for gaps:')
display(w)

IBUBd7TKhu7mKkCKjWrfxxqiU not found
IBcDSKGEkyxdVkVhmy2OjSmFc not found
70 suitable sentences; 104 sentences not found


HTML(value='<form>\n      \n           <p>ztp.n sw jtj =f Jmn m <input size="20" style="width: 300px; -webkit-…

In [24]:
#@title Word2Vec predictions, with MaReTe Transliterations format inputs/output

model=w2v_tr_combined
def get_predictions_w2v(list_of_inputs):
  predictions = []
  out = [ "" ]
  for idx,t in enumerate(list_of_inputs):
    tokens = t[0]
    transl = t[1]
    t[0] = [x.replace("\n", "") for x in t[0]]
    #print(tokens)
    pr = predict_words(start_pad() + t[0] + end_pad(),model,lacuna='<LACUNAE_WORD>')
    #print(pr)
    for s in pr:
      if "\n" in s[0]:
        print(t[0])
        print(s[0])
        raise Exception(f"sentence #{idx} {' '.join(tokens)}")
    d = {
      "sentence": " ".join(tokens),
      "predictions": [s[0].replace("\n","") for s in pr]
    }
    if transl is not None:
      d["translation"] = transl
    d["id"] = t[5]
    predictions.append(d)

    words = [p[0] for p in pr]
    s1 = ""
    s2 = ""
    tokens = " ".join(t[0])
    tokens = tokens.replace("<LACUNAE_WORD>", "\u226A"+(" | ".join(words))+"\u226B")
    out.append(
        f"""
           <p>{tokens.replace(">", "&extended-gt;").replace("<", s1 + "&lt;").replace("&extended-gt;", "&gt;" + s2)}</p>
        """)
    if transl is not None:
      out.append(f"<p><i>{transl}</i></p>")
    out.append("<hr>")
  out[-1] = '</form>'
  return ("".join(out), predictions)

html, obj = get_predictions_w2v(slw[:50])
w = widgets.HTML(
    value=html,
    placeholder='...Processing...',
    description='Predictions for gaps:')
display(w)

import json
import io

file = "predictions-w2vcombined.json"
with open(file, "w") as pf:
  print(json.dumps(obj, indent=2), file=pf)

print(f"Also written predictions to {file}")



HTML(value='\n           <p>ztp.n sw jtj =f Jmn m ≪pr | iw | n | sw | Hr | Dd | nfr | rdi.t | r | di≫ m rḫ.n =…

Also written predictions to predictions-w2vcombined.json


In [25]:
# @title Download the prediction results

from google.colab import files

download_button = widgets.Button(description="Download predictions")
def on_download_button(button):
  files.download('predictions-w2vcombined.json')

download_button.on_click(on_download_button)
display(download_button)


Button(description='Download predictions', style=ButtonStyle())