In [None]:
import os
import json
import numpy as np
import pandas as pd

In [None]:
# =========================
# 0) 경로 설정 (현재 폴더 기준)
# =========================
BASE_DIR = os.getcwd()  # 지금 파일들이 있는 작업 디렉토리
INPUT_CSV = os.path.join(BASE_DIR, "final_df.csv")

OUTPUT_BASE = os.path.join(BASE_DIR, "transactions_base.csv.gz")
OUTPUT_REPORT = os.path.join(BASE_DIR, "transactions_base_report.json")

CHUNKSIZE = 500_000  # 메모리 부족하면 200_000~300_000으로 낮추세요.

REQUIRED_COLS = [
    "id", "date", "client_id", "card_id", "amount", "use_chip",
    "merchant_id", "merchant_city", "merchant_state", "zip",
    "mcc", "errors", "fraud"
]

ALLOWED_USE_CHIP = {
    "Swipe Transaction",
    "Chip Transaction",
    "Online Transaction"
}

In [None]:
# =========================
# Helper functions
# =========================
def parse_amount_to_float(amount_series: pd.Series) -> pd.Series:
    # "$14.57", "$-77.00" 같은 문자열 -> float
    cleaned = amount_series.astype(str).str.replace("$", "", regex=False)
    return pd.to_numeric(cleaned, errors="coerce")

def zip_to_string(zip_series: pd.Series, online_mask: pd.Series) -> pd.Series:
    """
    zip은 원본에서 float/결측이 섞여 있을 수 있음.
    - 온라인: "ONLINE"
    - 오프라인: 58523.0 -> "58523"
    - 결측: NA 유지
    """
    z = pd.to_numeric(zip_series, errors="coerce")  # 숫자로 해석 가능한 것만 변환
    out = pd.Series(pd.NA, index=zip_series.index, dtype="object")
    offline_mask = ~online_mask

    # 오프라인에서만 숫자 -> int -> str
    m = offline_mask & z.notna()
    out.loc[m] = z.loc[m].astype(np.int64).astype(str)

    # 온라인은 ONLINE로 고정
    out.loc[online_mask] = "ONLINE"
    return out

def error_group_from_raw(errors_series: pd.Series) -> pd.Series:
    """
    간단/설명가능한 오류 그룹화 (추후 팀 룰로 확장 가능)
    - NO_ERROR
    - AUTH_CREDENTIAL
    - FUNDS
    - SYSTEM
    - OTHER
    """
    raw = errors_series.fillna("NO_ERROR").astype(str)

    out = pd.Series(np.full(len(raw), "OTHER", dtype=object), index=raw.index)

    no = raw.eq("NO_ERROR") | raw.eq("nan")
    out.loc[no] = "NO_ERROR"

    auth = raw.str.contains(r"CVV|PIN|Expiration|Bad Card|Card Number", case=False, regex=True, na=False)
    out.loc[auth] = "AUTH_CREDENTIAL"

    funds = raw.str.contains(r"Insufficient|Balance", case=False, regex=True, na=False)
    out.loc[funds] = "FUNDS"

    system = raw.str.contains(r"Technical|Timeout|Network|System", case=False, regex=True, na=False)
    out.loc[system] = "SYSTEM"

    return out


In [None]:
# =========================
# 1) 사전 체크
# =========================
if not os.path.exists(INPUT_CSV):
    raise FileNotFoundError(f"final_df.csv를 찾을 수 없습니다. 현재 폴더: {BASE_DIR}")

# 기존 출력 제거
if os.path.exists(OUTPUT_BASE):
    os.remove(OUTPUT_BASE)

report = {
    "base_dir": BASE_DIR,
    "input_csv": INPUT_CSV,
    "rows_total": 0,
    "rows_online": 0,
    "amount_parse_fail": 0,
    "unknown_use_chip_values": {},
    "offline_state_missing": 0,
    "offline_zip_missing": 0,
    "gate2_failed_chunks": 0,
    "gate_notes": []
}

first_write = True


In [None]:
# =========================
# 2) 청크 처리
# =========================
reader = pd.read_csv(INPUT_CSV, chunksize=CHUNKSIZE, low_memory=False)

