In [55]:
from tab_transformer_pytorch import TabTransformer

In [56]:
import torch

In [57]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Dict, List, Any, Optional, Union

class FTTransformer(nn.Module):
    """Feature Tokenizer Transformer - Base model"""
    def __init__(
        self,
        categories: List[int],
        num_continuous: int,
        dim: int,
        depth: int,
        heads: int,
        dim_out: int,
        attn_dropout: float = 0.1,
        ff_dropout: float = 0.1
    ):
        super().__init__()
        self.categories = categories
        self.num_continuous = num_continuous
        self.dim = dim
        self.dim_out = dim_out
        
        # Categorical embeddings
        self.categorical_embeddings = nn.ModuleList([
            nn.Embedding(cat_size, dim) for cat_size in categories
        ])
        
        # Continuous feature projection
        if num_continuous > 0:
            self.continuous_projection = nn.Linear(num_continuous, dim)
        
        # Positional embeddings
        total_tokens = len(categories) + (1 if num_continuous > 0 else 0)
        self.pos_embedding = nn.Parameter(torch.randn(total_tokens, dim))
        
        # Transformer layers
        self.transformer_layers = nn.ModuleList([
            TransformerBlock(dim, heads, attn_dropout, ff_dropout)
            for _ in range(depth)
        ])
        
        # Layer norm
        self.layer_norm = nn.LayerNorm(dim)
        
        # Output projection to get desired embedding dimension
        self.output_projection = nn.Linear(dim, dim_out)
        
    def forward(self, x_categorical: torch.Tensor, x_continuous: Optional[torch.Tensor] = None):
        batch_size = x_categorical.size(0)
        tokens = []
        
        # Process categorical features
        for i, embedding in enumerate(self.categorical_embeddings):
            cat_token = embedding(x_categorical[:, i])
            tokens.append(cat_token)
        
        # Process continuous features
        if x_continuous is not None and self.num_continuous > 0:
            cont_token = self.continuous_projection(x_continuous)
            tokens.append(cont_token)
        
        # Stack tokens and add positional embeddings
        x = torch.stack(tokens, dim=1)  # [batch_size, num_tokens, dim]
        x = x + self.pos_embedding.unsqueeze(0)
        
        # Apply transformer layers
        for layer in self.transformer_layers:
            x = layer(x)
        
        # Apply layer norm
        x = self.layer_norm(x)
        
        # Global average pooling across tokens
        x = x.mean(dim=1)  # [batch_size, dim]
        
        # Project to output dimension
        x = self.output_projection(x)  # [batch_size, dim_out]
        
        return x


class TransformerBlock(nn.Module):
    """Single transformer block with multi-head attention and feed-forward"""
    def __init__(self, dim: int, heads: int, attn_dropout: float = 0.1, ff_dropout: float = 0.1):
        super().__init__()
        self.attention = nn.MultiheadAttention(
            embed_dim=dim,
            num_heads=heads,
            dropout=attn_dropout,
            batch_first=True
        )
        self.norm1 = nn.LayerNorm(dim)
        self.norm2 = nn.LayerNorm(dim)
        
        # Feed-forward network
        self.ff = nn.Sequential(
            nn.Linear(dim, dim * 4),
            nn.GELU(),
            nn.Dropout(ff_dropout),
            nn.Linear(dim * 4, dim),
            nn.Dropout(ff_dropout)
        )
    
    def forward(self, x):
        # Multi-head attention with residual connection
        attn_out, _ = self.attention(x, x, x)
        x = self.norm1(x + attn_out)
        
        # Feed-forward with residual connection
        ff_out = self.ff(x)
        x = self.norm2(x + ff_out)
        
        return x


