In [1]:
data_dir = "../custom_data/raw"
out_dir = "/home/omadbek/projects/run_all/out-notebooks-cta/rule-based-regex-results"
run_id   = 1

In [2]:
# Parameters
data_dir = "/home/omadbek/projects/Sherlock/custom_data/raw"
out_dir = "/home/omadbek/projects/run_all/out-notebooks-cta/rule-based-regex-results"
run_id = 10


In [3]:
%env PYTHONHASHSEED=13
%env PYTHONHASHSEED

env: PYTHONHASHSEED=13


'13'

In [4]:
from pathlib import Path
import pandas as pd
import numpy as np
from sklearn.metrics import classification_report, f1_score
import re
from enum import unique
from typing import Sequence, Optional
import time

In [5]:
ISO_DATE        = re.compile(r'^(19|20)\d{2}-[01]\d-[0-3]\d$')
SLASH_DATE      = re.compile(r'^[0-3]\d/[01]\d/(19|20)\d{2}$')
VACC_DATE       = re.compile(r'^\d{6}_\w{7,8}$')              # 240119_46576392
EXCEL_SERIAL    = re.compile(r'^\d{5,6}(\.0)?$')
HEX_ID          = re.compile(r'^[0-9a-f]{6,64}$', re.IGNORECASE)
GENOMIC_ID      = re.compile(r'^EPI_ISL_\d{6,9}$', re.IGNORECASE)
AGE_PAT         = re.compile(r'^(0?\d{1,2}|1[01]\d|120)(\.0)?$')

DATE_RE = re.compile(
    r'^(?:'
    # ISO dates: YYYY-MM-DD, years 1900–2099, months 01–12, days 01–31
    r'(?:19|20)\d{2}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[12]\d|3[01])'
    r'|'
    # Slashed dates: DD/MM/YYYY, days 01–31, months 01–12, years 1900–2099
    r'(?:0[1-9]|[12]\d|3[01])/(?:0[1-9]|1[0-2])/(?:19|20)\d{2}'
    r'|'
    # Vaccine codes: six digits, underscore, 7–8 alnum chars
    r'\d{6}_[A-Za-z0-9]{7,8}'
    r'|'
    # Excel serials: 5–6 digits, optional “.0”
    r'\d{5,6}(?:\.0)?'
    r')$'
)

GENERAL_ID = re.compile(
    r'^(?=.*[A-Za-z])(?=.*\d)[A-Za-z0-9.-]{3,30}$',
    re.VERBOSE
)


YES_NO          = re.compile(
    r'^(yes|no|oui|non|unknown|inconnu|y|n|0\.0|1\.0)$',
    re.IGNORECASE
)
GENDER_PAT      = re.compile(
    r'^(male|female|m|f|other)$',
    re.IGNORECASE
)
OUTCOME_PAT     = re.compile(
    r'^(recovered|died|alive|deceased|discharged.*|not recovered|other)$',
    re.IGNORECASE
)
CASE_STATUS_PAT = re.compile(
    r'^(confirme?|prob|disc|suspected case|invalidée|confirmed case)$',
    re.IGNORECASE
)

SYM_DELIMS      = re.compile(r'[|;/]')   # just to token-split if you need

# comma-separated list of roles/persons
CONTACT_COMMENT_PAT = re.compile(
    r"^[A-Z][a-z'-]+ [A-Z][a-z'-]+"
    r"(?:, ?[A-Z][a-z'-]+ [A-Z][a-z'-]+)*$"
)

# confirmation method options
CONFIRMATION_METHOD_PAT = re.compile(
    r'^(?:clinical signs and symptoms only|laboratory confirmed|other|samples)'
    r'(?:,(?:clinical signs and symptoms only|laboratory confirmed|other|samples))*$',
    re.IGNORECASE
)

# crude pipe-delimited contact_animal entries
CONTACT_ANIMAL_PAT = re.compile(r'.*\|.*')

# comma-separated list of occupations (allow slash, hyphens, semicolons)
# strict list of known occupations, case‐insensitive
titles = [
    "doctor","nurse","teacher","driver","farmer","student","engineer",
    "chef","laborer","police officer","police","soldier","technician",
    "manager","cleaner","waiter"
]

# build an “alternation” of those titles, escaping spaces
escaped = [re.escape(t) for t in titles]
group   = "|".join(escaped)

OCCUPATION_PAT = re.compile(
    r'(?i)^'                   # inline case-insensitive, at the very start
    r'(?:' + group + r')'      # one of the known titles
                     r'(?:'                     # optional repeat...
                     r'(?:,\s*|;\s*|/\s*)'    #   separator
                     r'(?:' + group + r')'    #   another title
                                      r')*'                      # zero or more
                                      r'$'                       # end of string
)

LOCATION_PAT = re.compile(
    r"^[A-Z][A-Za-zÀ-ÿ' .-]{3,}(?:, ?[A-Z][A-Za-z]{2,})?$"
)

# known pathogens
PATHOGEN_PAT = re.compile(r'^(?:novhep|cholera)$', re.IGNORECASE)

