# 04 - Silver Data Analysis (Data Quality Report)

This notebook audits the generated silver NER dataset and produces a data quality report suitable for portfolio documentation.

It covers:
- split loading and summary,
- label distribution and co-occurrence,
- entity length behavior,
- source-level differences (Sanadset vs hadith-json),
- gazetteer coverage and long-tail analysis,
- class imbalance flags and mitigation suggestions.


In [1]:
from __future__ import annotations

import json
import sys
from collections import Counter, defaultdict
from itertools import combinations
from pathlib import Path
from typing import Dict, Iterable, List

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from IPython.display import display


def find_project_root(start: Path) -> Path:
    for candidate in [start, *start.parents]:
        if (candidate / 'data').exists() and (candidate / 'notebooks').exists():
            return candidate
    return start


ROOT = find_project_root(Path.cwd().resolve())
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))

from src.preprocessing.normalize import ArabicNormalizer

SILVER_DIR = ROOT / 'data' / 'silver'
SPLIT_PATHS = {
    'train': SILVER_DIR / 'train.json',
    'dev': SILVER_DIR / 'dev.json',
    'test_held_out': SILVER_DIR / 'test_held_out.json',
}
GAZETTEER_PATHS = {
    'SCHOLAR': ROOT / 'data' / 'gazetteers' / 'scholars.txt',
    'BOOK': ROOT / 'data' / 'gazetteers' / 'books.txt',
    'CONCEPT': ROOT / 'data' / 'gazetteers' / 'concepts.txt',
    'PLACE': ROOT / 'data' / 'gazetteers' / 'places.txt',
}
ENTITY_TYPES = ['SCHOLAR', 'BOOK', 'CONCEPT', 'PLACE', 'HADITH_REF']

# Set to an int (e.g. 50000) for fast exploratory runs; keep None for full analysis.
MAX_RECORDS_PER_SPLIT = None
PROGRESS_EVERY = 50_000

normalizer = ArabicNormalizer()

for split_name, path in SPLIT_PATHS.items():
    if not path.exists():
        raise FileNotFoundError(f'Missing split file: {path}')

print('Project root:', ROOT)
print('Silver dir:', SILVER_DIR)
print('Splits:')
for split_name, path in SPLIT_PATHS.items():
    print(f'  - {split_name}: {path}')
print('MAX_RECORDS_PER_SPLIT =', MAX_RECORDS_PER_SPLIT)


Project root: C:\Users\diaab\islamic-ner
Silver dir: C:\Users\diaab\islamic-ner\data\silver
Splits:
  - train: C:\Users\diaab\islamic-ner\data\silver\train.json
  - dev: C:\Users\diaab\islamic-ner\data\silver\dev.json
  - test_held_out: C:\Users\diaab\islamic-ner\data\silver\test_held_out.json
MAX_RECORDS_PER_SPLIT = None


## 1) Load Silver Splits

This section streams `train/dev/test_held_out` and computes all downstream statistics.


In [2]:
def iter_json_array(path: Path, chunk_size: int = 1 << 16):
    """Yield objects from a top-level JSON array (or JSONL fallback) without loading full file."""
    decoder = json.JSONDecoder()

    with path.open('r', encoding='utf-8') as handle:
        buffer = ''

        # Prime buffer until we have a non-whitespace character.
        while True:
            chunk = handle.read(chunk_size)
            if not chunk:
                return
            buffer += chunk
            pos0 = 0
            while pos0 < len(buffer) and buffer[pos0].isspace():
                pos0 += 1
            if pos0 < len(buffer):
                break

        # Fallback: support JSONL files by parsing one JSON object per non-empty line.
        if buffer[pos0] != '[':
            handle.seek(0)
            for line_no, line in enumerate(handle, start=1):
                text = line.strip()
                if not text:
                    continue
                try:
                    yield decoder.decode(text)
                except json.JSONDecodeError as exc:
                    raise ValueError(f'Invalid JSONL record at {path}:{line_no}: {exc}') from exc
            return

        pos = pos0 + 1

        while True:
            # Skip separators between array elements.
            while True:
                if pos >= len(buffer):
                    chunk = handle.read(chunk_size)
                    if not chunk:
                        return
                    buffer += chunk
                    continue

                ch = buffer[pos]
                if ch.isspace() or ch == ',':
                    pos += 1
                    continue
                break

            if ch == ']':
                return

            while True:
                try:
                    obj, end = decoder.raw_decode(buffer, pos)
                    yield obj
                    pos = end

                    # Trim consumed buffer periodically.
                    if pos > 1_000_000:
                        buffer = buffer[pos:]
                        pos = 0
                    break
                except json.JSONDecodeError as exc:
                    chunk = handle.read(chunk_size)
                    if not chunk:
                        context = buffer[max(0, pos - 30): min(len(buffer), pos + 60)]
                        raise ValueError(
                            f'Malformed JSON in {path} near char {pos}: {exc.msg}. Context: {context!r}'
                        ) from exc
                    buffer += chunk


