In [1]:
# CÉLULA 2.1
import os
from pathlib import Path
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

print("CWD:", os.getcwd())  # deve ser .../blue-team-aml-portfolio/pandas
DATA = Path("../datasets")
assert DATA.exists(), f"Pasta {DATA} não encontrada."

paysim = pd.read_parquet(DATA / "paysim_sample.parquet")
pep    = pd.read_parquet(DATA / "opensanctions_pep_sample.parquet")  # pode não usar hoje

# normalizações idempotentes
if "ts" in paysim.columns:
    paysim["ts"] = pd.to_datetime(paysim["ts"], errors="coerce")
paysim = paysim.sort_values(["cpf","ts"]).reset_index(drop=True)

for col in ["cpf","device_id","ip","asn","city","channel","merchant_id","beneficiary_id"]:
    if col in paysim.columns:
        paysim[col] = paysim[col].astype(str).str.strip()

print("OK: dados carregados e normalizados")
paysim.head(3)


CWD: /Users/ricardoalmeida/Projetos/blue-team-aml-portfolio/pandas
OK: dados carregados e normalizados


Unnamed: 0,ts,cpf,device_id,ip,asn,city,amount,currency,channel,merchant_id,beneficiary_id,type
0,2025-09-01 10:00:00,111,devA,200.100.10.1,AS123,Campinas,120,BRL,app,m01,b01,payment
1,2025-09-01 10:02:00,111,devA,200.100.10.1,AS123,Campinas,180,BRL,app,m02,b02,payment
2,2025-09-01 10:03:00,111,devB,177.23.44.9,AS456,São Paulo,300,BRL,web,m03,b03,payment


In [2]:
# CÉLULA 3.1 — VELOCITY (10min)
assert {"cpf","ts","amount"}.issubset(paysim.columns), "Colunas necessárias ausentes."

# contagem de transações em janela móvel de 10 minutos por cpf
tx_count_10 = (
    paysim
    .groupby("cpf")
    .rolling("10min", on="ts")["amount"]
    .count()
    .reset_index(level=0, drop=True)
)
paysim["tx_count_10min"] = tx_count_10

# limiar inicial (ajustaremos depois): >= 3 transações em 10 min
paysim["flag_velocity_10m_ge3"] = (paysim["tx_count_10min"] >= 3).astype(int)

paysim[["ts","cpf","amount","tx_count_10min","flag_velocity_10m_ge3"]].head(10)


Unnamed: 0,ts,cpf,amount,tx_count_10min,flag_velocity_10m_ge3
0,2025-09-01 10:00:00,111,120,,0
1,2025-09-01 10:02:00,111,180,,0
2,2025-09-01 10:03:00,111,300,,0
3,2025-09-01 10:25:00,111,700,,0
4,2025-09-01 10:20:00,222,50,,0
5,2025-09-01 11:10:00,333,10,,0


In [3]:
# CÉLULA 4.1 — GEO-SHIFT
assert {"cpf","ts","city","asn"}.issubset(paysim.columns), "Colunas city/asn ausentes."

# valores anteriores por cpf
paysim["prev_city"] = paysim.groupby("cpf")["city"].shift(1)
paysim["prev_asn"]  = paysim.groupby("cpf")["asn"].shift(1)
paysim["prev_ts"]   = paysim.groupby("cpf")["ts"].shift(1)

# mudança de cidade/asn
paysim["city_changed"] = (paysim["city"] != paysim["prev_city"]).astype(int)
paysim["asn_changed"]  = (paysim["asn"]  != paysim["prev_asn"]).astype(int)

# delta de tempo em minutos desde a transação anterior do mesmo cpf
paysim["mins_since_prev"] = (paysim["ts"] - paysim["prev_ts"]).dt.total_seconds() / 60.0

# geo-shift "brusco": mudou cidade OU ASN e ocorreu em <= 60 minutos desde a anterior
paysim["flag_geoshift_60m"] = (
    (paysim["city_changed"].fillna(0).astype(int).eq(1) | paysim["asn_changed"].fillna(0).astype(int).eq(1))
    & (paysim["mins_since_prev"].fillna(1e9) <= 60)
).astype(int)

paysim[["ts","cpf","city","asn","prev_city","prev_asn","mins_since_prev","flag_geoshift_60m"]].head(10)


Unnamed: 0,ts,cpf,city,asn,prev_city,prev_asn,mins_since_prev,flag_geoshift_60m
0,2025-09-01 10:00:00,111,Campinas,AS123,,,,0
1,2025-09-01 10:02:00,111,Campinas,AS123,Campinas,AS123,2.0,0
2,2025-09-01 10:03:00,111,São Paulo,AS456,Campinas,AS123,1.0,1
3,2025-09-01 10:25:00,111,Rio,AS789,São Paulo,AS456,22.0,1
4,2025-09-01 10:20:00,222,Rio,AS789,,,,0
5,2025-09-01 11:10:00,333,São Paulo,AS15169,,,,0


