In [1]:
# %% [markdown]
# IMDB Sentiment (PyTorch, CPU-only, no torchtext) — RNN/LSTM with GloVe + trainable embeddings
# Tasks satisfied:
#   1) GloVe + Vanilla RNN
#   2) GloVe + LSTM
#   3) Repeat [1] & [2] with on-the-fly trainable embeddings (nn.Embedding)

# ---------- Environment: ensure CPU-only Torch 2.3.1 (avoids Windows autograd/optimizer quirks) ----------
import os, sys, subprocess, importlib
def _pip_install(pkgs, extra_args=None):
    cmd = [sys.executable, "-m", "pip", "install", "-q", "-U", *pkgs]
    if extra_args: cmd.extend(extra_args)
    subprocess.check_call(cmd)

def ensure_cpu_torch():
    try:
        import torch
        ver = getattr(torch, "__version__", "")
        has_cuda = getattr(torch.version, "cuda", None) is not None
        if has_cuda or not ver.startswith("2.3.1"):
            raise RuntimeError("Switching to CPU wheels for stability")
    except Exception:
        subprocess.call([sys.executable, "-m", "pip", "uninstall", "-y", "torch", "torchvision", "torchaudio"])
        subprocess.call([sys.executable, "-m", "pip", "cache", "purge"])
        subprocess.check_call([sys.executable, "-m", "pip", "install",
                               "--index-url", "https://download.pytorch.org/whl/cpu",
                               "torch==2.3.1+cpu", "torchvision==0.18.1+cpu", "torchaudio==2.3.1+cpu"])
        # Restart so the new wheels load cleanly
        os._exit(0)

ensure_cpu_torch()

# ---------- Deps ----------
for p in ["kagglehub", "pandas", "scikit-learn"]:
    try: importlib.import_module(p if p!="scikit-learn" else "sklearn")
    except ImportError: _pip_install([p])

# ---------- Imports ----------
import re, time, random
from pathlib import Path
from typing import List, Dict, Tuple

import numpy as np
import pandas as pd
import kagglehub

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import f1_score, roc_auc_score, accuracy_score

# ---------- Config ----------
SEED           = 42
BATCH_SIZE     = 256
EPOCHS         = 2              # bump to 3–5 for better scores
MAX_LEN        = 150
EMB_DIM        = 100            # GloVe 6B-100d
HIDDEN_DIM     = 128
LAYERS         = 1
BIDIRECTIONAL  = True
DROPOUT        = 0.2
MIN_FREQ       = 2
LR             = 2e-3
NUM_WORKERS    = 0
DEVICE         = torch.device("cpu")  # stay CPU-only for stability
SAVE_DIR       = Path("./artifacts_pt_cpu"); SAVE_DIR.mkdir(parents=True, exist_ok=True)

# Small speed nicety
if hasattr(torch, "set_float32_matmul_precision"):
    try: torch.set_float32_matmul_precision("high")
    except: pass

def set_seed(seed=SEED):
    random.seed(seed); np.random.seed(seed)
    torch.manual_seed(seed)

set_seed(SEED)

# ---------- Basic text utils ----------
def basic_english(text: str) -> List[str]:
    return re.findall(r"[a-z0-9']+", str(text).lower())

def pad_sequences(seqs, pad_idx=0):
    lengths = torch.tensor([max(1, len(s)) for s in seqs], dtype=torch.long)
    max_len = int(lengths.max().item()) if len(lengths) else 1
    out = torch.full((len(seqs), max_len), pad_idx, dtype=torch.long)
    for i, s in enumerate(seqs):
        if len(s): out[i, :len(s)] = torch.tensor(s, dtype=torch.long)
    return out, lengths

