In [1]:
"""
Анализ транзакций и ответы на вопросы 1–11.
"""
import os
import sys
import json
import math
import warnings
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional

import numpy as np
import pandas as pd


In [6]:
DATA_DIR = os.getcwd()
FILE_TRANSACTIONS = "transaction_fraud_data.parquet"
FILE_EXCHANGE = "historical_currency_exchange.parquet"
OUTPUT_JSON = "answers.json"
OUTPUT_CSV = "answers.csv"

def _normalize_currency_code(code: str) -> str:
    if pd.isna(code):
        return "UNK"
    return str(code).strip().upper()

def _as_date_col(ts: pd.Series) -> pd.Series:
    """Приводит к datetime64[ns] naive и нормализует к началу дня."""
    ts = pd.to_datetime(ts, errors="coerce", utc=True)
    ts = ts.dt.tz_convert(None)
    ts = ts.astype("datetime64[ns]")
    return ts.dt.normalize()

def _prepare_exchange_long(exchange_wide: pd.DataFrame, currencies):
    """
    Преобразует wide-таблицу курсов в набор DataFrame по валютам: {'EUR': DataFrame[date, rate], ...}
    Ожидается колонка 'date' и отдельные столбцы с курсами валют (единиц валюты за 1 USD).
    """
    if "date" not in exchange_wide.columns:
        raise ValueError("В таблице курсов отсутствует колонка 'date'.")
    exch = exchange_wide.copy()
    exch["date"] = pd.to_datetime(exch["date"], errors="coerce", utc=True)
    exch["date"] = exch["date"].dt.tz_convert(None)
    exch["date"] = exch["date"].astype("datetime64[ns]").dt.normalize()
    exch = exch.sort_values("date")
    per_ccy = {}
    for c in currencies:
        if c in exch.columns:
            dfc = exch[["date", c]].rename(columns={c: "rate"}).dropna(subset=["rate"]).copy()
            dfc["date"] = dfc["date"].astype("datetime64[ns]")  # страховка
            per_ccy[c] = dfc
    return per_ccy

def _merge_nearest_rate(tx: pd.DataFrame, exch_ccy: pd.DataFrame) -> pd.DataFrame:
    """
    Для подмножества транзакций одной валюты выполняет merge_asof по ближайшей дате курса.
    Гарантируем одинаковый dtype у ключа: datetime64[ns] без TZ.
    """
    tx = tx.sort_values("date").copy()
    exch_ccy = exch_ccy.sort_values("date").copy()

    tx["date"] = pd.to_datetime(tx["date"], errors="coerce", utc=True).dt.tz_convert(None).astype("datetime64[ns]")
    exch_ccy["date"] = pd.to_datetime(exch_ccy["date"], errors="coerce", utc=True).dt.tz_convert(None).astype("datetime64[ns]")

    merged = pd.merge_asof(
        tx,
        exch_ccy,
        on="date",
        direction="nearest",
        tolerance=None  # можно добавить окно, если нужно
    )
    return merged

def convert_to_usd(df: pd.DataFrame, exchange_wide: pd.DataFrame, currency_col: str="currency",
                   amount_col: str="amount", ts_col: str="timestamp") -> pd.DataFrame:
    """
    Добавляет колонку amount_usd с пересчётом сумм транзакций в доллары США.
    Предполагается, что exchange_wide содержит столбец 'date' + валюты, где значение — число единиц валюты за 1 USD.
    Тогда: сумма в USD = amount / rate[currency], для USD rate=1.
    """
    out = df.copy()
    out["currency"] = out[currency_col].map(_normalize_currency_code)
    out["date"] = _as_date_col(out[ts_col])

    currencies = sorted(out["currency"].dropna().unique().tolist())
    exch_per_ccy = _prepare_exchange_long(exchange_wide, currencies)

    parts = []
    unknown_ccy = []
    for c in currencies:
        sub = out.loc[out["currency"] == c].copy()
        sub["date"] = pd.to_datetime(sub["date"], errors="coerce", utc=True).dt.tz_convert(None).astype("datetime64[ns]")
        if c == "USD":
            sub["rate"] = 1.0
            parts.append(sub)
            continue
        if c not in exch_per_ccy:
            unknown_ccy.append(c)
            continue
        merged = _merge_nearest_rate(sub, exch_per_ccy[c])
        parts.append(merged)

    if unknown_ccy:
        warnings.warn(f"Для валют отсутствуют курсы: {unknown_ccy}. Эти транзакции будут отброшены при расчётах в USD.")
    if not parts:
        raise RuntimeError("Не удалось сопоставить ни одной транзакции с курсами валют. Проверьте формат данных.")

    aligned = pd.concat(parts, ignore_index=True).dropna(subset=["rate"])
    aligned["date"] = pd.to_datetime(aligned["date"], errors="coerce").astype("datetime64[ns]")
    aligned["amount_usd"] = aligned[amount_col] / aligned["rate"]
    return aligned

