Phase 0 — Orientation & Tooling (Week 1)

* **Local dev environment ready**
    Repo folder: /Software_development/Python/projects/wallet_analytics
* **API key obtained**
    Alchemy API key: L1x4RXcfGHuBfGhvZSwof
* **Git repo initialized**
    https://github.com/Dev-Uchiha/wallet_analytics.git


To start, i created a repository folder on my laptop called wallet_analytics where everything related to the project can be stored.
I then installed a python package manager called "uv". A package manager is a python (programming language) toolkit which allows the user to install all the needed tools (packages of code) to create a project. For e.g to do maths using python, i would need to install "numpy" which will already have all the code needed to do calculations.
I chose uv specifically because it is much faster at downloading packages than the standard package manager "pip" and it is also what we use at work.
I then used uv to create a virtual environment inside the wallet_analytics folder. This virtual environment is the place where all the installed tools(code) related to the projects will reside. Each project should have its own virtual environment for code cleanliness.

I then created an account with Alchemy to get their API key (An API is a way for one software to speak to another). This key allows me to get data about Ethereum wallets from the Alchemy website.

Lastly, I installed "git", which allows me to keep track of the different changes i make to my code. I also connected it to "github" so that i can store my code on the github website.

In [None]:
#Creating the virtual environment

cd users/name/project
uv venv .venv        # once
uv sync              # whenever deps change
source .venv/bin/activate

Phase 1 — Raw On-Chain Data Ingestion (Weeks 2–3)
* Normal transactions
* ERC-20 token transfers

**Deliverable**

* Script that pulls data for 1 wallet
* Raw tables saved locally
* Re-runnable without manual edits


I then created various functions (blocks of code that have a spefic purpose), to return the last 10 transactions from vitalik buterins eth wallet, displaying the transaction hash (a unique transaction id), eth amount and data. This can be verified by putting his wallet address in an eth blockchain explorer (a website which allows you to track wallet data) to see if the last 10 transactions are the same. 

https://etherscan.io/address/0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045

The next step was to bring back all the relevant wallet data and save them locally as .csv files. 
This included:

**transactions**
tx_hash
block_number
timestamp
from_address
to_address
value_eth
gas_used
gas_price
status

**token_transfers**
tx_hash
token_symbol
token_contract
from_address
to_address
value
timestamp

The differences between transactions and token transfers are:
    - a transaction moves a value of eth (the base asset) from one wallet to another (e.g sending someone 5 eth). It initiates an execution and shows what was sent and who sent it.
    - a token transfer changes the ownership of a token from one wallet to another (e.g sending someone an nft). It shows what did the contract say happened. 
    - all token transfers happen within transactions, but a token transfer isnt needed for a transaction
    - transactions are recorded in the transaction object() but token transfers are recorded in the logs (a description of what happened in the contract)


In [None]:
# Return the last 10 transactions from an ethereum wallet
# Return the most relevant data headers

import requests
import pandas as pd
import time
import os
import json
from datetime import datetime

ALCHEMY_API_KEY = "L1x4RXcfGHuBfGhvZSwof"  # ideally: os.getenv("ALCHEMY_API_KEY")
ALCHEMY_BASE_URL = "https://eth-mainnet.g.alchemy.com/v2"


def get_alchemy_json(method, params):
    """Make a request to Alchemy JSON-RPC."""
    url = f"{ALCHEMY_BASE_URL}/{ALCHEMY_API_KEY}"
    payload = {"id": 1, "jsonrpc": "2.0", "method": method, "params": params}

    r = requests.post(url, json=payload, headers={"Content-Type": "application/json"})
    r.raise_for_status()

    data = r.json()
    if "error" in data:
        raise Exception(f"Alchemy API error: {data['error']}")

    return data


def get_eth_balance(address):
    """Return ETH balance for an address (in ETH)."""
    data = get_alchemy_json("eth_getBalance", [address, "latest"])
    balance_wei = int(data["result"], 16)
    return balance_wei / 10**18


