In [14]:
%reload_ext autoreload
%autoreload 2

import warnings

import polars as pl

warnings.filterwarnings("ignore")
warnings.filterwarnings("ignore", category=FutureWarning)

_ = pl.Config.set_tbl_rows(10)
_ = pl.Config.set_tbl_cols(20)

In [15]:
from sdpc.data import (
    joined_dex_swaps_df,
    joined_token_transfers_df,
    joined_train_df,
    joined_transactions_df,
    test_data_df,
    wallet_addresses_df,
)

addresses: pl.DataFrame = wallet_addresses_df()
train_df: pl.DataFrame = joined_train_df()
transactions_df: pl.DataFrame = joined_transactions_df()
dex_swaps_df: pl.DataFrame = joined_dex_swaps_df()
token_transfers_df: pl.DataFrame = joined_token_transfers_df()
test_df: pl.DataFrame = test_data_df()

# Features start with addresses
features_df = addresses

In [16]:
# Print stats of unique addresses in train and test datasets
train_addresses = train_df.select("address").unique()
test_addresses = test_df.select("address").unique()

print(f"Number of unique addresses in train dataset: {train_addresses.height}")
print(f"Number of unique addresses in test dataset: {test_addresses.height}")

# Check for any overlap between train and test addresses
overlap = train_addresses.join(test_addresses, on="address", how="inner")
print(f"Number of addresses that appear in both train and test: {overlap.height}")


Number of unique addresses in train dataset: 99067
Number of unique addresses in test dataset: 20369
Number of addresses that appear in both train and test: 2


# Flipside Data

In [17]:
flipside_addresses_labels: pl.DataFrame = pl.read_parquet(
    "../data/external/flipside_address_labels.parquet"
)

features_df = features_df.join(
    flipside_addresses_labels,
    on="address",
    how="left",
)

In [18]:
flipside_contracts_labels: pl.DataFrame = pl.read_parquet(
    "../data/external/flipside_contracts_data.parquet"
)

flipside_contracts = flipside_contracts_labels.get_column("address")
flipside_contracts_creators = flipside_contracts_labels.get_column("creator_address")

features_df = features_df.with_columns(
    pl.col("address").is_in(flipside_contracts).alias("flipside_is_contract"),
    pl.col("address")
    .is_in(flipside_contracts_creators)
    .alias("flipside_is_contract_creator"),
)


## Known Sybil Lists Hits

In [19]:
zk_cluster_list = (
    pl.read_csv("../data/external/zk_cluster_list.csv", ignore_errors=True)
    .select(pl.col("Wallet Address").alias("address"))
    .filter(pl.col("address").str.starts_with("0x"))
)

features_df = features_df.with_columns(
    pl.col("address")
    .is_in(zk_cluster_list.get_column("address"))
    .alias("zk_cluster_list_hit")
)

In [20]:
# Read the Layer Zero wallet list
layer_zero_wallet_list = pl.read_csv("../data/external/layer_zero_wallet_list.csv")
layer_zero_wallet_addresses = layer_zero_wallet_list.get_column("ADDRESS")

# Add a column indicating if the address is in the Layer Zero wallet list
features_df = features_df.with_columns(
    pl.col("address")
    .is_in(layer_zero_wallet_addresses)
    .alias("layer_zero_wallet_list_hit")
)

In [21]:
lz_provisional_sybil_list = pl.read_parquet(
    "../data/external/lz_provisional_sybil_list.parquet"
)

In [23]:
# Read the LZ provisional sybil list
lz_provisional_sybil_list = (
    pl.read_parquet("../data/external/lz_provisional_sybil_list.parquet")
    .rename({
        "source": "lz_provisional_list_source",
        "cluster": "lz_probisional_list_cluster",
    })
    .with_columns(pl.lit(True).alias("lz_provisional_sybil_list_hit"))
)

features_df = features_df.join(
    lz_provisional_sybil_list,
    on="address",
    how="left",
)


## Sybil Labeling

In [25]:
features_df = features_df.join(
    train_df.select(["address", "label"]),
    on="address",
    how="left",
)

## Transactions

