In [6]:
import spacy
import re
from spacy.tokens import Doc, DocBin, Span
from typing import List, Tuple, Dict, Any, Optional
from __future__ import annotations
from pathlib import Path
from collections import defaultdict
from typing import List, Tuple, Dict, Any
import random
import spacy
from spacy.tokens import DocBin
from spacy import displacy
from pathlib import Path
import json,ast
annotations = list()

In [None]:
def safe_parse(s: str):
    """
    Parse a string into Python dict safely.
    Tries JSON first, falls back to Python literal, 
    and fixes bad unicode escapes if needed.
    """
    if not isinstance(s, str):
        return s  # zaten dict ise direkt dön

    # 1. JSON
    try:
        return json.loads(s)
    except json.JSONDecodeError as e:
        pass  

    # 2. Fix Unicode
    try:
        fixed = re.sub(r'\\u[0-9a-fA-F]{0,3}([^0-9a-fA-F])', r'\\\\u\1', s)
        return json.loads(fixed)
    except json.JSONDecodeError:
        pass

    # 3. Python dict string → literal_eval
    try:
        return ast.literal_eval(s)
    except Exception:
        pass

    print("❌ Could not parse:", s, "...")
    raise ValueError("Couldn't parse")

turkish_map = str.maketrans("ğüşöçıİ", "gusocii")
def normalize(text):
    return text.lower().translate(turkish_map).replace("i̇","i")

In [3]:
def get_annotations(json_obj):
    temp_dict = {"entities":[]}
    real_address,llm_resp = list(json_obj.items())[0]
    llm_resp = safe_parse(llm_resp)
    #raw = llm_resp["raw"]
    for key,val in llm_resp.items():
        if not val:continue
        start = normalize(real_address).find(normalize(val))
        if start == -1: 
            # print(f"{key}-> {val} not found")
            # print(real_address)
            # print("-----------------------")
            continue
        end = start + len(val)
        temp_dict["entities"].append((start,end,key))
    return real_address,temp_dict

In [None]:
with open("../llmdatasetnew.jsonl","r",encoding="utf-8") as dataset:
    for index,line in enumerate(dataset):
        try:
            json_obj = safe_parse(line)
            annotations.append(get_annotations(json_obj))
        except:
            continue

In [None]:
# 1) Label priority (higher number = higher priority)
LABEL_PRIORITY = {
    "il": 6,
    "ilce": 5,
    "semt": 4,
    "mahalle": 3,
    "sokak": 2,
    "pk": 1,
}

def _priority_of(span: Span) -> int:
    return LABEL_PRIORITY.get(span.label_.lower(), 0)

def _length(span: Span) -> int:
    return span.end_char - span.start_char

def _overlaps(a: Span, b: Span) -> bool:
    # Do [a.start,a.end) and [b.start,b.end) intersect?
    return not (a.end_char <= b.start_char or b.end_char <= a.start_char)

# ──────────────────────────────────────────────────────────────────────────────
# 2) Helpers: trim whitespace, snap to token boundaries, span repair

def _trim_ws(text: str, s: int, e: int) -> Tuple[int, int]:
    while s < e and text[s].isspace(): s += 1
    while e > s and text[e-1].isspace(): e -= 1
    return s, e

def _snap_to_tokens(doc: Doc, s: int, e: int) -> Optional[Tuple[int,int]]:
    text = doc.text
    if not (0 <= s < e <= len(text)):
        return None

    # First token covering or following s
    si = None
    for i, tok in enumerate(doc):
        if tok.idx <= s < tok.idx + len(tok):
            si = i; break
        if s < tok.idx and text[s:tok.idx].strip() == "":
            si = i; break
    if si is None:
        return None

    # Token covering e or preceding it
    ei = None
    for i, tok in enumerate(doc):
        if tok.idx < e <= tok.idx + len(tok):
            ei = i; break
        if e <= tok.idx and text[e:tok.idx].strip() == "":
            ei = max(0, i-1); break
    if ei is None:
        ei = len(doc) - 1

    ns = doc[si].idx
    ne = doc[ei].idx + len(doc[ei])
    return (ns, ne) if ns < ne else None

def _repair_span(doc: Doc, s: int, e: int, label: str) -> Optional[Span]:
    """If char_span returns None, try contract/expand and token snapping."""
    text = doc.text
    s, e = _trim_ws(text, s, e)
    if s >= e: 
        return None

    sp = doc.char_span(s, e, label=label, alignment_mode="contract")
    if sp is not None:
        return sp

    sp = doc.char_span(s, e, label=label, alignment_mode="expand")
    if sp is not None:
        return sp

    snapped = _snap_to_tokens(doc, s, e)
    if snapped is not None:
        s2, e2 = snapped
        sp = doc.char_span(s2, e2, label=label, alignment_mode="expand") \
             or doc.char_span(s2, e2, label=label, alignment_mode="contract")
        return sp

    return None

# ──────────────────────────────────────────────────────────────────────────────
# 3) Overlap resolution: label priority > length > earlier start