for chunk_idx, chunk in enumerate(reader, start=1):
    # ---------- Gate 0: 스키마 ----------
    if "Unnamed: 0" in chunk.columns:
        chunk = chunk.drop(columns=["Unnamed: 0"])

    missing = [c for c in REQUIRED_COLS if c not in chunk.columns]
    if missing:
        raise ValueError(f"[Gate0 Fail] Missing required columns: {missing}")

    # ---------- Stage 1: 타입/포맷 ----------
    chunk["date"] = pd.to_datetime(chunk["date"], errors="coerce")
    chunk["amount_num"] = parse_amount_to_float(chunk["amount"])
    report["amount_parse_fail"] += int(chunk["amount_num"].isna().sum())

    unknown_vals = set(chunk["use_chip"].dropna().unique()) - ALLOWED_USE_CHIP
    if unknown_vals:
        for v in unknown_vals:
            report["unknown_use_chip_values"][str(v)] = report["unknown_use_chip_values"].get(str(v), 0) + 1

    # ---------- Stage 2: is_online + 온라인 구조적 결측 ----------
    # 핵심: merchant_state로 판단하지 말고 use_chip으로 고정
    chunk["is_online"] = (chunk["use_chip"] == "Online Transaction").astype("int8")

    online_mask = chunk["is_online"].eq(1)
    report["rows_online"] += int(online_mask.sum())

    # 온라인이면 merchant_state는 ONLINE로 고정(구조적 결측 보정)
    chunk.loc[online_mask, "merchant_state"] = "ONLINE"

    # zip 문자열화(온라인 ONLINE로)
    chunk["zip"] = zip_to_string(chunk["zip"], online_mask)

    # 오프라인 결측 카운트(진짜 결측일 가능성)
    offline_mask = ~online_mask
    report["offline_state_missing"] += int(chunk.loc[offline_mask, "merchant_state"].isna().sum())
    report["offline_zip_missing"] += int(chunk.loc[offline_mask, "zip"].isna().sum())

    # ---------- Gate 2: 이번 이슈 방지 (청크 단위 sanity) ----------
    if (chunk["use_chip"].eq("Online Transaction").any()) and (chunk["is_online"].sum() == 0):
        report["gate2_failed_chunks"] += 1
        raise ValueError("[Gate2 Fail] Online Transaction exists but is_online is all zeros. Check logic immediately.")

    # ---------- Stage 3: errors 정규화 ----------
    chunk["has_error"] = chunk["errors"].notna().astype("int8")
    chunk["error_raw"] = chunk["errors"].fillna("NO_ERROR").astype(str)
    chunk["error_group"] = error_group_from_raw(chunk["errors"])

    # ---------- Stage 4: 출력 컬럼 고정 ----------
    keep_cols = [
        "id", "date", "client_id", "card_id",
        "amount_num", "use_chip", "is_online",
        "merchant_id", "merchant_city", "merchant_state", "zip",
        "mcc", "has_error", "error_raw", "error_group",
        "fraud"
    ]
    out = chunk[keep_cols].copy()

    # 간단 다운캐스팅(결측 없을 때만)
    for col in ["id", "client_id", "card_id", "merchant_id", "mcc", "fraud"]:
        if out[col].notna().all():
            out[col] = pd.to_numeric(out[col], downcast="integer")

    report["rows_total"] += len(out)

    # ---------- 저장 ----------
    out.to_csv(
        OUTPUT_BASE,
        mode="wt" if first_write else "at",
        header=first_write,
        index=False,
        compression="gzip"
    )
    first_write = False

    print(f"[chunk {chunk_idx}] wrote {len(out):,} rows | total {report['rows_total']:,}")

[chunk 1] wrote 500,000 rows | total 500,000
[chunk 2] wrote 500,000 rows | total 1,000,000
[chunk 3] wrote 500,000 rows | total 1,500,000
[chunk 4] wrote 500,000 rows | total 2,000,000
[chunk 5] wrote 500,000 rows | total 2,500,000
[chunk 6] wrote 500,000 rows | total 3,000,000
[chunk 7] wrote 500,000 rows | total 3,500,000
[chunk 8] wrote 500,000 rows | total 4,000,000
[chunk 9] wrote 500,000 rows | total 4,500,000
[chunk 10] wrote 500,000 rows | total 5,000,000
[chunk 11] wrote 500,000 rows | total 5,500,000
[chunk 12] wrote 500,000 rows | total 6,000,000
[chunk 13] wrote 500,000 rows | total 6,500,000
[chunk 14] wrote 500,000 rows | total 7,000,000
[chunk 15] wrote 500,000 rows | total 7,500,000
[chunk 16] wrote 500,000 rows | total 8,000,000
[chunk 17] wrote 500,000 rows | total 8,500,000
[chunk 18] wrote 414,963 rows | total 8,914,963


In [None]:
# =========================
# 3) 리포트 저장
# =========================
with open(OUTPUT_REPORT, "w", encoding="utf-8") as f:
    json.dump(report, f, ensure_ascii=False, indent=2)

print("\nDONE")
print("Output:", OUTPUT_BASE)
print("Report:", OUTPUT_REPORT)


DONE
Output: /content/transactions_base.csv.gz
Report: /content/transactions_base_report.json


In [None]:
import pandas as pd

in_path = "transactions_base.csv.gz"
out_path = "transactions_base.csv"

reader = pd.read_csv(in_path, chunksize=500_000)
first = True
for chunk in reader:
    chunk.to_csv(out_path, mode="w" if first else "a", header=first, index=False)
    first = False