class TaskHead(nn.Module):
    """Task-specific head for different types of outputs"""
    def __init__(self, input_dim: int, task_config: Dict[str, Any]):
        super().__init__()
        self.task_type = task_config['type']
        
        if self.task_type == 'regression':
            output_dim = task_config.get('output_dim', 1)
            # Add output scaling for better initialization
            self.head = nn.Sequential(
                nn.Linear(input_dim, input_dim // 2),
                nn.ReLU(),
                nn.Dropout(0.5),
                nn.Linear(input_dim // 2, output_dim)
            )
            
            # Optional: Add output scaling parameters
            self.output_scale = task_config.get('output_scale', 100)
            self.output_bias = task_config.get('output_bias', 150)
            
        elif self.task_type == 'classification':
            num_classes = task_config['num_classes']
            self.head = nn.Sequential(
                nn.Linear(input_dim, input_dim // 2),
                nn.ReLU(),
                nn.Dropout(0.5),
                nn.Linear(input_dim // 2, num_classes)
            )
        else:
            raise ValueError(f"Unsupported task type: {self.task_type}")
    
    def forward(self, x):
        output = self.head(x)
        
        # Apply scaling for regression tasks
        if self.task_type == 'regression':
            output = output * self.output_scale + self.output_bias
            
        return output


class MultitaskFTTransformer(nn.Module):
    """Multitask wrapper for FT Transformer"""
    def __init__(self, base_model: FTTransformer, task_configs: Dict[str, Dict[str, Any]]):
        super().__init__()
        self.base_model = base_model
        self.task_configs = task_configs
        
        # Create task-specific heads
        self.task_heads = nn.ModuleDict({
            task_name: TaskHead(base_model.dim_out, config)
            for task_name, config in task_configs.items()
        })
    
    def forward(self, x_categorical: torch.Tensor, x_continuous: Optional[torch.Tensor] = None, 
                tasks: Optional[List[str]] = None):
        # Get shared representation from base model
        shared_repr = self.base_model(x_categorical, x_continuous)
        
        # Apply task-specific heads
        if tasks is None:
            tasks = list(self.task_heads.keys())
        
        outputs = {}
        for task_name in tasks:
            if task_name in self.task_heads:
                outputs[task_name] = self.task_heads[task_name](shared_repr)
        
        return outputs


def create_multitask_loss(outputs: Dict[str, torch.Tensor], 
                         targets: Dict[str, torch.Tensor],
                         task_configs: Dict[str, Dict[str, Any]],
                         task_weights: Optional[Dict[str, float]] = None) -> Dict[str, torch.Tensor]:
    """Create multitask loss function"""
    if task_weights is None:
        task_weights = {task: 1.0 for task in task_configs.keys()}
    
    losses = {}
    total_loss = 0.0
    
    for task_name, config in task_configs.items():
        if task_name not in outputs or task_name not in targets:
            continue
            
        pred = outputs[task_name]
        target = targets[task_name]
        weight = task_weights.get(task_name, 1.0)
        
        if config['type'] == 'regression':
            loss = F.mse_loss(pred, target)
        elif config['type'] == 'classification':
            loss = F.cross_entropy(pred, target)
        else:
            raise ValueError(f"Unsupported task type: {config['type']}")
        
        losses[task_name] = loss
        total_loss += weight * loss
    
    losses['total'] = total_loss
    return losses


# Example usage

# Advanced features for better multitask learning
class AdaptiveTaskWeighting(nn.Module):
    """Adaptive task weighting based on uncertainty"""
    def __init__(self, task_names: List[str]):
        super().__init__()
        self.task_names = task_names
        # Learnable log variance for each task
        self.log_vars = nn.Parameter(torch.zeros(len(task_names)))
    
    def forward(self, losses: Dict[str, torch.Tensor]) -> torch.Tensor:
        total_loss = 0.0
        for i, task_name in enumerate(self.task_names):
            if task_name in losses:
                # Uncertainty weighting: 1/(2*sigma^2) * loss + log(sigma)
                precision = torch.exp(-self.log_vars[i])
                total_loss += precision * losses[task_name] + self.log_vars[i]
        return total_loss


class GradientBalancing:
    """Gradient balancing for multitask learning"""
    def __init__(self, task_names: List[str], alpha: float = 0.12):
        self.task_names = task_names
        self.alpha = alpha
        self.task_losses_history = {task: [] for task in task_names}
    
    def compute_weights(self, current_losses: Dict[str, torch.Tensor]) -> Dict[str, float]:
        weights = {}
        
        for task_name in self.task_names:
            if task_name in current_losses:
                # Add current loss to history
                self.task_losses_history[task_name].append(current_losses[task_name].item())
                
                # Keep only recent history
                if len(self.task_losses_history[task_name]) > 100:
                    self.task_losses_history[task_name].pop(0)
                
                # Compute relative loss rate
                if len(self.task_losses_history[task_name]) > 1:
                    recent_avg = sum(self.task_losses_history[task_name][-10:]) / min(10, len(self.task_losses_history[task_name]))
                    overall_avg = sum(self.task_losses_history[task_name]) / len(self.task_losses_history[task_name])
                    rate = recent_avg / (overall_avg + 1e-8)
                    weights[task_name] = rate ** self.alpha
                else:
                    weights[task_name] = 1.0
        
        return weights


# Enhanced usage example
def train_multitask_model():
    """Complete training example with advanced features"""
    
    # Model setup
    base_model = FTTransformer(
        categories=[10, 5, 6],
        num_continuous=8,
        dim=128,
        depth=6,
        heads=8,
        dim_out=128
    )
    
    task_configs = {
        'price': {'type': 'regression', 'output_dim': 1},
        'category': {'type': 'classification', 'num_classes': 5},
    }
    
    model = MultitaskFTTransformer(base_model, task_configs)
    
    # Advanced loss components
    adaptive_weighting = AdaptiveTaskWeighting(list(task_configs.keys()))
    gradient_balancing = GradientBalancing(list(task_configs.keys()))
    
    # Optimizer
    optimizer = torch.optim.AdamW(
        list(model.parameters()) + list(adaptive_weighting.parameters()),
        lr=1e-3,
        weight_decay=1e-4
    )
    
    # Training loop example
    model.train()
    for epoch in range(10):
        # Generate dummy batch
        batch_size = 32
        x_categorical = torch.randint(0, 5, (batch_size, 3))
        x_continuous = torch.randn(batch_size, 8)
        
        targets = {
            'price': torch.randn(batch_size, 1) * 50 + 150,  # Price range: ~100-200
            'category': torch.randint(0, 5, (batch_size,)),   # Categories: 0-4
        }
        
        # Forward pass
        outputs = model(x_categorical, x_continuous)
        
        # Calculate individual task losses
        task_losses = {}
        for task_name, config in task_configs.items():
            pred = outputs[task_name]
            target = targets[task_name]
            
            if config['type'] == 'regression':
                task_losses[task_name] = F.mse_loss(pred, target)
            elif config['type'] == 'classification':
                task_losses[task_name] = F.cross_entropy(pred, target)
        
        # Apply adaptive weighting
        total_loss = adaptive_weighting(task_losses)
        
        # Alternative: Use gradient balancing
        # weights = gradient_balancing.compute_weights(task_losses)
        # total_loss = sum(weights[task] * loss for task, loss in task_losses.items())
        
        # Backward pass
        optimizer.zero_grad()
        total_loss.backward()
        
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()
        
        if epoch % 2 == 0:
            print(f"Epoch {epoch}: Total Loss = {total_loss.item():.4f}")
            for task_name, loss in task_losses.items():
                print(f"  {task_name}: {loss.item():.4f}")


# Inference utilities
class MultitaskInference:
    """Utilities for inference with multitask model"""
    def __init__(self, model: MultitaskFTTransformer):
        self.model = model
        self.model.eval()
    
    def predict_single_task(self, x_categorical: torch.Tensor, 
                           x_continuous: Optional[torch.Tensor], 
                           task_name: str) -> torch.Tensor:
        """Predict for a single task only"""
        with torch.no_grad():
            outputs = self.model(x_categorical, x_continuous, tasks=[task_name])
            return outputs[task_name]
    
    def predict_all_tasks(self, x_categorical: torch.Tensor, 
                         x_continuous: Optional[torch.Tensor]) -> Dict[str, torch.Tensor]:
        """Predict for all tasks"""
        with torch.no_grad():
            return self.model(x_categorical, x_continuous)
    
    def get_embeddings(self, x_categorical: torch.Tensor, 
                      x_continuous: Optional[torch.Tensor]) -> torch.Tensor:
        """Get shared embeddings from base model"""
        with torch.no_grad():
            return self.model.base_model(x_categorical, x_continuous)


In [58]:
import pandas as pd

In [59]:
data = pd.read_csv(r'C:\Users\rayhn\College\MK Semester 7\Tugas Akhir 1\MY SKRIPSI GUE\MultiTask-Tab\data.csv', delimiter=';',)

In [60]:
data = data[data['Lapse'] <= 1]

In [61]:
data['Lapse'].value_counts()

Lapse
0    84007
1    20008
Name: count, dtype: int64

In [62]:
data.columns

Index(['ID', 'Date_start_contract', 'Date_last_renewal', 'Date_next_renewal',
       'Date_birth', 'Date_driving_licence', 'Distribution_channel',
       'Seniority', 'Policies_in_force', 'Max_policies', 'Max_products',
       'Lapse', 'Date_lapse', 'Payment', 'Premium', 'Cost_claims_year',
       'N_claims_year', 'N_claims_history', 'R_Claims_history', 'Type_risk',
       'Area', 'Second_driver', 'Year_matriculation', 'Power',
       'Cylinder_capacity', 'Value_vehicle', 'N_doors', 'Type_fuel', 'Length',
       'Weight'],
      dtype='object')

In [63]:
data.info()

<class 'pandas.core.frame.DataFrame'>
Index: 104015 entries, 0 to 105554
Data columns (total 30 columns):
 #   Column                Non-Null Count   Dtype  
---  ------                --------------   -----  
 0   ID                    104015 non-null  int64  
 1   Date_start_contract   104015 non-null  object 
 2   Date_last_renewal     104015 non-null  object 
 3   Date_next_renewal     104015 non-null  object 
 4   Date_birth            104015 non-null  object 
 5   Date_driving_licence  104015 non-null  object 
 6   Distribution_channel  104015 non-null  int64  
 7   Seniority             104015 non-null  int64  
 8   Policies_in_force     104015 non-null  int64  
 9   Max_policies          104015 non-null  int64  
 10  Max_products          104015 non-null  int64  
 11  Lapse                 104015 non-null  int64  
 12  Date_lapse            33790 non-null   object 
 13  Payment               104015 non-null  int64  
 14  Premium               104015 non-null  float64
 15  Cost_

In [64]:
# Ambil 500 sampel dari tiap kelas
data = (
    data.groupby("Lapse", group_keys=False)
        .apply(lambda x: x.sample(n=20000, random_state=42))
)

print(data["Lapse"].value_counts())


Lapse
0    20000
1    20000
Name: count, dtype: int64


In [65]:
data = data.reset_index(drop=True)

In [66]:
from sklearn.preprocessing import StandardScaler

In [67]:
# def train_multitask_model():
#     """Complete training example with advanced features"""
    
#     # Model setup
#     cat_cols = ['Seniority', 'Policies_in_force', 'Max_policies', 'Max_products', 'Type_risk']
#     num_cols = ['Cylinder_capacity', 'Power', 'Year_matriculation']
#     num_categories = [data[col].nunique() for col in cat_cols]

#     scaler = StandardScaler()
#     data[num_cols] = scaler.fit_transform(data[num_cols])

#     from sklearn.preprocessing import LabelEncoder

#     for col in cat_cols:
#         le = LabelEncoder()
#         data[col] = le.fit_transform(data[col])


#     base_model = FTTransformer(
#         categories=num_categories,
#         num_continuous=len(num_cols),
#         dim=128,
#         depth=6,
#         heads=8,
#         dim_out=128
#     )

    
#     task_configs = {
#         'Value_vehicle': {'type': 'regression', 'output_dim': 1},
#         'lapse': {'type': 'classification', 'num_classes': 2},
#     }
    
#     model = MultitaskFTTransformer(base_model, task_configs)
    
#     # Advanced loss components
#     adaptive_weighting = AdaptiveTaskWeighting(list(task_configs.keys()))
#     gradient_balancing = GradientBalancing(list(task_configs.keys()))
    
#     # Optimizer
#     optimizer = torch.optim.AdamW(
#         list(model.parameters()) + list(adaptive_weighting.parameters()),
#         lr=1e-3,
#         weight_decay=1e-4
#     )
    
#     # Training loop example
#     model.train()
#     for epoch in range(3):
#         # Generate dummy batch

#         x_categorical = torch.tensor(
#             data[cat_cols].astype('int64').values, dtype=torch.long
#         )

#         x_continuous = torch.tensor(
#             data[num_cols].astype('float32').values, dtype=torch.float32
#         )

#         targets = {
#             'Value_vehicle': torch.tensor(
#                 data['Value_vehicle'].astype('float32').values, dtype=torch.float32
#             ).unsqueeze(1),  # regression tetap (N,1)
#             'lapse': torch.tensor(
#                 data['Lapse'].astype('int64').values, dtype=torch.long
#             ),  # classification harus 1D (N,)
#         }

        
#         # Forward pass
#         outputs = model(x_categorical, x_continuous)
        
#         # Calculate individual task losses
#         task_losses = {}
#         for task_name, config in task_configs.items():
#             pred = outputs[task_name]
#             target = targets[task_name]
            
#             if config['type'] == 'regression':
#                 task_losses[task_name] = F.mse_loss(pred, target)
#             elif config['type'] == 'classification':
#                 task_losses[task_name] = F.cross_entropy(pred, target)
        
#         # Apply adaptive weighting
        
#         # Alternative: Use gradient balancing
#         weights = gradient_balancing.compute_weights(task_losses)
#         total_loss = sum(weights[task] * loss for task, loss in task_losses.items())
        
#         # Backward pass
#         optimizer.zero_grad()
#         total_loss.backward()
        
#         # Gradient clipping
#         torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
#         optimizer.step()
        
#         if epoch % 2 == 0:
#             print(f"Epoch {epoch}: Total Loss = {total_loss.item():.4f}")
#             for task_name, loss in task_losses.items():
#                 print(f"  {task_name}: {loss.item():.4f}")



In [68]:
data[['Value_vehicle']].describe()

Unnamed: 0,Value_vehicle
count,40000.0
mean,18280.73156
std,9118.739966
min,270.46
25%,12950.0
50%,17480.0
75%,22466.2875
max,220675.8


In [69]:

# Model setup
cat_cols = ['Seniority', 'Policies_in_force', 'Max_policies', 'Max_products', 'Type_risk']
num_cols = ['Cylinder_capacity', 'Power', 'Year_matriculation']
num_categories = [data[col].nunique() for col in cat_cols]

scaler = StandardScaler()
data[num_cols] = scaler.fit_transform(data[num_cols])

# scaler_y = StandardScaler()
# data[['Value_vehicle']] = scaler_y.fit_transform(data[['Value_vehicle']])

data[['Value_vehicle']] = data[['Value_vehicle']] / 16500

from sklearn.preprocessing import LabelEncoder

for col in cat_cols:
    le = LabelEncoder()
    data[col] = le.fit_transform(data[col])


base_model = FTTransformer(
    categories=num_categories,
    num_continuous=len(num_cols),
    dim=32,
    depth=3,
    heads=8,
    dim_out=64
)


task_configs = {
    'Value_vehicle': {'type': 'regression', 'output_dim': 1},
    'lapse': {'type': 'classification', 'num_classes': 2},
}

model = MultitaskFTTransformer(base_model, task_configs)

# Advanced loss components
adaptive_weighting = AdaptiveTaskWeighting(list(task_configs.keys()))
gradient_balancing = GradientBalancing(list(task_configs.keys()))

# Optimizer
optimizer = torch.optim.AdamW(
    list(model.parameters()) + list(adaptive_weighting.parameters()),
    lr=0.0001,
    weight_decay=0
)

# Training loop example
model.train()
for epoch in range(2):
    # Generate dummy batch

    x_categorical = torch.tensor(
        data[cat_cols].astype('int64').values, dtype=torch.long
    )

    x_continuous = torch.tensor(
        data[num_cols].astype('float32').values, dtype=torch.float32
    )

    targets = {
        'Value_vehicle': torch.tensor(
        data['Value_vehicle'].astype('float32').values, dtype=torch.float32
        ).unsqueeze(1),  # regression tetap (N,1)
        'lapse': torch.tensor(
            data['Lapse'].astype('int64').values, dtype=torch.long
        ),  # classification harus 1D (N,)
    }

    
    # Forward pass
    outputs = model(x_categorical, x_continuous)
    
    # Calculate individual task losses
    task_losses = {}
    for task_name, config in task_configs.items():
        pred = outputs[task_name]
        target = targets[task_name]
        
        if config['type'] == 'regression':
            task_losses[task_name] = F.mse_loss(pred, target)
        elif config['type'] == 'classification':
            task_losses[task_name] = F.cross_entropy(pred, target)
    
    # Apply adaptive weighting
    
    # Alternative: Use gradient balancing
    weights = gradient_balancing.compute_weights(task_losses)
    total_loss = sum(weights[task] * loss for task, loss in task_losses.items())
    
    # Backward pass
    optimizer.zero_grad()
    total_loss.backward()
    
    # Gradient clipping
    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
    
    optimizer.step()
    
    if epoch % 2 == 0:
        print(f"Epoch {epoch}: Total Loss = {total_loss.item():.4f}")
        for task_name, loss in task_losses.items():
            print(f"  {task_name}: {loss.item():.4f}")



Epoch 0: Total Loss = 27549.7246
  Value_vehicle: 27549.0234
  lapse: 0.7016


In [None]:
model.train()
for epoch in range(10):
    # Generate dummy batch
    x_categorical = torch.tensor(
        data[cat_cols].astype('int64').values, dtype=torch.long
    )
    x_continuous = torch.tensor(
        data[num_cols].astype('float32').values, dtype=torch.float32
    )
    targets = {
        'Value_vehicle': torch.tensor(
            data['Value_vehicle'].astype('float32').values, dtype=torch.float32
        ).unsqueeze(1),  # regression tetap (N,1)
        'lapse': torch.tensor(
            data['Lapse'].astype('int64').values, dtype=torch.long
        ),  # classification harus 1D (N,)
    }

    # Forward pass
    outputs = model(x_categorical, x_continuous)

    # Calculate individual task losses
    task_losses = {}
    metrics = {}
    for task_name, config in task_configs.items():
        pred = outputs[task_name]
        target = targets[task_name]

        if config['type'] == 'regression':
            task_losses[task_name] = F.mse_loss(pred, target)
            # --- RMSE ---
            rmse = torch.sqrt(task_losses[task_name]).item()
            metrics[task_name] = {"rmse": rmse}

        elif config['type'] == 'classification':
            task_losses[task_name] = F.cross_entropy(pred, target)
            # --- Accuracy ---
            pred_labels = pred.argmax(dim=1)
            acc = (pred_labels == target).float().mean().item()
            metrics[task_name] = {"accuracy": acc}

    # Adaptive weighting
    weights = gradient_balancing.compute_weights(task_losses)
    total_loss = sum(weights[task] * loss for task, loss in task_losses.items())

    # Backward pass
    optimizer.zero_grad()
    total_loss.backward()
    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
    optimizer.step()

    if epoch % 2 == 0:
        print(f"Epoch {epoch}: Total Loss = {total_loss.item():.4f}")
        for task_name, loss in task_losses.items():
            print(f"  {task_name}: {loss.item():.4f}")
            if 'rmse' in metrics[task_name]:
                print(f"    RMSE     : {metrics[task_name]['rmse']:.4f}")
            if 'accuracy' in metrics[task_name]:
                print(f"    Accuracy : {metrics[task_name]['accuracy']:.4f}")


Epoch 0: Total Loss = 26932.1152
  Value_vehicle: 26931.4141
    RMSE     : 164.1079
  lapse: 0.7016
    Accuracy : 0.4961
Epoch 2: Total Loss = 26311.0176
  Value_vehicle: 26310.3164
    RMSE     : 162.2045
  lapse: 0.7003
    Accuracy : 0.4952
Epoch 4: Total Loss = 25687.5508
  Value_vehicle: 25686.8496
    RMSE     : 160.2712
  lapse: 0.7004
    Accuracy : 0.4973
Epoch 6: Total Loss = 25069.4043
  Value_vehicle: 25068.7031
    RMSE     : 158.3310
  lapse: 0.7004
    Accuracy : 0.4944
Epoch 8: Total Loss = 24365.1875
  Value_vehicle: 24382.0547
    RMSE     : 156.1475
  lapse: 0.6995
    Accuracy : 0.4965


In [71]:
base_model = FTTransformer(
    categories=num_categories,
    num_continuous=len(num_cols),
    dim=32,
    depth=3,
    heads=8,
    dim_out=64
)


task_configs = {
    'Value_vehicle': {'type': 'regression', 'output_dim': 1},
    'lapse': {'type': 'classification', 'num_classes': 2},
}

model = MultitaskFTTransformer(base_model, task_configs)


In [81]:
x_categorical = torch.tensor(
    data[cat_cols].astype('int64').values, dtype=torch.long
)
x_continuous = torch.tensor(
    data[num_cols].astype('float32').values, dtype=torch.float32
)

In [72]:
from torchinfo import summary

# Jika model membutuhkan dua input
summary(model, input_data=[x_categorical, x_continuous])



Layer (type:depth-idx)                        Output Shape              Param #
MultitaskFTTransformer                        [40000, 2]                --
├─FTTransformer: 1-1                          [40000, 64]               192
│    └─ModuleList: 2-1                        --                        --
│    │    └─Embedding: 3-1                    [40000, 32]               1,280
│    │    └─Embedding: 3-2                    [40000, 32]               512
│    │    └─Embedding: 3-3                    [40000, 32]               480
│    │    └─Embedding: 3-4                    [40000, 32]               128
│    │    └─Embedding: 3-5                    [40000, 32]               128
│    └─Linear: 2-2                            [40000, 32]               128
│    └─ModuleList: 2-3                        --                        --
│    │    └─TransformerBlock: 3-6             [40000, 6, 32]            12,704
│    │    └─TransformerBlock: 3-7             [40000, 6, 32]            12,704
│  

In [79]:
pip install torchviz

Collecting torchviz
  Downloading torchviz-0.0.3-py3-none-any.whl.metadata (2.1 kB)
Downloading torchviz-0.0.3-py3-none-any.whl (5.7 kB)
Installing collected packages: torchviz
Successfully installed torchviz-0.0.3
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.3.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [84]:
import torch
from torchviz import make_dot

# ----- Forward pass -----
y = model(x_categorical, x_continuous)

# Jika output dict, gabungkan semua tensor menjadi satu
output_tensor = torch.cat(list(y.values()), dim=1)

# ----- Buat graph landscape -----
dot = make_dot(
    output_tensor,
    params=dict(model.named_parameters()),
    graph_attr={'rankdir': 'LR'}  # LR = left-to-right (landscape)
)

dot.format = 'png'
dot.render('fttransformer_landscape')

print("Diagram FTTransformer tersimpan sebagai fttransformer_landscape.png")


TypeError: make_dot() got an unexpected keyword argument 'graph_attr'

In [35]:
model

MultitaskFTTransformer(
  (base_model): FTTransformer(
    (categorical_embeddings): ModuleList(
      (0): Embedding(40, 32)
      (1): Embedding(16, 32)
      (2): Embedding(15, 32)
      (3-4): 2 x Embedding(4, 32)
    )
    (continuous_projection): Linear(in_features=3, out_features=32, bias=True)
    (transformer_layers): ModuleList(
      (0-2): 3 x TransformerBlock(
        (attention): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=32, out_features=32, bias=True)
        )
        (norm1): LayerNorm((32,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((32,), eps=1e-05, elementwise_affine=True)
        (ff): Sequential(
          (0): Linear(in_features=32, out_features=128, bias=True)
          (1): GELU(approximate='none')
          (2): Dropout(p=0.1, inplace=False)
          (3): Linear(in_features=128, out_features=32, bias=True)
          (4): Dropout(p=0.1, inplace=False)
        )
      )
    )
    (layer_norm): Lay

In [34]:

# Single task prediction
price_pred = MultitaskInference(model=model).predict_single_task(x_categorical=x_categorical, 
                                                    x_continuous=x_continuous, task_name='Value_vehicle')
print(f"Price predictions: {price_pred.flatten()}")

# All tasks prediction
all_preds = MultitaskInference(model=model).predict_all_tasks(x_categorical=x_categorical, 
                                                    x_continuous=x_continuous,)
print(f"All predictions: {list(all_preds.keys())}")

# Get embeddings
embeddings = MultitaskInference(model=model).get_embeddings(x_categorical=x_categorical, 
                                                    x_continuous=x_continuous,)
print(f"Embeddings shape: {embeddings.shape}")

Price predictions: tensor([121.0222, 122.4365, 119.9268,  ..., 122.0918, 125.3112, 117.6130])
All predictions: ['Value_vehicle', 'lapse']
Embeddings shape: torch.Size([40000, 64])


In [42]:
x = torch.tensor([[1,2,3],[4,5,6]])  # shape (2,3)
print(x.shape)  # (2,3)

x0 = x.unsqueeze(0)
print(x0.shape)  # (1,2,3)

x1 = x.unsqueeze(1)
print(x1.shape)  # (2,1,3)


torch.Size([2, 3])
torch.Size([1, 2, 3])
torch.Size([2, 1, 3])


In [45]:
x0

tensor([[[1, 2, 3],
         [4, 5, 6]]])