# Sensitive Info Detector — Clean Colab Notebook
Run cells from top to bottom. You’ll get:
1) Synthetic data with span labels (EMAIL, SECRET)
2) Rules baseline (v1) → stricter rules (v2)
3) DistilBERT span tagger
4) Hybrid (rules v2 ∪ model) + metrics


In [22]:
!pip -q install "transformers==4.44.2" "datasets>=2.20.0" "accelerate>=0.34.2" -U
import transformers, datasets, accelerate, torch
print("transformers:", transformers.__version__, "torch:", torch.__version__)

transformers: 4.44.2 torch: 2.8.0+cu126


In [23]:
!mkdir -p /content/sensi_starter
%cd /content/sensi_starter

/content/sensi_starter


In [24]:
%%writefile data_synth.py
import json, random, argparse, os
from pathlib import Path
RANDOM_SEED = 42
random.seed(RANDOM_SEED)
ZWSP = "\u200b"
HOMO = str.maketrans({"a":"а","e":"е","o":"о","c":"с"})
EMAIL_USERS = ["alice","bob","charlie","dora","eve","frank","grace","heidi","ivan","judy"]
EMAIL_DOMAINS = ["corp.local","internal.corp","example.com","myco.io","product.dev"]
SECRET_PREFIXES = ["ghp_", "AKIA", "xoxb-", "xoxa-", "slk-"]
ALNUM = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
SENSITIVE_TEMPLATES = [
  "Please email me at {email} and use password {password}.",
  "Set the env var API_KEY={token} before running the job.",
  "Credentials: user alice, pwd {password}.",
  "Rotate the key {token} immediately.",
  "Contact {email} for access; temporary token: {token}",
  "YAML:\napi_key: {token}\nuser: service\n",
  "JSON: {\"password\": \"{password}\", \"email\": \"{email}\"}",
  ".env line: SECRET={token}",
]
SAFE_TEMPLATES = [
  "Please review the sprint notes and update Jira tickets.",
  "Reset your password via the self-service portal; do not share it.",
  "Email support at the official page (no direct addresses allowed).",
  "The weather is nice; deploy after the freeze window ends.",
  "Docs show examples like user@example.com and password=hunter2, but they are placeholders.",
  "Config format uses fields `api_key` and `password`, do not paste real values.",
  "We discuss security but avoid sharing secrets in chat.",
]
def rand_email():
  u = random.choice(EMAIL_USERS) + str(random.randint(1,99))
  d = random.choice(EMAIL_DOMAINS)
  return f"{u}@{d}"
def rand_password():
  return random.choice(["Password123!","p@ssw0rd!","admin123!!","Tr0ub4dor&3","LetMeIn99!"])
def rand_token():
  pfx = random.choice(SECRET_PREFIXES)
  body = "".join(random.choice(ALNUM) for _ in range(random.randint(24,36)))
  return pfx + body
def maybe_perturb(text):
  if random.random() > 0.25: return text
  f = random.choice([
    lambda x: x.replace("password", "pa ss word"),
    lambda x: x.replace("password", f"pa{ZWSP}ss{ZWSP}word"),
    lambda x: x.translate(HOMO),
    lambda x: x.replace("@", " [at] "),
  ])
  return f(text)
def insert_and_track(tmpl, **fields):
  text = tmpl; spans=[]
  for k,v in fields.items():
    ph = "{"+k+"}"
    if ph not in text: continue
    i = text.index(ph)
    text = text.replace(ph, v, 1)
    t = "EMAIL" if k=="email" else ("SECRET" if k in ("password","token","apikey","api_key","cc") else "OTHER")
    if t!="OTHER": spans.append((t, i, i+len(v)))
  spans.sort(key=lambda x:x[1])
  return text, spans
def make_dataset(n_pos=600, n_neg=600):
  rows=[]
  for _ in range(n_pos):
    t = random.choice(SENSITIVE_TEMPLATES)
    text, spans = insert_and_track(t, email=rand_email(), password=rand_password(), token=rand_token())
    text = maybe_perturb(text)
    spans = [s for s in spans if s[0] in ("EMAIL","SECRET")]
    rows.append({"text":text, "label":1, "spans":spans})
  for _ in range(n_neg):
    t = random.choice(SAFE_TEMPLATES)
    rows.append({"text":maybe_perturb(t), "label":0, "spans":[]})
  random.shuffle(rows); return rows
def split(rows, train=0.7, dev=0.15):
  n=len(rows); n_train=int(n*train); n_dev=int(n*dev)
  return rows[:n_train], rows[n_train:n_train+n_dev], rows[n_train+n_dev:]
def save_jsonl(path, rows):
  os.makedirs(os.path.dirname(path), exist_ok=True)
  with open(path, "w", encoding="utf-8") as f:
    for r in rows: json.dump(r, f, ensure_ascii=False); f.write("\n")
def main():
  ap = argparse.ArgumentParser()
  ap.add_argument("--out_dir", type=str, default="data")
  ap.add_argument("--n_pos", type=int, default=600)
  ap.add_argument("--n_neg", type=int, default=600)
  args = ap.parse_args()
  rows = make_dataset(args.n_pos, args.n_neg)
  tr, dv, te = split(rows)
  out = Path(args.out_dir); out.mkdir(parents=True, exist_ok=True)
  save_jsonl(out/"train.jsonl", tr)
  save_jsonl(out/"dev.jsonl", dv)
  save_jsonl(out/"test_balanced.jsonl", te)
  rows_skew = make_dataset(n_pos=50, n_neg=950)
  save_jsonl(out/"test_skewed.jsonl", rows_skew)
  print("Wrote:", out)