print("DONE:", out_path)


DONE: transactions_base.csv


In [9]:
import os, glob, pandas as pd, json

# 1) 기존 파트 삭제
for p in glob.glob("transactions_base_part_*.parquet"):
    os.remove(p)
print("deleted old parts")

# 2) 전수 파트 생성
CSV_PATH = "transactions_base.csv"
CHUNKSIZE = 500_000

reader = pd.read_csv(CSV_PATH, chunksize=CHUNKSIZE, low_memory=False)
for i, chunk in enumerate(reader, start=1):
    out = f"transactions_base_part_{i:02d}.parquet"
    chunk.to_parquet(out, index=False)
    print("wrote", out, "rows:", len(chunk))

# 3) 게이트: 전수 row 검증
rep = json.load(open("transactions_base_report.json","r",encoding="utf-8"))
target = rep["rows_total"]

parts = sorted(glob.glob("transactions_base_part_*.parquet"))
total = 0
for p in parts:
    total += len(pd.read_parquet(p, columns=["fraud"]))

print("target:", target, "parts:", len(parts), "total:", total)
assert total == target, "Gate Fail: parquet parts rows != rows_total"
print("Gate PASS")


deleted old parts
wrote transactions_base_part_01.parquet rows: 500000
wrote transactions_base_part_02.parquet rows: 500000
wrote transactions_base_part_03.parquet rows: 500000
wrote transactions_base_part_04.parquet rows: 500000
wrote transactions_base_part_05.parquet rows: 500000
wrote transactions_base_part_06.parquet rows: 500000
wrote transactions_base_part_07.parquet rows: 500000
wrote transactions_base_part_08.parquet rows: 500000
wrote transactions_base_part_09.parquet rows: 500000
wrote transactions_base_part_10.parquet rows: 500000
wrote transactions_base_part_11.parquet rows: 500000
wrote transactions_base_part_12.parquet rows: 500000
wrote transactions_base_part_13.parquet rows: 500000
wrote transactions_base_part_14.parquet rows: 500000
wrote transactions_base_part_15.parquet rows: 500000
wrote transactions_base_part_16.parquet rows: 500000
wrote transactions_base_part_17.parquet rows: 500000
wrote transactions_base_part_18.parquet rows: 414963
target: 8914963 parts: 18 to

In [11]:
import glob
import pandas as pd
import numpy as np

parts = sorted(glob.glob("transactions_base_part_*.parquet"))
assert len(parts) == 18

total_n, total_f = 0, 0
for p in parts:
    x = pd.read_parquet(p, columns=["fraud"])
    # fraud가 float로 저장됐을 수도 있으니 안전 처리
    total_n += len(x)
    total_f += int(np.nansum(x["fraud"].values))

base_rate = total_f / total_n
print("BASE fraud rate:", base_rate, "| N:", total_n, "| fraud:", total_f)


BASE fraud rate: 0.0014954633014180765 | N: 8914963 | fraud: 13332


In [12]:
from collections import defaultdict

def group_agg_over_parts(parts, group_cols, cols_needed):
    agg_n = defaultdict(int)
    agg_f = defaultdict(int)

    for p in parts:
        df = pd.read_parquet(p, columns=cols_needed)

        # 안전 캐스팅(0/1 컬럼은 int로)
        if "fraud" in df.columns:
            df["fraud"] = df["fraud"].fillna(0).astype("int8")
        for c in ["is_online","has_error"]:
            if c in df.columns:
                df[c] = df[c].fillna(0).astype("int8")

        g = df.groupby(group_cols, dropna=False)["fraud"].agg(["count","sum"])
        for idx, row in g.iterrows():
            key = idx if isinstance(idx, tuple) else (idx,)
            agg_n[key] += int(row["count"])
            agg_f[key] += int(row["sum"])

    out_rows = []
    for key in agg_n:
        n = agg_n[key]
        f = agg_f[key]
        out_rows.append((*key, n, f))
    out = pd.DataFrame(out_rows, columns=[*group_cols, "n", "fraud_cnt"])
    out["fraud_rate"] = out["fraud_cnt"] / out["n"]
    return out


In [13]:
def add_lift(df, base):
    df = df.copy()
    df["lift"] = df["fraud_rate"] / base
    return df

# use_chip
t_use_chip = group_agg_over_parts(parts, ["use_chip"], ["use_chip","fraud"])
t_use_chip = add_lift(t_use_chip, base_rate).sort_values("fraud_rate", ascending=False)
display(t_use_chip)

# is_online
t_online = group_agg_over_parts(parts, ["is_online"], ["is_online","fraud"])
t_online = add_lift(t_online, base_rate).sort_values("fraud_rate", ascending=False)
display(t_online)

# has_error
t_has_error = group_agg_over_parts(parts, ["has_error"], ["has_error","fraud"])
t_has_error = add_lift(t_has_error, base_rate).sort_values("fraud_rate", ascending=False)
display(t_has_error)

