In [1]:
%load_ext autoreload
%autoreload 2

Make plain text data, remembering the nodes that the text comes from.

In [2]:
import os
import sys
import json

from tf.app import use
from tf.convert.recorder import Recorder
from tf.core.helpers import specFromRangesLogical, rangesFromSet

In [3]:
A = use("nena:clone", checkout="clone", hoist=globals())

In [4]:
for (tp, av, start, end) in C.levels.data:
    print(f"{end - start + 1:>6} {tp:<12} nodes average {int(round(av)):>6} chars")

     2 dialect      nodes average 269689 chars
   126 text         nodes average   4281 chars
   350 paragraph    nodes average   1541 chars
  2544 line         nodes average    212 chars
 16326 sentence     nodes average     33 chars
 24497 subsentence  nodes average     22 chars
 36444 inton        nodes average     15 chars
 93766 stress       nodes average      6 chars
120151 word         nodes average      4 chars
539378 letter       nodes average      1 chars


# Generate full text

We use the `full` transcription.

We remember nodes of the types *letter*, *word*, *sentence*, *line*, and *text*.

We store the positions by node type.

In [5]:
GH = os.path.expanduser("~/github")
ORG = "CambridgeSemiticsLab"
REPO = "nena_tf"
REL = "nena2search/app"
OUTPUT = f"{GH}/{ORG}/{REPO}/{REL}"
DEBUG_OUTPUT = f"{GH}/{ORG}/{REPO}/_local"

if not os.path.exists(OUTPUT):
    os.makedirs(OUTPUT, exist_ok=True)

In [6]:
PH_ABSENT = "z"
CH_ABSENT = "¿"

In [18]:
DATA = dict(
    letter=dict(
        layers=dict(
            full=dict(
                feature="full",
                map=None,
                default=CH_ABSENT,
                pos=None,
            ),
            cls=dict(
                feature="phonetic_class",
                map={
                    "vowel": "V",
                    "consonant": "C",
                },
                default=PH_ABSENT,
                pos=None,
            ),
            voice=dict(
                feature="phonation",
                map={
                    "plain": "P",
                    "unvoiced_aspirated": "H",
                    "voiced": "V",
                    "unvoiced": "F",
                    "unvoiced_unaspirated": "G",
                    "emphatic": "X",
                },
                default=PH_ABSENT,
                pos="cls",
            ),
            place=dict(
                feature="phonetic_place",
                map={
                    "dental-alveolar": "D",
                    "labial": "B",
                    "palatal-alveolar": "C",
                    "palatal": "J",
                    "velar": "G",
                    "uvular": "X",
                    "pharyngeal": "Q",
                    "laryngeal": "H",
                },
                default=PH_ABSENT,
                pos="cls",
            ),
            manner=dict(
                feature="phonetic_manner",
                map={
                    "affricative": "A",
                    "nasal": "N",
                    "other": "X",
                    "fricative": "F",
                    "lateral": "L",
                    "sibilant": "S",
                },
                default=PH_ABSENT,
                pos="cls",
            ),
        ),
    ),
    word=dict(
        layers=dict(
            lang=dict(
                feature="lang",
                map={x[0]: i + 1 for (i, x) in enumerate(F.lang.freqList())},
                default=0,
                pos=None,
            ),
            speaker=dict(
                feature="speaker",
                map={x[0]: i + 1 for (i, x) in enumerate(F.speaker.freqList())},
                default=0,
                pos=None,
            ),
        ),
        afterFeature="full_end",
        afterDefault="/",
    ),
    sentence=dict(
        afterDefault="\n",
    ),
    line=dict(
        layers=dict(
            number=dict(
                feature="line_number",
                map=None,
                default=-1,
                pos=None,
            ),
        ),
        afterDefault="\n",
    ),
    text=dict(
        layers=dict(
            title=dict(
                feature="title",
                map=None,
                default="¿",
                pos=None,
            ),
            tid=dict(
                feature="text_id",
                map=None,
                default="¿",
                pos=None,
            ),
            place=dict(
                feature="place",
                map=None,
                default="¿",
                pos=None,
            ),
        ),
        afterDefault="\n",
    ),
)

TYPE_SEQ = list(DATA)
TYPES_LOWER = {}

for (i, tp) in enumerate(TYPE_SEQ):
    TYPES_LOWER[tp] = TYPE_SEQ[0: i + 1]

