<a href="https://colab.research.google.com/github/farahBassoumi/defi-sandwich-attack-detection/blob/main/extracted_data_reduction_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd


In [None]:
sybil_df=pd.read_csv('address_is_sybil.csv',sep=',')

In [None]:
sybil_df['is_sybil'] = sybil_df['is_sybil'].astype(bool)

In [None]:
sybil_df.head(-1)

Unnamed: 0,address,is_sybil
0,1ab1c070c7f1958dbfc5537340cd8056580c43fc,True
1,3c8cbd613857965267bcd4bdec7b794dd53969a0,True
2,2d93d0c4753e9b5c04a0e5b66a033ac71501fe6d,True
3,b10bd34199663ebfbf20d740959d773e34030b59,True
4,1e3a64ce9b4a573674284ab590b4fc538746fa21,True
...,...,...
220584,2286fe256579ee6c5074d3a4c6d0502094c1e47b,False
220585,3a46674c99e7be4d2f0aa5e46800f1cdb0ae7c50,False
220586,6d6a09aeec793c6ef96af5705a7b6a1ca4890af8,False
220587,6d762a25a43976d3a58eb3771ebe17e6faa33395,False


In [None]:
import pandas as pd
import numpy as np
from tqdm import tqdm

# ========== Step 0: Global Settings ==========
tqdm.pandas()
pd.set_option('display.max_columns', None)

def report_memory(df, label=""):
    mem = df.memory_usage(deep=True).sum() / 1024**2
    tqdm.write(f"{label}Memory usage: {mem:.2f} MB")