def resolve_overlaps_by_priority(spans: List[Span]) -> List[Span]:
    if not spans:
        return []

    # sort order: priority DESC, length DESC, start ASC
    spans_sorted = sorted(
        spans,
        key=lambda s: (_priority_of(s), _length(s), -s.start_char),
        reverse=True
    )
    kept: List[Span] = []
    for sp in spans_sorted:
        if any(_overlaps(sp, k) for k in kept):
            continue
        kept.append(sp)

    kept.sort(key=lambda s: s.start_char)  # for visual order
    return kept

# ──────────────────────────────────────────────────────────────────────────────
# 4) Main function: does everything

def annotations_to_docbin(
    annotations: List[Tuple[str, Dict[str, Any]]],
    outfile: str = "train_clean.spacy",
    lang: str = "tr",
    preview_first_n: int = 0,  # >0 → preview with displaCy
    verbose: bool = True,
) -> Tuple[List[Doc], Dict[str,int]]:
    """
    annotations → repair + priority resolution → produces DocBin (outfile) and returns it.
    """
    nlp = spacy.blank(lang)
    db = DocBin(store_user_data=True)
    docs: List[Doc] = []

    stats = dict(
        texts=len(annotations),
        spans_total=0,
        kept=0,
        fixed=0,          # (we don't log index difference, but count if repair happened)
        skipped=0,        # could not be produced at all
        overlapped_removed=0,
        duplicates_removed=0,
        docs_with_ents=0,
        docs_empty=0,
    )

    for text, meta in annotations:
        try:
            doc = nlp.make_doc(text)

            raw_spans = meta.get("entities")
            if raw_spans is None:
                # also support the typo (entitites)
                raw_spans = meta.get("entitites", [])

            stats["spans_total"] += len(raw_spans)

            repaired: List[Span] = []
            for (s, e, label) in raw_spans:
                sp = _repair_span(doc, int(s), int(e), str(label))
                if sp is None:
                    stats["skipped"] += 1
                else:
                    # note: we could also distinguish fixed/kept by comparing indices;
                    # in practice tracking "was it repaired?" is not essential. kept+fixed equals len(repaired).
                    repaired.append(sp)

            # remove duplicates (same start, end, label)
            uniq_key = set()
            dedup: List[Span] = []
            for sp in repaired:
                key = (sp.start_char, sp.end_char, sp.label_)
                if key not in uniq_key:
                    uniq_key.add(key)
                    dedup.append(sp)
            stats["duplicates_removed"] += (len(repaired) - len(dedup))

            # resolve overlap with priority rules
            non_overlap = resolve_overlaps_by_priority(dedup)
            stats["overlapped_removed"] += (len(dedup) - len(non_overlap))

            # write results to doc.ents (now overlap-free)
            if non_overlap:
                doc.ents = non_overlap
                stats["docs_with_ents"] += 1
            else:
                stats["docs_empty"] += 1

            # kept/fixed counts approx.: (produced span count)
            stats["kept"] += len(non_overlap)  # those that remain in the final result
            # we didn't track fixed separately; you could compare s/e in _repair_span to increment it

            db.add(doc)
            docs.append(doc)
        except:
            continue

    db.to_disk(outfile)

    if verbose:
        print(f"\n✔ DocBin written: {outfile}")
        print("Summary:")
        for k in ["texts","spans_total","kept","skipped","overlapped_removed","duplicates_removed","docs_with_ents","docs_empty"]:
            print(f"  - {k}: {stats[k]}")

    # optional preview
    if preview_first_n > 0:
        try:
            from spacy import displacy
            subset = [d for d in docs if len(list(d.ents))][:preview_first_n]
            if subset:
                displacy.render(subset, style="ent", jupyter=True)
            else:
                print("Warning: no entities to preview (all docs ended up empty).")
        except Exception as e:
            print("displaCy render error:", e)

    return docs, stats


In [None]:
# ==== Helpers ====

def get_entities(meta: Dict[str, Any]):
    return meta.get("entities") or meta.get("entitites") or []

def has_target_labels(meta: Dict[str, Any], targets={"il","ilce"}) -> bool:
    labs = {str(l).lower() for _,_,l in get_entities(meta)}
    return bool(labs & targets)

def emphasize_labels(annotations: List[Tuple[str, Dict[str, Any]]],
                     targets={"il","ilce"}, pos_k=2, neg_k=1):
    out = []
    for text, meta in annotations:
        k = pos_k if has_target_labels(meta, targets) else neg_k
        out.extend([(text, meta)] * k)
    return out