def get_last_10_transactions(address):
    """Return the last 10 *normal* (external ETH) transfers involving the address."""
    base_params = {
        "fromBlock": "0x0",
        "toBlock": "latest",
        "category": ["external"],
        "maxCount": hex(10),
        "excludeZeroValue": False,
        "withMetadata": True,
        "order": "desc"
    }

    all_txs = []

    # Transfers TO the address
    params_to = base_params.copy()
    params_to["toAddress"] = address
    data_to = get_alchemy_json("alchemy_getAssetTransfers", [params_to])
    all_txs.extend(data_to.get("result", {}).get("transfers", []))

    time.sleep(0.1)  # rate limiting

    # Transfers FROM the address
    params_from = base_params.copy()
    params_from["fromAddress"] = address
    data_from = get_alchemy_json("alchemy_getAssetTransfers", [params_from])
    all_txs.extend(data_from.get("result", {}).get("transfers", []))

    df = pd.DataFrame(all_txs)
    if df.empty:
        return df

    # Deduplicate and sort so we truly return the most recent 10.
    if "hash" in df.columns:
        df = df.drop_duplicates(subset=["hash"], keep="first")

    if "blockNum" in df.columns:
        df["_block_num"] = df["blockNum"].apply(
            lambda x: int(x, 16)
            if isinstance(x, str) and x.startswith("0x")
            else -1
        )
        df = df.sort_values("_block_num", ascending=False).drop(columns=["_block_num"])

    return df.head(10).reset_index(drop=True)


def list_relevant_data_headers(address):
    """Print the main field names returned by the API.

    Think of these as the "column headers" you can use once you load the data into a DataFrame.
    """
    params = {
        "fromBlock": "0x0",
        "toBlock": "latest",
        "category": ["external"],
        "maxCount": hex(10),
        "excludeZeroValue": False,
        "toAddress": address,
    }

    data = get_alchemy_json("alchemy_getAssetTransfers", [params])
    transfers = (data.get("result", {}) or {}).get("transfers", []) or []

    headers = set()
    for t in transfers:
        if isinstance(t, dict):
            headers.update(t.keys())

    print("Relevant data headers:", sorted(list(headers)))


# Example usage
if ALCHEMY_API_KEY:
    eth_address = "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"  # example address

    balance = get_eth_balance(eth_address)
    print(f"ETH Balance: {balance:.6f} ETH")

    list_relevant_data_headers(eth_address)

    last_10 = get_last_10_transactions(eth_address)
    print(f"Last 10 transactions fetched: {len(last_10)}")

    pd.set_option('display.max_colwidth', None)
    pd.set_option('display.float_format', '{:.15f}'.format)

    display(last_10[['uniqueId', 'value']].assign(timestamp=last_10['metadata'].apply(lambda x: x['blockTimestamp'])))



else:
    print("Please set your ALCHEMY_API_KEY to use the API")


## Phase 2 — Wallet-Level Analytics (Weeks 4–5)


### Derived Metrics (Core)

Per wallet:

* Total ETH received / sent
* Net ETH balance (from txs, not live balance)
* Gas spent (ETH)
* Transaction count
* Active days
* First / last activity

Token-level:

* Tokens interacted with
* Net token inflow/outflow
* Frequency by token

### Behavioral Metrics (Differentiator)

* Avg tx size
* Tx frequency over time
* % outgoing vs incoming
* Gas per transaction trend
* Dormant periods

### Output Tables

**wallet_summary**

```
wallet
first_tx_date
last_tx_date
tx_count
total_eth_sent
total_eth_received
net_eth
total_gas_spent
```

**wallet_activity**

```
wallet
period
tx_count
eth_sent
eth_received
gas_spent
```

**Deliverable**

* Clean analytical tables
* Documented metric definitions
* Deterministic transformations

In [14]:
# Transactions and token_transfers in project schema — display last 10, CSV saves last 100

from datetime import datetime

OUTPUT_DIR = "output"
DISPLAY_COUNT = 10
CSV_COUNT = 100


def _fetch_external_transfers(address, max_count):
    """Fetch up to max_count external (ETH) transfers for address; reuses get_alchemy_json from above."""
    base_params = {
        "fromBlock": "0x0",
        "toBlock": "latest",
        "category": ["external"],
        "maxCount": hex(max_count),
        "excludeZeroValue": False,
        "withMetadata": True,
        "order": "desc",
    }
    all_txs = []
    for key, val in [("toAddress", address), ("fromAddress", address)]:
        p = {**base_params, key: val}
        data = get_alchemy_json("alchemy_getAssetTransfers", [p])
        all_txs.extend(data.get("result", {}).get("transfers", []))
        time.sleep(0.1)
    df = pd.DataFrame(all_txs)
    if df.empty:
        return df
    if "hash" in df.columns:
        df = df.drop_duplicates(subset=["hash"], keep="first")
    if "blockNum" in df.columns:
        df["_bn"] = df["blockNum"].apply(
            lambda x: int(x, 16) if isinstance(x, str) and x.startswith("0x") else -1
        )
        df = df.sort_values("_bn", ascending=False).drop(columns=["_bn"])
    return df.head(max_count).reset_index(drop=True)