# ========== Step 1: Load CSV File ==========
def load_csv_with_progress(filepath="merged_13.csv") -> pd.DataFrame:
    tqdm.write(f"========== Step 1: Loading CSV File ({filepath}) ==========")
    total_rows = sum(1 for _ in open(filepath)) - 1  # exclude header
    chunks = []
    with pd.read_csv(filepath, sep=',', chunksize=10_000) as reader:
        for chunk in tqdm(reader, total=total_rows // 10_000 + 1, desc=f"📥 Reading {filepath}"):
            chunks.append(chunk)
    df = pd.concat(chunks, ignore_index=True)
    tqdm.write(f"✅ Loaded {len(df):,} rows from {filepath}")
    return df

# ========== Step 2: Optimize DataFrame ==========
def optimize_dataframe(df):
    tqdm.write("========== Step 2: Optimizing DataFrame ==========")

    for col in tqdm(df.select_dtypes(include='number').columns, desc="🔧 Downcasting numeric types"):
        if 'float' in str(df[col].dtype):
            df[col] = pd.to_numeric(df[col], downcast='float')
        else:
            df[col] = pd.to_numeric(df[col], downcast='integer')

    for col in tqdm(df.select_dtypes(include='object').columns, desc="🔧 Converting object columns to category"):
        try:
            pd.to_datetime(df[col].dropna().iloc[0])
            continue
        except Exception:
            pass
        if df[col].nunique() / len(df) < 0.5:
            df[col] = df[col].astype('category')

    for dt_col in ['detecttime', 'detect_date']:
        if dt_col in df.columns:
            df[dt_col] = pd.to_datetime(df[dt_col], errors='coerce')

    if 'input' in df.columns:
        tqdm.write("🧹 Dropping 'input' column")
        df.drop(columns=['input'], inplace=True)

    tqdm.write("🧹 Dropping duplicate rows")
    df.drop_duplicates(inplace=True)

    return df

# ========== Step 3: Enrich Confirmed Transactions ==========
def enrich_confirmed_transactions(df):
    tqdm.write("========== Step 3: Enriching Confirmed Transactions ==========")

    df['detecttime'] = pd.to_datetime(df['detecttime'], errors='coerce')
    confirmed_df = df[df["status"] == "confirmed"].copy()
    confirmed_df["was_evicted"] = 0
    confirmed_df["drop_reason"] = None
    confirmed_df["was_rejected"] = 0
    confirmed_df["rejection_reason"] = None

    grouped = df.groupby("hash")

    def enrich(row):
        tx_hash = row["hash"]
        if tx_hash not in grouped.groups:
            return pd.Series({
                "was_evicted": 0,
                "drop_reason": None,
                "was_rejected": 0,
                "rejection_reason": None
            })

        related = grouped.get_group(tx_hash)

        evicted = related[related["status"] == "evicted"]
        was_evicted = len(evicted)
        drop_reason = evicted["dropreason"].mode().iloc[0] if was_evicted > 0 and not evicted["dropreason"].isna().all() else None

        rejected = related[related["status"] == "rejected"]
        was_rejected = len(rejected)
        rejection_reason = rejected["rejectionreason"].mode().iloc[0] if was_rejected > 0 and not rejected["rejectionreason"].isna().all() else None

        return pd.Series({
            "was_evicted": was_evicted,
            "drop_reason": drop_reason,
            "was_rejected": was_rejected,
            "rejection_reason": rejection_reason
        })

    confirmed_df[["was_evicted", "drop_reason", "was_rejected", "rejection_reason"]] = confirmed_df.progress_apply(enrich, axis=1)

    pending_times = df[df['status'] == 'pending'][['hash', 'detecttime']].rename(columns={'detecttime': 'pending_time'})
    confirmed_df = confirmed_df.merge(pending_times, on='hash', how='left')

    confirmed_df['pending_time'] = pd.to_datetime(confirmed_df['pending_time'], errors='coerce')
    confirmed_df['time_pending'] = (confirmed_df['detecttime'] - confirmed_df['pending_time']).dt.total_seconds()
    confirmed_df.drop(columns=['pending_time'], inplace=True)

    final_df = df[~df['status'].isin(['confirmed', 'evicted', 'rejected'])].copy()
    final_df = pd.concat([final_df, confirmed_df], ignore_index=True)
    final_df.sort_values(by='detecttime', inplace=True)
    final_df.reset_index(drop=True, inplace=True)

    return final_df

# ========== Step 4: Deduplicate Confirmed Transactions ==========
def deduplicate_confirmed_transactions(df):
    tqdm.write("========== Step 4: Deduplicating Confirmed Transactions ==========")
    confirmed_df = df[df['status'] == 'confirmed'].copy()
    confirmed_df['detecttime'] = pd.to_datetime(confirmed_df['detecttime'], errors='coerce')
    confirmed_df = confirmed_df.sort_values(by='detecttime').drop_duplicates(subset='hash', keep='first')

    df = df[df['status'] != 'confirmed']
    df = pd.concat([df, confirmed_df], ignore_index=True)

    tqdm.write(f"✅ Confirmed transactions deduplicated.")
    return df

# ========== Step 5: Add Sybil Label ==========
def add_sybil_label(df, sybil_df):
    tqdm.write("========== Step 5: Merging is_sybil Labels ==========")
    try:
        df['fromaddress_clean'] = df['fromaddress'].str.lower().str.replace("^0x", "", regex=True)
        sybil_df['address_clean'] = sybil_df['address'].str.lower().str.replace("^0x", "", regex=True)

        df = df.merge(
            sybil_df[['address_clean', 'is_sybil']],
            how='left',
            left_on='fromaddress_clean',
            right_on='address_clean'
        )

        df.drop(columns=['fromaddress_clean', 'address_clean'], inplace=True)

        original_len = len(df)
        df = df[df['is_sybil'].notna()].reset_index(drop=True)
        tqdm.write(f"🧹 Dropped {original_len - len(df)} rows without sybil label.")
    except Exception as e:
        tqdm.write(f"⚠️ Failed to merge is_sybil: {e}")

    return df

# ========== Step 6: Save Final CSV ==========
def save_csv(df, prefix="df_labeled"):
    tqdm.write("========== Step 6: Saving Output CSV ==========")
    df.to_csv(f"{prefix}.csv", index=False)
    tqdm.write(f"✅ Saved as {prefix}.csv")

# ========== Master Callable Function ==========
def run_full_pipeline(filepath: str, sybil_file: str = None, is_training: bool = True):
    tqdm.write("========== 🏁 Starting Full Pipeline ==========")

    df = load_csv_with_progress(filepath)
    report_memory(df, "Original ")

    df = optimize_dataframe(df)
    report_memory(df, "After Optimization ")

    df = enrich_confirmed_transactions(df)
    report_memory(df, "After Enrichment ")

    df = deduplicate_confirmed_transactions(df)
    report_memory(df, "After Deduplication ")

    df.drop(columns=['reorg', 'replace', 'dropreason', 'rejectionreason', 'network'], inplace=True, errors='ignore')

    if is_training:
        if not sybil_file:
            raise ValueError("You must provide sybil_file for training.")
        sybil_df = pd.read_csv(sybil_file)
        df = add_sybil_label(df, sybil_df)
        save_csv(df, prefix=f"{filepath.split('.')[0]}_labeled_sybil")
    else:
        save_csv(df, prefix=f"{filepath.split('.')[0]}_for_prediction")

    tqdm.write("✅ Full Pipeline Complete")


In [None]:
run_full_pipeline(
    filepath="merged_13.csv",#exemple_file_name
    sybil_file="sybil_addresses.csv",
    is_training=True  # or False for prediction
)


In [None]:
run_sybil_processing_pipeline("prediction_df_for_day.csv", is_training=False)
