# Anon demo

**Assignment**:

1) Anon the user field and create a mapping
2) Anon the IP address field (which is expressed as a string, but can also map to a 32bit integer. [yet sparsely so in our data]
3) Make a vectors of the anonymous data and clear data
4) Generate a "ragged array" of the data in this form (i.e. dump out the records which are of variable length)
Bonus) Tell us which users jobs fail and how often as an absolute number, and as a fraction of the whole dataset.

In [17]:
# --- Minimal dependencies ---
import hmac, hashlib, base64, ipaddress, json
from typing import Any, Iterable, Dict, Optional
import numpy as np
import pandas as pd
import datetime as _dt
import glob
import duckdb

In [18]:
#!pip install duckdb

### Helpers

In [19]:
# ---------- Helpers ----------
def _b(x: Any) -> bytes:
    """Normalize values to bytes for HMAC; preserves None/NaN."""
    if x is None:
        return b""
    try:
        if pd.isna(x):
            return b""
    except Exception:
        pass
    if isinstance(x, bytes):
        return x
    return str(x).encode("utf-8", errors="ignore")

def token_hmac_b32(value: Any, key: bytes, length: int = 22) -> str:
    """Deterministic HMAC-SHA256 → Base32 (truncated). Good for user pseudonyms."""
    mac = hmac.new(key, _b(value), hashlib.sha256).digest()
    tok = base64.b32encode(mac).decode("ascii").rstrip("=")[:length]
    return tok.lower()

def anon_ipv4(value: Any, key: bytes) -> Any:
    """
    Deterministically map an IPv4 string to a pseudonymous dotted-quad using HMAC.
    - Fast and stable but NOT prefix-preserving.
    - If value is not a valid IPv4, return it unchanged (None/NaN/IPv6/garbage).
    """
    if value is None:
        return value
    s = str(value).strip()
    try:
        ipaddress.IPv4Address(s)
    except Exception:
        return value
    dig = hmac.new(key, s.encode("utf-8"), hashlib.sha256).digest()
    b0, b1, b2, b3 = dig[0], dig[1], dig[2], dig[3]
    return f"{b0}.{b1}.{b2}.{b3}"

def ipv4_to_int(value: Any) -> Any:
    """Convert dotted IPv4 to 32-bit integer (NaN if invalid/missing)."""
    try:
        return int(ipaddress.IPv4Address(str(value)))
    except Exception:
        return np.nan

def _to_jsonable(v):
    """Convert common pandas/NumPy types to JSON-safe Python types."""
    # None/NaN
    try:
        if pd.isna(v):
            return None
    except Exception:
        pass

    # pandas / numpy datetimes
    if isinstance(v, pd.Timestamp):
        # keep timezone info if present
        return v.isoformat()
    if isinstance(v, (np.datetime64,)):
        return pd.to_datetime(v).isoformat()

    # date/time
    if isinstance(v, (_dt.datetime, _dt.date, _dt.time)):
        try:
            return v.isoformat()
        except Exception:
            return str(v)

    # numpy scalars
    if isinstance(v, (np.integer,)):
        return int(v)
    if isinstance(v, (np.floating,)):
        f = float(v)
        # normalize inf/nan which JSON can't represent
        if np.isnan(f) or np.isinf(f):
            return None
        return f
    if isinstance(v, (np.bool_,)):
        return bool(v)

    # nested containers: convert recursively
    if isinstance(v, (list, tuple)):
        return [_to_jsonable(x) for x in v]
    if isinstance(v, dict):
        return {str(k): _to_jsonable(x) for k, x in v.items()}

    # leave strings/ints/floats/bools as-is
    return v

def to_ragged_records(df: pd.DataFrame) -> list[dict]:
    """
    Produce a ragged list of dicts (only non-null fields), JSON-safe.
    """
    out = []
    for _, row in df.iterrows():
        obj = {}
        for c, v in row.items():
            # skip null-like
            try:
                if pd.isna(v):
                    continue
            except Exception:
                pass
            jv = _to_jsonable(v)
            if jv is not None:
                obj[c] = jv
        out.append(obj)
    return out