if __name__=="__main__":
  main()


Overwriting data_synth.py


In [25]:
%%writefile span_metrics.py
import json, argparse
def iou(a,b):
  s1,e1=a; s2,e2=b
  inter=max(0, min(e1,e2)-max(s1,s2))
  uni=max(e1,e2)-min(s1,s2)
  return inter/uni if uni>0 else 0.0
def match_spans(ts, ps, thr=0.5):
  mt=set(); mp=set(); tp=0
  for i,(tl,ts1,te1) in enumerate(ts):
    for j,(pl,ps1,pe1) in enumerate(ps):
      if j in mp or tl!=pl: continue
      if iou((ts1,te1),(ps1,pe1))>=thr:
        tp+=1; mt.add(i); mp.add(j); break
  fp=len(ps)-len(mp); fn=len(ts)-len(mt); return tp,fp,fn
def prf(tp,fp,fn):
  p=tp/(tp+fp) if tp+fp>0 else 0.0
  r=tp/(tp+fn) if tp+fn>0 else 0.0
  f=2*p*r/(p+r) if p+r>0 else 0.0
  return p,r,f
def evaluate(gold, preds):
  import json
  G=[json.loads(l) for l in open(gold,encoding="utf-8")]
  P=[json.loads(l) for l in open(preds,encoding="utf-8")]
  labels=["EMAIL","SECRET"]
  tot={lbl:{"tp":0,"fp":0,"fn":0} for lbl in labels}
  all={"tp":0,"fp":0,"fn":0}
  for g,p in zip(G,P):
    gs=[tuple(s) for s in g["spans"] if s[0] in labels]
    ps=[tuple(s) for s in p.get("spans",[]) if s[0] in labels]
    for lbl in labels:
      g1=[s for s in gs if s[0]==lbl]; p1=[s for s in ps if s[0]==lbl]
      tp,fp,fn=match_spans(g1,p1)
      tot[lbl]["tp"]+=tp; tot[lbl]["fp"]+=fp; tot[lbl]["fn"]+=fn
    tp,fp,fn=match_spans(gs,ps)
    all["tp"]+=tp; all["fp"]+=fp; all["fn"]+=fn
  def fmt(d):
    p,r,f=prf(d["tp"],d["fp"],d["fn"]); return {**d,"precision":p,"recall":r,"f1":f}
  out={"per_label":{k:fmt(v) for k,v in tot.items()}, "overall":fmt(all)}
  print(json.dumps(out, indent=2))
if __name__=='__main__':
  ap=argparse.ArgumentParser(); ap.add_argument('--gold'); ap.add_argument('--preds'); a=ap.parse_args(); evaluate(a.gold,a.preds)


Overwriting span_metrics.py


