In [None]:
import pandas as pd
import numpy as np
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer, BertModel
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import classification_report, roc_auc_score

# Step 1: Load and Preprocess Dataset
csv_path = "dataset/btc_tx_2011_2013.csv"
df = pd.read_csv(csv_path, encoding='latin-1')
df = df.drop_duplicates().dropna()

# Selecting features as described in the paper
selected_features = ["indegree", "outdegree", "in_btc", "out_btc", "total_btc", "mean_in_btc", "mean_out_btc"]
if all(col in df.columns for col in selected_features):
    X = df[selected_features]
    y = df["out_and_tx_malicious"]
else:
    print("Selected features are missing. Using available numerical columns.")
    X = df.select_dtypes(include=[np.number])
    y = df.iloc[:, -1]  # Use last column as the target for simplicity

# Encode target variable
y = LabelEncoder().fit_transform(y)

# Standardize Features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Train-Test Split
X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, y, test_size=0.2, stratify=y, random_state=42
)

# Step 2: Feature Embedding and Dataset Preparation
class BitcoinDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return torch.tensor(self.features[idx], dtype=torch.float32), torch.tensor(self.labels[idx], dtype=torch.long)

train_dataset = BitcoinDataset(X_train, y_train)
test_dataset = BitcoinDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Step 3: Transformer Model for Classification
class TabTransformer(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_classes):
        super(TabTransformer, self).__init__()
        self.embedding = nn.Linear(input_dim, hidden_dim)
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=4, dim_feedforward=256),
            num_layers=2
        )
        self.fc = nn.Linear(hidden_dim, num_classes)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.embedding(x).unsqueeze(1)  # Add sequence dimension
        x = self.transformer(x)
        x = x.mean(dim=1)  # Pooling over sequence dimension
        x = self.fc(x)
        return self.softmax(x)

input_dim = X_train.shape[1]
hidden_dim = 64
num_classes = 2

model = TabTransformer(input_dim, hidden_dim, num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# Step 4: Train Transformer Model
def train_model(model, criterion, optimizer, train_loader, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for batch_features, batch_labels in train_loader:
            optimizer.zero_grad()
            outputs = model(batch_features)
            loss = criterion(outputs, batch_labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss / len(train_loader):.4f}")

train_model(model, criterion, optimizer, train_loader)

# Step 5: Evaluate Transformer Model
def evaluate_model(model, test_loader):
    model.eval()
    y_true = []
    y_pred = []
    with torch.no_grad():
        for batch_features, batch_labels in test_loader:
            outputs = model(batch_features)
            preds = torch.argmax(outputs, dim=1)
            y_true.extend(batch_labels.numpy())
            y_pred.extend(preds.numpy())
    print("Classification Report:")
    print(classification_report(y_true, y_pred))
    print(f"ROC-AUC: {roc_auc_score(y_true, y_pred):.4f}")

evaluate_model(model, test_loader)

# Step 6: Positional Encoding for Temporal Features
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return x

# Integrate Positional Encoding into TabTransformer
class TemporalTabTransformer(TabTransformer):
    def __init__(self, input_dim, hidden_dim, num_classes):
        super().__init__(input_dim, hidden_dim, num_classes)
        self.positional_encoding = PositionalEncoding(d_model=hidden_dim)

    def forward(self, x):
        x = self.embedding(x).unsqueeze(1)  # Add sequence dimension
        x = self.positional_encoding(x)
        x = self.transformer(x)
        x = x.mean(dim=1)  # Pooling over sequence dimension
        x = self.fc(x)
        return self.softmax(x)

temporal_model = TemporalTabTransformer(input_dim, hidden_dim, num_classes)
train_model(temporal_model, criterion, optimizer, train_loader)
evaluate_model(temporal_model, test_loader)
