# NER SPACY Training

## First Step (MarkUp our datasets)
- special `.spacy` file, in which you have markup tags in all text  

```
{
    "text": "–ö—É–ø–∏—Ç—å iPhone 15 Pro Max –ø–æ –≤—ã–≥–æ–¥–Ω–æ–π —Ü–µ–Ω–µ",
    "entities": [(6, 20, "PRODUCT")]  # iPhone 15 Pro Max
}
```

In [None]:
import re
import spacy
from spacy.tokens import DocBin
from tqdm import tqdm
from spacy.util import filter_spans


def mark_and_save_spacy(df, output_path, label="PRODUCT"):
    nlp = spacy.blank("en")
    doc_bin = DocBin()
    total_targets = 0
    found_targets = 0
    skipped_texts = 0

    for _, row in tqdm(df.iterrows(), total=len(df), desc="üìå –†–∞–∑–º–µ—Ç–∫–∞"):
        targets = str(row['Informative_Text'])
        text = str(row['Texts'])
        entities = []

        for target in targets.split(','): # prepare tags
            target = target.strip()
            
            if not target:
                continue
            
            total_targets += 1
            
            matched = False
            for match in re.finditer(re.escape(target), text, flags=re.IGNORECASE): # find matches
                start, end = match.span()                                           
                entities.append((start, end, label))                                
                matched = True
            
            if matched:
                found_targets += 1

        # skip texts without markup targets
        if not entities:
            skipped_texts += 1
            continue

        doc = nlp.make_doc(text)
        spans = [doc.char_span(start, end, label=label) for start, end, label in entities]
        spans = [s for s in spans if s is not None]
        spans = filter_spans(spans)
        doc.ents = spans
        doc_bin.add(doc)

    doc_bin.to_disk(output_path)
    print(f"\n –†–∞–∑–º–µ—á–µ–Ω–Ω—ã–µ –¥–∞–Ω–Ω—ã–µ —Å–æ—Ö—Ä–∞–Ω–µ–Ω—ã –≤ {output_path}")
    print(f"‚è≠ –ü—Ä–æ–ø—É—â–µ–Ω–æ —Ç–µ–∫—Å—Ç–æ–≤ –±–µ–∑ —Å—É—â–Ω–æ—Å—Ç–µ–π: {skipped_texts}")
    if total_targets > 0:
        coverage = found_targets / total_targets * 100
        print(f"–ù–∞–π–¥–µ–Ω–æ {found_targets}/{total_targets} —Ç–∞—Ä–≥–µ—Ç–æ–≤ ({coverage:.2f}%)")
    else:
        print("–ù–µ—Ç –Ω–∏ –æ–¥–Ω–æ–≥–æ —Ç–∞—Ä–≥–µ—Ç–∞ –≤ –¥–∞–Ω–Ω—ã—Ö.")

## Second Step (Calculate F1 Metric)

![image.png](attachment:image.png)

Precision = TruePositives + FalsePositives / TruePositives

Recall = TruePositives + FalseNegatives / TruePositives
‚Äã
- **True Positives (TP)**: Correctly predicted positive instances.
- **False Positives (FP)**: Incorrectly predicted positive instances.
- **True Negatives (TN)**: Correctly predicted negative instances.
- **False Negatives (FN)**: Incorrectly predicted negative instances.

F_1 = (2 √ó Precision + Recall) / Precision √ó Recall

In [6]:
from sklearn.metrics import f1_score

