In [2]:
#!/usr/bin/env python3
# extract_from_logs.py


import pandas as pd
import json, ast, os
from typing import Any, Dict, List, Union

# ====== INPUT ======
BASE_DIR = r"C:\Users\Multiplexon\Desktop\data\d7"
INPUT_FILES = [
    os.path.join(BASE_DIR, "entrypoint7_ALL.csv"),
    # thêm file khác nếu muốn
]

# ---------- helpers: topics/data decode ----------
def _topic_to_address(topic_hex: str) -> str:
    if not isinstance(topic_hex, str):
        return ""
    h = topic_hex.lower()
    if h.startswith("0x"):
        h = h[2:]
    return "0x" + h[-40:] if len(h) >= 40 else ""

def _word_at(data_hex: str, i: int) -> str:
    if not isinstance(data_hex, str):
        return ""
    h = data_hex.lower()
    if h.startswith("0x"):
        h = h[2:]
    return h[64 * i : 64 * (i + 1)]

def _ihex(w: str) -> int:
    try:
        return int(w or "0", 16)
    except Exception:
        return 0

# ---------- robust parse logs ----------
def _parse_logs(cell: Any) -> Union[List, Dict]:
    """Trả list/dict từ cell 'logs' (JSON/escaped/literal)."""
    if isinstance(cell, (list, dict)):
        return cell
    if cell is None:
        return []
    s = str(cell)
    if not s or s.strip().lower() in ("nan", "none"):
        return []

    try:
        return json.loads(s)     # JSON trực tiếp
    except Exception:
        pass
    try:
        lit = ast.literal_eval(s)  # Python literal
        if isinstance(lit, (list, dict)):
            return lit
    except Exception:
        pass
    if s.startswith('"') and s.endswith('"'):  # JSON bị bọc chuỗi
        inner = s[1:-1]
        try:
            inner = inner.encode("utf-8").decode("unicode_escape")
            return json.loads(inner)
        except Exception:
            pass
    return []

def _extract_uoe_from_log_item(item: Dict[str, Any]) -> Dict[str, str]:
    """Trích dữ liệu từ 1 object UserOperationEvent."""
    args = item.get("args", {}) or {}
    out = {
        "sender": args.get("sender") or item.get("sender", ""),
        "paymaster": args.get("paymaster") or item.get("paymaster", ""),
        "actualGasCost": args.get("actualGasCost") or item.get("actualGasCost", ""),
        "actualGasUsed": args.get("actualGasUsed") or item.get("actualGasUsed", ""),
        "nonce": args.get("nonce") or item.get("nonce", ""),
        "success": args.get("success") or item.get("success", ""),
        "logIndex": item.get("logIndex", ""),
    }

    topics = item.get("topics", []) or []
    data_hex = item.get("data", "") or ""

    # fallback từ topics
    if not out["sender"] and len(topics) >= 3:
        out["sender"] = _topic_to_address(topics[2])
    if not out["paymaster"] and len(topics) >= 4:
        out["paymaster"] = _topic_to_address(topics[3])

    # fallback từ data (abi-encoded words)
    if data_hex:
        w0, w1, w2, w3 = _word_at(data_hex, 0), _word_at(data_hex, 1), _word_at(data_hex, 2), _word_at(data_hex, 3)
        if not out["nonce"]:
            out["nonce"] = str(_ihex(w0))
        if not out["success"]:
            out["success"] = "1" if _ihex(w1) != 0 else "0"
        if not out["actualGasCost"]:
            out["actualGasCost"] = str(_ihex(w2))
        if not out["actualGasUsed"]:
            out["actualGasUsed"] = str(_ihex(w3))

    for k in ("sender", "paymaster", "actualGasCost", "actualGasUsed", "nonce", "success", "logIndex"):
        v = out.get(k)
        out[k] = "" if v is None else str(v)
    return out

def _extract_uoe_fields_from_logs(logs_cell: Any) -> Dict[str, str]:
    logs = _parse_logs(logs_cell)
    if isinstance(logs, dict):
        logs = [logs]
    best, best_score = {}, -1
    if isinstance(logs, list):
        for it in logs:
            if not isinstance(it, dict):
                continue
            if str(it.get("event", "")) != "UserOperationEvent":
                continue
            cand = _extract_uoe_from_log_item(it)
            score = sum(1 for v in cand.values() if v not in ("", None))
            if score > best_score:
                best, best_score = cand, score
            if score >= 7:  # đủ field thì dừng sớm
                break
    for c in ("sender", "paymaster", "actualGasCost", "actualGasUsed", "nonce", "success", "logIndex"):
        if c not in best:
            best[c] = ""
    return best