def extract_entities(tokens: List[str], tags: List[str]) -> List[Dict]:
    entities = []
    n = min(len(tokens), len(tags))
    i = 0

    while i < n:
        label = tags[i]
        if label == 'O' or '-' not in label:
            i += 1
            continue

        prefix, entity_type = label.split('-', 1)
        if prefix not in {'B', 'I'}:
            i += 1
            continue

        # Treat malformed starting I-* as entity start for robustness.
        start = i
        i += 1
        while i < n and tags[i] == f'I-{entity_type}':
            i += 1

        end = i
        entities.append(
            {
                'type': entity_type,
                'start': start,
                'end': end,
                'length_tokens': end - start,
                'text': ' '.join(tokens[start:end]),
            }
        )

    return entities


def load_gazetteer_entries(path: Path) -> List[Dict]:
    entries = []
    if not path.exists():
        return entries

    for raw in path.read_text(encoding='utf-8').splitlines():
        line = raw.strip()
        if not line or line.startswith('#'):
            continue

        variants = [v.strip() for v in line.split('|') if v.strip()]
        if not variants:
            continue

        variants_norm = {normalizer.normalize(v) for v in variants if normalizer.normalize(v)}
        if not variants_norm:
            continue

        entries.append(
            {
                'canonical': variants[0],
                'variants': variants,
                'variants_norm': variants_norm,
            }
        )

    return entries


split_sentence_counts = Counter()
source_sentence_counts = Counter()
source_entity_counts = defaultdict(Counter)
source_entity_sum = Counter()
source_entity_n = Counter()

token_label_counts = Counter()
entity_count_by_type = Counter({entity_type: 0 for entity_type in ENTITY_TYPES})

cooccurrence = Counter()

entity_length_sum = Counter()
entity_length_n = Counter()
entity_length_dist = defaultdict(Counter)

observed_entities_by_type = defaultdict(set)
entity_text_counter_by_type = defaultdict(Counter)
representative_surface = defaultdict(dict)

for split_name, split_path in SPLIT_PATHS.items():
    print(f'Processing split: {split_name} -> {split_path.name}')

    for i, record in enumerate(iter_json_array(split_path), start=1):
        if MAX_RECORDS_PER_SPLIT is not None and i > MAX_RECORDS_PER_SPLIT:
            break

        tokens = record.get('tokens') or []
        tags = record.get('ner_tags') or []
        if not isinstance(tokens, list) or not isinstance(tags, list):
            continue

        n = min(len(tokens), len(tags))
        tokens = tokens[:n]
        tags = tags[:n]

        source = str(record.get('source', 'unknown'))

        split_sentence_counts[split_name] += 1
        source_sentence_counts[source] += 1
        token_label_counts.update(tags)

        entities = extract_entities(tokens, tags)
        source_entity_sum[source] += len(entities)
        source_entity_n[source] += 1

        sentence_types = sorted({e['type'] for e in entities if e['type'] in ENTITY_TYPES})
        for entity_type in sentence_types:
            cooccurrence[(entity_type, entity_type)] += 1
        for left, right in combinations(sentence_types, 2):
            cooccurrence[(left, right)] += 1
            cooccurrence[(right, left)] += 1

        for entity in entities:
            entity_type = entity['type']
            if entity_type not in ENTITY_TYPES:
                continue

            entity_count_by_type[entity_type] += 1
            source_entity_counts[source][entity_type] += 1

            length_tokens = int(entity['length_tokens'])
            entity_length_sum[entity_type] += length_tokens
            entity_length_n[entity_type] += 1
            entity_length_dist[entity_type][length_tokens] += 1

            normalized_entity_text = normalizer.normalize(entity['text'])
            if not normalized_entity_text:
                continue

            observed_entities_by_type[entity_type].add(normalized_entity_text)
            entity_text_counter_by_type[entity_type][normalized_entity_text] += 1
            representative_surface[entity_type].setdefault(normalized_entity_text, entity['text'])

        if i % PROGRESS_EVERY == 0:
            print(f'  {split_name}: processed {i:,} records...')