In [4]:
# CÉLULA 5.1 — MULTI-DEVICE/IP
assert {"cpf","ts","device_id","ip"}.issubset(paysim.columns), "Colunas device_id/ip ausentes."

# contagem de devices/ips únicos por janela móvel (60min)
def rolling_nunique(series, window="60min", group_key=None):
    # pandas não tem nunique direto no rolling -> usamos truque com cumsum de mudanças
    # aqui vamos gerar um "marcador" para cada elemento (cpf + valor) e contar distintos via set no resample
    # como o dataset é pequeno para protótipo, usamos abordagem simples: aplicar função custom no rolling obj
    return (
        series
        .rolling(window, on="ts")
        .apply(lambda x: len(set(x)), raw=False)  # raw=False para usar objetos Python
    )

paysim_dev = (
    paysim[["cpf","ts","device_id"]]
    .groupby("cpf", group_keys=False)
    .apply(lambda g: rolling_nunique(g.set_index("ts")["device_id"].reset_index(), window="60min").reset_index(drop=True))
)
paysim_ip = (
    paysim[["cpf","ts","ip"]]
    .groupby("cpf", group_keys=False)
    .apply(lambda g: rolling_nunique(g.set_index("ts")["ip"].reset_index(), window="60min").reset_index(drop=True))
)

paysim["uniq_devices_60m"] = paysim_dev.values
paysim["uniq_ips_60m"]     = paysim_ip.values

# flags com limiares iniciais
paysim["flag_multidevice_60m_ge2"] = (paysim["uniq_devices_60m"] >= 2).astype(int)
paysim["flag_multiip_60m_ge2"]     = (paysim["uniq_ips_60m"]     >= 2).astype(int)

paysim[["ts","cpf","device_id","ip","uniq_devices_60m","uniq_ips_60m","flag_multidevice_60m_ge2","flag_multiip_60m_ge2"]].head(12)


DataError: Cannot aggregate non-numeric type: object

In [5]:
# CÉLULA 5.1 (FIX) — MULTI-DEVICE/IP (60 min) com códigos numéricos

import numpy as np

assert {"cpf","ts","device_id","ip"}.issubset(paysim.columns), "Colunas device_id/ip ausentes."
# Garantir ordenação e tipos (idempotente)
paysim["ts"] = pd.to_datetime(paysim["ts"], errors="coerce")
paysim = paysim.sort_values(["cpf","ts"]).reset_index(drop=True)

# 1) Converter strings para códigos numéricos estáveis
# factorize() retorna (codes, uniques); códigos são inteiros (NaN vira -1)
paysim["device_code"] = pd.factorize(paysim["device_id"], sort=True)[0].astype("int64")
paysim["ip_code"]     = pd.factorize(paysim["ip"], sort=True)[0].astype("int64")

# 2) Função helper: rolling nunique sobre CÓDIGOS numéricos (ignora -1)
def rolling_nunique_codes(df, code_col, window="60min"):
    # df precisa ter colunas: ['cpf','ts', code_col]
    return (
        df
        .groupby("cpf")
        .rolling(window, on="ts")[code_col]
        .apply(lambda a: np.unique(a[a >= 0]).size, raw=True)  # raw=True -> numpy array
        .reset_index(level=0, drop=True)
    )

# 3) Calcular distintos na janela de 60 min
paysim["uniq_devices_60m"] = rolling_nunique_codes(paysim[["cpf","ts","device_code"]].copy(), "device_code")
paysim["uniq_ips_60m"]     = rolling_nunique_codes(paysim[["cpf","ts","ip_code"]].copy(),     "ip_code")

# 4) Flags com limiares iniciais (ajustaremos em tuning)
paysim["flag_multidevice_60m_ge2"] = (paysim["uniq_devices_60m"] >= 2).astype(int)
paysim["flag_multiip_60m_ge2"]     = (paysim["uniq_ips_60m"]     >= 2).astype(int)

# 5) Visualização rápida
paysim[["ts","cpf","device_id","ip","uniq_devices_60m","uniq_ips_60m","flag_multidevice_60m_ge2","flag_multiip_60m_ge2"]].head(12)