def get_transactions(address, save_csv=False):
    """
    Return normal (external ETH) transactions as a DataFrame with schema:
    tx_hash, block_number, timestamp, from_address, to_address, value_eth, gas_used, gas_price, status.
    Displays last 10; CSV (when save_csv=True) contains last 100. Saves to output/transactions_{date}.csv.
    """
    raw = _fetch_external_transfers(address, CSV_COUNT)
    if raw.empty:
        return pd.DataFrame(columns=[
            "tx_hash", "block_number", "timestamp", "from_address", "to_address",
            "value_eth", "gas_used", "gas_price", "status",
        ])

    rows = []
    for _, row in raw.iterrows():
        tx_hash = row.get("hash", "")
        block_hex = row.get("blockNum", "0x0")
        block_number = int(block_hex, 16) if isinstance(block_hex, str) and block_hex.startswith("0x") else 0
        meta = row.get("metadata") or {}
        timestamp = meta.get("blockTimestamp", "")
        from_addr = row.get("from", "")
        to_addr = row.get("to", "")
        val = row.get("value")
        if val is None:
            value_wei = 0
        elif isinstance(val, str) and str(val).startswith("0x"):
            value_wei = int(val, 16)
        else:
            value_wei = int(val) if val is not None else 0
        value_eth = value_wei / 10**18

        gas_used, gas_price, status = None, None, None
        try:
            tx_data = get_alchemy_json("eth_getTransactionByHash", [tx_hash])
            receipt = get_alchemy_json("eth_getTransactionReceipt", [tx_hash])
            time.sleep(0.05)
            if tx_data.get("result"):
                gas_price = int(tx_data["result"].get("gasPrice", "0x0"), 16)
            if receipt.get("result"):
                gas_used = int(receipt["result"].get("gasUsed", "0x0"), 16)
                status = "success" if int(receipt["result"].get("status", "0x0"), 16) == 1 else "failed"
        except Exception:
            pass

        rows.append({
            "tx_hash": tx_hash,
            "block_number": block_number,
            "timestamp": timestamp,
            "from_address": from_addr,
            "to_address": to_addr,
            "value_eth": value_eth,
            "gas_used": gas_used,
            "gas_price": gas_price,
            "status": status,
        })

    df = pd.DataFrame(rows)
    display(df.head(DISPLAY_COUNT))
    if save_csv:
        os.makedirs(OUTPUT_DIR, exist_ok=True)
        date_str = datetime.now().strftime("%Y-%m-%d")
        path = os.path.join(OUTPUT_DIR, f"transactions_{date_str}.csv")
        df.to_csv(path, index=False)
        print(f"Saved {len(df)} rows to {path}")
    return df


def _fetch_token_transfers(address, max_count):
    """Fetch up to max_count ERC-20 token transfers for address."""
    base_params = {
        "fromBlock": "0x0",
        "toBlock": "latest",
        "category": ["erc20"],
        "maxCount": hex(max_count),
        "excludeZeroValue": False,
        "withMetadata": True,
        "order": "desc",
    }
    all_txs = []
    for key, val in [("toAddress", address), ("fromAddress", address)]:
        p = {**base_params, key: val}
        data = get_alchemy_json("alchemy_getAssetTransfers", [p])
        all_txs.extend(data.get("result", {}).get("transfers", []))
        time.sleep(0.1)
    df = pd.DataFrame(all_txs)
    if df.empty:
        return df
    if "hash" in df.columns:
        df = df.drop_duplicates(subset=["hash"], keep="first")
    if "blockNum" in df.columns:
        df["_bn"] = df["blockNum"].apply(
            lambda x: int(x, 16) if isinstance(x, str) and x.startswith("0x") else -1
        )
        df = df.sort_values("_bn", ascending=False).drop(columns=["_bn"])
    return df.head(max_count).reset_index(drop=True)