In [26]:
transactions_df = (
    transactions_df.select(
        pl.col("BLOCK_NUMBER").alias("block_number"),
        pl.col("BLOCK_TIMESTAMP").alias("block_timestamp"),
        pl.col("TX_HASH").alias("tx_hash"),
        pl.col("FROM_ADDRESS").alias("from_address"),
        pl.col("TO_ADDRESS").alias("to_address"),
        pl.col("VALUE").alias("value"),
        pl.col("TX_FEE").alias("tx_fee"),
        pl.col("GAS_PRICE").alias("gas_price"),
        pl.col("GAS_LIMIT").alias("gas_limit"),
        pl.col("GAS_USED").alias("gas_used"),
        pl.col("INPUT_DATA").alias("input_data"),
        pl.col("NETWORK").alias("network"),
    )
    .join(
        features_df,
        left_on="from_address",
        right_on="address",
        how="left",
    )
    .join(
        features_df,
        left_on="to_address",
        right_on="address",
        how="left",
        suffix="_to",
    )
)

In [28]:
common_aggregations = [
    pl.col("block_number").n_unique().alias("unique_block_numbers"),
    pl.col("block_timestamp").min().alias("min_block_timestamp"),
    pl.col("block_timestamp").max().alias("max_block_timestamp"),
    pl.col("tx_hash").n_unique().alias("unique_tx_hashes"),
    pl.col("value").sum().alias("total_value"),
    pl.col("value").mean().alias("avg_value"),
    pl.col("value").max().alias("max_value"),
    pl.col("value").min().alias("min_value"),
    pl.col("tx_fee").sum().alias("total_tx_fee"),
    pl.col("tx_fee").mean().alias("avg_tx_fee"),
    pl.col("gas_price").sum().cast(pl.Int64).alias("total_gas_price"),
    pl.col("gas_price").mean().cast(pl.Int64).alias("avg_gas_price"),
    pl.col("gas_limit").sum().cast(pl.Int64).alias("total_gas_limit"),
    pl.col("gas_limit").mean().cast(pl.Int64).alias("avg_gas_limit"),
    pl.col("gas_used").sum().cast(pl.Int64).alias("total_gas_used"),
    pl.col("gas_used").mean().cast(pl.Int64).alias("avg_gas_used"),
    pl.col("network").n_unique().alias("unique_networks"),
    # pl.col("community").n_unique().alias("unique_communities"),
    # # pl.col("community_size").mean().alias("avg_community_size"),
]

from_aggregations = [
    pl.col("from_address").n_unique().alias("unique_from_addresses"),
    # pl.col("passport_stamps_score").mean().alias("avg_passport_stamps_score"),
    pl.col("flipside_address_name").n_unique().alias("address_name_count"),
    pl.col("flipside_is_contract").mean().alias("avg_flipside_is_contract"),
    pl.col("flipside_is_contract").sum().alias("flipside_is_contract_count"),
]

to_aggregations = [
    pl.col("to_address").n_unique().alias("unique_to_addresses"),
    # pl.col("passport_stamps_score_to").mean().alias("avg_passport_stamps_score_to"),
    pl.col("flipside_address_name_to").n_unique().alias("address_name_count_to"),
    pl.col("flipside_is_contract_to").mean().alias("avg_flipside_is_contract_to"),
    pl.col("flipside_is_contract_to").sum().alias("flipside_is_contract_count_to"),
]

from_all_metrics_df = (
    transactions_df.group_by(["from_address"])
    .agg(
        *common_aggregations,
        *to_aggregations,
        pl.col("tx_hash")
        .filter(pl.col("label_to") == 1)
        .n_unique()
        .alias("num_transactions_to_sybil"),
        pl.col("to_address")
        .filter(pl.col("label_to") == 1)
        .n_unique()
        .alias("num_unique_to_sybil_addresses"),
    )
    .rename({"from_address": "address"})
)

to_all_metrics_df = (
    transactions_df.group_by(["to_address"])
    .agg(
        *common_aggregations,
        *from_aggregations,
        pl.col("tx_hash")
        .filter(pl.col("label") == 1)
        .n_unique()
        .alias("num_transactions_from_sybil"),
        pl.col("from_address")
        .filter(pl.col("label") == 1)
        .n_unique()
        .alias("num_unique_from_sybil_addresses"),
    )
    .rename({"to_address": "address"})
)

from_network_transactions_metrics_df = (
    transactions_df.group_by(["from_address", "network"])
    .agg(
        *common_aggregations,
        pl.col("to_address").n_unique().alias("unique_to_addresses"),
    )
    .pivot(on="network", index="from_address")
).rename({"from_address": "address"})