def stratified_cluster_split(
    annotations: List[Tuple[str, Dict[str, Any]]],
    dev_ratio: float = 0.1,
    seed: int = 42,
    targets={"il","ilce"},
):
    """
    If the same text appears in multiple records, split by cluster to avoid
    leakage. (The same text will not be placed in both train and dev.)
    Additionally, it splits stratified: those containing 'il/ilce' and those
    not containing them are separated and divided by the given ratio.
    """
    random.seed(seed)

    # 1) Cluster by text
    clusters = defaultdict(list)
    for item in annotations:
        clusters[item[0]].append(item)

    # 2) Split clusters into two groups based on target labels
    pos_clusters, neg_clusters = [], []
    for text, items in clusters.items():
        # A cluster is considered positive if any record has a target label
        is_pos = any(has_target_labels(meta, targets) for _, meta in items)
        (pos_clusters if is_pos else neg_clusters).append(items)

    def split_clusters(cls, ratio):
        random.shuffle(cls)
        dev_count = max(1, int(len(cls) * ratio)) if len(cls) > 0 else 0
        dev_part = cls[:dev_count]
        train_part = cls[dev_count:]
        # flatten
        dev = [it for group in dev_part for it in group]
        train = [it for group in train_part for it in group]
        return train, dev

    train_pos, dev_pos = split_clusters(pos_clusters, dev_ratio)
    train_neg, dev_neg = split_clusters(neg_clusters, dev_ratio)

    train = train_pos + train_neg
    dev   = dev_pos + dev_neg

    # shuffle
    random.shuffle(train); random.shuffle(dev)
    return train, dev


In [7]:
train_ann, dev_ann = stratified_cluster_split(annotations, dev_ratio=0.1, seed=42, targets={"il","ilce","mahalle"})
weighted_train = emphasize_labels(train_ann, targets={"il","ilce"}, pos_k=2, neg_k=1)


In [None]:
annotations_to_docbin(dev_ann, "data/dev.spacy", lang="tr")
annotations_to_docbin(weighted_train, "data/train.spacy", lang="tr")

  from .autonotebook import tqdm as notebook_tqdm



✔ DocBin yazıldı: data/dev2.spacy
Özet:
  - texts: 5445
  - spans_total: 22685
  - kept: 20650
  - skipped: 0
  - overlapped_removed: 2035
  - duplicates_removed: 0
  - docs_with_ents: 5441
  - docs_empty: 4

✔ DocBin yazıldı: data/train2.spacy
Özet:
  - texts: 80778
  - spans_total: 363643
  - kept: 327303
  - skipped: 0
  - overlapped_removed: 36340
  - duplicates_removed: 0
  - docs_with_ents: 80728
  - docs_empty: 50


([7402/3 sokak numara2kat2 daire6 İzmir örnekköy yusufoğlu apartmanı,
  7402/3 sokak numara2kat2 daire6 İzmir örnekköy yusufoğlu apartmanı,
  Koyunbaba Mahallesi, Çetinkaya Sokak, Gümüşkaya Sitesi, Villa No:23, Bodrum,
  Koyunbaba Mahallesi, Çetinkaya Sokak, Gümüşkaya Sitesi, Villa No:23, Bodrum,
  Fevziçakmak Mah. Rüya sokak no 12/A,
  Davutlar Mah. Adnan Kahveci Cad.  6179 Sok. Ayday Sitesi No:6/8 (Eski 1 Numara) Kuşadası/Aydın 09400,
  Davutlar Mah. Adnan Kahveci Cad.  6179 Sok. Ayday Sitesi No:6/8 (Eski 1 Numara) Kuşadası/Aydın 09400,
  Şanlılar tıbbi cihaz ltd şti: 
  İTOB OSB. Atatürk Caddesi 9.sokak No:24 Tekeli/Menderes/İzmir İTOB OSB MENDERES İzmir,
  Şanlılar tıbbi cihaz ltd şti: 
  İTOB OSB. Atatürk Caddesi 9.sokak No:24 Tekeli/Menderes/İzmir İTOB OSB MENDERES İzmir,
  Bestekar Sadi Hoşses Sk. 37 Kat 5 daire 21,
  Yamanlar, 7321 Sk.no:10 kat:2 daire:2,
  Çamköy mahallesi değirmenbaşı caddesi no 27 kat 2,
  turan mah. zahire pazarı cad. belediye işhanı no.2 /109 Sarayköy Tica

After preparing necesarry docbins train NER by writing: 

python -m spacy config.cfg --output ./training


After that we load the model and test it

In [10]:
import spacy
from IPython.display import display, HTML
import pandas as pd
best = spacy.load("training/new_only/model-best/")
test_df = pd.read_csv("../dataset/test.csv")
liste = test_df.address.head(100).to_list()

In [4]:
docs = [best(doc) for doc in liste]

In [15]:
best.get_pipe("ner").labels

('diger', 'il', 'ilce', 'mahalle', 'pk', 'semt', 'sokak')

In [None]:
colors = {
    "diger": "#ffd166",    # soft yellow
    "il": "#118ab2",       # blue
    "ilce": "#06d6a0",     # mint/green
    "mahalle": "#ef476f",  # pink/red
    "pk": "#073b4c",       # dark blue
    "semt": "#f9844a",     # orange
    "sokak": "#8338ec",    # purple
}

options = {"colors": colors}

for doc in docs:
    displacy.render(doc, style="ent", options=options, jupyter=True)