def get_token_transfers(address, save_csv=False):
    """
    Return ERC-20 token transfers as a DataFrame with schema:
    tx_hash, token_symbol, token_contract, from_address, to_address, value, timestamp.
    Displays last 10; CSV (when save_csv=True) contains last 100. Saves to output/token_transfers_{date}.csv.
    """
    raw = _fetch_token_transfers(address, CSV_COUNT)
    if raw.empty:
        return pd.DataFrame(columns=[
            "tx_hash", "token_symbol", "token_contract", "from_address", "to_address",
            "value", "timestamp",
        ])

    rows = []
    for _, row in raw.iterrows():
        meta = row.get("metadata") or {}
        raw_contract = row.get("rawContract") or {}
        token_contract = raw_contract.get("address", "") or ""
        # Alchemy may return 'asset' as symbol (e.g. "USDC"); fallback to contract or "N/A"
        token_symbol = row.get("asset") or row.get("symbol") or token_contract or "N/A"
        if isinstance(token_symbol, dict):
            token_symbol = token_symbol.get("symbol", "N/A") or "N/A"
        value_raw = row.get("value")
        if value_raw is None:
            value = None
        elif isinstance(value_raw, (int, float)):
            value = float(value_raw)
        else:
            try:
                value = int(str(value_raw), 16) if str(value_raw).startswith("0x") else float(value_raw)
            except (ValueError, TypeError):
                value = value_raw
        rows.append({
            "tx_hash": row.get("hash", ""),
            "token_symbol": str(token_symbol),
            "token_contract": token_contract,
            "from_address": row.get("from", ""),
            "to_address": row.get("to", ""),
            "value": value,
            "timestamp": meta.get("blockTimestamp", ""),
        })

    df = pd.DataFrame(rows)
    display(df.head(DISPLAY_COUNT))
    if save_csv:
        os.makedirs(OUTPUT_DIR, exist_ok=True)
        date_str = datetime.now().strftime("%Y-%m-%d")
        path = os.path.join(OUTPUT_DIR, f"token_transfers_{date_str}.csv")
        df.to_csv(path, index=False)
        print(f"Saved {len(df)} rows to {path}")
    return df


# Example: display last 10 for both; CSV files contain last 100
if ALCHEMY_API_KEY:
    eth_address = "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"

    print("--- Last 10 transactions ---")
    tx_df = get_transactions(eth_address, save_csv=True)

    print("\n--- Last 10 token transfers ---")
    token_df = get_token_transfers(eth_address, save_csv=True)
else:
    print("Please set ALCHEMY_API_KEY (run the cell above first).")

Unnamed: 0,tx_hash,block_number,timestamp,from_address,to_address,value_eth,gas_used,gas_price,status
0,0x6d9e296f5880a91fc5d3791167aae70317d305dc275e4e6d15ad61098d5df2de,24398720,2026-02-06T15:36:35.000Z,0x81ea4f3505bf366f87a84f9cff55226cf4a4e98d,0xd8da6bf26964af9d7eed9e03e53415d37aa96045,0.0,31480,619195254,success
1,0x5b0d81bab4af24e07c797e193bdc988b80d232fa9614215efbebcbf589b1b898,24391001,2026-02-05T13:43:47.000Z,0xf8fc9a91349ebd2033d53f2b97245102f00aba96,0xd8da6bf26964af9d7eed9e03e53415d37aa96045,0.0,21062,2238889869,success
2,0x92434a4685614ce4d30b20e905587c7174909ff498def2447d20c695321b383d,24389810,2026-02-05T09:43:23.000Z,0xb063b093f7cd53165b4e7d32ff85803ae0572ea9,0xd8da6bf26964af9d7eed9e03e53415d37aa96045,0.0,21062,180950015,success
3,0x913636ad0b35bd643cdb81a165125a643bd76e8655440f960a8bef43bfee3c1a,24359893,2026-02-01T05:21:47.000Z,0xf8fc9a91349ebd2033d53f2b97245102f00aba96,0xd8da6bf26964af9d7eed9e03e53415d37aa96045,0.0,21062,1080380430,success
4,0x05ba4ddef39235564942c44ccf943500b982ce8fb38719491a5d683173a76391,24356474,2026-01-31T17:53:35.000Z,0xf8fc9a91349ebd2033d53f2b97245102f00aba96,0xd8da6bf26964af9d7eed9e03e53415d37aa96045,0.0,21062,9636431555,success
5,0x1fd61d1067de6e9724c09689ed385057c181d9f3c92c5f1188921f440aedff5a,24354284,2026-01-31T10:33:11.000Z,0xf8fcc291990c83536963c892b52b1442f00aba96,0xd8da6bf26964af9d7eed9e03e53415d37aa96045,0.0,21062,88422760,success
6,0xbd2a23052f6939a20a6b90305f383e9e9ca1c275e9d76e48c502a1121e52a662,24354282,2026-01-31T10:32:47.000Z,0xf8fc9a91349ebd2033d53f2b97245102f00aba96,0xd8da6bf26964af9d7eed9e03e53415d37aa96045,0.0,21062,1090140567,success
7,0xd1c84c52d163da645d84249419e65d6d7c401e7d6e35a049468c0fa6b99d3344,24341946,2026-01-29T17:16:23.000Z,0xd8da6bf26964af9d7eed9e03e53415d37aa96045,0x5256d6d94ed14667fa1661a99f5b142b1e051b8e,0.0,127089,932221462,success
8,0x47ae09af3f55c2cb3964b86d9c24463763d559f7f1af34b0ec6fedc5f70e099e,24341800,2026-01-29T16:47:11.000Z,0xd8da6bf26964af9d7eed9e03e53415d37aa96045,0x5256d6d94ed14667fa1661a99f5b142b1e051b8e,0.0,98818,1710473665,success
9,0xc170a260989474fb5b814236d223ba7ba7a78dced269cb41e2e49242c6db8386,24341581,2026-01-29T16:03:11.000Z,0xd8da6bf26964af9d7eed9e03e53415d37aa96045,0x5256d6d94ed14667fa1661a99f5b142b1e051b8e,0.0,83050,3938464499,success