# generic comma-separated source identifiers (allow underscores, slashes)
SOURCE_PAT = re.compile(
    r'^(?:[A-Z]{2,5}\d{2,}|https?://)', re.IGNORECASE
)
# ------------------------------------------------------------------ #
# RULES  label → list[pattern]                                       #
# first pattern that matches *one* value wins                        #
# ------------------------------------------------------------------ #
RULES = {
    "date":               [ISO_DATE, SLASH_DATE, VACC_DATE, EXCEL_SERIAL, DATE_RE],
    "id":                 [GENERAL_ID],
    "age":                [AGE_PAT],
    "medical_boolean":    [YES_NO],
    "gender":             [GENDER_PAT],
    "outcome":            [OUTCOME_PAT],
    "case_status":        [CASE_STATUS_PAT],
    "genomics_metadata":  [GENOMIC_ID],
    "location":           [LOCATION_PAT],
    "symptoms":           [re.compile(r'[A-Za-z]{3,}.*\|.*')],     # crude pipe-list
    "pre_existing_condition": [
        re.compile(r'^(diab|hyper|neuro|none|preg|yes|no)$', re.IGNORECASE)
    ],
    "contact_setting":    [
        re.compile(r'^(famille|funerailles|communaute|household|school)$', re.IGNORECASE)
    ],
    "transmission":       [
        re.compile(r'^(funerailles|communautaire|nosocomiale)$', re.IGNORECASE)
    ],
    "vaccine_name":       [
        re.compile(r'^(pfizer|moderna|com)$', re.IGNORECASE)
    ],
    "demographic":        [
        re.compile(r'^(male|female|other|age|m|f)$', re.IGNORECASE)
    ],
    # new fields:
    "contact_comment":    [CONTACT_COMMENT_PAT],
    "confirmation_method": [CONFIRMATION_METHOD_PAT],
    "contact_animal":      [CONTACT_ANIMAL_PAT],
    "occupation":          [OCCUPATION_PAT],
    "pathogen":            [PATHOGEN_PAT],
    "source":              [SOURCE_PAT],

}


DELIM_RE = re.compile(r'[|,;/]')

def classify_column(values: Sequence[str]) -> Optional[str]:
    for label, patterns in RULES.items():
        for pat in patterns:
            # look at every raw cell…
            for raw in values:
                if raw is None:
                    continue
                # split into pieces
                for token in DELIM_RE.split(str(raw).strip()):
                    if pat.match(token.strip()):
                        return label
    return None

In [6]:
df_vals = pd.read_parquet(f"{data_dir}/test_data.parquet")
df_labels = pd.read_parquet(f"{data_dir}/test_labels.parquet").values.flatten()   # same order as df “columns”

value_col  = df_vals.columns[1]            # change if needed
label_col  = df_labels

values = df_vals[value_col].astype(str).tolist()
y_true = label_col

# optional: same remap/lowercase you used in training

y_true = np.array([x.lower() for x in y_true])

In [7]:
#len(df_labels)

In [8]:
start = time.perf_counter()
# Rule-based predictions
y_pred = [classify_column([v]) for v in values]   # wrap each val in list
end = time.perf_counter()

inference_time = end - start
print(f"Inference time: {inference_time:.2f}s")


#print("Predicted by model")
#print(y_pred)
# if classify_column returned None, keep it as special token
y_pred = ["__none__" if p is None else p for p in y_pred]

#print("After pred")
#print(y_pred)

#print(y_pred)

Inference time: 0.01s


In [9]:
# Output metrics
macro_f1    = f1_score(y_true, y_pred, average="macro", zero_division=0)
weighted_f1 = f1_score(y_true, y_pred, average="weighted", zero_division=0)

print(f"Macro-F1   : {macro_f1:.4f}")
print(f"Weighted F1: {weighted_f1:.4f}")

present_ids   = np.unique(np.concatenate([y_true, y_pred]))
print(classification_report(
    y_true, y_pred,
    labels=present_ids,
    target_names=present_ids,
    digits=3,
    zero_division=0))

Macro-F1   : 0.4267
Weighted F1: 0.7209
                 precision    recall  f1-score   support

       __none__      0.000     0.000     0.000         0
            age      0.400     1.000     0.571         4
    case_status      1.000     0.600     0.750         5
contact_setting      1.000     0.500     0.667         2
           date      1.000     0.960     0.980        25
         gender      1.000     1.000     1.000         4
             id      0.875     0.778     0.824         9
       location      0.000     0.000     0.000        12
medical_boolean      0.939     0.775     0.849        40
     occupation      0.000     0.000     0.000         2
        outcome      0.500     0.250     0.333         4
       pathogen      0.000     0.000     0.000         0
         source      0.000     0.000     0.000         0
       symptoms      0.000     0.000     0.000         2

       accuracy                          0.688       109
      macro avg      0.480     0.419     0.427

In [10]:
import os



run_name = f"rule-based-regex-run-{run_id}"

# 1. get the report dict
raw_report = classification_report(
    y_true, y_pred,
    output_dict=True,
    zero_division=0
)

n = len(y_true)

# 2. flatten into a single dict
flat = {}
for label, metrics in raw_report.items():
    if label == "accuracy":
        flat["accuracy"] = metrics
    else:
        for metric_name, val in metrics.items():
            # replace any dashes so your CSV columns are valid identifiers
            clean_metric = metric_name.replace("-", "_")
            flat[f"{label}_{clean_metric}"] = val

# 3. add summary fields _after_ flattening
flat["total_entries"] = n
flat["run_name"]      = run_name
flat["inference_time"] = f"{inference_time:.2f}s"

# 4. write once
df = pd.DataFrame([flat])
metrics_csv = os.path.join(out_dir, "rule-based-metrics.csv")

# only write header if file doesn’t exist
if not os.path.isfile(metrics_csv):
    df.to_csv(metrics_csv, index=False, float_format="%.4f")
else:
    df.to_csv(metrics_csv, mode="a", header=False, index=False, float_format="%.4f")