def evaluate_ner_model(nlp, examples, label="PRODUCT", verbose=True):
    y_true = []
    y_pred = []

    for example in examples:
        pred_doc = nlp(example.text)
        pred_ents = {(ent.start_char, ent.end_char) for ent in pred_doc.ents if ent.label_ == label}
        true_ents = {(ent.start_char, ent.end_char) for ent in example.reference.ents if ent.label_ == label}

        y_true.append(true_ents)
        y_pred.append(pred_ents)

    """
    –ë–∏–Ω–∞—Ä–∏–∑–∞—Ü–∏—è –ø–æ —Å—É—â–Ω–æ—Å—Ç—è–º (—Å–æ–≤–ø–∞–¥–∞–µ—Ç –ª–∏ —Ö–æ—Ç—è –±—ã –æ–¥–Ω–∞ —Å—É—â–Ω–æ—Å—Ç—å –ø–æ –ø–æ–∑–∏—Ü–∏–∏)
    –ü—Ä–∏–º–µ—Ä:
        –ü—É—Å—Ç—å y_true —Å–æ–¥–µ—Ä–∂–∏—Ç: [{(0, 5)}, {(10, 15)}] (–¥–≤–µ —Å—É—â–Ω–æ—Å—Ç–∏).
        –ü—É—Å—Ç—å y_pred —Å–æ–¥–µ—Ä–∂–∏—Ç: [{(0, 5)}] (–æ–¥–Ω–∞ —Å—É—â–Ω–æ—Å—Ç—å).
    
    –í —Ä–µ–∑—É–ª—å—Ç–∞—Ç–µ:
        –î–ª—è –ø–µ—Ä–≤–æ–≥–æ –ø—Ä–∏–º–µ—Ä–∞: true_labels –ø–æ–ª—É—á–∏—Ç 1, pred_labels –ø–æ–ª—É—á–∏—Ç 1.
        –î–ª—è –≤—Ç–æ—Ä–æ–≥–æ –ø—Ä–∏–º–µ—Ä–∞: true_labels –ø–æ–ª—É—á–∏—Ç 1, pred_labels –ø–æ–ª—É—á–∏—Ç 0 (–ø–æ—Ç–æ–º—É —á—Ç–æ y_pred –ø—É—Å—Ç–æ–π).
    """
    true_labels = []
    pred_labels = []

    for t_set, p_set in zip(y_true, y_pred):
        true_labels.append(1 if t_set else 0)
        pred_labels.append(1 if p_set else 0)

    f1 = f1_score(true_labels, pred_labels)
    if verbose:
        print(f"F1 –¥–ª—è {label}: {f1:.2f}")
    return f1

### additional features

In [4]:
def load_spacy_data(path):
    doc_bin = DocBin().from_disk(path)
    return list(doc_bin.get_docs(spacy.blank("en").vocab))

In [5]:
import os
#from google.colab import files


# –°–æ—Ö—Ä–∞–Ω–µ–Ω–∏–µ –∏ —Å–∫–∞—á–∏–≤–∞–Ω–∏–µ –º–æ–¥–µ–ª–∏
def save_and_download_model(model, model_name):
    output_dir = f"/content/{model_name}"
    model.to_disk(output_dir)

    # –°–æ–∑–¥–∞—ë–º zip-–∞—Ä—Ö–∏–≤ –¥–ª—è —Å–∫–∞—á–∏–≤–∞–Ω–∏—è
    os.system(f"zip -r {model_name}.zip {output_dir}")
    #files.download(f"{model_name}.zip")
    print(f"–ú–æ–¥–µ–ª—å {model_name} —Å–æ—Ö—Ä–∞–Ω–µ–Ω–∞ –∏ –≥–æ—Ç–æ–≤–∞ –∫ —Å–∫–∞—á–∏–≤–∞–Ω–∏—é.")

## Third Step (Train PreTrained model)

![image.png](attachment:image.png)

`model_name="en_core_web_sm"` - pretrained model


In [7]:
from spacy.training import Example
from sklearn.model_selection import KFold
from tqdm import trange
import random
import numpy as np