Saved 100 rows to output/transactions_2026-02-07.csv

--- Last 10 token transfers ---


Unnamed: 0,tx_hash,token_symbol,token_contract,from_address,to_address,value,timestamp
0,0xabb13858c469d1c300974f1878e6db2ce12240cad567b31be0b3e9decea8a9c0,BTC2,0xbc2ecbe2195114b82f03680ed4270fa7008f3be0,0x24e70b286513300a8a89a6d6433cee18412b6ace,0xd8da6bf26964af9d7eed9e03e53415d37aa96045,6969.0,2026-02-06T22:51:47.000Z
1,0x02c1c34095d99162a10666a96acb57395c12cae36aa447675b0033b25e022469,0xe6da45b87b14733956c57ee9fc380388a4ae927d,0xe6da45b87b14733956c57ee9fc380388a4ae927d,0x014567348739248526856af7452db548dbf5cb6f,0xd8da6bf26964af9d7eed9e03e53415d37aa96045,,2026-02-06T20:22:59.000Z
2,0x4c59a06940e7a0069a86b8d20f13eb2d3175e33c4a5d87f5be3a17288b4bb25d,USDC,0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48,0xf70da97812cb96acdf810712aa562db8dfa3dbef,0xd8da6bf26964af9d7eed9e03e53415d37aa96045,4703.710425,2026-02-06T18:53:47.000Z
3,0xae2abe21a62ba8a593566760b4bd6cdd39e68c5aed49691784ffc73966c6a85c,0xe6da45b87b14733956c57ee9fc380388a4ae927d,0xe6da45b87b14733956c57ee9fc380388a4ae927d,0x014567348739248526856af7452db548dbf5cb6f,0xd8da6bf26964af9d7eed9e03e53415d37aa96045,,2026-02-06T14:32:11.000Z
4,0x89fae3c739a6a7a7081cd9b6ce2ba707cf4de475a6664d74b921db1eddf0709d,ezSKATE,0xc12e4d31e92cedc1ad4c8c23dbce2c5f7cb52998,0xef750c1ca825441c9ab8368a5b1556a0273ba50f,0xd8da6bf26964af9d7eed9e03e53415d37aa96045,0.405227714273364,2026-02-06T04:46:47.000Z
5,0x86ea03e7834982d01957f67845dff8c68b0074df3606068684173fcebaf18536,BITCOIN,0x72e4f9f808c49a2a61de9c5896298920dc4eeea9,0x11b37e48a1973cf3efc69cc44b49a561ff5ac2f3,0xd8da6bf26964af9d7eed9e03e53415d37aa96045,0.41308808,2026-02-06T01:44:47.000Z
6,0xba2b8b31d7a4eba3c55a3a9d53a2e5dc24840388357b82e80b506272003be9eb,CTO,0xd714a9c3836edd56198576ebfbc8d23ea3cb405e,0x74c10e4bbe847d68ce02a9abb4bab8dbedfd4675,0xd8da6bf26964af9d7eed9e03e53415d37aa96045,1000000.0,2026-02-05T16:31:47.000Z
7,0x67852cbf184812f2e8ffa8e586af56da1018ee39de765f755b8b3255b2c4bbb0,ELON,0x761d38e5ddf6ccf6cf7c55759d5210750b5d60f3,0xb06896fbc28370a70b86bda84db0931f09f99ea9,0xd8da6bf26964af9d7eed9e03e53415d37aa96045,6.66e-05,2026-02-05T09:42:47.000Z
8,0xd5efd7dcf8f738cb3ffaa9ae76d3ef964d381b844e354b59b2f65b2c8997d69c,ELON,0x761d38e5ddf6ccf6cf7c55759d5210750b5d60f3,0xf250259b35bda8c3e1b3f0b46ce4cd9b503c865b,0xd8da6bf26964af9d7eed9e03e53415d37aa96045,6.66e-05,2026-02-05T09:40:35.000Z
9,0x10b679c984084d9687ef65372dabad0bcc0c2e6cb9c96868deab5bfc971bed7b,OSCAR,0xebb66a88cedd12bfe3a289df6dfee377f2963f12,0xb4594c71d4969dbabbaf39e9d09bfdf64c2ee060,0xd8da6bf26964af9d7eed9e03e53415d37aa96045,1000.0,2026-02-04T14:47:11.000Z