def bucket_sort_by_length(examples, bucket_size=50):
    buckets = {}
    for ex in examples:
        L = len(ex["input_ids"]); key = (L // bucket_size) * bucket_size
        buckets.setdefault(key, []).append(ex)
    ordered = []
    for k in sorted(buckets.keys()):
        ordered.extend(buckets[k])
    return ordered

# ---------- Data ----------
def load_imdb():
    print("[info] Downloading IMDB via kagglehub…")
    path = kagglehub.dataset_download("lakshmi25npathi/imdb-dataset-of-50k-movie-reviews")
    print("[info] kagglehub path:", path)
    csv_path = Path(path) / "IMDB Dataset.csv"
    df = pd.read_csv(csv_path)
    df = df.sample(frac=1.0, random_state=123).reset_index(drop=True)
    train_df = df.iloc[:40000].reset_index(drop=True)
    val_df   = df.iloc[40000:45000].reset_index(drop=True)
    test_df  = df.iloc[45000:].reset_index(drop=True)
    return train_df, val_df, test_df

train_df, val_df, test_df = load_imdb()

def build_vocab(tokenizer, texts, min_freq=MIN_FREQ, specials=("<pad>", "<unk>")) -> Dict[str,int]:
    freq = {}
    for t in texts:
        for tok in tokenizer(t):
            if re.search(r"[a-z0-9']", tok):
                freq[tok] = freq.get(tok, 0) + 1
    vocab = {specials[0]:0, specials[1]:1}
    idx = len(vocab)
    for tok, c in sorted(freq.items(), key=lambda x: (-x[1], x[0])):
        if c >= min_freq and tok not in vocab:
            vocab[tok] = idx; idx += 1
    return vocab

print("[info] Building vocabulary…")
vocab = build_vocab(basic_english, train_df["review"].tolist(), min_freq=MIN_FREQ)
pad_idx, unk_idx = vocab["<pad>"], vocab["<unk>"]
print(f"[info] Vocab size: {len(vocab):,}")

class IMDBDataset(Dataset):
    def __init__(self, rows, max_len, vocab, tokenizer):
        self.vocab = vocab
        self.max_len = max_len
        self.tokenizer = tokenizer
        self.pad_idx = vocab["<pad>"]; self.unk_idx = vocab["<unk>"]
        self.data = []
        for _, r in rows.iterrows():
            text = str(r["review"])
            tokens = [t for t in tokenizer(text) if re.search(r"[a-z0-9']", t)]
            tokens = tokens[:max_len]
            if not tokens: tokens = ["<unk>"]
            ids = [vocab.get(t, self.unk_idx) for t in tokens]
            label = 1 if str(r["sentiment"]).lower() == "positive" else 0
            self.data.append({"input_ids": ids, "label": label})
        self.data = bucket_sort_by_length(self.data, bucket_size=50)
    def __len__(self): return len(self.data)
    def __getitem__(self, idx):
        item = self.data[idx]
        return item["input_ids"], item["label"]
    def collate(self, batch):
        seqs, labels = zip(*batch)
        x, lengths = pad_sequences(seqs, pad_idx=self.pad_idx)
        y = torch.tensor(labels, dtype=torch.float32)
        return x, lengths, y

train_ds = IMDBDataset(train_df, MAX_LEN, vocab, basic_english)
val_ds   = IMDBDataset(val_df,   MAX_LEN, vocab, basic_english)
test_ds  = IMDBDataset(test_df,  MAX_LEN, vocab, basic_english)

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,
                          collate_fn=train_ds.collate, num_workers=NUM_WORKERS)
val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False,
                          collate_fn=val_ds.collate, num_workers=NUM_WORKERS)
test_loader  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False,
                          collate_fn=test_ds.collate, num_workers=NUM_WORKERS)

# ---------- (Optional) GloVe ----------
def maybe_load_glove_100d():
    slugs = [
        "danielwillgeorge/glove6b100dtxt",
        "parthplc/glove6b100dtxt",
        "anindya2906/glove6b",
    ]
    for slug in slugs:
        try:
            print(f"[info] Trying GloVe via kagglehub: {slug}")
            gpath = kagglehub.dataset_download(slug)
            for name in ["glove.6B.100d.txt", "glove.6b.100d.txt"]:
                txt = Path(gpath) / name
                if txt.exists():
                    stoi, vecs = {}, []
                    with open(txt, "r", encoding="utf-8") as f:
                        for line in f:
                            parts = line.rstrip().split(" ")
                            if len(parts) < 101:  # word + 100 dims
                                continue
                            w = parts[0]
                            vec = np.asarray(parts[1:], dtype=np.float32)
                            if vec.shape[0] != 100: continue
                            stoi[w] = len(stoi)
                            vecs.append(torch.from_numpy(vec))
                    if vecs:
                        print(f"[info] Loaded {len(stoi):,} GloVe tokens.")
                        return stoi, torch.stack(vecs)
        except Exception as e:
            print(f"[warn] GloVe fetch failed for {slug}: {e}")
    print("[warn] Could not load GloVe 6B-100d. Will skip GloVe-based models.")
    return None, None

def build_embedding_matrix(vocab, glove_stoi, glove_vecs, dim=100):
    mat = torch.randn(len(vocab), dim) * 0.05
    mat[vocab["<pad>"]] = torch.zeros(dim)
    if glove_stoi is None or glove_vecs is None:
        return mat
    hit = 0
    for tok, idx in vocab.items():
        gi = glove_stoi.get(tok)
        if gi is not None:
            mat[idx] = glove_vecs[gi]; hit += 1
    print(f"[info] GloVe coverage: {hit}/{len(vocab)} = {100.0*hit/len(vocab):.2f}%")
    return mat

glove_stoi, glove_vecs = maybe_load_glove_100d()
emb_matrix = None
if glove_stoi is not None and glove_vecs is not None:
    emb_matrix = build_embedding_matrix(vocab, glove_stoi, glove_vecs, dim=EMB_DIM)