# error_group (표본 최소 조건 적용)
t_err = group_agg_over_parts(parts, ["error_group"], ["error_group","fraud"])
t_err = add_lift(t_err, base_rate).sort_values("fraud_rate", ascending=False)
display(t_err[t_err["n"]>=5000].head(30))


Unnamed: 0,use_chip,n,fraud_cnt,fraud_rate,lift
0,Online Transaction,1043975,8779,0.008409,5.623144
2,Chip Transaction,3202776,3176,0.000992,0.663099
1,Swipe Transaction,4668212,1377,0.000295,0.197246


Unnamed: 0,is_online,n,fraud_cnt,fraud_rate,lift
1,1,1043975,8779,0.008409,5.623144
0,0,7870988,4553,0.000578,0.386805


Unnamed: 0,has_error,n,fraud_cnt,fraud_rate,lift
1,1,141767,569,0.004014,2.683869
0,0,8773196,12763,0.001455,0.97279


Unnamed: 0,error_group,n,fraud_cnt,fraud_rate,lift
0,AUTH_CREDENTIAL,34975,364,0.010407,6.959338
1,FUNDS,88012,173,0.001966,1.314403
4,SYSTEM,18013,32,0.001776,1.187923
2,NO_ERROR,8773196,12763,0.001455,0.97279


In [15]:
import pandas as pd

cards = pd.read_csv("card_common.csv", low_memory=False)
users = pd.read_csv("user_common.csv", low_memory=False)

if "Unnamed: 0" in cards.columns: cards = cards.drop(columns=["Unnamed: 0"])
if "Unnamed: 0" in users.columns: users = users.drop(columns=["Unnamed: 0"])

card_type_map = cards.set_index("card_id")["card_type"].to_dict()

def score_bucket(x):
    if pd.isna(x): return "NA"
    if x < 580: return "<580"
    if x < 670: return "580-669"
    if x < 740: return "670-739"
    if x < 800: return "740-799"
    return "800+"

score_bucket_map = users.set_index("client_id")["credit_score"].map(score_bucket).to_dict()

def group_agg_over_parts_with_maps(parts, group_cols, cols_needed):
    agg_n = defaultdict(int)
    agg_f = defaultdict(int)

    for p in parts:
        df = pd.read_parquet(p, columns=cols_needed)

        df["fraud"] = df["fraud"].fillna(0).astype("int8")
        if "is_online" in df.columns:
            df["is_online"] = df["is_online"].fillna(0).astype("int8")

        if "card_id" in df.columns:
            df["card_type"] = df["card_id"].map(card_type_map).fillna("NA")
        if "client_id" in df.columns:
            df["score_bucket"] = df["client_id"].map(score_bucket_map).fillna("NA")

        g = df.groupby(group_cols, dropna=False)["fraud"].agg(["count","sum"])
        for idx, row in g.iterrows():
            key = idx if isinstance(idx, tuple) else (idx,)
            agg_n[key] += int(row["count"])
            agg_f[key] += int(row["sum"])

    out_rows = []
    for key in agg_n:
        out_rows.append((*key, agg_n[key], agg_f[key]))
    out = pd.DataFrame(out_rows, columns=[*group_cols, "n", "fraud_cnt"])
    out["fraud_rate"] = out["fraud_cnt"]/out["n"]
    out["lift"] = out["fraud_rate"]/base_rate
    return out

# 교차 1) is_online x card_type
t_cross_ct = group_agg_over_parts_with_maps(
    parts,
    ["is_online","card_type"],
    ["is_online","card_id","fraud"]
).sort_values("fraud_rate", ascending=False)
display(t_cross_ct[t_cross_ct["n"]>=30000].head(30))

# 교차 2) is_online x error_group
t_cross_err = group_agg_over_parts(
    parts,
    ["is_online","error_group"],
    ["is_online","error_group","fraud"]
).sort_values("fraud_rate", ascending=False)
t_cross_err["lift"] = t_cross_err["fraud_rate"]/base_rate
display(t_cross_err[t_cross_err["n"]>=5000].head(30))


Unnamed: 0,is_online,card_type,n,fraud_cnt,fraud_rate,lift
5,1,Debit (Prepaid),79568,992,0.012467,8.336763
3,1,Credit,313472,2801,0.008935,5.975009
4,1,Debit,649004,4986,0.007683,5.137232
2,0,Debit (Prepaid),532562,386,0.000725,0.484665
0,0,Credit,2437521,1689,0.000693,0.463346
1,0,Debit,4889978,2453,0.000502,0.33544