In [26]:
!python data_synth.py --out_dir data --n_pos 600 --n_neg 600
!wc -l data/*.jsonl
!head -n 2 data/train.jsonl

Wrote: data
   180 data/dev.jsonl
   180 data/test_balanced.jsonl
  1000 data/test_skewed.jsonl
   840 data/train.jsonl
  2200 total
{"text": ".env line: SECRET=slk-DOHpCLrg3clUCjtOD8tQQBr222WitCmPWG", "label": 1, "spans": [["SECRET", 18, 56]]}
{"text": "Please email me at eve42@product.dev and use password LetMeIn99!.", "label": 1, "spans": [["EMAIL", 19, 36], ["SECRET", 54, 64]]}


In [27]:
%%writefile regex_baseline_v1.py
import re, json, argparse, math

EMAIL_RE = re.compile(r"[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}")
TOK_RE   = re.compile(r"[A-Za-z0-9_\-]{16,}")
PREFIXES = ("ghp_", "AKIA", "xoxb-", "xoxa-", "slk-")
CTX      = {"password","passwd","pwd","secret","token","apikey","api_key"}

# only flag personal emails
PERSONAL_DOMAINS = {
    "gmail.com","hotmail.com","outlook.com","yahoo.com",
    "icloud.com","proton.me","protonmail.com","aol.com",
    "live.com","gmx.com","mail.com","zoho.com","yandex.com","pm.me"
}

def entropy(s):
    if not s: return 0.0
    counts = {}
    for ch in s: counts[ch] = counts.get(ch, 0) + 1
    n = len(s)
    return -sum((v/n) * math.log2(v/n) for v in counts.values())

def detect(text):
    spans = []
    # EMAIL (personal only)
    for m in EMAIL_RE.finditer(text):
        addr = text[m.start():m.end()]
        dom  = addr.split("@", 1)[-1].lower()
        if (dom in PERSONAL_DOMAINS) or any(dom.endswith(pd) for pd in PERSONAL_DOMAINS):
            spans.append(("EMAIL", m.start(), m.end()))
    # SECRET/TOKEN
    for m in TOK_RE.finditer(text):
        tok  = m.group(0)
        left = text[max(0, m.start()-40):m.start()].lower()
        if tok.startswith(PREFIXES) or (entropy(tok) > 3.5 and any(k in left for k in CTX)):
            start, end = m.start(), m.end()
            while end > start and text[end-1] in ";:,.)]}\"'": end -= 1
            spans.append(("SECRET", start, end))
    return spans

def run(gold, out):
    preds = []
    with open(gold, encoding='utf-8') as f:
        for line in f:
            ex = json.loads(line)
            preds.append({"spans": detect(ex["text"])})
    with open(out, 'w', encoding='utf-8') as f:
        for p in preds:
            json.dump(p, f); f.write('\n')

if __name__ == "__main__":
    ap = argparse.ArgumentParser()
    ap.add_argument("--gold", required=True)
    ap.add_argument("--out",  required=True)
    a = ap.parse_args()
    run(a.gold, a.out)


Overwriting regex_baseline_v1.py


In [28]:
!python regex_baseline_v1.py --gold data/test_balanced.jsonl --out preds_rules_bal_v1.jsonl
!python span_metrics.py --gold data/test_balanced.jsonl --preds preds_rules_bal_v1.jsonl
!python regex_baseline_v1.py --gold data/test_skewed.jsonl --out preds_rules_sk_v1.jsonl
!python span_metrics.py --gold data/test_skewed.jsonl --preds preds_rules_sk_v1.jsonl

{
  "per_label": {
    "EMAIL": {
      "tp": 0,
      "fp": 0,
      "fn": 39,
      "precision": 0.0,
      "recall": 0.0,
      "f1": 0.0
    },
    "SECRET": {
      "tp": 67,
      "fp": 0,
      "fn": 36,
      "precision": 1.0,
      "recall": 0.6504854368932039,
      "f1": 0.788235294117647
    }
  },
  "overall": {
    "tp": 67,
    "fp": 0,
    "fn": 75,
    "precision": 1.0,
    "recall": 0.47183098591549294,
    "f1": 0.6411483253588516
  }
}
{
  "per_label": {
    "EMAIL": {
      "tp": 0,
      "fp": 0,
      "fn": 16,
      "precision": 0.0,
      "recall": 0.0,
      "f1": 0.0
    },
    "SECRET": {
      "tp": 33,
      "fp": 1,
      "fn": 17,
      "precision": 0.9705882352941176,
      "recall": 0.66,
      "f1": 0.7857142857142857
    }
  },
  "overall": {
    "tp": 33,
    "fp": 1,
    "fn": 33,
    "precision": 0.9705882352941176,
    "recall": 0.5,
    "f1": 0.6599999999999999
  }
}


In [29]:
import re, json, math, unicodedata
EMAIL_RE = re.compile(r"[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}")
TOK_RE   = re.compile(r"[A-Za-z0-9_\-]{20,}")
PREFIXES = ("ghp_", "AKIA", "xoxb-", "xoxa-", "slk-")
CTX      = {"password","passwd","pwd","secret","token","apikey","api_key"}
ALLOW_DOMAINS = {"example.com","corp.local","internal.corp"}
def normalize(s):
  s = unicodedata.normalize("NFKC", s)
  return s.replace("\u200b","")
def entropy(s):
  from collections import Counter
  if not s: return 0.0
  c=Counter(s); n=len(s)
  return -sum((v/n)*math.log2(v/n) for v in c.values())
def detect_rules(text):
  t=normalize(text); spans=[]
  for m in EMAIL_RE.finditer(t):
    dom = t[m.start():m.end()].split("@")[-1].lower()
    if any(dom.endswith(ad) for ad in ALLOW_DOMAINS):
      continue
    spans.append(("EMAIL", m.start(), m.end()))
  for m in TOK_RE.finditer(t):
    tok=m.group(0); left=t[max(0,m.start()-40):m.start()].lower(); ent=entropy(tok)
    ok_prefix = tok.startswith(PREFIXES)
    ok_entropy = ent > 3.8
    ok_context = any(k in left for k in CTX)
    if ok_prefix or (ok_entropy and ok_context):
      start,end=m.start(),m.end()
      while end>start and t[end-1] in ",.;:)`'\"": end-=1
      spans.append(("SECRET", start, end))
  return spans
def run_rules(gold,out):
  preds=[]
  for line in open(gold,encoding='utf-8'):
    ex=json.loads(line); preds.append({"spans":[list(s) for s in detect_rules(ex['text'])]})
  with open(out,'w',encoding='utf-8') as f:
    for p in preds: json.dump(p,f); f.write('\n')
run_rules('data/test_balanced.jsonl','preds_rules_bal_v2.jsonl')
!python span_metrics.py --gold data/test_balanced.jsonl --preds preds_rules_bal_v2.jsonl
run_rules('data/test_skewed.jsonl','preds_rules_sk_v2.jsonl')
!python span_metrics.py --gold data/test_skewed.jsonl --preds preds_rules_sk_v2.jsonl

{
  "per_label": {
    "EMAIL": {
      "tp": 14,
      "fp": 0,
      "fn": 25,
      "precision": 1.0,
      "recall": 0.358974358974359,
      "f1": 0.5283018867924528
    },
    "SECRET": {
      "tp": 66,
      "fp": 0,
      "fn": 37,
      "precision": 1.0,
      "recall": 0.6407766990291263,
      "f1": 0.7810650887573964
    }
  },
  "overall": {
    "tp": 80,
    "fp": 0,
    "fn": 62,
    "precision": 1.0,
    "recall": 0.5633802816901409,
    "f1": 0.7207207207207207
  }
}
{
  "per_label": {
    "EMAIL": {
      "tp": 1,
      "fp": 0,
      "fn": 15,
      "precision": 1.0,
      "recall": 0.0625,
      "f1": 0.11764705882352941
    },
    "SECRET": {
      "tp": 33,
      "fp": 0,
      "fn": 17,
      "precision": 1.0,
      "recall": 0.66,
      "f1": 0.7951807228915663
    }
  },
  "overall": {
    "tp": 34,
    "fp": 0,
    "fn": 32,
    "precision": 1.0,
    "recall": 0.5151515151515151,
    "f1": 0.6799999999999999
  }
}


In [30]:
import json, torch, random
from datasets import Dataset
from transformers import AutoTokenizer, AutoModelForTokenClassification, DataCollatorForTokenClassification, Trainer, TrainingArguments
random.seed(42)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
label2id={'O':0,'B-EMAIL':1,'B-SECRET':2}
id2label={v:k for k,v in label2id.items()}
def load_jsonl(p): return [json.loads(l) for l in open(p,encoding='utf-8')]
train=load_jsonl('data/train.jsonl'); dev=load_jsonl('data/dev.jsonl')
tok=AutoTokenizer.from_pretrained('distilbert-base-uncased')
def char_spans_to_token_labels(text, spans, max_len=256):
  enc=tok(text, return_offsets_mapping=True, truncation=True, max_length=max_len)
  labels=[0]*len(enc['input_ids'])
  for lbl,s,e in spans:
    tid=label2id.get('B-'+lbl,0)
    for i,(cs,ce) in enumerate(enc['offset_mapping']):
      if cs==ce: continue
      if max(cs,s) < min(ce,e): labels[i]=tid
  enc.pop('offset_mapping'); enc['labels']=labels; return enc
def ds(rows): return Dataset.from_list([char_spans_to_token_labels(r['text'],r['spans']) for r in rows])
train_ds=ds(train); dev_ds=ds(dev)
model=AutoModelForTokenClassification.from_pretrained('distilbert-base-uncased', num_labels=3, id2label=id2label, label2id=label2id).to(device)
coll=DataCollatorForTokenClassification(tokenizer=tok)
args=TrainingArguments(output_dir='runs/sensi-span', learning_rate=2e-5, per_device_train_batch_size=16,
                       per_device_eval_batch_size=32, num_train_epochs=4, eval_strategy='epoch',
                       logging_steps=50, save_strategy='epoch', report_to='none', load_best_model_at_end=True)
trainer=Trainer(model=model, args=args, train_dataset=train_ds, eval_dataset=dev_ds, tokenizer=tok, data_collator=coll)
trainer.train()
model.save_pretrained('model_distilbert'); tok.save_pretrained('model_distilbert')
print('Model saved.')

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/483 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]



model.safetensors:   0%|          | 0.00/268M [00:00<?, ?B/s]

Some weights of DistilBertForTokenClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Epoch,Training Loss,Validation Loss
1,0.3284,0.040717
2,0.0417,0.025199
3,0.0307,0.021382
4,0.0237,0.020341




Model saved.


In [31]:
from transformers import AutoTokenizer, AutoModelForTokenClassification
import numpy as np, json, torch
tok=AutoTokenizer.from_pretrained('model_distilbert')
model=AutoModelForTokenClassification.from_pretrained('model_distilbert').to('cuda' if torch.cuda.is_available() else 'cpu').eval()
device = next(model.parameters()).device
def predict_jsonl(gold,out,max_len=256):
  O=[]
  for line in open(gold,encoding='utf-8'):
    ex=json.loads(line); text=ex['text']
    enc=tok(text, return_offsets_mapping=True, truncation=True, max_length=max_len, return_tensors='pt')
    offs=enc.pop('offset_mapping').squeeze(0).tolist()
    for k in enc: enc[k]=enc[k].to(device)
    with torch.no_grad():
      logits=model(**enc).logits.squeeze(0).softmax(-1).cpu().numpy()
    pred=np.argmax(logits,axis=-1).tolist(); spans=[]; i=0
    while i<len(pred):
      if pred[i] in (1,2):
        lbl='EMAIL' if pred[i]==1 else 'SECRET'
        cs,ce=offs[i]; j=i+1
        while j<len(pred) and pred[j]==pred[i]:
          cs=min(cs,offs[j][0]); ce=max(ce,offs[j][1]); j+=1
        if ce>cs: spans.append([lbl,cs,ce])
        i=j
      else: i+=1
    O.append({'spans':spans})
  with open(out,'w',encoding='utf-8') as f:
    for r in O: json.dump(r,f); f.write('\n')
  print('Wrote', out)
predict_jsonl('data/dev.jsonl','preds_model_dev.jsonl')
predict_jsonl('data/test_balanced.jsonl','preds_model_bal.jsonl')
predict_jsonl('data/test_skewed.jsonl','preds_model_sk.jsonl')

Wrote preds_model_dev.jsonl
Wrote preds_model_bal.jsonl
Wrote preds_model_sk.jsonl


In [32]:
!python span_metrics.py --gold data/dev.jsonl --preds preds_model_dev.jsonl
!python span_metrics.py --gold data/test_balanced.jsonl --preds preds_model_bal.jsonl
!python span_metrics.py --gold data/test_skewed.jsonl --preds preds_model_sk.jsonl

{
  "per_label": {
    "EMAIL": {
      "tp": 30,
      "fp": 0,
      "fn": 0,
      "precision": 1.0,
      "recall": 1.0,
      "f1": 1.0
    },
    "SECRET": {
      "tp": 79,
      "fp": 0,
      "fn": 0,
      "precision": 1.0,
      "recall": 1.0,
      "f1": 1.0
    }
  },
  "overall": {
    "tp": 109,
    "fp": 0,
    "fn": 0,
    "precision": 1.0,
    "recall": 1.0,
    "f1": 1.0
  }
}
{
  "per_label": {
    "EMAIL": {
      "tp": 38,
      "fp": 1,
      "fn": 1,
      "precision": 0.9743589743589743,
      "recall": 0.9743589743589743,
      "f1": 0.9743589743589743
    },
    "SECRET": {
      "tp": 103,
      "fp": 1,
      "fn": 0,
      "precision": 0.9903846153846154,
      "recall": 1.0,
      "f1": 0.9951690821256038
    }
  },
  "overall": {
    "tp": 141,
    "fp": 2,
    "fn": 1,
    "precision": 0.986013986013986,
    "recall": 0.9929577464788732,
    "f1": 0.9894736842105264
  }
}
{
  "per_label": {
    "EMAIL": {
      "tp": 16,
      "fp": 0,
      "fn": 0,
  

In [33]:
import json
def loadj(p): return [json.loads(l) for l in open(p,encoding='utf-8')]
def savej(p,rows):
  with open(p,'w',encoding='utf-8') as f:
    for r in rows: json.dump(r,f); f.write('\n')
rules_bal=loadj('preds_rules_bal_v2.jsonl')
rules_sk =loadj('preds_rules_sk_v2.jsonl')
model_bal=loadj('preds_model_bal.jsonl')
model_sk =loadj('preds_model_sk.jsonl')
def union(a,b):
  out=[]
  for x,y in zip(a,b):
    s=set(tuple(t) for t in x['spans']); s.update(tuple(t) for t in y['spans'])
    out.append({'spans':[list(t) for t in sorted(s, key=lambda z:(z[0],z[1],z[2]))]})
  return out
hyb_bal=union(rules_bal,model_bal)
hyb_sk =union(rules_sk, model_sk)
savej('preds_hybrid_bal.jsonl',hyb_bal)
savej('preds_hybrid_sk.jsonl', hyb_sk)
!python span_metrics.py --gold data/test_balanced.jsonl --preds preds_hybrid_bal.jsonl
!python span_metrics.py --gold data/test_skewed.jsonl  --preds preds_hybrid_sk.jsonl

{
  "per_label": {
    "EMAIL": {
      "tp": 38,
      "fp": 6,
      "fn": 1,
      "precision": 0.8636363636363636,
      "recall": 0.9743589743589743,
      "f1": 0.9156626506024097
    },
    "SECRET": {
      "tp": 103,
      "fp": 3,
      "fn": 0,
      "precision": 0.9716981132075472,
      "recall": 1.0,
      "f1": 0.985645933014354
    }
  },
  "overall": {
    "tp": 141,
    "fp": 9,
    "fn": 1,
    "precision": 0.94,
    "recall": 0.9929577464788732,
    "f1": 0.9657534246575342
  }
}
{
  "per_label": {
    "EMAIL": {
      "tp": 16,
      "fp": 1,
      "fn": 0,
      "precision": 0.9411764705882353,
      "recall": 1.0,
      "f1": 0.9696969696969697
    },
    "SECRET": {
      "tp": 49,
      "fp": 1,
      "fn": 1,
      "precision": 0.98,
      "recall": 0.98,
      "f1": 0.98
    }
  },
  "overall": {
    "tp": 65,
    "fp": 2,
    "fn": 1,
    "precision": 0.9701492537313433,
    "recall": 0.9848484848484849,
    "f1": 0.9774436090225564
  }
}


In [34]:
# Optional: show a few false positives for report
gold=[json.loads(l) for l in open('data/test_skewed.jsonl',encoding='utf-8')]
pred=[json.loads(l) for l in open('preds_hybrid_sk.jsonl',encoding='utf-8')]
def toset(sp): return {tuple(s) for s in sp}
samples=[]
for g,p in zip(gold,pred):
  gset,pset=toset(g['spans']),toset(p['spans'])
  fps=[s for s in pset if s not in gset]
  if fps: samples.append({'text':g['text'],'fp_spans':list(fps)})
  if len(samples)>=5: break
samples

[{'text': 'JSON: {"password": "Password123!", "email": "alice80@corp.local"}',
  'fp_spans': [('EMAIL', 44, 63)]},
 {'text': 'YAML:\nаpi_kеy: AKIAоC4m7аsоS1nYYiPuQ7nbBRсgTJEс\nusеr: sеrviсе\n',
  'fp_spans': [('SECRET', 15, 46)]},
 {'text': 'Please email me at ivan25 [at] internal.corp and use password Password123!.',
  'fp_spans': [('SECRET', 62, 74), ('EMAIL', 19, 44)]},
 {'text': 'Contact bob70 [at] corp.local for access; temporary token: xoxb-swnkgmg4xlc5J5HFc9Bu1igol9S7qgB002',
  'fp_spans': [('SECRET', 59, 98), ('EMAIL', 8, 29)]},
 {'text': 'JSON: {"password": "Password123!", "email": "heidi94@product.dev"}',
  'fp_spans': [('EMAIL', 44, 64), ('EMAIL', 45, 64)]}]

In [35]:
# === Span expansion + display/redaction helpers ===

def _is_email_char(c):   # characters that belong to an email/token
    return c.isalnum() or c in "._%+-@"

def _is_secret_char(c):
    return c.isalnum() or c in "_-"

def _expand_and_merge(text, spans):
    """Expand spans to token boundaries and merge overlaps (per label)."""
    # 1) expand to whole tokens
    expanded = []
    for lbl, s, e in sorted(spans, key=lambda x: x[1]):
        L, R = s, e
        if lbl == "EMAIL":
            while L > 0 and _is_email_char(text[L-1]): L -= 1
            while R < len(text) and _is_email_char(text[R]): R += 1
        else:  # SECRET
            while L > 0 and _is_secret_char(text[L-1]): L -= 1
            while R < len(text) and _is_secret_char(text[R]): R += 1
        expanded.append([lbl, L, R])

    # 2) merge overlaps of the same label
    merged = []
    for lbl, s, e in sorted(expanded, key=lambda x: (x[1], x[2])):
        if merged and merged[-1][0] == lbl and merged[-1][2] >= s:
            merged[-1][2] = max(merged[-1][2], e)
        else:
            merged.append([lbl, s, e])
    return merged

def highlight_text(text, spans):
    """Show inline tags like [SECRET:xxxxx] with word-boundary expansion."""
    spans = _expand_and_merge(text, spans)
    out, last = [], 0
    for lbl, s, e in spans:
        out.append(text[last:s])
        out.append(f"[{lbl}:{text[s:e]}]")
        last = e
    out.append(text[last:])
    return "".join(out)

def redact_text(text, spans):
    """Replace expanded spans with [REDACTED:<LABEL>]."""
    spans = _expand_and_merge(text, spans)
    chars = list(text)
    for lbl, s, e in sorted(spans, key=lambda x: x[1], reverse=True):
        chars[s:e] = f"[REDACTED:{lbl}]"
    return "".join(chars)


In [36]:
import re, json, torch
from transformers import AutoTokenizer, AutoModelForTokenClassification

# load your trained model + tokenizer
tok = AutoTokenizer.from_pretrained("model_distilbert")
model = AutoModelForTokenClassification.from_pretrained("model_distilbert").eval()
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)

def detect_hybrid(text):
    # --- Rules
    r_spans = detect_rules(text)

    # --- Model
    enc = tok(text, return_offsets_mapping=True, truncation=True, max_length=256, return_tensors="pt")
    offs = enc.pop("offset_mapping").squeeze(0).tolist()
    for k in enc: enc[k] = enc[k].to(device)
    with torch.no_grad():
        logits = model(**enc).logits.squeeze(0).softmax(-1).cpu().numpy()
    pred = logits.argmax(-1).tolist()

    m_spans = []
    i=0
    while i < len(pred):
        if pred[i] in (1,2):
            lbl = "EMAIL" if pred[i]==1 else "SECRET"
            cs,ce = offs[i]; j=i+1
            while j<len(pred) and pred[j]==pred[i]:
                cs=min(cs,offs[j][0]); ce=max(ce,offs[j][1]); j+=1
            if ce>cs: m_spans.append([lbl,cs,ce])
            i=j
        else: i+=1

    # --- Merge
    s=set(tuple(t) for t in r_spans); s.update(tuple(t) for t in m_spans)
    spans = [list(t) for t in sorted(s, key=lambda z:(z[1],z[2]))]
    return spans

def highlight_text(text, spans):
    """Return text with detected spans wrapped for easy reading"""
    out=""; last=0
    for lbl,s,e in spans:
        out += text[last:s]
        out += f"[{lbl}:{text[s:e]}]"
        last=e
    out += text[last:]
    return out

# 🔹 Try examples here
samples = [
    "Email me at alice99@corp.local and use password Tr0ub4dor&3",
    "Set API_KEY=ghp_ABC123456789xyz when deploying",
    "My name is Maitha"
    "My password is"
]

for txt in samples:
    spans = detect_hybrid(txt)
    print("Input:   ", txt)
    print("Detected:", spans)
    print("Marked:  ", highlight_text(txt, spans))
    print("-"*60)


Input:    Email me at alice99@corp.local and use password Tr0ub4dor&3
Detected: [['EMAIL', 12, 30], ['SECRET', 48, 59]]
Marked:   Email me at [EMAIL:alice99@corp.local] and use password [SECRET:Tr0ub4dor&3]
------------------------------------------------------------
Input:    Set API_KEY=ghp_ABC123456789xyz when deploying
Detected: [['SECRET', 12, 31]]
Marked:   Set API_KEY=[SECRET:ghp_ABC123456789xyz] when deploying
------------------------------------------------------------
Input:    My name is MaithaMy password is
Detected: []
Marked:   My name is MaithaMy password is
------------------------------------------------------------


In [37]:
import re, json, torch
from transformers import AutoTokenizer, AutoModelForTokenClassification

# load your trained model + tokenizer
tok = AutoTokenizer.from_pretrained("model_distilbert")
model = AutoModelForTokenClassification.from_pretrained("model_distilbert").eval()
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)

def detect_hybrid(text):
    # --- Rules
    r_spans = detect_rules(text)

    # --- Model
    enc = tok(text, return_offsets_mapping=True, truncation=True, max_length=256, return_tensors="pt")
    offs = enc.pop("offset_mapping").squeeze(0).tolist()
    for k in enc: enc[k] = enc[k].to(device)
    with torch.no_grad():
        logits = model(**enc).logits.squeeze(0).softmax(-1).cpu().numpy()
    pred = logits.argmax(-1).tolist()

    m_spans = []
    i=0
    while i < len(pred):
        if pred[i] in (1,2):
            lbl = "EMAIL" if pred[i]==1 else "SECRET"
            cs,ce = offs[i]; j=i+1
            while j<len(pred) and pred[j]==pred[i]:
                cs=min(cs,offs[j][0]); ce=max(ce,offs[j][1]); j+=1
            if ce>cs: m_spans.append([lbl,cs,ce])
            i=j
        else: i+=1

    # --- Merge rules+model
    s=set(tuple(t) for t in r_spans); s.update(tuple(t) for t in m_spans)
    spans = [list(t) for t in sorted(s, key=lambda z:(z[1],z[2]))]
    return spans

# ---------- Word-boundary helpers ----------
def _is_email_char(c):   # characters that belong to an email/token
    return c.isalnum() or c in "._%+-@"

def _is_secret_char(c):
    return c.isalnum() or c in "_-"

def _expand_and_merge(text, spans):
    """Expand spans to token boundaries and merge overlaps (per label)."""
    # 1) expand to whole tokens
    expanded = []
    for lbl, s, e in sorted(spans, key=lambda x: x[1]):
        L, R = s, e
        if lbl == "EMAIL":
            while L > 0 and _is_email_char(text[L-1]): L -= 1
            while R < len(text) and _is_email_char(text[R]): R += 1
        else:  # SECRET
            while L > 0 and _is_secret_char(text[L-1]): L -= 1
            while R < len(text) and _is_secret_char(text[R]): R += 1
        expanded.append([lbl, L, R])

    # 2) merge overlaps of the same label
    merged = []
    for lbl, s, e in sorted(expanded, key=lambda x: (x[1], x[2])):
        if merged and merged[-1][0] == lbl and merged[-1][2] >= s:
            merged[-1][2] = max(merged[-1][2], e)
        else:
            merged.append([lbl, s, e])
    return merged

def highlight_text(text, spans):
    """Show inline tags like [SECRET:xxxxx] with word-boundary expansion."""
    spans = _expand_and_merge(text, spans)
    out, last = [], 0
    for lbl, s, e in spans:
        out.append(text[last:s])
        out.append(f"[{lbl}:{text[s:e]}]")
        last = e
    out.append(text[last:])
    return "".join(out)

def redact_text(text, spans):
    """Replace expanded spans with [REDACTED:<LABEL>]."""
    spans = _expand_and_merge(text, spans)
    chars = list(text)
    for lbl, s, e in sorted(spans, key=lambda x: x[1], reverse=True):
        chars[s:e] = f"[REDACTED:{lbl}]"
    return "".join(chars)

# 🔹 Try examples here (FIX: add missing comma)
samples = [
    "Email me at alice99@corp.local and use password Tr0ub4dor&3",
    "Set API_KEY=ghp_ABC123456789xyz when deploying",
    "My name is Maitha",
    "My password is MHyr7"
]

for txt in samples:
    spans = detect_hybrid(txt)
    print("Input:    ", txt)
    print("Detected: ", spans)
    print("Marked:   ", highlight_text(txt, spans))
    print("Redacted: ", redact_text(txt, spans))
    print("-"*60)


Input:     Email me at alice99@corp.local and use password Tr0ub4dor&3
Detected:  [['EMAIL', 12, 30], ['SECRET', 48, 59]]
Marked:    Email me at [EMAIL:alice99@corp.local] and use password [SECRET:Tr0ub4dor&3]
Redacted:  Email me at [REDACTED:EMAIL] and use password [REDACTED:SECRET]
------------------------------------------------------------
Input:     Set API_KEY=ghp_ABC123456789xyz when deploying
Detected:  [['SECRET', 12, 31]]
Marked:    Set API_KEY=[SECRET:ghp_ABC123456789xyz] when deploying
Redacted:  Set API_KEY=[REDACTED:SECRET] when deploying
------------------------------------------------------------
Input:     My name is Maitha
Detected:  []
Marked:    My name is Maitha
Redacted:  My name is Maitha
------------------------------------------------------------
Input:     My password is MHyr7
Detected:  [['SECRET', 15, 19]]
Marked:    My password is [SECRET:MHyr7]
Redacted:  My password is [REDACTED:SECRET]
------------------------------------------------------------


In [40]:
def entities_view(text, spans):
    """Return ONLY the detected entities as [LABEL:VALUE] lines."""
    spans = _expand_and_merge(text, spans)
    if not spans: return "—"
    return "\n".join(f"[{lbl}:{text[s:e]}]" for lbl, s, e in spans)

def strip_text(text, spans):
    """Remove detected spans from the text (safe to share)."""
    spans = _expand_and_merge(text, spans)
    chars = list(text)
    for _, s, e in sorted(spans, key=lambda x: x[1], reverse=True):
        del chars[s:e]
    safe = "".join(chars)
    # tidy spacing
    safe = re.sub(r"\s{2,}", " ", safe)
    safe = re.sub(r"\s+([,.;:!?])", r"\1", safe)
    return safe.strip()

EXAMPLES = [
    "My email is MaithaHabib@hotmailcom",
    "Email me at alice99@corp.local and use password Tr0ub4dor&3",
    "Set API_KEY=ghp_ABC123456789xyz when deploying",
    "This is a safe line, nothing secret here",
]

In [43]:
def warning_box(spans):
    """
    Return a styled HTML notice:
      • Yellow warning box if any spans
      • Green success box if none
    """
    if not spans:
        return """
<div style="margin:8px 0;padding:12px;border-radius:10px;border:1px solid #b6e3b6;background:#eef9ee;color:#0f5132">
  ✅ No sensitive information detected.
</div>"""

    # counts per label for a pro touch
    counts = {}
    for lbl, _, _ in spans:
        counts[lbl] = counts.get(lbl, 0) + 1
    items = "".join(f"<li>{lbl}: <b>{n}</b></li>" for lbl, n in sorted(counts.items()))

    return f"""
<div style="margin:8px 0;padding:12px;border-radius:10px;border:1px solid #ffecb5;background:#fff8e1;color:#664d03">
  <b>⚠️ Sensitive information detected</b>
  <ul style="margin:8px 0 0 18px">{items}</ul>
  <div style="margin-top:6px">Please avoid sharing this information in public or insecure channels.</div>
</div>"""


In [51]:
# --- Web UI only (keep your pipeline unchanged) ---
!pip -q install gradio==4.44.0
import gradio as gr

# ---------- traffic-light styles ----------
TRAFFIC_CSS = """
.safe-box {
  background:#1e4620; color:#a3f7b5; padding:10px; border-radius:8px;
  border:1px solid #28a745; font-weight:600;
}
.warning-box {
  background:#2b2b2b; color:#ffcc00; padding:10px; border-radius:8px;
  border:1px solid #ffcc00; font-weight:600;
}
.critical-box {
  background:#460000; color:#ff7a7a; padding:10px; border-radius:8px;
  border:1px solid #ff1a1a; font-weight:600;
}
"""

def ui_predict(text, mode):
    if not text or not text.strip():
        return "<em>Type or paste some text above…</em>"

    # 1) detect
    spans = detect_hybrid(text)

    # 2) guardrail
    max_cover = int(0.7 * len(text))
    spans = [s for s in spans if s[2] > s[1] and (s[2]-s[1]) <= max_cover]

    # 3) counts per type
    counts = {}
    for lbl,_,_ in spans:
        counts[lbl] = counts.get(lbl, 0) + 1

    # 4) build multiple traffic-light boxes
    boxes = []

    if not spans:
        boxes.append("""
        <div class="safe-box">
            ✅ No sensitive information detected. Safe to share.
        </div>
        """)

    if counts.get("EMAIL"):
        boxes.append(f"""
        <div class="warning-box">
            ⚠️ Warning: Sensitive information detected.<br>
            <b>Detected:</b> {counts['EMAIL']} EMAIL{'s' if counts['EMAIL']>1 else ''}<br>
            Handle with caution before sharing.
        </div>
        """)

    if counts.get("SECRET"):
        boxes.append(f"""
        <div class="critical-box">
            🔴 Critical: Highly confidential information detected.<br>
            <b>Detected:</b> {counts['SECRET']} SECRET{'s' if counts['SECRET']>1 else ''}<br>
            Please do <u>not</u> share this in public or insecure channels.
        </div>
        """)

    # 5) output view
    original = text
    unsafe_list = "\n".join(f"[{lbl}:{text[s:e]}]" for lbl,s,e in spans) or "-"
    safe = strip_text(text, spans)

    if mode == "Unsafe":
        html = f"<h4>Original</h4><pre>{original}</pre><h4>Unsafe / Redacted</h4><pre>{unsafe_list}</pre>"
    elif mode == "Safe":
        html = f"<h4>Safe</h4><pre>{safe}</pre>"
    else:
        html = f"<h4>Original</h4><pre>{original}</pre><h4>Unsafe / Redacted</h4><pre>{unsafe_list}</pre><h4>Safe</h4><pre>{safe}</pre>"

    return "".join(boxes) + html




with gr.Blocks(theme="gradio/soft", css=TRAFFIC_CSS) as demo:
    gr.Markdown("# 🔒 Sensitive Info Detector")
    inp  = gr.Textbox(label="Input text", lines=7, placeholder="Paste text here…")
    view = gr.Radio(["Both","Unsafe","Safe"], value="Both", label="View")
    out  = gr.HTML(label="Output")
    btn  = gr.Button("Detect", variant="primary")

    # examples (optional)
    gr.Examples(
        examples=[
            "The meeting is at 10am tomorrow. Nothing sensitive here.",
            "Contact me at maria.lopez@gmail.com about the draft.",
            "Login password is Summer2024! and the API key is ghp_abc123DEF456ghi789.",
            "Please email me at david.smith@gmail.com. Also, the system password is Winter2025! and the deployment key is ghp_XYZ123456789abcd. Do not share this outside the team.",
        ],
        inputs=inp
    )

    btn.click(ui_predict, inputs=[inp, view], outputs=out)

demo.launch(share=True)


--------


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
Running on public URL: https://135e2f0707f8150600.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)