entity_counts_series = pd.Series({entity_type: entity_count_by_type[entity_type] for entity_type in ENTITY_TYPES})

split_counts_df = pd.DataFrame(
    {'split': list(split_sentence_counts.keys()), 'sentences': list(split_sentence_counts.values())}
).sort_values('split')

total_tokens = int(sum(token_label_counts.values()))
o_tokens = int(token_label_counts.get('O', 0))
entity_tokens = int(total_tokens - o_tokens)

token_balance_df = pd.DataFrame(
    [
        {'group': 'O', 'tokens': o_tokens, 'pct': (o_tokens / total_tokens * 100) if total_tokens else 0.0},
        {'group': 'Entity', 'tokens': entity_tokens, 'pct': (entity_tokens / total_tokens * 100) if total_tokens else 0.0},
    ]
)

label_distribution_df = pd.DataFrame(
    [
        {
            'label': label,
            'count': count,
            'pct': (count / total_tokens * 100) if total_tokens else 0.0,
        }
        for label, count in token_label_counts.items()
    ]
).sort_values('count', ascending=False)

cooccur_df = pd.DataFrame(0, index=ENTITY_TYPES, columns=ENTITY_TYPES, dtype=int)
for (left, right), value in cooccurrence.items():
    if left in cooccur_df.index and right in cooccur_df.columns:
        cooccur_df.loc[left, right] = int(value)

source_entity_df = pd.DataFrame(index=sorted(source_sentence_counts.keys()), columns=ENTITY_TYPES).fillna(0)
for source in source_entity_df.index:
    for entity_type in ENTITY_TYPES:
        source_entity_df.loc[source, entity_type] = int(source_entity_counts[source][entity_type])
source_entity_df = source_entity_df.astype(int)

source_summary_df = pd.DataFrame(
    [
        {
            'source': source,
            'sentences': int(source_sentence_counts[source]),
            'entities_total': int(source_entity_sum[source]),
            'avg_entities_per_sentence': (
                source_entity_sum[source] / source_entity_n[source] if source_entity_n[source] else 0.0
            ),
        }
        for source in sorted(source_sentence_counts.keys())
    ]
)

entity_length_avg_df = pd.DataFrame(
    [
        {
            'entity_type': entity_type,
            'avg_length_tokens': (
                entity_length_sum[entity_type] / entity_length_n[entity_type] if entity_length_n[entity_type] else 0.0
            ),
            'entity_count': int(entity_length_n[entity_type]),
        }
        for entity_type in ENTITY_TYPES
    ]
)

gazetteer_entries = {
    entity_type: load_gazetteer_entries(path)
    for entity_type, path in GAZETTEER_PATHS.items()
}

gazetteer_coverage_rows = []
for entity_type in ['SCHOLAR', 'BOOK', 'CONCEPT', 'PLACE']:
    entries = gazetteer_entries.get(entity_type, [])
    observed = observed_entities_by_type[entity_type]

    covered = 0
    for entry in entries:
        if entry['variants_norm'] & observed:
            covered += 1

    total_entries = len(entries)
    gazetteer_coverage_rows.append(
        {
            'entity_type': entity_type,
            'gazetteer_entries': total_entries,
            'covered_entries': covered,
            'coverage_pct': (covered / total_entries * 100) if total_entries else 0.0,
            'unique_entities_observed_in_data': len(observed),
        }
    )

gazetteer_coverage_df = pd.DataFrame(gazetteer_coverage_rows)

top20_by_type = {}
singleton_rows = []
for entity_type in ENTITY_TYPES:
    counter = entity_text_counter_by_type[entity_type]
    representative = representative_surface[entity_type]

    top_rows = []
    for rank, (normalized_text, count) in enumerate(counter.most_common(20), start=1):
        top_rows.append(
            {
                'rank': rank,
                'entity_normalized': normalized_text,
                'example_surface': representative.get(normalized_text, normalized_text),
                'count': int(count),
            }
        )

    top20_by_type[entity_type] = pd.DataFrame(top_rows)

    unique_entities = len(counter)
    singleton_count = sum(1 for value in counter.values() if value == 1)
    singleton_rows.append(
        {
            'entity_type': entity_type,
            'unique_entities': unique_entities,
            'singletons': singleton_count,
            'singleton_pct': (singleton_count / unique_entities * 100) if unique_entities else 0.0,
        }
    )