# ---------- Models ----------
class RNNClassifier(nn.Module):
    def __init__(self, vocab_size, emb_dim, hidden_dim, num_layers, bidirectional,
                 dropout, pad_idx, pretrained_emb=None, trainable_embed=False, cell="rnn"):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=pad_idx)
        if pretrained_emb is not None:
            with torch.no_grad(): self.embedding.weight.copy_(pretrained_emb)
        self.embedding.weight.requires_grad = trainable_embed
        rnn_drop = dropout if num_layers > 1 else 0.0
        if cell == "rnn":
            self.rnn = nn.RNN(emb_dim, hidden_dim, num_layers=num_layers,
                              batch_first=True, bidirectional=bidirectional, dropout=rnn_drop)
        elif cell == "lstm":
            self.rnn = nn.LSTM(emb_dim, hidden_dim, num_layers=num_layers,
                               batch_first=True, bidirectional=bidirectional, dropout=rnn_drop)
        else:
            raise ValueError("cell must be 'rnn' or 'lstm'")
        self.cell = cell
        self.dropout = nn.Dropout(dropout)
        out_dim = hidden_dim * (2 if bidirectional else 1)
        self.fc = nn.Linear(out_dim, 1)
    def forward(self, x, lengths):
        emb = self.embedding(x)
        packed = nn.utils.rnn.pack_padded_sequence(emb, lengths.cpu(), batch_first=True, enforce_sorted=False)
        if self.cell == "lstm":
            _, (h, _) = self.rnn(packed)
        else:
            _, h = self.rnn(packed)
        if self.rnn.bidirectional:
            h = torch.cat([h[-2], h[-1]], dim=1)
        else:
            h = h[-1]
        h = self.dropout(h)
        return self.fc(h).squeeze(1)

# ---------- Train/Eval ----------
@torch.no_grad()
def evaluate(model, loader, device):
    model.eval()
    ys, ps, losses = [], [], []
    criterion = nn.BCEWithLogitsLoss()
    for x, lengths, y in loader:
        x, lengths, y = x.to(device), lengths.to(device), y.to(device)
        logits = model(x, lengths)
        loss = criterion(logits, y)
        probs = torch.sigmoid(logits).cpu().numpy()
        ys.extend(y.cpu().numpy().tolist()); ps.extend(probs.tolist()); losses.append(loss.item())
    ys = np.array(ys); ps = np.array(ps)
    preds = (ps >= 0.5).astype(int)
    acc = accuracy_score(ys, preds)
    f1  = f1_score(ys, preds)
    try: auroc = roc_auc_score(ys, ps)
    except: auroc = float("nan")
    return float(np.mean(losses)), acc, f1, auroc

def train_one(model, train_loader, val_loader, device, epochs=EPOCHS, lr=LR,
              max_grad_norm=1.0, early_stopping_patience=2):
    optimizer = torch.optim.SGD(filter(lambda p: p.requires_grad, model.parameters()),
                                lr=lr, momentum=0.9, nesterov=True)
    criterion = nn.BCEWithLogitsLoss()
    best_f1, best_state, no_improve = -1.0, None, 0
    for epoch in range(1, epochs+1):
        model.train(); losses=[]
        for x, lengths, y in train_loader:
            x, lengths, y = x.to(device), lengths.to(device), y.to(device)
            optimizer.zero_grad(set_to_none=True)
            logits = model(x, lengths)
            loss = criterion(logits, y)
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
            optimizer.step()
            losses.append(loss.item())
        val_loss, val_acc, val_f1, val_auroc = evaluate(model, val_loader, device)
        print(f"[epoch {epoch:02d}] train_loss={np.mean(losses):.4f} | "
              f"val_loss={val_loss:.4f} acc={val_acc:.4f} f1={val_f1:.4f} auroc={val_auroc:.4f}")
        if val_f1 > best_f1:
            best_f1 = val_f1
            best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}
            no_improve = 0
        else:
            no_improve += 1
            if no_improve >= early_stopping_patience:
                print("[info] Early stopping."); break
    if best_state is not None: model.load_state_dict(best_state)
    return model

# ---------- Orchestrate 4 runs ----------
configs = []
if emb_matrix is not None:
    configs.append(("RNN + GloVe",  dict(cell="rnn",  pretrained=emb_matrix, trainable=False)))
    configs.append(("LSTM + GloVe", dict(cell="lstm", pretrained=emb_matrix, trainable=False)))
else:
    print("[warn] Skipping GloVe models (vectors unavailable).")
configs.append(("RNN + Trainable",  dict(cell="rnn",  pretrained=None, trainable=True)))
configs.append(("LSTM + Trainable", dict(cell="lstm", pretrained=None, trainable=True)))

