In [None]:
import sys
from pathlib import Path
PROJECT_ROOT = Path.cwd().parent
sys.path.insert(0, str(PROJECT_ROOT))

import itertools
import pandas as pd
import torch
from transformers import AutoTokenizer

from src.data_loader import load_dimabsa_dataset, create_dataloaders
from src.models.deberta_dimabsa import DeBERTaDimABSA, DeBERTaDimABSAConfig
from src.models.baselines import AspectMeanPoolingModel, TransformerVARegressor
from src.trainer import TrainingConfig, train_model
from src.lexicon import create_lexicon

# Configs
DATA_DIR = PROJECT_ROOT / 'DimABSA2026' / 'task-dataset' / 'track_a' / 'subtask_1'
MODEL_NAME = 'microsoft/deberta-v3-base'
OUTPUT_DIR = PROJECT_ROOT / 'outputs' / 'ablation'
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Tokenizer
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

# Load datasets
raw = load_dimabsa_dataset(DATA_DIR, lang='eng', domain='laptop', split_dev=True)
train_df = raw['train']
dev_df = raw['dev']
print(f'Train: {len(train_df)}, Dev: {len(dev_df)}')

#Lexicon extractor
try:
    lexicon = create_lexicon(PROJECT_ROOT / 'NRC-VAD-Lexicon-v2.1', use_dependency_parsing=False)
    lexicon_extractor = lambda texts, aspects: lexicon.extract_batch_features(texts, aspects)
    print('Lexicon loaded')
except Exception as e:
    print('Lexicon not available:', e)
    def lexicon_extractor(texts, aspects):
        return torch.zeros((len(texts), 8), dtype=torch.float32)


In [None]:
# Subsample train for fast experiments
USE_SUBSET = True
SUBSET_FRACTION = 0.2
SUBSET_MAX = 1500
if USE_SUBSET:
    n = min(len(train_df), max(1, int(len(train_df) * SUBSET_FRACTION)), SUBSET_MAX)
    train_df_small = train_df.sample(n=n, random_state=42).reset_index(drop=True)
else:
    train_df_small = train_df
print('Using train subset size =', len(train_df_small))

# Ablation settings
scalings = ['sigmoid', 'tanh', 'linear']
poolings = ['attention', 'mean', 'cls']
lexicon_flags = [True, False]

# Build experiments list (simple Cartesian product)
ablation_exps = list(itertools.product(scalings, poolings, lexicon_flags))
len(ablation_exps)


In [None]:
# Runner for ablation experiments (not executed automatically)
results = []
for i, (scaling, pooling, use_lex) in enumerate(ablation_exps, 1):
    print(f'Experiment {i}/{len(ablation_exps)} - scaling={scaling}, pooling={pooling}, lexicon={use_lex}')

    # choose model class and kwargs
    if pooling == 'attention':
        cfg = DeBERTaDimABSAConfig(model_name=MODEL_NAME, use_lexicon=use_lex, lexicon_feature_dim=8, output_scaling=scaling)
        model = DeBERTaDimABSA(cfg)
    elif pooling == 'mean':
        model = AspectMeanPoolingModel(model_name=MODEL_NAME, output_scaling=scaling)
    else:
        model = TransformerVARegressor(model_name=MODEL_NAME, pooling='cls', output_scaling=scaling)

    # dataloaders (small subset)
    train_loader, dev_loader = create_dataloaders(train_df_small, dev_df, tokenizer, batch_size=16, max_length=128, use_aspect_aware=True)

    cfg_train = TrainingConfig(learning_rate=2e-5, num_epochs=3, checkpoint_dir=str(OUTPUT_DIR / 'checkpoints'), device=str(DEVICE))

    # run training (uncomment to execute)
    res = train_model(model, train_loader, dev_loader, lexicon_extractor=lexicon_extractor if use_lex else None, config=cfg_train, model_name=f'ablation_{i}')
    results.append({'scaling': scaling, 'pooling': pooling, 'use_lex': use_lex, 'best_metric': res['best_metric']})

# Save planned experiments
import pandas as pd
pd.DataFrame([{'scaling': s, 'pooling': p, 'use_lex': l} for s,p,l in ablation_exps]).to_csv(OUTPUT_DIR / 'ablation_plan.csv', index=False)
print('Ablation plan saved to', OUTPUT_DIR / 'ablation_plan.csv')


## Visualizations

The cells below create three visualizations if `outputs/ablation/ablation_results.csv` exists: grouped barplot (scaling × pooling, hue=lexicon), violin plot (distribution by pooling), and heatmap (scaling vs pooling mean metric when lexicon on).


In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from pathlib import Path

OUT_DIR = Path.cwd().parent / 'outputs' / 'ablation'
OUT_DIR.mkdir(parents=True, exist_ok=True)

results_path = OUT_DIR / 'ablation_results.csv'
if not results_path.exists():
    print('No ablation results file found at', results_path)
    print('Run the ablation experiments to produce', results_path)
else:
    df = pd.read_csv(results_path)
    # Ensure columns exist: scaling, pooling, use_lex, best_metric
    if not {'scaling','pooling','use_lex','best_metric'}.issubset(df.columns):
        print('Expected columns missing in', results_path)
    else:
        plt.figure(figsize=(10,6))
        sns.catplot(data=df, x='scaling', y='best_metric', hue='pooling', kind='bar', ci='sd', height=5, aspect=1.5)
        plt.title('Mean Best Metric by Output Scaling and Pooling')
        plt.tight_layout()
        out_file = OUT_DIR / 'ablation_barplot_scaling_pooling.png'
        plt.savefig(out_file, dpi=200)
        plt.show()
        print('Saved', out_file)


In [None]:
# Violin plot: distribution of best_metric by pooling (hue=scaling)
import numpy as np

results_path = OUT_DIR / 'ablation_results.csv'
if not results_path.exists():
    print('No ablation results file found at', results_path)
else:
    df = pd.read_csv(results_path)
    if 'pooling' not in df.columns or 'best_metric' not in df.columns:
        print('Expected columns missing in', results_path)
    else:
        plt.figure(figsize=(8,5))
        sns.violinplot(data=df, x='pooling', y='best_metric', hue='scaling', split=True, inner='quartile')
        plt.title('Distribution of Best Metric by Pooling (colored by scaling)')
        plt.tight_layout()
        out_file = OUT_DIR / 'ablation_violin_pooling.png'
        plt.savefig(out_file, dpi=200)
        plt.show()
        print('Saved', out_file)


In [None]:
# Heatmap: scaling × pooling mean metric when lexicon is True
results_path = OUT_DIR / 'ablation_results.csv'
if not results_path.exists():
    print('No ablation results file found at', results_path)
else:
    df = pd.read_csv(results_path)
    # focus on lexicon-on experiments for clearer pivot
    df_on = df[df['use_lex'] == True] if 'use_lex' in df.columns else df
    if df_on.empty:
        print('No lexicon-on rows found; using all rows for heatmap')
        df_on = df
    try:
        pivot = df_on.pivot_table(index='pooling', columns='scaling', values='best_metric', aggfunc='mean')
        plt.figure(figsize=(6,4))
        sns.heatmap(pivot, annot=True, fmt='.4f', cmap='vlag')
        plt.title('Mean Best Metric (lexicon-on) — Pooling vs Scaling')
        plt.tight_layout()
        out_file = OUT_DIR / 'ablation_heatmap_pooling_scaling.png'
        plt.savefig(out_file, dpi=200)
        plt.show()
        print('Saved', out_file)
    except Exception as e:
        print('Could not create heatmap:', e)