singleton_df = pd.DataFrame(singleton_rows)

total_entities = int(sum(entity_counts_series.values))
entity_share_df = pd.DataFrame(
    {
        'entity_type': ENTITY_TYPES,
        'count': [int(entity_counts_series[entity_type]) for entity_type in ENTITY_TYPES],
    }
)
entity_share_df['pct'] = np.where(total_entities > 0, entity_share_df['count'] / total_entities * 100, 0.0)

scholar_share = float(entity_share_df.loc[entity_share_df['entity_type'] == 'SCHOLAR', 'pct'].iloc[0])
book_share = float(entity_share_df.loc[entity_share_df['entity_type'] == 'BOOK', 'pct'].iloc[0])
imbalance_flag = scholar_share >= 80.0 and book_share <= 2.0

print()
print('Completed streaming analysis.')
print(f'Total sentences analyzed: {sum(split_sentence_counts.values()):,}')
print(f'Total tokens analyzed: {total_tokens:,}')
print(f'Total entities analyzed: {total_entities:,}')

display(split_counts_df)
display(source_summary_df)


Processing split: train -> train.json


KeyboardInterrupt: 

## 2) Label Distribution Analysis

- Entity type frequencies
- `O` vs entity token share
- Entity type co-occurrence matrix


In [None]:
display(entity_share_df.sort_values('count', ascending=False))
display(token_balance_df)
display(label_distribution_df.head(20))

fig, ax = plt.subplots(figsize=(8, 4))
entity_share_df.set_index('entity_type')['count'].reindex(ENTITY_TYPES).plot(kind='bar', ax=ax, color='#2E86AB')
ax.set_title('Entity Type Frequencies (Span Counts)')
ax.set_xlabel('Entity type')
ax.set_ylabel('Count')
ax.grid(axis='y', alpha=0.25)
plt.xticks(rotation=0)
plt.show()

fig, ax = plt.subplots(figsize=(5, 4))
ax.bar(token_balance_df['group'], token_balance_df['pct'], color=['#9E9E9E', '#3F8EFC'])
ax.set_title('Token Share: O vs Entity')
ax.set_ylabel('Percentage (%)')
ax.set_ylim(0, 100)
for i, value in enumerate(token_balance_df['pct']):
    ax.text(i, value + 1, f'{value:.2f}%', ha='center', fontsize=10)
ax.grid(axis='y', alpha=0.25)
plt.show()

display(cooccur_df)

fig, ax = plt.subplots(figsize=(7, 6))
im = ax.imshow(cooccur_df.values, cmap='YlGnBu')
ax.set_xticks(range(len(ENTITY_TYPES)))
ax.set_xticklabels(ENTITY_TYPES, rotation=45, ha='right')
ax.set_yticks(range(len(ENTITY_TYPES)))
ax.set_yticklabels(ENTITY_TYPES)
ax.set_title('Entity Type Co-occurrence (Sentence-level)')

for i in range(len(ENTITY_TYPES)):
    for j in range(len(ENTITY_TYPES)):
        ax.text(j, i, str(int(cooccur_df.iloc[i, j])), ha='center', va='center', fontsize=8)

fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
plt.tight_layout()
plt.show()


## 3) Entity Length Analysis

- Average token length per type
- Distribution of lengths per entity type


In [None]:
display(entity_length_avg_df)

fig, ax = plt.subplots(figsize=(8, 4))
entity_length_avg_df.set_index('entity_type')['avg_length_tokens'].reindex(ENTITY_TYPES).plot(
    kind='bar',
    ax=ax,
    color='#E07A5F',
)
ax.set_title('Average Entity Length by Type (tokens)')
ax.set_xlabel('Entity type')
ax.set_ylabel('Average length')
ax.grid(axis='y', alpha=0.25)
plt.xticks(rotation=0)
plt.show()


def bucket_lengths(counter: Counter, max_bucket: int = 8) -> Dict[str, int]:
    buckets = Counter()
    for length, count in counter.items():
        key = str(length) if length <= max_bucket else f'{max_bucket + 1}+'
        buckets[key] += count
    return dict(buckets)


fig, axes = plt.subplots(2, 3, figsize=(14, 8))
axes = axes.flatten()