Unnamed: 0,is_online,error_group,n,fraud_cnt,fraud_rate,lift
5,1,AUTH_CREDENTIAL,13483,252,0.01869,12.497935
6,1,FUNDS,8290,120,0.014475,9.679456
7,1,NO_ERROR,1020075,8388,0.008223,5.49858
0,0,AUTH_CREDENTIAL,21492,112,0.005211,3.4847
4,0,SYSTEM,15886,13,0.000818,0.547209
1,0,FUNDS,79722,53,0.000665,0.444551
2,0,NO_ERROR,7753121,4375,0.000564,0.377334


# **구현 방식: DuckDB로 전수 Parquet를 바로 집계(정확+빠름)**

transactions_base_part_*.parquet가 18개로 잘 쪼개져 있으니, DuckDB가 가장 깔끔합니다.
(pandas로 distinct count까지 정확하게 하려면 메모리/시간이 크게 늘어남)

In [16]:
!pip -q install duckdb

import duckdb
con = duckdb.connect()

In [17]:
# 입력: parquet 파트 전수
parquet_glob = "transactions_base_part_*.parquet"

# date가 문자열로 들어가 있을 수 있으니 CAST 처리
# merchant_id, mcc가 float로 들어가 있을 수 있어 CAST
con.execute(f"""
CREATE OR REPLACE TABLE card_daily_log AS
SELECT
  card_id,
  DATE_TRUNC('day', CAST(date AS TIMESTAMP)) AS day,
  COUNT(*) AS tx_cnt,
  SUM(amount_num) AS amt_sum,
  MAX(amount_num) AS amt_max,

  -- 온라인
  SUM(CASE WHEN CAST(is_online AS INTEGER)=1 THEN 1 ELSE 0 END) AS online_cnt,
  AVG(CASE WHEN CAST(is_online AS INTEGER)=1 THEN 1.0 ELSE 0.0 END) AS online_ratio,

  -- 오류
  SUM(CASE WHEN CAST(has_error AS INTEGER)=1 THEN 1 ELSE 0 END) AS error_cnt,
  SUM(CASE WHEN error_group='AUTH_CREDENTIAL' THEN 1 ELSE 0 END) AS auth_cred_err_cnt,

  -- 다양성(설명 가능 신호로 유용): distinct는 DuckDB가 정확하고 빠르게 처리
  COUNT(DISTINCT CAST(merchant_id AS BIGINT)) AS uniq_merchant_cnt,
  COUNT(DISTINCT CAST(mcc AS BIGINT)) AS uniq_mcc_cnt

FROM read_parquet('{parquet_glob}')
WHERE amount_num IS NOT NULL
GROUP BY 1, 2
""")

# 저장
con.execute("COPY card_daily_log TO 'card_daily_log.parquet' (FORMAT PARQUET)")
print("saved: card_daily_log.parquet")

# quick check
print(con.execute("SELECT COUNT(*) AS rows, COUNT(DISTINCT card_id) AS cards FROM card_daily_log").fetchdf())
print(con.execute("SELECT * FROM card_daily_log ORDER BY day DESC LIMIT 5").fetchdf())

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

saved: card_daily_log.parquet
      rows  cards
0  5390353   4070
   card_id        day  tx_cnt  amt_sum  amt_max  online_cnt  online_ratio  \
0     4605 2019-10-31       4    46.85    68.00         1.0      0.250000   
1       14 2019-10-31       6   293.27    99.00         0.0      0.000000   
2     5857 2019-10-31       2    71.83    71.47         0.0      0.000000   
3     5650 2019-10-31       1     2.82     2.82         0.0      0.000000   
4     4041 2019-10-31       3    70.92    36.96         1.0      0.333333   

   error_cnt  auth_cred_err_cnt  uniq_merchant_cnt  uniq_mcc_cnt  
0        0.0                0.0                  2             2  
1        0.0                0.0                  5             5  
2        0.0                0.0                  2             2  
3        0.0                0.0                  1             1  
4        1.0                0.0                  3             3  