# ---------- Main pipeline ----------
def process_dataset(
    df: pd.DataFrame,
    user_col: str,
    ip_col: str,
    status_col: str,
    failure_values: Iterable[Any],
    user_key_hex: str,
    ip_key_hex: str,
    vector_cols_clear: Iterable[str] = (),
    vector_cols_anon: Iterable[str] = (),
) -> Dict[str, Any]:
    """
    End-to-end anonymization + analysis pipeline (no files written).

    Returns a dict:
      {
        "user_mapping": DataFrame[ user, anon_user ],
        "ip_mapping": DataFrame[ ip, anon_ip ],
        "df_anon": DataFrame  # original df + anon_user, anon_ip, ip_int, anon_ip_int
        "vectors_clear": np.ndarray | None,
        "vectors_anon": np.ndarray | None,
        "ragged_records": list[dict],  # variable-length records (non-null only)
        "failure_stats": DataFrame[ anon_user, fails, jobs, fail_fraction ],
      }
    """
    # sanity checks
    for col, name in [(user_col, "user_col"), (ip_col, "ip_col"), (status_col, "status_col")]:
        if col not in df.columns:
            raise ValueError(f"Missing {name}: '{col}' not in df.columns")

    user_key = bytes.fromhex(user_key_hex)
    ip_key   = bytes.fromhex(ip_key_hex)

    # 1) Anonymize user + mapping
    anon_user = df[user_col].map(lambda x: token_hmac_b32(x, user_key, 22))
    user_mapping = pd.DataFrame({user_col: df[user_col], "anon_user": anon_user}).drop_duplicates()

    # 2) Anonymize IP + mapping; also integer forms
    anon_ip = df[ip_col].map(lambda x: anon_ipv4(x, ip_key))
    ip_mapping = pd.DataFrame({ip_col: df[ip_col], "anon_ip": anon_ip}).drop_duplicates()

    # Build anonymized frame
    df_anon = df.copy()
    df_anon["anon_user"]   = anon_user
    df_anon["anon_ip"]     = anon_ip
    df_anon["ip_int"]      = df[ip_col].map(ipv4_to_int)
    df_anon["anon_ip_int"] = df_anon["anon_ip"].map(ipv4_to_int)

    # 3) Vectors (clear + anonymized)
    vectors_clear = None
    vectors_anon  = None
    if vector_cols_clear:
        missing = [c for c in vector_cols_clear if c not in df.columns]
        if missing:
            raise KeyError(f"vector_cols_clear missing columns: {missing}")
        vectors_clear = df.loc[:, list(vector_cols_clear)].to_numpy()

    if vector_cols_anon:
        missing = [c for c in vector_cols_anon if c not in df_anon.columns]
        if missing:
            raise KeyError(f"vector_cols_anon missing columns: {missing}")
        vectors_anon = df_anon.loc[:, list(vector_cols_anon)].to_numpy()

    # 4) Ragged records (variable length per row)
    ragged_records = to_ragged_records(df_anon)

    return {
        "user_mapping": user_mapping,
        "ip_mapping": ip_mapping,
        "df_anon": df_anon,
        "vectors_clear": vectors_clear,
        "vectors_anon": vectors_anon,
        "ragged_records": ragged_records,
    }

### test code with demo data

In [20]:
# ---------- Example usage (delete/modify for your data) ----------
if __name__ == "__main__":
    demo = pd.DataFrame({
        "user": ["alice","bob","alice","carol","bob","dave", None],
        "ip": ["192.168.1.10","10.0.0.4","192.168.1.10","8.8.8.8","10.0.0.4","bad-ip", None],
        "status": ["OK","FAIL","OK","TIMEOUT","FAIL","OK","FAIL"],
        "payload_size": [100, 200, 150, 180, 300, 120, 90],
    })
    user_key_hex = hashlib.sha256(b"user-demo-key").hexdigest()
    ip_key_hex   = hashlib.sha256(b"ip-demo-key").hexdigest()

    out = process_dataset(
        demo,
        user_col="user",
        ip_col="ip",
        status_col="status",
        failure_values=("FAIL","TIMEOUT"),
        user_key_hex=user_key_hex,
        ip_key_hex=ip_key_hex,
        vector_cols_clear=("payload_size",),
        vector_cols_anon=("payload_size","anon_ip_int"),
    )

    # quick peeks
    print(out["user_mapping"].head(), "\n")
    print(out["ip_mapping"].head(), "\n")
    print("vectors_clear shape:", None if out["vectors_clear"] is None else out["vectors_clear"].shape)
    print("vectors_anon  shape:", None if out["vectors_anon"]  is None else out["vectors_anon"].shape)
    print("ragged_records sample:", json.dumps(out["ragged_records"][:2], indent=2))


    user               anon_user