results = []
for name, cfg in configs:
    print("\n" + "="*72); print(f"[run] {name}"); print("="*72)
    model = RNNClassifier(
        vocab_size=len(vocab),
        emb_dim=EMB_DIM,
        hidden_dim=HIDDEN_DIM,
        num_layers=LAYERS,
        bidirectional=BIDIRECTIONAL,
        dropout=DROPOUT,
        pad_idx=pad_idx,
        pretrained_emb=cfg["pretrained"],
        trainable_embed=cfg["trainable"],
        cell=cfg["cell"],
    ).to(DEVICE)

    model = train_one(model, train_loader, val_loader, DEVICE, epochs=EPOCHS, lr=LR,
                      max_grad_norm=1.0, early_stopping_patience=2)
    tl, ta, tf1, auc = evaluate(model, test_loader, DEVICE)
    print(f"[test] loss={tl:.4f} acc={ta:.4f} f1={tf1:.4f} auroc={auc:.4f}")

    out_path = SAVE_DIR / (name.replace(" ","_").replace("+","").replace("/","-") + ".pt")
    torch.save({"model_state": model.state_dict(), "vocab": vocab}, out_path)
    print(f"[info] Saved -> {out_path}")
    results.append({"model": name, "test_loss": round(tl,4), "test_acc": round(ta,4),
                    "test_f1": round(tf1,4), "test_auroc": round(auc,4)})

print("\n=== Summary ===")
print(pd.DataFrame(results).to_string(index=False))


  from .autonotebook import tqdm as notebook_tqdm


[info] Downloading IMDB via kagglehub…
[info] kagglehub path: C:\Users\jaypr\.cache\kagglehub\datasets\lakshmi25npathi\imdb-dataset-of-50k-movie-reviews\versions\1
[info] Building vocabulary…
[info] Vocab size: 64,729
[info] Trying GloVe via kagglehub: danielwillgeorge/glove6b100dtxt
[info] Loaded 400,000 GloVe tokens.
[info] GloVe coverage: 52213/64729 = 80.66%

[run] RNN + GloVe
[epoch 01] train_loss=0.6928 | val_loss=0.6890 acc=0.5356 f1=0.4708 auroc=0.5585
[epoch 02] train_loss=0.6864 | val_loss=0.6823 acc=0.5668 f1=0.6314 auroc=0.5916
[test] loss=0.6852 acc=0.5578 f1=0.6243 auroc=0.5826
[info] Saved -> artifacts_pt_cpu\RNN__GloVe.pt

[run] LSTM + GloVe
[epoch 01] train_loss=0.6920 | val_loss=0.6912 acc=0.5476 f1=0.5000 auroc=0.5718
[epoch 02] train_loss=0.6906 | val_loss=0.6900 acc=0.5612 f1=0.5302 auroc=0.5888
[test] loss=0.6908 acc=0.5476 f1=0.5138 auroc=0.5700
[info] Saved -> artifacts_pt_cpu\LSTM__GloVe.pt

[run] RNN + Trainable
[epoch 01] train_loss=0.6974 | val_loss=0.6914 a

In [10]:


# ======== SET YOUR CSV PATH HERE ========
CSV_PATH = r"C:/Users/jaypr/Downloads/date_parser_testcases (1).csv"
# Example:
# CSV_PATH = r"C:\Users\you\Downloads\date_parser_testcases.csv"
# ========================================

import re
import csv
import os
from datetime import datetime

# -----------------------------
# Month dictionary & utilities
# -----------------------------
_MONTHS = {
    "january": 1, "jan": 1,
    "february": 2, "feb": 2,
    "march": 3, "mar": 3,
    "april": 4, "apr": 4,
    "may": 5,
    "june": 6, "jun": 6,
    "july": 7, "jul": 7,
    "august": 8, "aug": 8,
    "september": 9, "sept": 9, "sep": 9,
    "october": 10, "oct": 10,
    "november": 11, "nov": 11,
    "december": 12, "dec": 12,
}
# Build a regex group for months (longer keys first to avoid partial greedy matches)
_MONTH_KEYS_ORDERED = sorted(_MONTHS.keys(), key=len, reverse=True)
_MONTH_RE = r"(?:%s)" % "|".join(map(re.escape, _MONTH_KEYS_ORDERED))

# Common ordinal suffixes
_ORDINAL_SUFFIX_RE = r"(?:st|nd|rd|th)"

# -----------------------------
# Helpers
# -----------------------------
def _to_int(s):
    try:
        return int(s)
    except Exception:
        return None

def _expand_year(y):
    """Expand 2-digit year to 4-digit using a simple rule:
       00–49 → 2000–2049, 50–99 → 1950–1999
    """
    if y is None:
        return None
    if y >= 100:
        return y
    return 2000 + y if y <= 49 else 1900 + y

def _valid_date(y, m, d):
    try:
        datetime(y, m, d)
        return True
    except ValueError:
        return False

def _fmt_ddmmyyyy(y, m, d):
    return f"{d:02d}/{m:02d}/{y:04d}"

def _month_to_num(s):
    if s is None:
        return None
    key = s.strip().lower().rstrip(".")  # allow trailing period in "Sep."
    return _MONTHS.get(key)