Saved 100 rows to output/token_transfers_2026-02-07.csv


In [11]:
# Phase 2 — Core metrics and Tableau-ready tables
# Depends on: get_alchemy_json, get_transactions, OUTPUT_DIR (from previous cells)

import os
import pandas as pd
from datetime import datetime, date



def _parse_ts(ts):
    """Parse timestamp string to date for grouping. Handles ISO and common formats."""
    if ts is None or (isinstance(ts, float) and pd.isna(ts)):
        return None
    s = str(ts).strip()
    if not s:
        return None
    try:
        if "T" in s:
            return datetime.fromisoformat(s.replace("Z", "+00:00")).date()
        return datetime.strptime(s[:10], "%Y-%m-%d").date()
    except Exception:
        return None


def _gas_spent_eth(row):
    """Gas spent in ETH from gas_used and gas_price (wei)."""
    gu, gp = row.get("gas_used"), row.get("gas_price")
    if pd.isna(gu) or pd.isna(gp) or gu is None or gp is None:
        return 0.0
    try:
        return (int(gu) * int(gp)) / 1e18
    except (TypeError, ValueError):
        return 0.0


# --- Core metric functions (from transactions DataFrame for one wallet) ---

def total_eth_sent(tx_df, wallet_address):
    """Total ETH sent by wallet (outgoing value_eth where from_address == wallet)."""
    if tx_df.empty or "from_address" not in tx_df.columns:
        return 0.0
    mask = tx_df["from_address"].str.lower() == wallet_address.lower()
    return float(tx_df.loc[mask, "value_eth"].sum())


def total_eth_received(tx_df, wallet_address):
    """Total ETH received by wallet (incoming value_eth where to_address == wallet)."""
    if tx_df.empty or "to_address" not in tx_df.columns:
        return 0.0
    mask = tx_df["to_address"].str.lower() == wallet_address.lower()
    return float(tx_df.loc[mask, "value_eth"].sum())


def net_eth_from_txs(tx_df, wallet_address):
    """Net ETH from transactions (received - sent)."""
    return total_eth_received(tx_df, wallet_address) - total_eth_sent(tx_df, wallet_address)


def total_gas_spent_eth(tx_df):
    """Total gas spent in ETH across all transactions."""
    if tx_df.empty:
        return 0.0
    return float(tx_df.apply(_gas_spent_eth, axis=1).sum())


def first_last_activity_dates(tx_df):
    """Return (first_tx_date, last_tx_date) as date objects or (None, None)."""
    if tx_df.empty or "timestamp" not in tx_df.columns:
        return None, None
    dates = tx_df["timestamp"].apply(_parse_ts).dropna()
    if dates.empty:
        return None, None
    return dates.min(), dates.max()


def active_days_count(tx_df):
    """Number of distinct days with at least one transaction."""
    if tx_df.empty or "timestamp" not in tx_df.columns:
        return 0
    dates = tx_df["timestamp"].apply(_parse_ts).dropna()
    return int(dates.nunique())


# --- Tableau output tables ---

def get_wallet_summary_table(address, save_csv=False):
    """
    Build wallet_summary table for one ETH wallet. Uses get_transactions() from previous cells.

    Columns: wallet, first_tx_date, last_tx_date, tx_count, total_eth_sent, total_eth_received, net_eth, total_gas_spent

    Optional: save_csv=True writes to output/wallet_summary_{date}.csv for Tableau.
    """
    try:
        get_transactions
    except NameError:
        raise NameError(
            "get_transactions is not defined. Run the Phase 1 cell "
            "'Transactions and token_transfers in project schema' first."
        )
    tx_df = get_transactions(address, save_csv=False)
    wallet = address

    if tx_df.empty:
        row = {
            "wallet": wallet,
            "first_tx_date": None,
            "last_tx_date": None,
            "tx_count": 0,
            "total_eth_sent": 0.0,
            "total_eth_received": 0.0,
            "net_eth": 0.0,
            "total_gas_spent": 0.0,
        }
    else:
        first_d, last_d = first_last_activity_dates(tx_df)
        row = {
            "wallet": wallet,
            "first_tx_date": first_d,
            "last_tx_date": last_d,
            "tx_count": len(tx_df),
            "total_eth_sent": total_eth_sent(tx_df, address),
            "total_eth_received": total_eth_received(tx_df, address),
            "net_eth": net_eth_from_txs(tx_df, address),
            "total_gas_spent": total_gas_spent_eth(tx_df),
        }

    summary_df = pd.DataFrame([row])
    if save_csv:
        os.makedirs(OUTPUT_DIR, exist_ok=True)
        date_str = datetime.now().strftime("%Y-%m-%d")
        path = os.path.join(OUTPUT_DIR, f"wallet_summary_{date_str}.csv")
        summary_df.to_csv(path, index=False)
        print(f"Saved wallet summary to {path}")
    return summary_df