for idx, entity_type in enumerate(ENTITY_TYPES):
    ax = axes[idx]
    buckets = bucket_lengths(entity_length_dist[entity_type], max_bucket=8)

    labels = [str(i) for i in range(1, 9)] + ['9+']
    values = [buckets.get(label, 0) for label in labels]

    ax.bar(labels, values, color='#81B29A')
    ax.set_title(entity_type)
    ax.set_xlabel('Entity length (tokens)')
    ax.set_ylabel('Count')
    ax.grid(axis='y', alpha=0.2)

# Hide extra subplot
axes[-1].axis('off')

plt.suptitle('Entity Length Distributions by Type', y=1.02, fontsize=14)
plt.tight_layout()
plt.show()


## 4) Source Comparison

Compare entity profiles between `sanadset` and `hadith_json` sourced sentences.


In [None]:
display(source_summary_df)
display(source_entity_df)

fig, ax = plt.subplots(figsize=(9, 5))
source_entity_df.reindex(columns=ENTITY_TYPES).plot(kind='bar', stacked=True, ax=ax)
ax.set_title('Entity Counts by Source')
ax.set_xlabel('Source')
ax.set_ylabel('Entity count')
ax.legend(title='Entity type', bbox_to_anchor=(1.02, 1), loc='upper left')
ax.grid(axis='y', alpha=0.25)
plt.tight_layout()
plt.show()

source_entity_rate_df = source_entity_df.div(source_entity_df.sum(axis=1), axis=0).fillna(0) * 100
display(source_entity_rate_df)

if {'sanadset', 'hadith_json'}.issubset(set(source_entity_df.index)):
    sanadset_row = source_entity_df.loc['sanadset']
    hadith_row = source_entity_df.loc['hadith_json']

    sanadset_scholar_per_sentence = sanadset_row['SCHOLAR'] / max(source_summary_df.loc[source_summary_df['source'] == 'sanadset', 'sentences'].iloc[0], 1)
    hadith_scholar_per_sentence = hadith_row['SCHOLAR'] / max(source_summary_df.loc[source_summary_df['source'] == 'hadith_json', 'sentences'].iloc[0], 1)

    print('SCHOLAR entities per sentence:')
    print(f"  sanadset: {sanadset_scholar_per_sentence:.3f}")
    print(f"  hadith_json: {hadith_scholar_per_sentence:.3f}")


## 5) Coverage Analysis

- Gazetteer coverage in actual silver entities
- Top 20 entities per type
- Singleton analysis (entities seen once)


In [None]:
display(gazetteer_coverage_df)
display(singleton_df)

for entity_type in ENTITY_TYPES:
    print()
    print('=' * 80)
    print(f'Top 20 entities: {entity_type}')
    if top20_by_type[entity_type].empty:
        print('No entities observed.')
    else:
        display(top20_by_type[entity_type])


## 6) Potential Issues and Mitigation

Flags imbalance risks and proposes practical mitigation options for training.


In [None]:
display(entity_share_df)

print('Class imbalance checks:')
print(f"- SCHOLAR share: {scholar_share:.2f}%")
print(f"- BOOK share: {book_share:.2f}%")
print(f"- Imbalance flag (SCHOLAR >= 80% and BOOK <= 2%): {imbalance_flag}")

rare_type_rows = entity_share_df[entity_share_df['pct'] < 5.0].copy()
if rare_type_rows.empty:
    print()
    print('No entity type under 5%.')
else:
    print()
    print('Entity types under 5% of all entities:')
    display(rare_type_rows.sort_values('pct'))

print()
print('Suggested mitigation actions:')
mitigations = []

if imbalance_flag:
    mitigations.append('Oversample sentences containing BOOK entities during training batches.')
    mitigations.append('Use class-weighted loss or focal loss to reduce SCHOLAR dominance.')
    mitigations.append('Track per-class F1 and accept lower BOOK F1 only with explicit reporting.')

if not rare_type_rows.empty:
    mitigations.append('Create targeted mini-corpora for low-frequency classes (BOOK/PLACE/HADITH_REF).')

if singleton_df['singleton_pct'].max() > 60:
    mitigations.append('High singleton tail detected: prefer subword-aware models and contextual augmentation.')

if not mitigations:
    mitigations.append('Current class balance is acceptable; proceed with standard stratified training.')

for idx, item in enumerate(mitigations, start=1):
    print(f'{idx}. {item}')

print()
print('Data quality report cell completed.')