We take care that for every phonetic property, the value is always exactly one character, no more no less.
That means that all recorded phonetic texts have the same mapping between character positions and slot numbers.

For the full text it is different: there are 18 letters with an empty full text, and some letters use multiple characters for their full text.

In the end, we only have to produce two mappings: for the full text and for the phonetics.
We choose the phonetic class text to carry the phonetic mapping.

As to the mapping from letter nodes to words, sentences, lines and texts: we only need to do that once, and we create
it as a single *parent* relation, stored outside the recorders.

In [9]:
def compress(data):
    sets = {}
    
    compressed = []

    for n in sorted(data):
        sets.setdefault(data[n], []).append(n)
        
    for (value, nset) in sorted(
        sets.items(), key=lambda x: (x[1][0], x[1][-1])
    ):
        nodeSpec = n if len(nset) == 1 else specFromRangesLogical(rangesFromSet(nset))
        compressed.append([nodeSpec, value])
        
    return compressed

def invert(data):
    return {v: k for (k,v) in data.items()}

In [27]:
def record():
    A.indent(reset=True)
    A.info("preparing ... ")

    A.info("start recording")

    up = {}
    layers = {}
    texts= {}
    positions = {}
    recorders = {}
    accumulators = {}
    
    for (nType, typeInfo) in DATA.items():
        ti = typeInfo.get("layers", None)
        if ti is None:
            continue
            
        layers[nType] = {name: dict(map=ti[name]["map"], pos=ti[name]["pos"] or name) for name in ti}
        texts[nType] = {name: None for name in ti}
        positions[nType] = {name: None for name in ti if ti[name]["pos"] is None}
        recorders[nType] = {name: Recorder(A.api) for name in ti if ti[name]["pos"] is None}
        accumulators[nType] = {name: [] for name in ti if ti[name]["pos"] is not None}

    nChAbsent = 0

    def addValue(node):
        returnValue = None

        nType = F.otype.v(node)
        typeInfo = DATA[nType]
        theseLayers = typeInfo.get("layers", {})

        first = True
        
        for name in theseLayers:
            info = theseLayers[name]
            default = info["default"]
            pos = info["pos"]
            value = Fs(info["feature"]).v(node)
            # print(f"{name=} {default=} {pos=} {value=}")
            vMap = info["map"]
            if vMap:
                value = vMap.get(value, default)
            else:
                value = value or default
            value = str(value)

            if pos is None:
                recorders[nType][name].add(value)
            else:
                accumulators[nType][name].append(value)

            if first:
                returnValue = value
                first = False

        return returnValue

    def addAfterValue(node):
        nType = F.otype.v(node)
        typeInfo = DATA[nType]
        afterFeature = typeInfo.get("afterFeature", None)
        afterDefault = typeInfo.get("afterDefault", None)
        value = ""
        if afterFeature is not None:
            value = Fs(afterFeature).v(node)
        if afterDefault is not None:
            if not value:
                value = afterDefault
        if value:
            addAll(nType, value)

    def addAll(nType, value):
        nTypes = TYPES_LOWER[nType]
        for nType in nTypes:
            if nType in recorders:
                for x in recorders[nType].values():
                    x.add(value)
            if nType in accumulators:
                for x in accumulators[nType].values():
                    x.append(value)

    def deliverAll():
        for (nType, typeInfo) in recorders.items():
            for (name, x) in typeInfo.items():
                texts[nType][name] = x.text()
                positions[nType][name] = [
                    tuple(nodes)[0] if nodes else None for nodes in x.positions()
                ]

        for (nType, typeInfo) in accumulators.items():
            for (name, x) in typeInfo.items():
                texts[nType][name] = "".join(x)

    def startNode(node):
        nType = F.otype.v(node)

        if nType in recorders:
            for rec in recorders[nType].values():
                rec.start(node)

    def endNode(node):
        nType = F.otype.v(node)

        if nType in recorders:
            for rec in recorders[nType].values():
                rec.end(node)

    for (i, text) in enumerate(F.otype.s("text")):
        startNode(text)
        title = addValue(text)
        sys.stdout.write("\r" + f"{i + 1:>3} {title:<80}")

        for line in L.d(text, otype="line"):
            up[line] = text
            startNode(line)
            addValue(line)

            for sent in L.d(line, otype="sentence"):
                up[sent] = line
                startNode(sent)
                addValue(sent)

                for word in L.d(sent, otype="word"):
                    up[word] = sent
                    startNode(word)
                    addValue(word)

                    for letter in L.d(word, otype="letter"):
                        up[letter] = word
                        startNode(letter)

                        ch = addValue(letter)
                        if ch == CH_ABSENT:
                            nChAbsent += 1

                        endNode(letter)
                        addAfterValue(letter)

                    endNode(word)
                    addAfterValue(word)

                endNode(sent)
                addAfterValue(sent)

            endNode(line)
            addAfterValue(line)

        endNode(text)
        addAfterValue(text)

    deliverAll()

    sys.stdout.write("\n")

    A.info(f"{nChAbsent} letter nodes with empty full text")

    data = dict(
        captions=dict(
            title="NENA phono search",
        ),
        ntypes=TYPE_SEQ[::-1],
        layers=layers,
        texts=texts,
        positions=positions,
        up=compress(up),
    )

    return data