def avg_tx_per_customer_per_hour(df: pd.DataFrame, ts_col: str="timestamp", cust_col: str="customer_id") -> float:
    """
    Интерпретация: среднее число транзакций в "активный час клиента".
    Т.е. группируем по (customer_id, час), считаем число транзакций в этом бакете и усредняем.
    """
    tmp = df.copy()
    if not np.issubdtype(tmp[ts_col].dtype, np.datetime64):
        tmp[ts_col] = pd.to_datetime(tmp[ts_col], errors="coerce")
    tmp["hour_bucket"] = tmp[ts_col].dt.floor("H")
    counts = tmp.groupby([cust_col, "hour_bucket"], as_index=False)["transaction_id"].count()
    counts.rename(columns={"transaction_id": "tx_count"}, inplace=True)
    return counts["tx_count"].mean()

def q11_count_risky_customers(df: pd.DataFrame, cust_col: str="customer_id",
                              lah_col: str="last_hour_activity") -> Tuple[int, float]:
    """
    Для каждого клиента берётся медианное значение last_hour_activity['unique_merchants'] по его транзакциям.
    Затем находим 95-й квантиль по этим медианам, считаем число клиентов, у которых медиана СТРОГО > квантиля.
    Возвращает (кол-во клиентов, порог_квантиль).
    """
    # Извлечение поля из STRUCT: ожидается, что last_hour_activity — dict-like с ключом unique_merchants
    lah = df[lah_col]
    # Если parquet был прочитан как столбец со словарями/структурами, следующий код извлечёт значение:
    if isinstance(lah.iloc[0], dict) or (hasattr(lah.iloc[0], "get")):
        um = lah.apply(lambda x: x.get("unique_merchants") if isinstance(x, dict) else np.nan)
    else:
        # Альтернативный вариант (если STRUCT уже "раскрыт" в отдельные колонки типа last_hour_activity.unique_merchants)
        # Попробуем найти колонку по возможным именам:
        cand_cols = [c for c in df.columns if "unique_merchants" in c and "last_hour" in c]
        if cand_cols:
            um = df[cand_cols[0]]
        else:
            raise ValueError("Не удалось извлечь last_hour_activity.unique_merchants — проверьте схему данных.")
    tmp = df[[cust_col]].copy()
    tmp["unique_merchants_lh"] = pd.to_numeric(um, errors="coerce")
    med_per_cust = tmp.groupby(cust_col)["unique_merchants_lh"].median().dropna()
    q95 = float(med_per_cust.quantile(0.95, interpolation="linear"))
    count_risky = int((med_per_cust > q95).sum())
    return count_risky, q95



In [7]:
path_tx = os.path.join(DATA_DIR, FILE_TRANSACTIONS)
path_ex = os.path.join(DATA_DIR, FILE_EXCHANGE)

# Загрузка
df = pd.read_parquet(path_tx)
ex = pd.read_parquet(path_ex)

# Базовые приведения типов
if not np.issubdtype(df["timestamp"].dtype, np.datetime64):
    df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
df["is_fraud"] = df["is_fraud"].astype(bool)
if "is_high_risk_vendor" in df.columns:
    df["is_high_risk_vendor"] = df["is_high_risk_vendor"].astype(bool)

# Конвертация в USD (для вопросов 5–10 и части EDA)
df_usd = convert_to_usd(df, ex, currency_col="currency", amount_col="amount", ts_col="timestamp")



In [8]:
df_usd.head()