In [18]:
con.execute("""
CREATE OR REPLACE TABLE card_daily_features AS
WITH base AS (
  SELECT
    *,
    -- 과거 30일(현재일 제외) 이동 평균/표준편차
    AVG(tx_cnt) OVER w AS tx_cnt_mean_30d,
    STDDEV_SAMP(tx_cnt) OVER w AS tx_cnt_std_30d,

    AVG(amt_sum) OVER w AS amt_sum_mean_30d,
    STDDEV_SAMP(amt_sum) OVER w AS amt_sum_std_30d,

    AVG(online_ratio) OVER w AS online_ratio_mean_30d,
    STDDEV_SAMP(online_ratio) OVER w AS online_ratio_std_30d,

    AVG(error_cnt) OVER w AS error_cnt_mean_30d,
    STDDEV_SAMP(error_cnt) OVER w AS error_cnt_std_30d

  FROM card_daily_log
  WINDOW w AS (
    PARTITION BY card_id
    ORDER BY day
    ROWS BETWEEN 30 PRECEDING AND 1 PRECEDING
  )
)
SELECT
  *,
  -- ratio(현재/평균): 평균이 0이면 NULL 처리
  CASE WHEN tx_cnt_mean_30d > 0 THEN tx_cnt * 1.0 / tx_cnt_mean_30d END AS tx_cnt_ratio_30d,
  CASE WHEN amt_sum_mean_30d > 0 THEN amt_sum * 1.0 / amt_sum_mean_30d END AS amt_sum_ratio_30d,
  CASE WHEN online_ratio_mean_30d > 0 THEN online_ratio * 1.0 / online_ratio_mean_30d END AS online_ratio_ratio_30d,
  CASE WHEN error_cnt_mean_30d > 0 THEN error_cnt * 1.0 / error_cnt_mean_30d END AS error_cnt_ratio_30d,

  -- z-score(현재-평균)/표준편차: 표준편차가 0이면 NULL 처리
  CASE WHEN tx_cnt_std_30d > 0 THEN (tx_cnt - tx_cnt_mean_30d) / tx_cnt_std_30d END AS tx_cnt_z_30d,
  CASE WHEN amt_sum_std_30d > 0 THEN (amt_sum - amt_sum_mean_30d) / amt_sum_std_30d END AS amt_sum_z_30d,
  CASE WHEN online_ratio_std_30d > 0 THEN (online_ratio - online_ratio_mean_30d) / online_ratio_std_30d END AS online_ratio_z_30d,
  CASE WHEN error_cnt_std_30d > 0 THEN (error_cnt - error_cnt_mean_30d) / error_cnt_std_30d END AS error_cnt_z_30d

FROM base
""")

con.execute("COPY card_daily_features TO 'card_daily_features.parquet' (FORMAT PARQUET)")
print("saved: card_daily_features.parquet")

# 확인
print(con.execute("SELECT * FROM card_daily_features ORDER BY day DESC LIMIT 5").fetchdf())

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

saved: card_daily_features.parquet
   card_id        day  tx_cnt  amt_sum  amt_max  online_cnt  online_ratio  \
0      304 2019-10-31       2   160.00    80.00         0.0           0.0   
1      339 2019-10-31       1    28.31    28.31         0.0           0.0   
2     1302 2019-10-31       1    50.65    50.65         0.0           0.0   
3     2019 2019-10-31       2    38.74    37.81         0.0           0.0   
4     2168 2019-10-31       1   220.18   220.18         0.0           0.0   

   error_cnt  auth_cred_err_cnt  uniq_merchant_cnt  ...  error_cnt_mean_30d  \
0        0.0                0.0                  1  ...            0.000000   
1        0.0                0.0                  1  ...            0.033333   
2        0.0                0.0                  1  ...            0.033333   
3        0.0                0.0                  2  ...            0.033333   
4        0.0                0.0                  1  ...            0.000000   

   error_cnt_std_30d  tx_cn

In [20]:
import duckdb, pandas as pd
con = duckdb.connect()

# 1) 일자별 fraud 여부 (카드-일자 기준으로 fraud 발생 여부)
con.execute("""
CREATE OR REPLACE TABLE card_day_fraud AS
SELECT
  card_id,
  DATE_TRUNC('day', CAST(date AS TIMESTAMP)) AS day,
  MAX(CAST(fraud AS INTEGER)) AS fraud_day
FROM read_parquet('transactions_base_part_*.parquet')
GROUP BY 1,2
""")

# 2) features와 결합
con.execute("""
CREATE OR REPLACE TABLE card_daily_final AS
SELECT f.*, COALESCE(d.fraud_day, 0) AS fraud_day
FROM read_parquet('card_daily_features.parquet') f
LEFT JOIN card_day_fraud d
USING(card_id, day)
""")

# 3) 예: 온라인 z-score 상위일수록 fraud_day 비율이 증가하는지(간단 스크리닝)
q = con.execute("""
SELECT
  CASE
    WHEN online_ratio_z_30d IS NULL THEN 'NA'
    WHEN online_ratio_z_30d >= 3 THEN 'z>=3'
    WHEN online_ratio_z_30d >= 2 THEN '2<=z<3'
    WHEN online_ratio_z_30d >= 1 THEN '1<=z<2'
    ELSE 'z<1'
  END AS bucket,
  COUNT(*) AS n_days,
  SUM(fraud_day) AS fraud_days,
  AVG(fraud_day) AS fraud_day_rate
FROM card_daily_final
GROUP BY 1
ORDER BY
  CASE bucket
    WHEN 'z>=3' THEN 1
    WHEN '2<=z<3' THEN 2
    WHEN '1<=z<2' THEN 3
    WHEN 'z<1' THEN 4
    ELSE 5
  END
""").fetchdf()

q


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,bucket,n_days,fraud_days,fraud_day_rate
0,z>=3,154618,1365.0,0.008828
1,2<=z<3,117783,1019.0,0.008652
2,1<=z<2,243476,990.0,0.004066
3,z<1,4273912,2736.0,0.00064
4,,600564,518.0,0.000863