def _strip_ordinals(text):
    # Replace "21st", "3rd" → "21", "3"
    return re.sub(rf"\b(\d{{1,2}}){_ORDINAL_SUFFIX_RE}\b", r"\1", text, flags=re.IGNORECASE)

# -----------------------------
# Regex patterns (compiled)
# -----------------------------
FLAGS = re.IGNORECASE

# Textual month patterns
PAT_TXT_DMY = re.compile(
    rf"\b(?P<d>\d{{1,2}})(?:{_ORDINAL_SUFFIX_RE})?(?:\s+of)?[\s,\-\/]*"
    rf"(?P<month>{_MONTH_RE})\.?[\s,\-\/]*,?\s*(?:'|’)?(?P<y>\d{{2,4}})\b", FLAGS
)

PAT_TXT_MDY = re.compile(
    rf"\b(?P<month>{_MONTH_RE})\.?[\s,\-\/]*"
    rf"(?P<d>\d{{1,2}})(?:{_ORDINAL_SUFFIX_RE})?[\s,\-\/]*,?\s*(?:'|’)?(?P<y>\d{{2,4}})\b", FLAGS
)

PAT_TXT_YMD = re.compile(
    rf"\b(?P<y>\d{{4}})[\s,\-\/]+(?P<month>{_MONTH_RE})\.?[\s,\-\/]+"
    rf"(?P<d>\d{{1,2}})(?:{_ORDINAL_SUFFIX_RE})?\b", FLAGS
)

# ISO-like numeric: YYYY-MM-DD / YYYY/MM/DD / YYYY.MM.DD
PAT_ISO_YMD = re.compile(
    r"\b(?P<y>\d{4})[./-](?P<m>\d{1,2})[./-](?P<d>\d{1,2})\b"
)

# Numeric D/M/Y with separators ./- or spaces
PAT_NUM_DMY = re.compile(
    r"\b(?P<d>\d{1,2})[.\-\/ ](?P<m>\d{1,2})[.\-\/ ](?P<y>\d{2,4})\b"
)

# Numeric M/D/Y with separators ./- or spaces
PAT_NUM_MDY = re.compile(
    r"\b(?P<m>\d{1,2})[.\-\/ ](?P<d>\d{1,2})[.\-\/ ](?P<y>\d{2,4})\b"
)

# Compact textual like "21Jun2024" or "Jun21'24"
PAT_TXT_COMPACT_DMY = re.compile(
    rf"\b(?P<d>\d{{1,2}})(?P<month>{_MONTH_RE})\.?(?:'|’)?(?P<y>\d{{2,4}})\b", FLAGS
)
PAT_TXT_COMPACT_MDY = re.compile(
    rf"\b(?P<month>{_MONTH_RE})\.?(?P<d>\d{{1,2}})(?:'|’)?(?P<y>\d{{2,4}})\b", FLAGS
)

# -----------------------------
# Core parsing logic
# -----------------------------
def _try_build_date_from_parts(y_str=None, m_str=None, d_str=None, month_name=None):
    """Return (dd/mm/yyyy) or None."""
    # month name → number
    if month_name:
        m = _month_to_num(month_name)
    else:
        m = _to_int(m_str)

    d = _to_int(d_str)
    y = _to_int(y_str)

    if y is None or m is None or d is None:
        return None

    if y < 100:
        y = _expand_year(y)

    if not (1 <= m <= 12 and 1 <= d <= 31 and 1900 <= y <= 2100):
        return None

    if not _valid_date(y, m, d):
        return None

    return _fmt_ddmmyyyy(y, m, d)

def parse_date(text):
    """
    Extract the first valid date found in `text` and return "DD/MM/YYYY".
    Returns None if no date can be parsed.
    """
    if not text or not isinstance(text, str):
        return None

    # Pre-clean ordinals like 21st → 21 (helps numeric patterns)
    cleaned = _strip_ordinals(text)

    candidates = []

    def add_candidates(pattern, kind):
        for m in pattern.finditer(cleaned):
            start = m.start()
            gd = m.groupdict()
            # unify keys to y, m, d
            if "month" in gd:
                month_name = gd.get("month")
                d = gd.get("d")
                y = gd.get("y")
                s = _try_build_date_from_parts(y_str=y, d_str=d, month_name=month_name)
            else:
                y = gd.get("y"); m_ = gd.get("m"); d = gd.get("d")
                s = _try_build_date_from_parts(y_str=y, m_str=m_, d_str=d)
            if s:
                candidates.append((start, kind, s))

    # Prefer textual dates first (less ambiguous), then ISO, then numeric
    add_candidates(PAT_TXT_DMY, "txt_dmy")
    add_candidates(PAT_TXT_MDY, "txt_mdy")
    add_candidates(PAT_TXT_YMD, "txt_ymd")
    add_candidates(PAT_TXT_COMPACT_DMY, "txt_compact_dmy")
    add_candidates(PAT_TXT_COMPACT_MDY, "txt_compact_mdy")
    add_candidates(PAT_ISO_YMD, "iso_ymd")
    add_candidates(PAT_NUM_DMY, "num_dmy")
    add_candidates(PAT_NUM_MDY, "num_mdy")

    if not candidates:
        return None

    # If multiple, choose the earliest occurrence in text.
    candidates.sort(key=lambda x: x[0])
    earliest_start = candidates[0][0]
    tied = [c for c in candidates if c[0] == earliest_start]

    if len(tied) == 1:
        return tied[0][2]

    # Preference within ties: textual first, then iso, then numeric; within numeric prefer DMY.
    kind_priority = {
        "txt_dmy": 1, "txt_mdy": 1, "txt_ymd": 1, "txt_compact_dmy": 1, "txt_compact_mdy": 1,
        "iso_ymd": 2,
        "num_dmy": 3, "num_mdy": 4
    }
    tied.sort(key=lambda x: kind_priority.get(x[1], 9))
    return tied[0][2]

