<a href="https://colab.research.google.com/github/annakalinina18/star-fle/blob/main/corpus_to_xmi_inception.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd, re, os, zipfile, shutil, time
from cassis import TypeSystem, Cas

xlsx_path = "corpus_projected_with_types_sorted (2).xlsx"
df = pd.read_excel(xlsx_path)
sent_id_col = "sent_id"

CLITICS = {"l'", "d'", "s'", "n'", "qu'", "j'", "t'", "m'", "c'"}
PUNCT_NO_SPACE_BEFORE = {".", ",", "?", "!", ";", ":", ")", "]", "}", "»"}
PUNCT_NO_SPACE_AFTER = {"(", "[", "{", "«"}

ALLOWED = [
    "Expression_idiomatique",
    "Collocation_opaque",
    "Collocation_transparente",
    "Expression_libre",
    "Autre",
]

def normalize_label(raw: str) -> str:
    raw = re.sub(r"\s+", " ", raw.strip())
    key = raw.lower().replace(" ", "_")
    mapping = {
        "expression_idiomatique": "Expression_idiomatique",
        "collocation_opaque": "Collocation_opaque",
        "collocation_transparente": "Collocation_transparente",
        "expression_libre": "Expression_libre",
        "autre": "Autre",
    }
    return mapping.get(key, "Autre")

def extract_labels(expr_raw: str):
    m = re.search(r"\((.*?)\)\s*$", expr_raw.strip())
    if not m:
        return ["Autre"]
    inside = re.sub(r"^\s*ничья\s*:\s*", "", m.group(1).strip(), flags=re.I)
    parts = [p.strip() for p in inside.split("/") if p.strip()]
    labs = [normalize_label(p) for p in parts] if parts else ["Autre"]
    out=[]; seen=set()
    for l in labs:
        if l not in seen and l in ALLOWED:
            out.append(l); seen.add(l)
    return out or ["Autre"]

def strip_parens(expr_raw: str) -> str:
    return re.split(r"\s*\(", expr_raw.strip(), maxsplit=1)[0].strip()

def clean_expr(expr_raw: str) -> str:
    return re.sub(r"\s+", " ", strip_parens(expr_raw)).lower()

def explode_apostrophes(expr: str) -> str:
    return re.sub(r"([a-zàâçéèêëîïôûùüÿñæœ]+')([a-zàâçéèêëîïôûùüÿñæœ]+)", r"\1 \2", expr, flags=re.I)

def expr_to_pattern_tokens(expr_raw: str):
    expr = explode_apostrophes(clean_expr(expr_raw))
    return [t for t in expr.split() if t and t not in CLITICS]

def parse_items(cell):
    if pd.isna(cell) or str(cell).strip() in ["", "_"]:
        return []
    parts=[p.strip() for p in str(cell).split("|") if p.strip()]
    return [{"raw":p, "labels":extract_labels(p)} for p in parts]

def subseq_match_indices(forms, lemmas, pattern, start_pos=0):
    idxs=[]; pos=start_pos
    for pat in pattern:
        found=None
        for i in range(pos, len(forms)):
            if forms[i]==pat or lemmas[i]==pat:
                found=i; break
        if found is None:
            return None
        idxs.append(found); pos=found+1
    return idxs

def all_subseq_matches(forms, lemmas, pattern):
    matches=[]; start=0
    while start < len(forms):
        idxs=subseq_match_indices(forms, lemmas, pattern, start)
        if idxs is None: break
        matches.append(idxs); start=idxs[0]+1
    matches.sort(key=lambda x:(x[0], x[-1]-x[0]))
    return matches

def reconstruct_sentence(tokens):
    text_parts=[]; offsets=[]; cur=0; prev=None
    for tok in tokens:
        if prev is None or tok in PUNCT_NO_SPACE_BEFORE or prev in PUNCT_NO_SPACE_AFTER or tok in {"'", "’"}:
            need_space=False
        else:
            need_space=True
        if need_space:
            text_parts.append(" "); cur+=1
        offsets.append(cur)
        text_parts.append(tok); cur+=len(tok); prev=tok
    return "".join(text_parts), offsets