In [24]:
!pip -q install duckdb

import duckdb
con = duckdb.connect()

# 1) fraud_day(카드-일자 라벨) 다시 생성
con.execute("""
CREATE OR REPLACE TABLE card_day_fraud AS
SELECT
  card_id,
  DATE_TRUNC('day', CAST(date AS TIMESTAMP)) AS day,
  MAX(CAST(fraud AS INTEGER)) AS fraud_day
FROM read_parquet('transactions_base_part_*.parquet')
GROUP BY 1,2
""")

# 2) features + fraud_day 결합 테이블 재생성
con.execute("""
CREATE OR REPLACE TABLE card_daily_final AS
SELECT f.*, COALESCE(d.fraud_day, 0) AS fraud_day
FROM read_parquet('card_daily_features.parquet') f
LEFT JOIN card_day_fraud d
USING(card_id, day)
""")

# 3) (1) 인증오류 횟수 버킷
q_auth = con.execute("""
SELECT
  CASE
    WHEN auth_cred_err_cnt >= 3 THEN '>=3'
    WHEN auth_cred_err_cnt = 2 THEN '2'
    WHEN auth_cred_err_cnt = 1 THEN '1'
    WHEN auth_cred_err_cnt = 0 THEN '0'
    ELSE 'NA'
  END AS bucket,
  COUNT(*) AS n_days,
  SUM(fraud_day) AS fraud_days,
  AVG(fraud_day) AS fraud_day_rate
FROM card_daily_final
GROUP BY 1
ORDER BY
  CASE bucket
    WHEN '>=3' THEN 1
    WHEN '2' THEN 2
    WHEN '1' THEN 3
    WHEN '0' THEN 4
    ELSE 5
  END
""").fetchdf()

# 4) (2) 거래량 z-score 버킷
q_txz = con.execute("""
SELECT
  CASE
    WHEN tx_cnt_z_30d IS NULL THEN 'NA'
    WHEN tx_cnt_z_30d >= 3 THEN 'z>=3'
    WHEN tx_cnt_z_30d >= 2 THEN '2<=z<3'
    WHEN tx_cnt_z_30d >= 1 THEN '1<=z<2'
    ELSE 'z<1'
  END AS bucket,
  COUNT(*) AS n_days,
  SUM(fraud_day) AS fraud_days,
  AVG(fraud_day) AS fraud_day_rate
FROM card_daily_final
GROUP BY 1
ORDER BY
  CASE bucket
    WHEN 'z>=3' THEN 1
    WHEN '2<=z<3' THEN 2
    WHEN '1<=z<2' THEN 3
    WHEN 'z<1' THEN 4
    ELSE 5
  END
""").fetchdf()

# 5) (3) 금액 z-score 버킷
q_amtz = con.execute("""
SELECT
  CASE
    WHEN amt_sum_z_30d IS NULL THEN 'NA'
    WHEN amt_sum_z_30d >= 3 THEN 'z>=3'
    WHEN amt_sum_z_30d >= 2 THEN '2<=z<3'
    WHEN amt_sum_z_30d >= 1 THEN '1<=z<2'
    ELSE 'z<1'
  END AS bucket,
  COUNT(*) AS n_days,
  SUM(fraud_day) AS fraud_days,
  AVG(fraud_day) AS fraud_day_rate
FROM card_daily_final
GROUP BY 1
ORDER BY
  CASE bucket
    WHEN 'z>=3' THEN 1
    WHEN '2<=z<3' THEN 2
    WHEN '1<=z<2' THEN 3
    WHEN 'z<1' THEN 4
    ELSE 5
  END
""").fetchdf()

q_auth, q_txz, q_amtz


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

(  bucket   n_days  fraud_days  fraud_day_rate
 0    >=3        2         0.0        0.000000
 1      2      209        10.0        0.047847
 2      1    34551       371.0        0.010738
 3      0  5355591      6247.0        0.001166,
    bucket   n_days  fraud_days  fraud_day_rate
 0    z>=3   141413      1439.0        0.010176
 1  2<=z<3   201167       781.0        0.003882
 2  1<=z<2   522660      1275.0        0.002439
 3     z<1  4501688      3096.0        0.000688
 4      NA    23425        37.0        0.001580,
    bucket   n_days  fraud_days  fraud_day_rate
 0    z>=3   149346      2031.0        0.013599
 1  2<=z<3   150775       550.0        0.003648
 2  1<=z<2   407921       852.0        0.002089
 3     z<1  4674164      3187.0        0.000682
 4      NA     8147         8.0        0.000982)

# 1) 결과 해석(표별)
(A) 인증오류 횟수 버킷(q_auth)

너 출력:

>=3 : n_days=2 (너무 작아서 의미 없음)