0  alice  2py4pkutebquzeyxcpn3jl
1    bob  ogmmf5dnd5itcvtdzsccob
3  carol  rtf2tr6d4rvskvy6mavpjo
5   dave  midzqblkeynmz43s6mtd55
6   None  jmzyeev7rykqcvkzfstsuy 

             ip       anon_ip
0  192.168.1.10   45.54.22.29
1      10.0.0.4   70.4.146.73
3       8.8.8.8  131.15.92.55
5        bad-ip        bad-ip
6          None          None 

vectors_clear shape: (7, 1)
vectors_anon  shape: (7, 2)
ragged_records sample: [
  {
    "user": "alice",
    "ip": "192.168.1.10",
    "status": "OK",
    "payload_size": 100,
    "anon_user": "2py4pkutebquzeyxcpn3jl",
    "anon_ip": "45.54.22.29",
    "ip_int": 3232235786.0,
    "anon_ip_int": 758519325.0
  },
  {
    "user": "bob",
    "ip": "10.0.0.4",
    "status": "FAIL",
    "payload_size": 200,
    "anon_user": "ogmmf5dnd5itcvtdzsccob",
    "anon_ip": "70.4.146.73",
    "ip_int": 167772164.0,
    "anon_ip_int": 1174704713.0
  }
]


In [21]:
anonymized_columns = ["x509UserProxyEmail","User","JobsubClientIpAddress"]
DATA_DIR = "../data"
USER_COL = "User"
IP_COL   = "JobsubClientIpAddress"  # can be IPv4 string or 32-bit int in your data
FAILED_COL = "DAG_NodesFailed"  # boolean-ish; see failure_stats()

# Minimal demo DataFrame (replace with your read_parquet)
fnames_in = f"{DATA_DIR}/fifebatch-history-*.parquet"
files = glob.glob(fnames_in)
print(f"Found {len(files)} files")
n_files = 2

Found 6 files


**Read data using pandas native read_parquet()**

In [22]:
# Read and concatenate
df_all = pd.concat([pd.read_parquet(f, engine="fastparquet") for f in files[:n_files]], ignore_index=True)
print(f"Read {len(files[:n_files])} and loaded demo:{len(df_all)}")

Read 2 and loaded demo:74500


**Read data using duckdb**

In [23]:
pattern = f"{DATA_DIR}/*.parquet"
query = f"SELECT User, RequestMemory, CumulativeSlotTime, JobsubClientIpAddress, MATCH_EXP_JOB_Site, DAG_NodesFailed, NumJobCompletions, NumJobStarts FROM '{pattern}'"
rel_obj = duckdb.sql(query)

In [24]:
df = rel_obj.df()

In [25]:
len(df)

281874