Unnamed: 0,transaction_id,customer_id,card_number,timestamp,vendor_category,vendor_type,vendor,amount,currency,country,...,device_fingerprint,ip_address,is_outside_home_country,is_high_risk_vendor,is_weekend,last_hour_activity,is_fraud,date,rate,amount_usd
0,TX_7be21fc4,CUST_16193,376079286931183,2024-09-30 00:00:02.297466,Gas,major,Exxon,630.6,AUD,Australia,...,70423fa3a1e74d01203cf93b51b9631d,17.230.177.225,False,False,False,"{'num_transactions': 764, 'total_amount': 2201...",False,2024-09-30,1.443654,436.808287
1,TX_b3b57a8d,CUST_57563,5391006502213781,2024-09-30 16:58:35.766014,Entertainment,streaming,Disney+,2291.04,AUD,Australia,...,781998db5c7270a22e16fed47ad65fab,100.237.26.250,False,True,False,"{'num_transactions': 199, 'total_amount': 1111...",False,2024-09-30,1.443654,1586.973133
2,TX_afcfbc81,CUST_71733,376443120536188,2024-09-30 16:58:40.565281,Healthcare,pharmacy,DuaneReade,551.77,AUD,Australia,...,46c8a294d897a3a143c304ec31cb9658,213.63.147.100,False,False,False,"{'num_transactions': 752, 'total_amount': 3113...",False,2024-09-30,1.443654,382.203788
3,TX_c3eec5f9,CUST_27664,6210581840883021,2024-09-30 16:58:41.863885,Entertainment,streaming,Apple Music,243.89,AUD,Australia,...,0bf7e85ebe2c1f08b94a00a88d9dc1b2,176.217.174.154,True,True,False,"{'num_transactions': 1010, 'total_amount': 141...",False,2024-09-30,1.443654,168.93938
4,TX_a4a904c8,CUST_91033,376974624515898,2024-09-30 16:58:45.996185,Education,supplies,Barnes & Noble,1669.76,AUD,Australia,...,dba19e5a445b71eb6f9f349148777b1c,154.246.255.222,False,False,False,"{'num_transactions': 76, 'total_amount': 27279...",False,2024-09-30,1.443654,1156.620687


In [11]:

# ----------------------------
# 1) Доля мошеннических транзакций
# ----------------------------
q1_fraud_share = float(df["is_fraud"].mean())

# ----------------------------
# 2) Топ-5 стран по числу мошенничеств
# ----------------------------
q2_top5 = (
    df.loc[df["is_fraud"]]
      .groupby("country", dropna=False)["transaction_id"]
      .count()
      .sort_values(ascending=False)
      .head(5)
)
q2_top5_list = [((str(idx) if pd.notna(idx) else "UNKNOWN"), int(val)) for idx, val in q2_top5.items()]

# ----------------------------
# 3) Среднее число транзакций клиента за 1 час (в активные часы)
# ----------------------------
q3_avg_tx_per_hour = float(avg_tx_per_customer_per_hour(df, ts_col="timestamp", cust_col="customer_id"))

# ----------------------------
# 4) Доля мошенничества у high-risk вендоров
# ----------------------------
if "is_high_risk_vendor" in df.columns:
    sub_hr = df[df["is_high_risk_vendor"] == True]
    q4_fraud_share_high_risk = float(sub_hr["is_fraud"].mean()) if len(sub_hr) else float("nan")
else:
    q4_fraud_share_high_risk = float("nan")

# ----------------------------
# 5) Город с наибольшей средней суммой транзакции (в USD)
# ----------------------------
city_avg = (
    df_usd.groupby("city", dropna=False)["amount_usd"]
          .mean()
          .sort_values(ascending=False)
)
q5_city, q5_city_avg = (str(city_avg.index[0]) if len(city_avg) else "UNKNOWN",
                        float(city_avg.iloc[0]) if len(city_avg) else float("nan"))

# ----------------------------
# 6) Город с наибольшим средним чеком по fast_food (в USD)
# ----------------------------
if "vendor_type" in df_usd.columns:
    fast = df_usd[df_usd["vendor_type"].str.lower() == "fast_food"]
    if len(fast):
        fast_city_avg = (
            fast.groupby("city", dropna=False)["amount_usd"]
                .mean()
                .sort_values(ascending=False)
        )
        q6_city, q6_avg = (str(fast_city_avg.index[0]), float(fast_city_avg.iloc[0]))
    else:
        q6_city, q6_avg = "NO_FAST_FOOD", float("nan")
else:
    q6_city, q6_avg = "NO_VENDOR_TYPE", float("nan")

# ----------------------------
# 7–10) Средние и СКО в USD по (не)мошенническим операциям
# ----------------------------
nonfraud_usd = df_usd.loc[df_usd["is_fraud"] == False, "amount_usd"]
fraud_usd = df_usd.loc[df_usd["is_fraud"] == True, "amount_usd"]

q7_nonfraud_mean = float(nonfraud_usd.mean())
q8_nonfraud_std = float(nonfraud_usd.std(ddof=1))  # выборочное СКО
q9_fraud_mean = float(fraud_usd.mean())
q10_fraud_std = float(fraud_usd.std(ddof=1))