# -----------------------------
# CSV I/O
# -----------------------------
def read_csv_rows(path):
    """
    Yield rows from a local CSV path as dicts (if header) or lists (no header).
    """
    if not os.path.exists(path):
        raise FileNotFoundError(f"CSV not found: {path}")

    with open(path, "r", encoding="utf-8-sig", newline="") as f:
        sample = f.read(2048)
        f.seek(0)
        has_header = False
        try:
            has_header = csv.Sniffer().has_header(sample)
        except Exception:
            pass

        if has_header:
            reader = csv.DictReader(f)
            for row in reader:
                yield row
        else:
            reader = csv.reader(f)
            for row in reader:
                yield row

def pick_text_from_row(row):
    """
    Try to get the text field from a CSV row (dict or list).
    Looks for common column names; falls back to first cell.
    """
    if isinstance(row, dict):
        for key in row.keys():
            if key.lower() in {"text", "sentence", "input", "review", "utterance", "query", "content"}:
                return row[key]
        # Otherwise, join all values into one string
        return " ".join(str(v) for v in row.values())
    elif isinstance(row, (list, tuple)):
        return row[0] if row else ""
    else:
        return str(row)

def process_csv(input_path, output_path="parsed_dates_output.csv"):
    rows = list(read_csv_rows(input_path))
    out_rows = []
    for row in rows:
        text = pick_text_from_row(row)
        parsed = parse_date(text)
        out_rows.append({"text": text, "parsed_date": parsed if parsed else ""})

    with open(output_path, "w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=["text", "parsed_date"])
        writer.writeheader()
        writer.writerows(out_rows)

    print(f"[done] Read {len(rows)} rows from: {input_path}")
    print(f"[done] Wrote results to: {output_path}")

# -----------------------------
# Run (reads your CSV_PATH)
# -----------------------------
if __name__ == "__main__":
    if not CSV_PATH or CSV_PATH.strip() == "/path/to/your/date_parser_testcases.csv":
        print("Please set CSV_PATH at the top of the script to your CSV file path.")
    else:
        process_csv(CSV_PATH)


[done] Read 100 rows from: C:/Users/jaypr/Downloads/date_parser_testcases (1).csv
[done] Wrote results to: parsed_dates_output.csv


In [11]:
# --- single-text inference examples ---
print(parse_date("I went to London on 21st June, 2024"))     # -> 21/06/2024
print(parse_date("Meet me June 7, 2023"))                    # -> 07/06/2023
print(parse_date("Event: 2024-06-21"))                       # -> 21/06/2024
print(parse_date("She wrote 06/21/2024"))                    # -> 21/06/2024 (prefers D/M/Y on ambiguous)


21/06/2024
07/06/2023
21/06/2024
21/06/2024


In [1]:
#!/usr/bin/env python3
"""
Rules-based Gendered Pronoun Transformer (pure Python + regex)

Goal:
  Swap gendered pronouns in a sentence from one gender to the opposite while
  preserving grammatical correctness and meaning as much as possible using rules.

Examples:
  Input : "He gave her his book."
  Mode  : "swap"
  Output: "She gave him her book."

What it handles:
  • Subject/Object pronouns: he ↔ she, him ↔ her
  • Possessives (determiner vs pronoun): his (det) ↔ her (det), his (pronoun) ↔ hers, her (det) ↔ his, hers ↔ his
  • Reflexives: himself ↔ herself
  • Common contractions: he's ↔ she's, he'll ↔ she'll, he'd ↔ she'd (both straight and curly apostrophes)
  • Case preservation: "He" → "She"; "HE" → "SHE"
  • Basic disambiguation for "his" / "her" as determiner vs pronoun:
      - If next meaningful word is noun-like → treat as determiner (e.g., "his book" → "her book")
      - If at end / before punctuation or a function word → treat as pronoun (e.g., "The book is his." → "The book is hers.")
      - If "her" is followed by preposition/conjunction/etc. → treat as object pronoun ("I told her that..." → "...him that...")

Modes:
  - "swap" (default): swap both male↔female forms throughout (this matches the example).
  - "to_female": change male→female only, leave female as-is.
  - "to_male"  : change female→male only, leave male as-is.

CSV (optional):
  - If you set CSV_PATH to a file (with a column named "text", or else the first column is used),
    the script will transform each row and write "pronoun_transform_output.csv".
"""

import re
import csv
import os
from typing import List, Tuple

# ===================== USER CONFIG (optional) =====================
CSV_PATH = "C:/Users/jaypr/Downloads/pronoun_testcases (1).csv"  # e.g., r"C:\Users\you\Downloads\pronoun_testcases.csv"
# If left as None, the script won't try to read/write CSV.
# If you uploaded to /mnt/data, you can set:
# CSV_PATH = r"/mnt/data/pronoun_testcases (1).csv"
MODE = "swap"    # "swap" (default), "to_female", or "to_male"
# =================================================================

# --- Small word lists used for "his/her" disambiguation ---
_PREPOSITIONS = {
    "at","on","in","by","for","with","about","against","between","into","through",
    "during","before","after","above","below","to","from","up","down","of","off",
    "over","under","again","further","then","once","as","per","via","within","without","onto","upon"
}
_AUX_OR_FUNCTION = {
    # Auxiliaries / copulas
    "is","am","are","was","were","be","being","been",
    "has","have","had","do","does","did","can","could","will","would","shall","should","may","might","must",
    # Conjunctions / determiners / wh-words / misc function words
    "that","this","these","those","who","whom","whose","which","what","when","where","why","how",
    "and","or","but","nor","so","yet","if","than","then","because","although","though","while","whereas"
}
# Words that often appear after possessive determiners (still treat as determiner)
_DET_FOLLOWERS = {"own","same","entire","whole","only","former","latter"}

# Regex tokenization: preserve words (incl. contractions) vs whitespace vs punctuation
# - words: letters + optional apostrophe + letters, e.g., he's, she’s
# - numbers allowed but ignored for pronoun logic
_TOKEN_RE = re.compile(r"\s+|[A-Za-z]+(?:[’'][A-Za-z]+)?|[0-9]+|[^\s]")

def _is_word(tok: str) -> bool:
    return bool(re.fullmatch(r"[A-Za-z]+(?:[’'][A-Za-z]+)?", tok))

def _lower_ascii(s: str) -> str:
    # Normalize to lowercase and normalize curly apostrophes to straight for matching
    return s.replace("’", "'").lower()

def _preserve_case(src: str, repl: str) -> str:
    # Preserve all-caps, Titlecase, or lowercase style of the source token
    if src.isupper():
        return repl.upper()
    if src[:1].isupper() and src[1:].islower():
        return repl.capitalize()
    return repl

def _next_meaningful_word(tokens: List[str], i: int) -> Tuple[int, str]:
    """
    Return (index, next_word_lower) for the next alphabetic token after index i,
    skipping whitespace and punctuation. Returns (-1, "") if none found.
    """
    j = i + 1
    while j < len(tokens):
        t = tokens[j]
        if _is_word(t):
            return j, _lower_ascii(t)
        # skip everything else (spaces, punctuation, numbers)
        j += 1
    return -1, ""

def _looks_like_determiner(tokens: List[str], i: int) -> bool:
    """
    Heuristic: treat 'his'/'her' as a possessive determiner if a plausible noun-like word follows.
    If next meaningful word is:
      - in prepositions / auxiliaries / function words -> NOT determiner (so treat as pronoun/object)
      - in DET_FOLLOWERS (e.g., 'own') -> determiner
      - ends with 'ing' (gerund often acts as noun) -> determiner
      - otherwise: determiner
    """
    j, nxt = _next_meaningful_word(tokens, i)
    if j == -1 or not nxt:
        return False  # end of sentence → likely pronoun (e.g., "The book is his.")
    if nxt in _PREPOSITIONS or nxt in _AUX_OR_FUNCTION:
        return False
    if nxt in _DET_FOLLOWERS:
        return True
    if nxt.endswith("ing"):  # often a noun/gerundial usage ("his cooking", "her singing")
        return True
    # fallback: assume it modifies a noun
    return True

def transform_pronouns(text: str, mode: str = "swap") -> str:
    """
    Transform gendered pronouns according to `mode`:
      - "swap":      male <-> female everywhere (default; matches the example)
      - "to_female": only male -> female
      - "to_male":   only female -> male
    """
    if not text:
        return text

    tokens = _TOKEN_RE.findall(text)
    out = []

    for i, tok in enumerate(tokens):
        if not _is_word(tok):
            out.append(tok)
            continue

        base = _lower_ascii(tok)

        # --- Reflexives ---
        if base == "himself":
            repl = "herself"
            if mode == "to_male": repl = None
        elif base == "herself":
            repl = "himself"
            if mode == "to_female": repl = None

        # --- Contractions ---
        elif base in {"he's","he’ll","he'll","he’d","he'd"}:
            key = base.replace("’", "'")
            c_map = {"he's": "she's", "he'll": "she'll", "he'd": "she'd"}
            repl = c_map.get(key)
            if mode == "to_male": repl = None
        elif base in {"she's","she’ll","she'll","she’d","she'd"}:
            key = base.replace("’", "'")
            c_map = {"she's": "he's", "she'll": "he'll", "she'd": "he'd"}
            repl = c_map.get(key)
            if mode == "to_female": repl = None

        # --- Simple subject/object pronouns ---
        elif base == "he":
            repl = "she"
            if mode == "to_male": repl = None
        elif base == "she":
            repl = "he"
            if mode == "to_female": repl = None
        elif base == "him":
            repl = "her"
            if mode == "to_male": repl = None
        elif base == "her":
            # 'her' can be object pronoun or possessive determiner
            if _looks_like_determiner(tokens, i):
                # determiner 'her' → 'his'
                repl = "his"
                if mode == "to_female": repl = None  # leave as-is if only converting male→female
            else:
                # object pronoun 'her' → 'him'
                repl = "him"
                if mode == "to_female": repl = None

        # --- Possessives ---
        elif base == "his":
            # 'his' can be possessive determiner or possessive pronoun
            if _looks_like_determiner(tokens, i):
                # determiner 'his' → 'her'
                repl = "her"
            else:
                # pronoun 'his' → 'hers'
                repl = "hers"
            if mode == "to_male": repl = None
        elif base == "hers":
            repl = "his"
            if mode == "to_female": repl = None

        else:
            repl = None

        if repl is None:
            out.append(tok)
        else:
            out.append(_preserve_case(tok, repl))

    return "".join(out)

# ---------------- CSV Helpers (optional) ----------------
def _read_csv_rows(path: str):
    if not os.path.exists(path):
        raise FileNotFoundError(f"CSV not found: {path}")
    with open(path, "r", encoding="utf-8-sig", newline="") as f:
        sample = f.read(2048)
        f.seek(0)
        has_header = False
        try:
            has_header = csv.Sniffer().has_header(sample)
        except Exception:
            pass
        if has_header:
            reader = csv.DictReader(f)
            rows = list(reader)
            # Guess column
            text_col = None
            if rows:
                keys = [k for k in rows[0].keys()]
                # prefer a 'text' column name if present
                for k in keys:
                    if k.lower() in {"text","sentence","input","utterance"}:
                        text_col = k
                        break
                if text_col is None:
                    text_col = keys[0]
            return [r.get(text_col, "") for r in rows]
        else:
            return [row[0] if row else "" for row in csv.reader(f)]

def _write_csv_rows(path: str, rows: List[dict], fieldnames: List[str]):
    with open(path, "w", encoding="utf-8", newline="") as f:
        w = csv.DictWriter(f, fieldnames=fieldnames)
        w.writeheader()
        w.writerows(rows)

# ---------------- Demo / Main ----------------
if __name__ == "__main__":
    # Quick sanity checks
    examples = [
        ('He gave her his book.', "swap"),
        ("SHE SAID HE'LL HELP HER.", "swap"),
        ("The book is his.", "swap"),
        ("I told her that it was important.", "swap"),
        ("He’s sure she’d finish before him.", "swap"),
        ("Her idea and her OWN plan were approved.", "swap"),
        ("He hurt himself; she blamed herself.", "swap"),
        ("She will see him on Friday.", "swap"),
        ("He will see her in LA.", "swap"),
    ]
    for s, m in examples:
        print(f"{s}  ->  {transform_pronouns(s, m)}")

    # Optional CSV processing
    if CSV_PATH:
        print(f"\n[info] Reading CSV: {CSV_PATH}")
        texts = _read_csv_rows(CSV_PATH)
        results = []
        for t in texts:
            results.append({"text": t, "transformed": transform_pronouns(t, MODE)})
        out_path = "pronoun_transform_output.csv"
        _write_csv_rows(out_path, results, fieldnames=["text","transformed"])
        print(f"[done] Wrote {len(results)} rows to {out_path}")


He gave her his book.  ->  She gave his her book.
SHE SAID HE'LL HELP HER.  ->  HE SAID SHE'LL HELP HIM.
The book is his.  ->  The book is hers.
I told her that it was important.  ->  I told him that it was important.
He’s sure she’d finish before him.  ->  She's sure he'd finish before her.
Her idea and her OWN plan were approved.  ->  His idea and his OWN plan were approved.
He hurt himself; she blamed herself.  ->  She hurt herself; he blamed himself.
She will see him on Friday.  ->  He will see her on Friday.
He will see her in LA.  ->  She will see him in LA.

[info] Reading CSV: C:/Users/jaypr/Downloads/pronoun_testcases (1).csv
[done] Wrote 27 rows to pronoun_transform_output.csv
