#**Pre-request**

##Mount google drive


In [1]:
### **Mount** Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


##Install pakages


In [2]:
#Install pakages
%pip install -q -r /content/drive/MyDrive/Sem-6/coding/github/fraud_detection/Extract_requirements.txt --no-cache-dir



In [3]:
project_path = "/content/drive/MyDrive/Sem-6/coding/github/fraud_detection/"
%cd $project_path
%ls /content/drive/MyDrive/Sem-6/coding/github/fraud_detection

/content/drive/MyDrive/Sem-6/coding/github/fraud_detection
[0m[01;34mconfigs[0m/                       requirements-lock.txt
[01;34mdataset[0m/                       requirements.txt
Extract_requirements-lock.txt  [01;34mresults[0m/
Extract_requirements.txt       run_experiment.py
[01;34mfeatures[0m/                      sample_extract_requirements-lock.txt
[01;34mnotebooks[0m/                     [01;34msrc[0m/
README.md                      [01;34mtests[0m/


##Import  libs

In [4]:

import datetime
import os
import pandas as pd
import numpy as np
from scipy.stats import mode
import yaml
import logging
from tqdm import tqdm
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.manifold import TSNE
import altair as alt
from google.colab import data_table
data_table.enable_dataframe_formatter()
import seaborn as sns
import matplotlib.pyplot as plt




%pip freeze > Extract_requirements-lock.txt



#Basic Methods

##Loging

In [5]:

# Make sure results directory exists
os.makedirs("results", exist_ok=True)

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("results/data_extract.log")
    ]
)
logger = logging.getLogger(__name__)



##Config

In [6]:
def load_config(config_path="configs/extract_feature.yaml"):
    """Load YAML config file"""
    with open(config_path, "r") as f:
        config = yaml.safe_load(f)
    logger.info(f"Loaded config from {config_path}")
    return config

## CDR dataset

In [7]:
def load_cdr(file_path, nrows=None):
    """Load a CSV file and safely parse datetime columns."""
    logger.info(f"Loading file: {file_path} (nrows={nrows})")
    df = pd.read_csv(file_path, nrows=nrows)

    # Auto-detect and parse datetime columns
    for col in df.columns:
        if "datetime" in col.lower():
            df[col] = pd.to_datetime(df[col], errors="coerce")

    # Optional: strip extra spaces in headers
    df.columns = df.columns.str.strip()
    return df



def load_all_data(config):
    """
    Load all CSVs defined in config into a dict of DataFrames.
    Uses training.sample_size if available.
    """
    base = config["dataset"]["base_path"]
    files = config["dataset"]["files"]
    sample_size = config.get("training", {}).get("sample_size", None)

    data = {}
    for name, fname in files.items():
        path = os.path.join(base, fname)
        df = load_cdr(path, nrows=sample_size)
        data[name] = df
        logger.info(f"Loaded {name} -> {df.shape} from {path}")
    return data


#Feature Extraction

##Compute app risk levels

In [None]:
def compute_app_risk_levels(app_df, label_col="label", id_col="busi_name"):
    """
    Compute fraud ratio and risk levels per app.
    """
    if label_col not in app_df.columns:
        raise ValueError("❌ No label column found for app risk computation.")

    summary = (
        app_df.groupby(id_col, as_index=False)
        .agg(
            total_users=("phone_no_m", "nunique"),
            fraud_users=(label_col, "sum"),
        )
    )
    summary["fraud_ratio"] = summary["fraud_users"] / summary["total_users"]

    # --- Define risk levels dynamically (percentile-based)
    q1, q2, q3 = summary["fraud_ratio"].quantile([0.25, 0.5, 0.75])
    def get_risk_level(x):
        if x < q1: return "Low"
        elif x < q2: return "Medium"
        elif x < q3: return "High"
        else: return "Critical"
    summary["risk_level"] = summary["fraud_ratio"].apply(get_risk_level)

    return summary[["busi_name", "fraud_ratio", "risk_level"]]


##Voice

In [8]:
def get_voc_feat(df):
    """
    Extract voice (VOC) features per user (for rolling or full aggregation).
    Expects: phone_no_m, opposite_no_m, start_datetime, call_dur, calltype_id.
    """

    df = df.copy()

    # --- Handle datetime
    if "start_datetime" in df.columns:
        df["start_datetime"] = pd.to_datetime(df["start_datetime"], errors="coerce")
        df = df.dropna(subset=["start_datetime"]).copy()
        df["hour"] = df["start_datetime"].dt.hour
        df["day"] = df["start_datetime"].dt.day
    else:
        raise ValueError("❌ Missing 'start_datetime' column in VOC data")

    phone_no_m = df[["phone_no_m"]].drop_duplicates().copy()

    # --- Call volume & contact diversity
    tmp = df.groupby("phone_no_m")["opposite_no_m"].agg(
        total_calls="count",
        unique_contacts="nunique"
    )
    phone_no_m = phone_no_m.merge(tmp, on="phone_no_m", how="left")

    # --- Duration statistics
    if "call_dur" in df.columns:
        tmp = df.groupby("phone_no_m")["call_dur"].agg(
            call_dur_mean="mean",
            call_dur_median="median",
            call_dur_max="max",
            call_dur_min="min",
            call_dur_sum="sum",
            call_dur_std="std"
        )
        phone_no_m = phone_no_m.merge(tmp, on="phone_no_m", how="left")

    # --- Time preference
    tmp = df.groupby("phone_no_m")["hour"].agg(
        voc_hour_mode=lambda x: mode(x, keepdims=True)[0][0] if len(x) > 0 else np.nan,
        voc_active_hours="nunique"
    )
    phone_no_m = phone_no_m.merge(tmp, on="phone_no_m", how="left")

    tmp = df.groupby("phone_no_m")["day"].agg(
        voc_day_mode=lambda x: mode(x, keepdims=True)[0][0] if len(x) > 0 else np.nan,
        voc_active_days="nunique"
    )
    phone_no_m = phone_no_m.merge(tmp, on="phone_no_m", how="left")

    # --- Call direction balance (optional)
    if "calltype_id" in df.columns:
        # Assuming 1=outgoing, 2=incoming
        tmp = df.groupby(["phone_no_m", "calltype_id"]).size().unstack(fill_value=0)
        out_col = tmp[1] if 1 in tmp.columns else pd.Series(0, index=tmp.index)
        in_col  = tmp[2] if 2 in tmp.columns else pd.Series(0, index=tmp.index)
        tmp["call_out_in_ratio"] = out_col.astype(float) / (in_col.astype(float) + 1e-5)
        phone_no_m = phone_no_m.merge(tmp[["call_out_in_ratio"]], on="phone_no_m", how="left")

    return phone_no_m


##SMS

In [9]:
def get_sms_feats(df):
    """
    Extract SMS features per user (for rolling or full aggregation).
    Expects: phone_no_m, opposite_no_m, request_datetime.
    """

    df = df.copy()

    # --- Handle datetime safely
    if "request_datetime" in df.columns:
        df["request_datetime"] = pd.to_datetime(df["request_datetime"], errors="coerce")
        df = df.dropna(subset=["request_datetime"]).copy()
        df["hour"] = df["request_datetime"].dt.hour
        df["day"] = df["request_datetime"].dt.day
    else:
        raise ValueError("❌ Missing 'request_datetime' column in SMS data")

    # --- Base aggregation
    phone_no_m = df[["phone_no_m"]].drop_duplicates().copy()

    tmp = df.groupby("phone_no_m")["opposite_no_m"].agg(
        sms_total="count",
        sms_unique_contacts="nunique"
    )
    phone_no_m = phone_no_m.merge(tmp, on="phone_no_m", how="left")

    # --- Time behavior features
    tmp = df.groupby("phone_no_m")["hour"].agg(
        sms_active_hours="nunique",
        sms_peak_hour=lambda x: mode(x, keepdims=True)[0][0] if len(x) > 0 else np.nan
    )
    phone_no_m = phone_no_m.merge(tmp, on="phone_no_m", how="left")

    tmp = df.groupby("phone_no_m")["day"].agg(
        sms_active_days="nunique",
        sms_peak_day=lambda x: mode(x, keepdims=True)[0][0] if len(x) > 0 else np.nan
    )
    phone_no_m = phone_no_m.merge(tmp, on="phone_no_m", how="left")

    # --- Send/receive ratio if available
    if "calltype_id" in df.columns:
        # Typically 1 = send, 2 = receive (check dataset)
        tmp = df.groupby(["phone_no_m", "calltype_id"]).size().unstack(fill_value=0)
        send_col = tmp[1] if 1 in tmp.columns else pd.Series(0, index=tmp.index)
        recv_col = tmp[2] if 2 in tmp.columns else pd.Series(0, index=tmp.index)
        tmp["sms_send_recv_ratio"] = send_col.astype(float) / (recv_col.astype(float) + 1e-5)

        phone_no_m = phone_no_m.merge(tmp[["sms_send_recv_ratio"]], on="phone_no_m", how="left")

    return phone_no_m


##app

In [10]:
def get_app_feats(df):
    """Extract per-user application usage features (aggregated monthly)."""
    df = df.copy()

    # --- Step 1: Ensure month_id is parsed as datetime ---
    if "month_id" not in df.columns:
        raise ValueError("❌ APP dataset must contain a 'month_id' column.")

    df["month_id"] = pd.to_datetime(df["month_id"], errors="coerce")
    df = df.dropna(subset=["month_id"])

    # --- Step 2: Basic cleaning ---
    df["flow"] = pd.to_numeric(df["flow"], errors="coerce").fillna(0)

    # --- Step 3: Aggregate per (user, month) ---
    monthly = (
        df.groupby(["phone_no_m", "month_id"])
        .agg(
            total_flow=("flow", "sum"),
            unique_apps=("busi_name", "nunique"),
        )
        .reset_index()
    )

    # --- Step 4: Aggregate across all months per user ---
    features = (
        monthly.groupby("phone_no_m")
        .agg(
            app_months_active=("month_id", "nunique"),
            app_total_flow=("total_flow", "sum"),
            app_avg_flow=("total_flow", "mean"),
            app_std_flow=("total_flow", "std"),
            app_unique_apps_mean=("unique_apps", "mean"),
            app_unique_apps_max=("unique_apps", "max"),
        )
        .reset_index()
    )

    # --- Step 5: Clean up ---
    features = features.fillna(0)
    return features


##User

In [11]:
def get_user_feats(df):
    """Extract per-user static and ARPU trend features (wide monthly format)."""

    df = df.copy()

    # Identify ARPU columns dynamically
    arpu_cols = [c for c in df.columns if c.startswith("arpu_")]
    if not arpu_cols:
        raise ValueError("❌ No ARPU columns found in USER dataset.")

    # Convert ARPU columns to float
    df[arpu_cols] = df[arpu_cols].apply(pd.to_numeric, errors="coerce")

    # --- Basic statistics ---
    df["arpu_mean"] = df[arpu_cols].mean(axis=1)
    df["arpu_std"] = df[arpu_cols].std(axis=1)
    df["arpu_min"] = df[arpu_cols].min(axis=1)
    df["arpu_max"] = df[arpu_cols].max(axis=1)
    df["arpu_range"] = df["arpu_max"] - df["arpu_min"]

    # --- Trend: approximate monthly slope ---
    arpu_values = df[arpu_cols].values
    months = np.arange(len(arpu_cols))
    slopes = []
    for row in arpu_values:
        if np.all(np.isnan(row)):
            slopes.append(np.nan)
        else:
            valid = ~np.isnan(row)
            if valid.sum() < 2:
                slopes.append(0)
            else:
                coeff = np.polyfit(months[valid], row[valid], 1)
                slopes.append(coeff[0])
    df["arpu_trend"] = slopes

    # --- Keep only relevant columns ---
    keep_cols = ["phone_no_m", "city_name", "county_name", "idcard_cnt", "label",
                 "arpu_mean", "arpu_std", "arpu_min", "arpu_max", "arpu_range", "arpu_trend"]

    return df[keep_cols]


##Build Aggregation

In [12]:
def aggregate_features(
    df,
    mode="full",
    window=10,
    time_col=None,
    extractor=None,
    rolling=False,
):
    """
    Feature aggregation per user.

    Supports:
      - full     → aggregate all data for each user once
      - n_events → per-user window of last `window` events
      - timely   → per-user window of last `window` days

    NEW:
      - rolling=True → generates multiple incremental snapshots per user
                       (each snapshot uses a new sliding window)
    """

    logger.info(
        f"Aggregating with mode={mode}, window={window}, time_col={time_col}, rolling={rolling}"
    )

    # --- Full aggregation (simple case)
    if mode == "full":
        return extractor(df)

    # --- Check prerequisites
    if time_col is None:
        logger.info("No time column provided — running full aggregation only once.")
        return extractor(df)

    df = df.sort_values(by=["phone_no_m", time_col])
    snapshots = []

    # --- Per-user processing
    for phone, group in tqdm(df.groupby("phone_no_m"), desc=f"{mode} aggregation"):
        group = group.sort_values(time_col).reset_index(drop=True)

        # --- Rolling by events
        if mode == "n_events":
            if rolling:
                # Generate multiple snapshots per user
                for end_idx in range(window, len(group) + 1):
                    subset = group.iloc[end_idx - window:end_idx]
                    feat = extractor(subset)
                    feat["phone_no_m"] = phone
                    feat["snapshot_time"] = group.loc[end_idx - 1, time_col]
                    feat["snapshot_id"] = end_idx
                    snapshots.append(feat)
            else:
                # Single (latest) snapshot
                subset = group.iloc[-window:]
                if not subset.empty:
                    feat = extractor(subset)
                    feat["phone_no_m"] = phone
                    feat["snapshot_time"] = group.iloc[-1][time_col]
                    snapshots.append(feat)

        # --- Rolling by days
        elif mode == "timely":
            if rolling:
                # Multiple time-based snapshots per user
                for end_idx in range(window, len(group)):
                    end_time = group.loc[end_idx, time_col]
                    start_time = end_time - pd.Timedelta(days=window)
                    subset = group[group[time_col].between(start_time, end_time)]
                    if subset.empty:
                        continue
                    feat = extractor(subset)
                    feat["phone_no_m"] = phone
                    feat["snapshot_time"] = end_time
                    feat["snapshot_id"] = end_idx
                    snapshots.append(feat)
            else:
                # Single (latest window)
                end_time = group[time_col].max()
                start_time = end_time - pd.Timedelta(days=window)
                subset = group[group[time_col].between(start_time, end_time)]
                if not subset.empty:
                    feat = extractor(subset)
                    feat["phone_no_m"] = phone
                    feat["snapshot_time"] = end_time
                    snapshots.append(feat)

    # --- Final assembly
    if not snapshots:
        logger.warning(
            f"No snapshots generated for mode={mode}, window={window}. Returning empty DataFrame."
        )
        return pd.DataFrame(columns=["phone_no_m"])

    return pd.concat(snapshots, ignore_index=True)


##Extract Features

In [13]:

def extract_all_features(data, config=None, mode="full", window=10, rolling=False):
    """
    Extract and aggregate all feature types safely (no future leakage).

    Parameters
    ----------
    data : dict
        Dictionary with DataFrames for keys: 'voice', 'sms', 'app', 'user'.
    config : dict or None
        Optional configuration.
    mode : str
        Aggregation mode: 'full', 'n_events', or 'timely'.
    window : int
        Window size for event-based or time-based aggregation.
    rolling : bool
        If True → multiple incremental snapshots per user (for event-level data).
    """

    logger.info(f"🚀 Starting feature extraction: mode={mode}, window={window}, rolling={rolling}")
    results = {}

    # ------------------------------------------------
    # 1️⃣ Identify event horizon (latest known time)
    # ------------------------------------------------
    event_times = []
    if "voice" in data and not data["voice"].empty:
        event_times.append(data["voice"]["start_datetime"].max())
    if "sms" in data and not data["sms"].empty:
        event_times.append(data["sms"]["request_datetime"].max())

    max_event_time = max(event_times) if event_times else None
    logger.info(f"🕒 Latest event timestamp across all data: {max_event_time}")

    # ------------------------------------------------
    # 2️⃣ VOICE features — rolling window or full
    # ------------------------------------------------
    if "voice" in data and not data["voice"].empty:
        logger.info("📞 Extracting VOICE features...")
        voice_feat = aggregate_features(
            df=data["voice"],
            mode=mode,
            window=window,
            time_col="start_datetime",
            extractor=get_voc_feat,
            rolling=rolling,
        )
        results["voice"] = voice_feat
    else:
        logger.warning("⚠️ VOICE data not found or empty.")

    # ------------------------------------------------
    # 3️⃣ SMS features — rolling window or full
    # ------------------------------------------------
    if "sms" in data and not data["sms"].empty:
        logger.info("💬 Extracting SMS features...")
        sms_feat = aggregate_features(
            df=data["sms"],
            mode=mode,
            window=window,
            time_col="request_datetime",
            extractor=get_sms_feats,
            rolling=rolling,
        )
        results["sms"] = sms_feat
    else:
        logger.warning("⚠️ SMS data not found or empty.")

    # ------------------------------------------------
    # 4️⃣ APP features — monthly context (no future leakage)
    # ------------------------------------------------
    if "app" in data and not data["app"].empty:
        logger.info("📱 Extracting APP (monthly) features...")
        app_df = data["app"].copy()

        if max_event_time is not None:
            cutoff_month = int(max_event_time.strftime("%Y%m"))
            app_df = app_df[app_df["month_id"] <= cutoff_month]

        app_feat = aggregate_features(
            df=app_df,
            mode="full",  # static per user
            window=window,
            time_col=None,
            extractor=get_app_feats,
        )
        results["app"] = app_feat
    else:
        logger.warning("⚠️ APP data not found or empty.")

    # ------------------------------------------------
    # 5️⃣ USER features — static or per-month up to event horizon
    # ------------------------------------------------
    if "user" in data and not data["user"].empty:
        logger.info("👤 Extracting USER (static/trend) features...")
        user_df = data["user"].copy()

        if max_event_time is not None:
            cutoff_month = int(max_event_time.strftime("%Y%m"))
            # Keep only ARPU columns up to the known month
            arpu_cols = [c for c in user_df.columns if c.startswith("arpu_")]
            valid_arpu = [c for c in arpu_cols if int(c.split("_")[1]) <= cutoff_month]
            # Always keep non-ARPU columns
            keep_cols = [c for c in user_df.columns if not c.startswith("arpu_")] + valid_arpu
            user_df = user_df[keep_cols]

        user_feat = aggregate_features(
            df=user_df,
            mode="full",
            window=window,
            time_col=None,
            extractor=get_user_feats,
        )
        results["user"] = user_feat
    else:
        logger.warning("⚠️ USER data not found or empty.")

    # ------------------------------------------------
    # ✅ Combine or return per-type results
    # ------------------------------------------------
    if not results:
        logger.warning("❌ No feature sets generated — returning empty result.")
        return {}

    logger.info(f"✅ Feature extraction complete — {len(results)} feature sets created.")
    return results


#Run Extract

In [15]:
config = load_config("configs/extract_feature.yaml")

# Load raw data
data = load_all_data(config)

# Extract features
config_mode = config["aggregation"]["mode"]
config_window = config["aggregation"]["window"]
features = extract_all_features(
    data,
    config,
    mode=config["aggregation"]["mode"],
    window=config["aggregation"]["window"]
)

# Create output directories if they don't exist
features_dir = config["features"]["features_dir"]
os.makedirs(features_dir, exist_ok=True)

results_dir = config["output"].get("results_dir", features_dir)
os.makedirs(results_dir, exist_ok=True)

# Add timestamp to filenames
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

# Save feature files
for name, df in zip(["voc", "sms", "app", "user"], features):
    output_filename = f"{name}_features_{config_mode}_{config_window}_{timestamp}.csv"
    output_path = os.path.join(features_dir, output_filename)
    df.to_csv(output_path, index=False)
    display(name)
    display(df.head())
    logger.info(f"✅ Saved {name} features → {output_path}")

'voc'

Unnamed: 0,phone_no_m,total_calls,unique_contacts,call_dur_mean,call_dur_median,call_dur_max,call_dur_min,call_dur_sum,call_dur_std,voc_hour_mode,voc_active_hours,voc_day_mode,voc_active_days,call_out_in_ratio
0,4b16c23a42fdf352951e035f80b0e0b5d0c168cb1cfd90...,39,13,-0.016437,-0.188986,1.817377,-0.413299,-0.641024,0.476297,9,3,20,1,3.33333
1,19bf579bbdfdec0102e257b3cc8e72a6f05ce07b3e42a6...,1496,367,0.128434,-0.089291,10.796161,-0.413299,192.137445,0.820084,17,18,30,31,0.07163324
2,4b61d1f3c6bd529f2da9e368a3644be6dc7d916021b598...,66,62,0.583556,0.057137,10.32884,-0.407068,38.514728,1.893961,13,5,3,3,6600000.0
3,d79ef1d1471faf94e2fa596b6d98694f94975a104e9b80...,364,349,0.044024,-0.28245,10.796161,-0.413299,16.024791,1.111393,16,10,23,6,36400000.0
4,ac2eb79b2020d27b76031ba4b7ef4a243d1d1363f0c1c1...,148,130,-0.032665,-0.285565,8.995419,-0.413299,-4.83439,0.905813,10,13,15,4,3.624999


'sms'

Unnamed: 0,phone_no_m,sms_total,sms_unique_contacts,sms_active_hours,sms_peak_hour,sms_active_days,sms_peak_day,sms_send_recv_ratio
0,e96ba2776ede5a56fffed9c86483f87725ab0d2cb7591c...,1020,62,20,7,31,16,0.088581
1,d0c57e1d2099b0abc33b61cc5e8a3454e96c363b5888df...,1127,126,20,17,31,16,0.033945
2,f45d7d167ce6a2f1c21048be3183d84e47f4acce53d1c8...,386,30,19,11,30,9,0.005208
3,d84a13e80df3055c59350ce185c139962ed8d5ed3b221a...,1090,147,20,16,31,15,0.00276
4,b8370fac19ea042d8bf5c213c604eb57e3a6e13ed4a8e3...,5506,453,24,14,31,2,0.025517


'app'

Unnamed: 0,phone_no_m,app_months_active,app_total_flow,app_avg_flow,app_std_flow,app_unique_apps_mean,app_unique_apps_max
0,0460a1f64869b7263e73c6ded8390d3afdd9b5eda757bc...,1,122.411707,122.411707,0.0,14.0,14
1,082bae239f712d32c5bc3016692d23f383cd457fe9d085...,2,2.04767,1.023835,1.442879,12.0,22
2,1038273218f1a164b36c8fac6fc182dc5016f4d5daf20a...,8,279797.230919,34974.653865,12075.486174,185.0,207
3,19bf579bbdfdec0102e257b3cc8e72a6f05ce07b3e42a6...,1,0.0,0.0,0.0,1.0,1
4,1a89e1e932b33899f6a7e0b9fc07f481a722fc4e0c8c96...,1,8.375146,8.375146,0.0,20.0,20


'user'

Unnamed: 0,phone_no_m,city_name,county_name,idcard_cnt,label,arpu_mean,arpu_std,arpu_min,arpu_max,arpu_range,arpu_trend
0,9855c1ff2c8b1e8c5bff0c0675ee1902cae41b5ad47f61...,Unknown,Unknown,1,1,-0.68696,0.098126,-0.796008,-0.518438,0.27757,0.007548
1,1d58e47df06b78b9e5c250cb6c21886f3a0243d5810c77...,成都,成都直属部门,1,1,-0.650643,0.132109,-0.796008,-0.38661,0.409398,0.027981
2,dbe2713eec4aa9b53ea5d002ae7eb4280018e7e44045e5...,成都,成华分公司,3,1,-0.539423,0.086161,-0.652475,-0.431199,0.221276,0.004976
3,664c045c24f1f5d7e019621684cd97f5a7469d7111f891...,成都,高新分公司,5,1,-0.522587,0.537671,-0.796008,0.796542,1.592551,0.11713
4,0460a1f64869b7263e73c6ded8390d3afdd9b5eda757bc...,成都,高新分公司,5,1,-0.625656,0.2324,-0.796008,-0.094362,0.701646,0.013387