def get_wallet_activity_table(address, start_date=None, end_date=None, save_csv=False):
    """
    Build wallet_activity table with daily aggregates. For Tableau.

    Columns: wallet, period (date), tx_count, eth_sent, eth_received, gas_spent.

    - start_date / end_date: optional date-like (str YYYY-MM-DD or date). If both None, returns daily activity for full history (default).
    - If provided, only transactions within [start_date, end_date] are included, still aggregated by day.
    """
    try:
        get_transactions
    except NameError:
        raise NameError(
            "get_transactions is not defined. Run the Phase 1 cell "
            "'Transactions and token_transfers in project schema' first."
        )
    tx_df = get_transactions(address, save_csv=False)
    wallet = address

    if tx_df.empty:
        out = pd.DataFrame(columns=["wallet", "period", "tx_count", "eth_sent", "eth_received", "gas_spent"])
        return out

    tx_df = tx_df.copy()
    tx_df["_date"] = tx_df["timestamp"].apply(_parse_ts)
    tx_df = tx_df.dropna(subset=["_date"])

    if start_date is not None or end_date is not None:
        if isinstance(start_date, str):
            start_date = datetime.strptime(start_date[:10], "%Y-%m-%d").date()
        if isinstance(end_date, str):
            end_date = datetime.strptime(end_date[:10], "%Y-%m-%d").date()
        if start_date is not None:
            tx_df = tx_df[tx_df["_date"] >= start_date]
        if end_date is not None:
            tx_df = tx_df[tx_df["_date"] <= end_date]

    if tx_df.empty:
        out = pd.DataFrame(columns=["wallet", "period", "tx_count", "eth_sent", "eth_received", "gas_spent"])
        return out

    tx_df["_gas_eth"] = tx_df.apply(_gas_spent_eth, axis=1)
    addr_lower = address.lower()

    def eth_sent_for_group(g):
        return g.loc[g["from_address"].str.lower() == addr_lower, "value_eth"].sum()

    def eth_received_for_group(g):
        return g.loc[g["to_address"].str.lower() == addr_lower, "value_eth"].sum()

    by_date = tx_df.groupby("_date", as_index=False).agg(
        tx_count=("tx_hash", "count"),
        gas_spent=("_gas_eth", "sum"),
    )
    sent_by_date = tx_df.groupby("_date").apply(eth_sent_for_group)
    received_by_date = tx_df.groupby("_date").apply(eth_received_for_group)
    by_date["eth_sent"] = by_date["_date"].map(sent_by_date).fillna(0)
    by_date["eth_received"] = by_date["_date"].map(received_by_date).fillna(0)
    by_date["wallet"] = wallet
    by_date = by_date.rename(columns={"_date": "period"})
    by_date = by_date[["wallet", "period", "tx_count", "eth_sent", "eth_received", "gas_spent"]]

    if save_csv:
        os.makedirs(OUTPUT_DIR, exist_ok=True)
        date_str = datetime.now().strftime("%Y-%m-%d")
        path = os.path.join(OUTPUT_DIR, f"wallet_activity_{date_str}.csv")
        by_date.to_csv(path, index=False)
        print(f"Saved wallet activity to {path}")
    return by_date