# ----------------------------
# 11) Клиенты с потенциально опасным поведением
# ----------------------------
q11_count, q11_threshold = q11_count_risky_customers(df, cust_col="customer_id", lah_col="last_hour_activity")

# Сбор результатов
answers = {
    "q1_fraud_share": q1_fraud_share,
    "q2_top5_countries_fraud": q2_top5_list,
    "q3_avg_tx_per_customer_per_hour": q3_avg_tx_per_hour,
    "q4_fraud_share_high_risk_vendor": q4_fraud_share_high_risk,
    "q5_city_highest_avg_amount_usd": {"city": q5_city, "avg_amount_usd": q5_city_avg},
    "q6_city_highest_avg_fast_food_usd": {"city": q6_city, "avg_amount_usd": q6_avg},
    "q7_nonfraud_mean_usd": q7_nonfraud_mean,
    "q8_nonfraud_std_usd": q8_nonfraud_std,
    "q9_fraud_mean_usd": q9_fraud_mean,
    "q10_fraud_std_usd": q10_fraud_std,
    "q11_num_customers_potentially_risky": q11_count,
    "q11_threshold_q95_unique_merchants_median": q11_threshold
}

# Вывод и сохранение

with open(OUTPUT_JSON, "w", encoding="utf-8") as f:
    json.dump(answers, f, ensure_ascii=False, indent=2)

# Дублируем плоскую таблицу для удобства
flat_rows = []
flat_rows.append({"metric": "q1_fraud_share", "value": answers["q1_fraud_share"]})
for country, cnt in answers["q2_top5_countries_fraud"]:
    flat_rows.append({"metric": "q2_top5_country", "country": country, "count": cnt})
flat_rows.append({"metric": "q3_avg_tx_per_customer_per_hour", "value": answers["q3_avg_tx_per_customer_per_hour"]})
flat_rows.append({"metric": "q4_fraud_share_high_risk_vendor", "value": answers["q4_fraud_share_high_risk_vendor"]})
flat_rows.append({"metric": "q5_city_highest_avg_amount_usd", "city": answers["q5_city_highest_avg_amount_usd"]["city"], "value": answers["q5_city_highest_avg_amount_usd"]["avg_amount_usd"]})
flat_rows.append({"metric": "q6_city_highest_avg_fast_food_usd", "city": answers["q6_city_highest_avg_fast_food_usd"]["city"], "value": answers["q6_city_highest_avg_fast_food_usd"]["avg_amount_usd"]})
flat_rows.append({"metric": "q7_nonfraud_mean_usd", "value": answers["q7_nonfraud_mean_usd"]})
flat_rows.append({"metric": "q8_nonfraud_std_usd", "value": answers["q8_nonfraud_std_usd"]})
flat_rows.append({"metric": "q9_fraud_mean_usd", "value": answers["q9_fraud_mean_usd"]})
flat_rows.append({"metric": "q10_fraud_std_usd", "value": answers["q10_fraud_std_usd"]})
flat_rows.append({"metric": "q11_num_customers_potentially_risky", "value": answers["q11_num_customers_potentially_risky"]})
flat_rows.append({"metric": "q11_threshold_q95_unique_merchants_median", "value": answers["q11_threshold_q95_unique_merchants_median"]})

pd.DataFrame(flat_rows).to_csv(OUTPUT_CSV, index=False, encoding="utf-8")

print(json.dumps(answers, ensure_ascii=False, indent=2))

  tmp["hour_bucket"] = tmp[ts_col].dt.floor("H")


{
  "q1_fraud_share": 0.19972818498066347,
  "q2_top5_countries_fraud": [
    [
      "Russia",
      299425
    ],
    [
      "Mexico",
      298841
    ],
    [
      "Brazil",
      298629
    ],
    [
      "Nigeria",
      298600
    ],
    [
      "Australia",
      37652
    ]
  ],
  "q3_avg_tx_per_customer_per_hour": 2.4527761909397174,
  "q4_fraud_share_high_risk_vendor": 0.19998632451602943,
  "q5_city_highest_avg_amount_usd": {
    "city": "New York",
    "avg_amount_usd": 568.8724674229495
  },
  "q6_city_highest_avg_fast_food_usd": {
    "city": "Chicago",
    "avg_amount_usd": 264.4502809265648
  },
  "q7_nonfraud_mean_usd": 459.7826108501767,
  "q8_nonfraud_std_usd": 417.0077334756039,
  "q9_fraud_mean_usd": 874.6067503080816,
  "q10_fraud_std_usd": 1349.8826922067437,
  "q11_num_customers_potentially_risky": 229,
  "q11_threshold_q95_unique_merchants_median": 100.0
}