Unnamed: 0,ts,cpf,device_id,ip,uniq_devices_60m,uniq_ips_60m,flag_multidevice_60m_ge2,flag_multiip_60m_ge2
0,2025-09-01 10:00:00,111,devA,200.100.10.1,,,0,0
1,2025-09-01 10:02:00,111,devA,200.100.10.1,,,0,0
2,2025-09-01 10:03:00,111,devB,177.23.44.9,,,0,0
3,2025-09-01 10:25:00,111,devC,18.5.6.7,,,0,0
4,2025-09-01 10:20:00,222,devC,18.5.6.7,,,0,0
5,2025-09-01 11:10:00,333,devD,8.8.8.8,,,0,0


In [7]:
# CÉLULA 6.1 — CONSOLIDAÇÃO
flag_cols = [
    "flag_velocity_10m_ge3",
    "flag_geoshift_60m",
    "flag_multidevice_60m_ge2",
    "flag_multiip_60m_ge2",
]
for c in flag_cols:
    assert c in paysim.columns, f"{c} não encontrado"

paysim["risk_score_v1"] = paysim[flag_cols].sum(axis=1)

paysim[["ts","cpf","amount"] + flag_cols + ["risk_score_v1"]].sort_values(["risk_score_v1","ts"], ascending=[False,True]).head(20)


Unnamed: 0,ts,cpf,amount,flag_velocity_10m_ge3,flag_geoshift_60m,flag_multidevice_60m_ge2,flag_multiip_60m_ge2,risk_score_v1
2,2025-09-01 10:03:00,111,300,0,1,0,0,1
3,2025-09-01 10:25:00,111,700,0,1,0,0,1
0,2025-09-01 10:00:00,111,120,0,0,0,0,0
1,2025-09-01 10:02:00,111,180,0,0,0,0,0
4,2025-09-01 10:20:00,222,50,0,0,0,0,0
5,2025-09-01 11:10:00,333,10,0,0,0,0,0


In [8]:
# CÉLULA 7.1 — MÉTRICAS SIMPLES
def hit_rate(s):
    return float(s.sum()) / max(len(s),1)

summary = pd.DataFrame({
    "rule": flag_cols + ["risk_score_v1_ge1"],
    "hit_rate": [hit_rate(paysim[c]) for c in flag_cols] + [hit_rate((paysim["risk_score_v1"]>=1).astype(int))]
}).sort_values("hit_rate", ascending=False)

summary


Unnamed: 0,rule,hit_rate
1,flag_geoshift_60m,0.333333
4,risk_score_v1_ge1,0.333333
0,flag_velocity_10m_ge3,0.0
2,flag_multidevice_60m_ge2,0.0
3,flag_multiip_60m_ge2,0.0


In [9]:
# CÉLULA 8.1 — SALVAR ARTEFATOS
OUT = Path("../reports/rules_v1")
OUT.mkdir(parents=True, exist_ok=True)

# Amostra de linhas "sinalizadas"
(paysim.loc[paysim["risk_score_v1"]>=1, ["ts","cpf","amount"] + flag_cols + ["risk_score_v1"]]
 .sort_values(["cpf","ts"])
 .head(50)
 .to_csv(OUT / "sample_flagged_rows.csv", index=False))

# Tabela de métricas
summary.to_csv(OUT / "rule_hit_rates.csv", index=False)

# (Opcional) Gráfico de barras das taxas
ax = summary.set_index("rule")["hit_rate"].plot(kind="bar", title="Rule hit rates (V1)")
fig = ax.get_figure()
fig.savefig(OUT / "rule_hit_rates.png", bbox_inches="tight")
plt.close(fig)

# Salvar dataset com features/flags para uso posterior
paysim.to_parquet(Path("../datasets") / "paysim_rules_v1.parquet", index=False)

print("OK: artefatos salvos em reports/rules_v1/ e datasets/paysim_rules_v1.parquet")


OK: artefatos salvos em reports/rules_v1/ e datasets/paysim_rules_v1.parquet


In [10]:
from IPython.display import Markdown as md
md("""
**Checklist Dia 3 – Regras V1**
- [x] Velocity (10min) com flag >=3
- [x] Geo-shift (mudança cidade/ASN em <=60min)
- [x] Multi-device e multi-IP (60min) com flags >=2
- [x] Risk score simples (soma de flags)
- [x] Métricas de hit rate por regra + gráfico
- [x] Artefatos salvos (CSV/PNG/Parquet)
""")



**Checklist Dia 3 – Regras V1**
- [x] Velocity (10min) com flag >=3
- [x] Geo-shift (mudança cidade/ASN em <=60min)
- [x] Multi-device e multi-IP (60min) com flags >=2
- [x] Risk score simples (soma de flags)
- [x] Métricas de hit rate por regra + gráfico
- [x] Artefatos salvos (CSV/PNG/Parquet)