def export_to_hyper(summary_df, activity_df, path=None):
    """
    Create a Tableau .hyper file from the wallet_summary and wallet_activity DataFrames.
    No CSV is used; data is written directly from the DataFrames.
    Opens in Tableau via Data > New Data Source > Tableau Hyper.
    """
    try:
        from tableauhyperapi import (
            HyperProcess, Telemetry, Connection, CreateMode,
            TableDefinition, SqlType, Inserter, Date as HyperDate,
        )
    except ImportError as e:
        raise ImportError(
            "tableauhyperapi is required for export_to_hyper. Run: uv sync"
        ) from e

    def _to_hyper_val(val):
        if val is None or pd.isna(val):
            return None
        if isinstance(val, pd.Timestamp):
            val = val.date()
        elif hasattr(val, "date") and callable(getattr(val, "date", None)):
            val = val.date()
        if isinstance(val, date):
            return HyperDate.from_date(val)
        return val

    if path is None:
        os.makedirs(OUTPUT_DIR, exist_ok=True)
        date_str = datetime.now().strftime("%Y-%m-%d")
        path = os.path.join(OUTPUT_DIR, f"wallet_analytics_{date_str}.hyper")
    path = os.path.abspath(path)
    os.makedirs(os.path.dirname(path) or ".", exist_ok=True)

    summary_schema = TableDefinition(
        "wallet_summary",
        [
            TableDefinition.Column("wallet", SqlType.text()),
            TableDefinition.Column("first_tx_date", SqlType.date()),
            TableDefinition.Column("last_tx_date", SqlType.date()),
            TableDefinition.Column("tx_count", SqlType.big_int()),
            TableDefinition.Column("total_eth_sent", SqlType.double()),
            TableDefinition.Column("total_eth_received", SqlType.double()),
            TableDefinition.Column("net_eth", SqlType.double()),
            TableDefinition.Column("total_gas_spent", SqlType.double()),
        ],
    )
    activity_schema = TableDefinition(
        "wallet_activity",
        [
            TableDefinition.Column("wallet", SqlType.text()),
            TableDefinition.Column("period", SqlType.date()),
            TableDefinition.Column("tx_count", SqlType.big_int()),
            TableDefinition.Column("eth_sent", SqlType.double()),
            TableDefinition.Column("eth_received", SqlType.double()),
            TableDefinition.Column("gas_spent", SqlType.double()),
        ],
    )

    with HyperProcess(telemetry=Telemetry.SEND_USAGE_DATA_TO_TABLEAU, user_agent="wallet_analytics") as hyper:
        with Connection(hyper.endpoint, path, CreateMode.CREATE_AND_REPLACE) as connection:
            connection.catalog.create_table(summary_schema)
            connection.catalog.create_table(activity_schema)

            if not summary_df.empty:
                summary_rows = []
                for _, row in summary_df.iterrows():
                    summary_rows.append([
                        _to_hyper_val(row.get("wallet")),
                        _to_hyper_val(row.get("first_tx_date")),
                        _to_hyper_val(row.get("last_tx_date")),
                        int(row["tx_count"]) if pd.notna(row.get("tx_count")) else None,
                        float(row["total_eth_sent"]) if pd.notna(row.get("total_eth_sent")) else None,
                        float(row["total_eth_received"]) if pd.notna(row.get("total_eth_received")) else None,
                        float(row["net_eth"]) if pd.notna(row.get("net_eth")) else None,
                        float(row["total_gas_spent"]) if pd.notna(row.get("total_gas_spent")) else None,
                    ])
                with Inserter(connection, summary_schema) as inserter:
                    inserter.add_rows(summary_rows)
                    inserter.execute()

            if not activity_df.empty:
                activity_rows = []
                for _, row in activity_df.iterrows():
                    activity_rows.append([
                        _to_hyper_val(row.get("wallet")),
                        _to_hyper_val(row.get("period")),
                        int(row["tx_count"]) if pd.notna(row.get("tx_count")) else None,
                        float(row["eth_sent"]) if pd.notna(row.get("eth_sent")) else None,
                        float(row["eth_received"]) if pd.notna(row.get("eth_received")) else None,
                        float(row["gas_spent"]) if pd.notna(row.get("gas_spent")) else None,
                    ])
                with Inserter(connection, activity_schema) as inserter:
                    inserter.add_rows(activity_rows)
                    inserter.execute()

    print(f"Exported to {path} (open in Tableau: Data > New Data Source > Tableau Hyper)")
    return path


# --- Example: build tables for Tableau (run after previous cells) ---
try:
    _has_key = ALCHEMY_API_KEY
except NameError:
    _has_key = False
if _has_key:
    eth_address = "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"
    summary = get_wallet_summary_table(eth_address, save_csv=True)
    activity = get_wallet_activity_table(eth_address, save_csv=True)  # full history, daily
    # activity = get_wallet_activity_table(eth_address, start_date="2024-01-01", end_date="2024-12-31", save_csv=True)
    export_to_hyper(summary, activity)  # creates output/wallet_analytics_{date}.hyper from DataFrames
    display(summary)
    display(activity.head(20))

ModuleNotFoundError: No module named 'tableauhyperapi'