# TypeSystem
ts = TypeSystem()
Sentence = ts.create_type("de.tudarmstadt.ukp.dkpro.core.api.segmentation.type.Sentence", "uima.tcas.Annotation")
SpanEP = ts.create_type("webanno.custom.Span_EP", "uima.tcas.Annotation")
RelEP = ts.create_type("webanno.custom.Relation_EP", "uima.tcas.Annotation")
ts.create_feature(RelEP, "Governor", "webanno.custom.Span_EP")
ts.create_feature(RelEP, "Dependent", "webanno.custom.Span_EP")
ts.create_feature(RelEP, "label", "uima.cas.String")

# document boundaries
sent_ids = df[sent_id_col].fillna(-1).astype(int).tolist()
doc_starts=[0]
for i in range(1,len(sent_ids)):
    if sent_ids[i]==0: doc_starts.append(i)
doc_starts.append(len(df))
doc_ranges=[(doc_starts[i], doc_starts[i+1]) for i in range(len(doc_starts)-1)]

out_dir="/mnt/data/inception_span_relation_labels_noNV"
if os.path.exists(out_dir): shutil.rmtree(out_dir)
os.makedirs(out_dir, exist_ok=True)

xmi_paths=[]
t0=time.time()

for doc_idx,(a,b) in enumerate(doc_ranges, start=1):
    doc_df=df.iloc[a:b]
    cas=Cas(typesystem=ts)
    full_text_parts=[]; sent_spans=[]; rel_records=[]
    cursor=0

    for _, row in doc_df.iterrows():
        tokens=str(row.get("ud_tokens","")).split()
        if not tokens: continue
        forms=[t.lower() for t in tokens]
        lemmas_raw=[t.lower() for t in str(row.get("ud_lemmas","")).split()]
        lemmas=(lemmas_raw+["_"]*len(tokens))[:len(tokens)]

        sent_text, tok_offsets = reconstruct_sentence(tokens)
        sent_begin=cursor
        full_text_parts.append(sent_text)
        sent_end=sent_begin+len(sent_text)
        sent_spans.append((sent_begin,sent_end))

        items=parse_items(row.get("MWEs_projected", row.get("MWE_projected","")))
        used=[False]*len(tokens)

        for item in items:
            pat=expr_to_pattern_tokens(item["raw"])
            if not pat: continue
            matches=all_subseq_matches(forms, lemmas, pat)
            chosen=None
            for idxs in matches:
                if not any(used[i] for i in idxs):
                    chosen=idxs; break
            if chosen is None: continue
            for i in chosen: used[i]=True

            span_fs=[]
            for i in chosen:
                tb=sent_begin+tok_offsets[i]
                te=tb+len(tokens[i])
                fs=SpanEP(begin=tb,end=te)
                cas.add(fs)
                span_fs.append(fs)

            gov=span_fs[0]
            for dep in span_fs[1:]:
                for lab in item["labels"]:
                    rel_records.append((gov,dep,lab))

        full_text_parts.append("\n")
        cursor=sent_end+1

    full_text="".join(full_text_parts).rstrip("\n")
    cas.sofa_string=full_text
    cas.sofa_mime="text/plain"

    for b0,e0 in sent_spans:
        if e0<=len(full_text):
            cas.add(Sentence(begin=b0,end=e0))

    for gov,dep,lab in rel_records:
        cas.add(RelEP(begin=gov.begin,end=dep.end,Governor=gov,Dependent=dep,label=lab))

    xmi_path=os.path.join(out_dir,f"doc_{doc_idx:03d}.xmi")
    cas.to_xmi(xmi_path, pretty_print=False)
    xmi_paths.append(xmi_path)

typesystem_path=os.path.join(out_dir,"TypeSystem.xml")
with open(typesystem_path,"wb") as f:
    f.write(ts.to_xml().encode("utf-8"))

zip_path="/mnt/data/inception_span_relation_labels_noNV.zip"
if os.path.exists(zip_path): os.remove(zip_path)
with zipfile.ZipFile(zip_path,"w",compression=zipfile.ZIP_DEFLATED) as z:
    z.write(typesystem_path, arcname="TypeSystem.xml")
    for p in xmi_paths:
        z.write(p, arcname=os.path.basename(p))

(zip_path, len(xmi_paths), round(time.time()-t0,2), os.path.getsize(zip_path))
