In [8]:
# Helper packages
import pandas as pd
import json

# NLP related packages
import re
import string
from nltk.tokenize import word_tokenize,sent_tokenize
from nltk.corpus import stopwords
import nltk
from tqdm import tqdm

In [None]:
# Basic text cleaning
def text_cleaning(text: str) -> str:
    text = re.sub(r"\s+", " ", text)  # remove extra spaces
    text = ''.join([k for k in text if k not in string.punctuation])  # remove punctuation
    text = re.sub(r'[^A-Za-z0-9]+', ' ', str(text).lower()).strip()   # alnum + lowercase
    return text

# A lighter cleaning for sentences (preserve punctuation tokens for output, normalize for matching)
def light_clean(text: str) -> str:
    # normalize spaces only; keep punctuation for output tokenization
    return re.sub(r"\s+", " ", text).strip()

def normalize_token(t: str) -> str:
    # per-token normalization for matching: strip punct, lowercase
    t = t.strip().lower()
    t = re.sub(r"^\W+|\W+$", "", t)
    return t


# Expect columns: dataset_label, dataset_title, cleaned_label
df_train = pd.read_csv("../data/mex2/train.csv")

def _safe_clean_series(s: pd.Series):
    return [text_cleaning(x) for x in s.fillna("").astype(str)]

temp_1 = _safe_clean_series(df_train['dataset_label'])
temp_2 = _safe_clean_series(df_train['dataset_title'])
temp_3 = _safe_clean_series(df_train['cleaned_label'])

existing_labels = set([x for x in (temp_1 + temp_2 + temp_3) if x])

print(f"Loaded labels: {len(existing_labels):,}")
# Quick peek
list(sorted(list(existing_labels))[:10])

Loaded labels: 173


['2019 ncov complete genome sequences',
 '2019 ncov genome sequence',
 '2019 ncov genome sequences',
 '2019ncov complete genome sequences',
 '2019ncov genome sequence',
 '2019ncov genome sequences',
 'adni',
 'advanced national seismic system anss comprehensive catalog comcat',
 'advanced national seismic system comprehensive catalog',
 'aging integrated database']

In [12]:
from typing import List, Tuple

def find_longest_label_in_sentence(clean_sentence_for_match: str,
                                   labels_cleaned: List[str]) -> str:
    """
    Returns the *cleaned* longest label whose string appears in the cleaned sentence.
    Uses your STEP-1 cleaning (case-insensitive).
    """
    longest = ""
    for lab in labels_cleaned:
        if lab and lab in clean_sentence_for_match:
            if len(lab) > len(longest):
                longest = lab
    return longest

def schwartz_hearst_abbrev(sent: str):
    """
    Extract long form + short form candidates using Schwartz–Hearst pattern.
    Returns list of (long_form, short_form).
    """
    candidates = []
    pattern = re.compile(r"\(([^)]+)\)")
    for match in pattern.finditer(sent):
        short = match.group(1).strip()
        start = match.start()
        long = sent[:start].strip().split()
        if not long:
            continue
        # Take a window before the parenthesis
        window = " ".join(long[-len(short)*2:])  # heuristic
        candidates.append((window, short))
    return candidates

def bio_tags_for_sentence(tokens: List[str], label_clean: str) -> List[str]:
    """
    Given original tokens (for output) and a single cleaned label string, produce BIO tags.
    - Matching is done on normalized tokens (lowercased, stripped punctuation).
    - Only the first occurrence of the label is tagged (consistent with your earlier logic).
    """
    if not label_clean:
        return ["O"] * len(tokens)

    # Tokenize the label in its cleaned form (split on spaces)
    label_tokens_clean = [t for t in label_clean.split() if t]
    if not label_tokens_clean:
        return ["O"] * len(tokens)

    sent_norm = [normalize_token(t) for t in tokens]
    # Build a normalized version of the label tokens (they're already cleaned)
    lab_norm = label_tokens_clean

    n, m = len(sent_norm), len(lab_norm)
    if m > n:
        return ["O"] * n

    # Sliding-window match
    span_start = -1
    for i in range(0, n - m + 1):
        if sent_norm[i:i+m] == lab_norm:
            span_start = i
            break

    tags = ["O"] * n
    if span_start != -1:
        tags[span_start] = "B"
        for j in range(1, m):
            tags[span_start + j] = "I"
    return tags

In [4]:
def iter_jsonl(jsonl_path: str):
    with open(jsonl_path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            yield json.loads(line)

In [10]:
def jsonl_to_conll(jsonl_path: str,
                   out_path: str,
                   labels_clean_set: set) -> Tuple[int, int, int]:
    """
    Reads a JSONL with at least 'text' (plus your file_name, n_sections) and writes CoNLL:
        TOKEN<TAB>TAG
    Blank line between sentences.
    Returns: (n_docs, n_sents, n_sents_with_BIO)
    """
    n_docs = n_sents = n_tagged = 0

    out_path = Path(out_path)
    out_path.parent.mkdir(parents=True, exist_ok=True)

    with open(out_path, "w", encoding="utf-8") as w:
        for row in tqdm(iter_jsonl(jsonl_path), desc="Docs"):
            n_docs += 1
            text = (row.get("text") or "").strip()
            if not text:
                continue

            # Sentence splitting on the *lightly* cleaned text to avoid random breaks
            sentences = sent_tokenize(light_clean(text))
            
            for sent in sentences:
                tokens = word_tokenize(sent)

                # Candidate labels for this sentence
                candidates = []

                # 1) Gazetteer-based (longest match from known labels)
                clean_sent_for_match = text_cleaning(sent)   # STEP-1 cleaner
                longest_label_clean = find_longest_label_in_sentence(
                    clean_sent_for_match, labels_clean_set
                )
                if longest_label_clean:
                    candidates.append(longest_label_clean)

                # 2) Schwartz–Hearst detection (long-form + short-form acronyms)
                for lf, sf in schwartz_hearst_abbrev(sent):
                    lf_clean = text_cleaning(lf)
                    sf_clean = text_cleaning(sf)
                    if lf_clean:
                        candidates.append(lf_clean)
                    if sf_clean:
                        candidates.append(sf_clean)

                # Pick the longest candidate among all
                max_lab = max(candidates, key=len) if candidates else ""

                # Tag sentence
                tags = bio_tags_for_sentence(tokens, max_lab)

                if "B" in tags:
                    n_tagged += 1
                n_sents += 1

                # Write CoNLL lines
                for tok, tag in zip(tokens, tags):
                    w.write(f"{tok}\t{tag}\n")
                w.write("\n")  # sentence boundary

    return n_docs, n_sents, n_tagged

In [13]:
from pathlib import Path
JSONL_IN  = "../data/mex3/corpus_1000.jsonl"
CONLL_OUT = "../data/mex6/conll_1000.conll"

n_docs, n_sents, n_tagged = jsonl_to_conll(JSONL_IN, CONLL_OUT, existing_labels)
print(f"Done.\n  Docs: {n_docs}\n  Sentences: {n_sents}\n  Sentences with BIO tags: {n_tagged}\n  Saved: {CONLL_OUT}")


Docs: 1000it [01:34, 10.58it/s]

Done.
  Docs: 1000
  Sentences: 346535
  Sentences with BIO tags: 25878
  Saved: ../data/mex6/conll_1000.conll