In [26]:
USER_COL = "User"
IP_COL   = "JobsubClientIpAddress"  # can be IPv4 string or 32-bit int in your data
FAILED_COL = "DAG_NodesFailed"  # boolean-ish; see failure_stats()
if __name__ == "__main__":
    user_key_hex = hashlib.sha256(b"user-fnal-key").hexdigest()
    ip_key_hex   = hashlib.sha256(b"ip-fnal-key").hexdigest()

    out = process_dataset(
        df.iloc[:10000],
        user_col=USER_COL, 
        ip_col=IP_COL,
        status_col=FAILED_COL,
        failure_values=("FAIL","TIMEOUT"),
        user_key_hex=user_key_hex,
        ip_key_hex=ip_key_hex,
        vector_cols_clear=("User",),
        vector_cols_anon=("User","anon_ip_int"),
    )

    # quick peeks
    print(out["user_mapping"].head(), "\n")
    print(out["ip_mapping"].head(), "\n")
    print("vectors_clear shape:", None if out["vectors_clear"] is None else out["vectors_clear"].shape)
    print("vectors_anon  shape:", None if out["vectors_anon"]  is None else out["vectors_anon"].shape)
    print("ragged_records sample:", json.dumps(out["ragged_records"][:2], indent=2))

                  User               anon_user
0   uboonepro@fnal.gov  qmr6f7pxor74ewisnff3bb
4   icaruspro@fnal.gov  zm54y4nwfcvespmillgesm
6     gputnam@fnal.gov  mm5sjkr7sqoe5k4oyjcexs
28   amakovec@fnal.gov  qpms2iqoxvfvggtsis4yzz
43    novapro@fnal.gov  rys4gtfwdsoxmvxelfei7e 

   JobsubClientIpAddress         anon_ip
0        131.225.240.146  157.114.23.207
4         131.225.240.90    197.91.33.50
6        131.225.240.140    35.37.41.247
28       131.225.240.190  76.116.142.231
43         131.225.67.73   91.150.29.119 

vectors_clear shape: (10000, 1)
vectors_anon  shape: (10000, 2)
ragged_records sample: [
  {
    "User": "uboonepro@fnal.gov",
    "RequestMemory": 2000,
    "CumulativeSlotTime": 727,
    "JobsubClientIpAddress": "131.225.240.146",
    "MATCH_EXP_JOB_Site": "FermiGrid",
    "NumJobCompletions": "1",
    "NumJobStarts": 1,
    "anon_user": "qmr6f7pxor74ewisnff3bb",
    "anon_ip": "157.114.23.207",
    "ip_int": 2212622482.0,
    "anon_ip_int": 2641500111.0
  },
  

In [28]:
n_job_failures = (df.NumJobStarts.sum() -  df.NumJobCompletions.astype('int').sum())
job_failure_frac = n_job_failures / df.NumJobStarts.sum()
print(f"job failure fraction %: {job_failure_frac:.3%}, job failure abs number: {n_job_failures}")

job failure fraction %: 9.581%, job failure abs number: 24400


In [29]:
df[(df.NumJobCompletions.astype('int')==0)&(df.NumJobStarts>0)].User.unique().tolist()

['uboonepro@fnal.gov',
 'osg@fnal.gov',
 'novapro@fnal.gov',
 'mu2epro@fnal.gov',
 'normanm@fnal.gov',
 'dunegli@fnal.gov',
 'minervapro@fnal.gov',
 'vnagasl@fnal.gov',
 'cmsgli@fnal.gov',
 'gm2pro@fnal.gov',
 'hcc@fnal.gov',
 'omorenop@fnal.gov',
 'icaruspro@fnal.gov',
 'anezkak@fnal.gov',
 'imawby@fnal.gov',
 'carneiro@fnal.gov',
 'ichong@fnal.gov',
 'zdar@fnal.gov',
 'gputnam@fnal.gov',
 'laliaga@fnal.gov',
 'c7cnz47c4mtyh22tfbpn3f',
 '4qvket2bv3jfhsqcxavpcl',
 'co7gjxiwgktaid3nx24kgt',
 'sdgawgbkhw7ld4ig3ndebc',
 '3updl67txqg64cq6sd4ul4',
 'lnphkrdnyj5vljafrncy4f',
 '5vbh7xkbwphb3gyvpanxs6',
 'j4gj4l3bkzyd4plbalpmhd',
 '2szjjkxxg5tkoqpwnah2g4',
 'gakaqchwlk37xxojwxepxg']

In [None]:
import itables

In [None]:
itables.show(df_all.iloc[:1000])

In [None]:
[el for el in df_all.columns.tolist() if 'job' in el.lower()]