to_network_transactions_metrics_df = (
    transactions_df.group_by(["to_address", "network"])
    .agg(
        *common_aggregations,
        pl.col("from_address").n_unique().alias("unique_from_addresses"),
    )
    .pivot(on="network", index="to_address")
).rename({"to_address": "address"})


## DEX Swaps

In [29]:
dex_swaps_df = (
    dex_swaps_df.select(
        pl.col("BLOCK_NUMBER").alias("block_number"),
        pl.col("BLOCK_TIMESTAMP").alias("block_timestamp"),
        pl.col("TX_HASH").alias("tx_hash"),
        pl.col("ORIGIN_FROM_ADDRESS").alias("origin_from_address"),
        pl.col("ORIGIN_TO_ADDRESS").alias("origin_to_address"),
        pl.col("CONTRACT_ADDRESS").alias("contract_address"),
        pl.col("POOL_NAME").alias("pool_name"),
        pl.col("AMOUNT_IN_USD").cast(pl.Int64).alias("amount_in_usd"),
        pl.col("AMOUNT_OUT_USD").cast(pl.Int64).alias("amount_out_usd"),
        pl.col("SENDER").alias("sender"),
        pl.col("TX_TO").alias("tx_to"),
        pl.col("PLATFORM").alias("platform"),
        pl.col("TOKEN_IN").alias("token_in"),
        pl.col("TOKEN_OUT").alias("token_out"),
        pl.col("SYMBOL_IN").alias("symbol_in"),
        pl.col("SYMBOL_OUT").alias("symbol_out"),
        pl.col("NETWORK").alias("network"),
    )
    .join(
        features_df,
        left_on="origin_from_address",
        right_on="address",
        how="left",
    )
    .join(
        features_df,
        left_on="origin_to_address",
        right_on="address",
        how="left",
        suffix="_to",
    )
)

In [30]:
common_aggregations = [
    pl.col("tx_hash").n_unique().alias("unique_tx_hashes"),
    pl.col("contract_address").n_unique().alias("unique_contract_addresses"),
    pl.col("block_timestamp").min().alias("min_block_timestamp"),
    pl.col("block_timestamp").max().alias("max_block_timestamp"),
    pl.col("pool_name").n_unique().alias("unique_pool_names"),
    pl.col("amount_in_usd").sum().alias("total_amount_in_usd"),
    pl.col("amount_in_usd").mean().alias("avg_amount_in_usd"),
    pl.col("amount_in_usd").max().alias("max_amount_in_usd"),
    pl.col("amount_in_usd").min().alias("min_amount_in_usd"),
    pl.col("amount_out_usd").sum().alias("total_amount_out_usd"),
    pl.col("amount_out_usd").mean().alias("avg_amount_out_usd"),
    pl.col("amount_out_usd").max().alias("max_amount_out_usd"),
    pl.col("amount_out_usd").min().alias("min_amount_out_usd"),
    pl.col("platform").n_unique().alias("unique_platforms"),
    pl.col("platform")
    .value_counts()
    .head(1)
    .struct.field("platform")
    .first()
    .alias("most_common_platform"),
    # pl.col("community").n_unique().alias("unique_communities"),
    # pl.col("community_size").mean().alias("avg_community_size"),
]

from_aggregations = [
    pl.col("origin_from_address").n_unique().alias("unique_origin_from_addresses")
]
to_aggregations = [
    pl.col("origin_to_address").n_unique().alias("unique_origin_to_addresses")
]

dex_from_all_metrics_df = (
    dex_swaps_df.group_by(["origin_from_address"])
    .agg(
        *common_aggregations,
        *to_aggregations,
    )
    .rename({"origin_from_address": "address"})
)

dex_to_all_metrics_df = (
    dex_swaps_df.group_by(["origin_to_address"])
    .agg(
        *common_aggregations,
        *from_aggregations,
    )
    .rename({"origin_to_address": "address"})
)

## Token Transfers

In [31]:
token_transfers_df = (
    token_transfers_df.select(
        pl.col("BLOCK_NUMBER").alias("block_number"),
        pl.col("BLOCK_TIMESTAMP").alias("block_timestamp"),
        pl.col("TX_HASH").alias("tx_hash"),
        pl.col("ORIGIN_FROM_ADDRESS").alias("origin_from_address"),
        pl.col("ORIGIN_TO_ADDRESS").alias("origin_to_address"),
        pl.col("CONTRACT_ADDRESS").alias("contract_address"),
        pl.col("FROM_ADDRESS").alias("from_address"),
        pl.col("TO_ADDRESS").alias("to_address"),
        pl.col("AMOUNT_USD").cast(pl.Int64, wrap_numerical=True).alias("amount_usd"),
        pl.col("SYMBOL").alias("symbol"),
        pl.col("NETWORK").alias("network"),
    )
    .join(
        features_df,
        left_on="from_address",
        right_on="address",
        how="left",
    )
    .join(
        features_df,
        left_on="to_address",
        right_on="address",
        how="left",
        suffix="_to",
    )
)