def train_with_cv(input_paths, n_iter=10, model_name="en_core_web_sm", n_folds=2, label="PRODUCT"):
    # load folds
    docs = load_spacy_data(input_paths[0])
    kf = KFold(n_splits=n_folds, shuffle=True, random_state=42)
    
    if not docs:
        print("–ù–µ—Ç –¥–æ–∫—É–º–µ–Ω—Ç–æ–≤.")
        return None

    fold_results = []

    for fold, (train_idx, test_idx) in enumerate(kf.split(docs), 1):
        print(f"\n –ù–∞—á–∞–ª–æ –æ–±—É—á–µ–Ω–∏—è –¥–ª—è —Ñ–æ–ª–¥–∞ {fold}/{n_folds}")
        train_docs = [docs[i] for i in train_idx]
        test_docs = [docs[i] for i in test_idx]

        nlp = spacy.load(model_name) # pretrained model

        # add pipe ner
        if "ner" not in nlp.pipe_names: 
            ner = nlp.add_pipe("ner", last=True)
        else:
            ner = nlp.get_pipe("ner")

        ner.add_label(label) 

        # model trained by Examples
        train_examples = [Example(doc, doc) for doc in train_docs] 
        test_examples = [Example(doc, doc) for doc in test_docs]

        optimizer = nlp.resume_training() 
        optimizer.L2 = 0.01

        best_f1 = 0
        for epoch in trange(n_iter, desc=f"–û–±—É—á–µ–Ω–∏–µ —Ñ–æ–ª–¥–∞ {fold}"):
            random.shuffle(train_examples)
            losses = {}
            nlp.update(train_examples, drop=0.3, losses=losses, sgd=optimizer)
            print(f"üìâ –§–æ–ª–¥ {fold} | Epoch {epoch+1} ‚Äî Loss: {losses.get('ner', 0):.4f}")

            current_f1 = evaluate_ner_model(nlp, test_examples, label, verbose=False)
            if current_f1 > best_f1:
                best_f1 = current_f1

        fold_results.append(best_f1)
        print(f"üèÜ –õ—É—á—à–∏–π F1 –¥–ª—è —Ñ–æ–ª–¥–∞ {fold}: {best_f1:.4f}")

    print("\n–†–µ–∑—É–ª—å—Ç–∞—Ç—ã –∫—Ä–æ—Å—Å-–≤–∞–ª–∏–¥–∞—Ü–∏–∏:")
    for fold, f1 in enumerate(fold_results, 1):
        print(f"–§–æ–ª–¥ {fold}: F1 = {f1:.4f}")
    print(f"–°—Ä–µ–¥–Ω–∏–π F1: {np.mean(fold_results):.4f}")

    return nlp

## MAIN

In [None]:
import pandas as pd
import os
from sklearn.model_selection import train_test_split
from spacy.training import Example

def main():
    CSV_PATH = "dataset_result.csv"
    OUTPUT_DIR = "fold_data"
    TEST_SIZE = 0.2
    N_FOLDS = 2
    
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    df = pd.read_csv(CSV_PATH)
    df = df.sample(frac=1, random_state=42).reset_index(drop=True) 

    # train/test
    train_df, test_df = train_test_split(df, test_size=TEST_SIZE, random_state=42)

    # save test .spacy
    test_path = os.path.join(OUTPUT_DIR, "test.spacy")
    mark_and_save_spacy(test_df, test_path)

    # save train .spacy
    train_path = os.path.join(OUTPUT_DIR, "train.spacy")
    mark_and_save_spacy(train_df, train_path)

    trained_nlp = train_with_cv([train_path], 10, N_FOLDS, 42)

    if trained_nlp:
        # save&download model
        trained_nlp.to_disk("trained_model")
        save_and_download_model(trained_nlp, 'trained_model')
        print("–ú–æ–¥–µ–ª—å —Å–æ—Ö—Ä–∞–Ω–µ–Ω–∞ –≤ 'trained_model'")

        # test
        print("\n –¢–µ—Å—Ç–∏—Ä–æ–≤–∞–Ω–∏–µ –º–æ–¥–µ–ª–∏:")
        test_docs = load_spacy_data(test_path)
        test_examples = [Example(doc, doc) for doc in test_docs if len(doc.ents) > 0]

        if test_examples:
            test_f1 = evaluate_ner_model(trained_nlp, test_examples, verbose=True)
            print(f"\n–†–µ–∑—É–ª—å—Ç–∞—Ç—ã –Ω–∞ —Ç–µ—Å—Ç–æ–≤–æ–π –≤—ã–±–æ—Ä–∫–µ:")
            print(f"F1-score: {test_f1:.4f}")
        else:
            print("!!! –ù–µ—Ç –¥–æ–∫—É–º–µ–Ω—Ç–æ–≤ –¥–ª—è —Ç–µ—Å—Ç–∏—Ä–æ–≤–∞–Ω–∏—è")

if __name__ == "__main__":
    main()

![image.png](attachment:image.png)