In [33]:
def dumpData(data, debug=False):
    A.indent(reset=True)
    A.info("Dumping data to a single compact json file")
    
    fileName = f"{OUTPUT}/corpus.js"
    
    with open(fileName, "w") as fh:
        fh.write("const corpus = ")
        json.dump(data, fh, ensure_ascii=False, indent=None, separators=(',', ':'))
        
    A.info(f"Data written to file {fileName}")
    
    if debug:
        A.info(f"Writing same data as separate, human readable files")
        for (kind, subData) in data.items():
            if kind == "ntypes":
                fileName = f"{DEBUG_OUTPUT}/{kind}.tsv"
                A.info(fileName)
                with open(fileName, "w") as fh:
                    fh.write("\t".join(subData) + "\n")
                continue
                
            if kind in {"captions", "up"}:
                fileName = f"{DEBUG_OUTPUT}/{kind}.json"
                A.info(fileName)
                with open(fileName, "w") as fh:
                    json.dump(subData, fh, ensure_ascii=False, indent=1)
                continue
                
            for (nType, typeData) in subData.items():
                for (name, layerData) in typeData.items():
                    ext = "txt" if kind == "texts" else "tsv" if kind == "positions" else "json"
                    fileName = f"{DEBUG_OUTPUT}/{kind}-{nType}-{name}.{ext}"
                    A.info(fileName)
                    with open(fileName, "w") as fh:
                        if ext == "json":
                            json.dump(layerData, fh, ensure_ascii=False, indent=1)
                        elif ext == "tsv":
                            for entry in layerData:
                                fh.write(("" if entry is None else f"{entry}") + "\n")
                        else:
                            fh.write(layerData)

In [29]:
data = record()

  0.00s preparing ... 
  0.00s start recording
126 Women Do Things Best                                                            
    12s 18 letter nodes with empty full text


In [34]:
dumpData(data, debug=True)

  0.00s Dumping data to a single compact json file
  1.97s Data written to file ~/github/CambridgeSemiticsLab/nena_tf/nena2search/app/corpus.js
  1.97s Writing same data as separate, human readable files
  1.97s ~/github/CambridgeSemiticsLab/nena_tf/_local/captions.json
  1.97s ~/github/CambridgeSemiticsLab/nena_tf/_local/ntypes.tsv
  1.97s ~/github/CambridgeSemiticsLab/nena_tf/_local/layers-letter-full.json
  1.97s ~/github/CambridgeSemiticsLab/nena_tf/_local/layers-letter-cls.json
  1.97s ~/github/CambridgeSemiticsLab/nena_tf/_local/layers-letter-voice.json
  1.97s ~/github/CambridgeSemiticsLab/nena_tf/_local/layers-letter-place.json
  1.97s ~/github/CambridgeSemiticsLab/nena_tf/_local/layers-letter-manner.json
  1.97s ~/github/CambridgeSemiticsLab/nena_tf/_local/layers-word-lang.json
  1.97s ~/github/CambridgeSemiticsLab/nena_tf/_local/layers-word-speaker.json
  1.97s ~/github/CambridgeSemiticsLab/nena_tf/_local/layers-line-number.json
  1.98s ~/github/CambridgeSemiticsLab/nena_tf/_