In [32]:
# TODO: Add more aggregations (e.g: total_for_most_common_symbol, )

common_aggregations = [
    pl.col("tx_hash").n_unique().alias("unique_tx_hashes"),
    pl.col("contract_address").n_unique().alias("unique_contract_addresses"),
    pl.col("block_timestamp").min().alias("min_block_timestamp"),
    pl.col("block_timestamp").max().alias("max_block_timestamp"),
    pl.col("symbol").n_unique().alias("unique_symbols"),
    pl.col("amount_usd").sum().alias("total_amount_usd"),
    pl.col("amount_usd").mean().alias("avg_amount_usd"),
    pl.col("amount_usd").max().alias("max_amount_usd"),
    pl.col("amount_usd").min().alias("min_amount_usd"),
    pl.col("network").n_unique().alias("unique_networks"),
    pl.col("symbol")
    .value_counts()
    .head(1)
    .struct.field("symbol")
    .first()
    .alias("most_common_symbol"),
]

to_aggregations = [
    pl.col("to_address").n_unique().alias("unique_to_addresses"),
]

from_aggregations = [
    pl.col("from_address").n_unique().alias("unique_from_addresses"),
]

token_transfers_from_all_metrics_df = (
    token_transfers_df.group_by(["from_address"])
    .agg(
        *common_aggregations,
        *to_aggregations,
    )
    .rename({"from_address": "address"})
)

token_transfers_to_all_metrics_df = (
    token_transfers_df.group_by(["to_address"])
    .agg(
        *common_aggregations,
        *from_aggregations,
    )
    .rename({"to_address": "address"})
)


## Aggregate Events

In [33]:
features_df = (
    features_df.join(
        from_all_metrics_df,
        on="address",
        how="left",
        suffix="_from_all",
    )
    .join(
        to_all_metrics_df,
        on="address",
        how="left",
        suffix="_to_all",
    )
    .join(
        from_network_transactions_metrics_df,
        on="address",
        how="left",
        suffix="_from_network",
    )
    .join(
        to_network_transactions_metrics_df,
        on="address",
        how="left",
        suffix="_to_network",
    )
    .join(
        dex_from_all_metrics_df,
        on="address",
        how="left",
        suffix="_dex_from_all",
    )
    .join(
        dex_to_all_metrics_df,
        on="address",
        how="left",
        suffix="_dex_to_all",
    )
    .join(
        token_transfers_from_all_metrics_df,
        on="address",
        how="left",
        suffix="_token_transfers_from_all",
    )
    .join(
        token_transfers_to_all_metrics_df,
        on="address",
        how="left",
        suffix="_token_transfers_to_all",
    )
)

## Extra Features

In [34]:
import polars.selectors as cs

features_df = features_df.with_columns(
    pl.min_horizontal(features_df.select(cs.datetime()).columns).alias(
        "first_interaction"
    ),
    pl.max_horizontal(features_df.select(cs.datetime()).columns).alias(
        "last_interaction"
    ),
).with_columns(
    (pl.col("last_interaction") - pl.col("first_interaction"))
    .dt.total_days()
    .alias("interaction_duration"),
    (pl.datetime(2025, 4, 26) - pl.col("first_interaction"))
    .dt.total_days()
    .alias("days_since_first_interaction"),
    (pl.datetime(2025, 4, 26) - pl.col("last_interaction"))
    .dt.total_days()
    .alias("days_since_last_interaction"),
)

In [23]:
# TODO: Same for token transfers and dex swaps

## Postprocessing

In [35]:
# Get columns that have only nulls in the train set
train_df = features_df.filter(pl.col("split") == "train")
train_null_cols = [
    col
    for col in train_df.columns
    if (train_df[col].is_null().sum() == train_df.height)
]

features_df = features_df.drop(train_null_cols)

In [36]:
# TODO: Ratios
# TODO: Number of nulls in the row


In [37]:
features_df.shrink_to_fit().write_parquet(
    "../data/processed/features_df.parquet", compression="zstd"
)