2 : n_days=209, fraud_day_rate=0.0478 (표본이 작고 변동이 큼)

1 : n_days=34,551, fraud_day_rate=0.01074

0 : n_days=5,355,591, fraud_day_rate=0.001166

핵심은 **“1회라도 AUTH_CREDENTIAL 오류가 있으면”**임:

auth_cred_err_cnt=1일 때 1.07%

auth_cred_err_cnt=0일 때 0.1166%
→ 약 9.2배(=0.010738 / 0.001166)

즉, “인증오류 1회”만으로도 강력한 신호고, 2회/3회는 희귀해서(표본 부족) 현재 버킷으로는 안정적으로 말하기 어렵습니다.
실무적으로도 이런 경우는 보통 **0 vs 1+**로 쓰는 게 맞습니다.

(B) 거래량 급증 z-score 버킷(q_txz)

너 출력(요약):

z>=3 : 1.0176%

2<=z<3 : 0.3882%

1<=z<2 : 0.2439%

z<1 : 0.0688%

여기서 강한 메시지:

z>=3 vs z<1 → 약 14.8배
(0.010176 / 0.000688)

즉, “평소 대비 거래 횟수 급증”은 사기일과 매우 강하게 동행합니다.

(C) 금액 급증 z-score 버킷(q_amtz)

너 출력:

z>=3 : 1.3599%

2<=z<3 : 0.3648%

1<=z<2 : 0.2089%

z<1 : 0.0682%

z>=3 vs z<1 → 약 19.9배
(0.013599 / 0.000682)

“평소 대비 지출 총액 급증”이 거래량 급증보다도 더 강하게 잡히는 패턴입니다.

In [25]:
q_auth2 = con.execute("""
SELECT
  CASE
    WHEN auth_cred_err_cnt >= 1 THEN '>=1'
    WHEN auth_cred_err_cnt = 0 THEN '0'
    ELSE 'NA'
  END AS bucket,
  COUNT(*) AS n_days,
  SUM(fraud_day) AS fraud_days,
  AVG(fraud_day) AS fraud_day_rate
FROM card_daily_final
GROUP BY 1
ORDER BY CASE bucket WHEN '>=1' THEN 1 WHEN '0' THEN 2 ELSE 3 END
""").fetchdf()
q_auth2


Unnamed: 0,bucket,n_days,fraud_days,fraud_day_rate
0,>=1,34762,381.0,0.01096
1,0,5355591,6247.0,0.001166


In [26]:
import glob, zipfile, os

parts = sorted(glob.glob("transactions_base_part_*.parquet"))
assert len(parts) == 18, f"parts={len(parts)} (18개여야 함)"

zip_name = "transactions_base_parts.zip"
if os.path.exists(zip_name):
    os.remove(zip_name)

with zipfile.ZipFile(zip_name, "w", compression=zipfile.ZIP_DEFLATED) as z:
    for p in parts:
        z.write(p, arcname=os.path.basename(p))

print("saved:", zip_name, "| files:", len(parts))

saved: transactions_base_parts.zip | files: 18


In [27]:
!pip -q install duckdb
import duckdb, os

con = duckdb.connect()
out_full = "transactions_base_full.parquet"

if os.path.exists(out_full):
    os.remove(out_full)

con.execute("""
COPY (
  SELECT * FROM read_parquet('transactions_base_part_*.parquet')
) TO 'transactions_base_full.parquet' (FORMAT PARQUET);
""")
print("saved:", out_full)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

saved: transactions_base_full.parquet


In [28]:
import os, duckdb

db_path = "analysis.duckdb"
if os.path.exists(db_path):
    os.remove(db_path)

con = duckdb.connect(db_path)

# 원천 테이블(거래 전수)
con.execute("""
CREATE TABLE transactions_base AS
SELECT * FROM read_parquet('transactions_base_part_*.parquet');
""")

# 이미 만든 결과물들 테이블화(있으면)
con.execute("""
CREATE TABLE card_daily_log AS
SELECT * FROM read_parquet('card_daily_log.parquet');
""")
con.execute("""
CREATE TABLE card_daily_features AS
SELECT * FROM read_parquet('card_daily_features.parquet');
""")

# (선택) fraud_day 라벨까지 만들어 테이블로 저장
con.execute("""
CREATE TABLE card_day_fraud AS
SELECT
  card_id,
  DATE_TRUNC('day', CAST(date AS TIMESTAMP)) AS day,
  MAX(CAST(fraud AS INTEGER)) AS fraud_day
FROM transactions_base
GROUP BY 1,2;
""")

con.execute("""
CREATE TABLE card_daily_final AS
SELECT f.*, COALESCE(d.fraud_day, 0) AS fraud_day
FROM card_daily_features f
LEFT JOIN card_day_fraud d USING(card_id, day);
""")

con.close()
print("saved:", db_path)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

saved: analysis.duckdb