def process_one_file(path: str) -> None:
    df = pd.read_csv(
        path, dtype=str, keep_default_na=False, na_filter=False,
        encoding="utf-8-sig", on_bad_lines="warn",
    )

    # Chỉ tách từ logs
    if "logs" in df.columns:
        extracted = df["logs"].apply(_extract_uoe_fields_from_logs)
        exdf = extracted.apply(pd.Series)
        needed = ["sender", "paymaster", "actualGasCost", "actualGasUsed", "nonce", "success", "logIndex"]
        for c in needed:
            if c not in exdf.columns:
                exdf[c] = ""
        out_df = pd.concat([df.reset_index(drop=True), exdf[needed].reset_index(drop=True)], axis=1)
    else:
        out_df = df.copy()
        for c in ["sender", "paymaster", "actualGasCost", "actualGasUsed", "nonce", "success", "logIndex"]:
            out_df[c] = ""

    base, _ = os.path.splitext(path)
    out_path = f"{base}_from_logs.csv"
    out_df.to_csv(out_path, index=False, encoding="utf-8-sig")
    print(f"✅ Saved: {out_path} (rows={len(out_df)}, cols={len(out_df.columns)})")

def main():
    for fp in INPUT_FILES:
        if not os.path.isfile(fp):
            print(f"⚠️ Not found: {fp}")
            continue
        process_one_file(fp)

if __name__ == "__main__":
    main()


✅ Saved: C:\Users\Multiplexon\Desktop\data\d7\entrypoint7_ALL_from_logs.csv (rows=62766, cols=20)


In [None]:
#change length_byte 
#!/usr/bin/env python3
# keep_features_only.py
# Giữ đúng các header: FEATURES + TARGET (theo thứ tự), drop các cột khác.

import os
import pandas as pd
import numpy as np

# ====== CONFIG ======
BASE_DIR  = r"C:\Users\Multiplexon\Desktop\data\d7"
FILE_NAME = "combine 2025_from_logs.csv"   # đầu vào sau bước extract_from_logs
IN_PATH   = os.path.join(BASE_DIR, FILE_NAME)
OUT_PATH  = os.path.join(BASE_DIR, f"{os.path.splitext(FILE_NAME)[0]}_feats_target.csv")

FEATURES = ['Original_len','Txn Fee','logIndex','actualGasCost','Blockno','DateTime_ts','nonce']
TARGET   = 'Gas Used'

def hex_len_bytes(x):
    try:
        s = str(x).strip()
        if s.lower().startswith("0x"):
            return len(s[2:]) // 2
    except Exception:
        pass
    return np.nan

def to_number(x):
    if x is None: return np.nan
    s = str(x).strip()
    if s == "":   return np.nan
    try:
        if s.lower().startswith("0x"):
            return int(s, 16)
        return pd.to_numeric(s, errors="coerce")
    except Exception:
        return np.nan

def main():
    if not os.path.isfile(IN_PATH):
        print(f"⚠️ Not found: {IN_PATH}")
        return

    df = pd.read_csv(IN_PATH, dtype=str, keep_default_na=False, na_filter=False, encoding="utf-8-sig")

    # Bổ sung 2 cột dẫn xuất nếu thiếu
    if "Original_len" not in df.columns and "Original" in df.columns:
        df["Original_len"] = df["Original"].apply(hex_len_bytes)
    if "DateTime_ts" not in df.columns and "DateTime (UTC)" in df.columns:
        dt = pd.to_datetime(df["DateTime (UTC)"], errors="coerce", utc=True)
        df["DateTime_ts"] = (dt.view("int64") // 10**9)

    # Ép kiểu số cho các cột cần giữ
    need_numeric = set(FEATURES + [TARGET])
    for col in need_numeric:
        if col in df.columns:
            df[col] = df[col].apply(to_number)

    # Chỉ giữ đúng các header yêu cầu, theo thứ tự
    wanted = [c for c in FEATURES if c in df.columns]
    if TARGET in df.columns:
        wanted += [TARGET]

    out_df = df.reindex(columns=wanted)
    out_df.to_csv(OUT_PATH, index=False, encoding="utf-8-sig")

    print(f"✅ Saved: {OUT_PATH} (shape={out_df.shape})")
    missing = [c for c in FEATURES + [TARGET] if c not in wanted]
    if missing:
        print(f"ℹ️ Missing in input (không thể xuất): {missing}")

if __name__ == "__main__":
    main()
