# Ensemble + Fractional Translation

Key insights from top kernel (jonathanchan/santa25-ensemble-sa-fractional-translation):
1. Collect best per-N solutions from MULTIPLE sources
2. Apply fractional translation - tiny position adjustments (0.001, 0.0005, etc.) in 8 directions
3. This finds improvements that SA misses

In [1]:
import sys
import os
os.chdir('/home/code/experiments/007_ensemble_fractional')
sys.path.insert(0, '/home/code')

import numpy as np
import pandas as pd
import json
import time
import glob
import math
from numba import njit

print("Imports done")

Imports done


In [2]:
# Tree polygon template
@njit
def make_polygon_template():
    tw=0.15; th=0.2; bw=0.7; mw=0.4; ow=0.25
    tip=0.8; t1=0.5; t2=0.25; base=0.0; tbot=-th
    x=np.array([0,ow/2,ow/4,mw/2,mw/4,bw/2,tw/2,tw/2,-tw/2,-tw/2,-bw/2,-mw/4,-mw/2,-ow/4,-ow/2],np.float64)
    y=np.array([tip,t1,t1,t2,t2,base,base,tbot,tbot,base,base,t2,t2,t1,t1],np.float64)
    return x, y

TX, TY = make_polygon_template()
print(f"Tree template: {len(TX)} vertices")

Tree template: 15 vertices


In [3]:
@njit
def get_tree_vertices(x, y, deg):
    """Get rotated tree vertices."""
    r = deg * np.pi / 180.0
    c = np.cos(r)
    s = np.sin(r)
    vx = np.empty(15, dtype=np.float64)
    vy = np.empty(15, dtype=np.float64)
    for i in range(15):
        vx[i] = c * TX[i] - s * TY[i] + x
        vy[i] = c * TX[i] + s * TY[i] + y
    return vx, vy

@njit
def score_group(xs, ys, degs):
    """Calculate score for a group of trees."""
    n = len(xs)
    mnx = mny = 1e300
    mxx = mxy = -1e300
    for i in range(n):
        r = degs[i] * np.pi / 180.0
        c = np.cos(r)
        s = np.sin(r)
        xi, yi = xs[i], ys[i]
        for j in range(15):
            X = c * TX[j] - s * TY[j] + xi
            Y = s * TX[j] + c * TY[j] + yi
            mnx = min(mnx, X)
            mxx = max(mxx, X)
            mny = min(mny, Y)
            mxy = max(mxy, Y)
    side = max(mxx - mnx, mxy - mny)
    return side * side / n

print("Score functions defined")

Score functions defined


In [4]:
@njit
def polygons_overlap(vx1, vy1, vx2, vy2):
    """Check if two polygons overlap using SAT."""
    # Check all edges of polygon 1
    for i in range(15):
        j = (i + 1) % 15
        nx = vy1[j] - vy1[i]
        ny = vx1[i] - vx1[j]
        
        min1 = max1 = nx * vx1[0] + ny * vy1[0]
        for k in range(1, 15):
            p = nx * vx1[k] + ny * vy1[k]
            min1 = min(min1, p)
            max1 = max(max1, p)
        
        min2 = max2 = nx * vx2[0] + ny * vy2[0]
        for k in range(1, 15):
            p = nx * vx2[k] + ny * vy2[k]
            min2 = min(min2, p)
            max2 = max(max2, p)
        
        if max1 <= min2 or max2 <= min1:
            return False
    
    # Check all edges of polygon 2
    for i in range(15):
        j = (i + 1) % 15
        nx = vy2[j] - vy2[i]
        ny = vx2[i] - vx2[j]
        
        min1 = max1 = nx * vx1[0] + ny * vy1[0]
        for k in range(1, 15):
            p = nx * vx1[k] + ny * vy1[k]
            min1 = min(min1, p)
            max1 = max(max1, p)
        
        min2 = max2 = nx * vx2[0] + ny * vy2[0]
        for k in range(1, 15):
            p = nx * vx2[k] + ny * vy2[k]
            min2 = min(min2, p)
            max2 = max(max2, p)
        
        if max1 <= min2 or max2 <= min1:
            return False
    
    return True

@njit
def has_any_overlap(xs, ys, degs):
    """Check if any trees overlap."""
    n = len(xs)
    vertices = []
    for i in range(n):
        vx, vy = get_tree_vertices(xs[i], ys[i], degs[i])
        vertices.append((vx, vy))
    
    for i in range(n):
        for j in range(i + 1, n):
            if polygons_overlap(vertices[i][0], vertices[i][1], vertices[j][0], vertices[j][1]):
                return True
    return False

print("Overlap check functions defined")

Overlap check functions defined


In [5]:
def strip(a):
    """Remove 's' prefix from values."""
    return np.array([float(str(v).replace('s', '')) for v in a], np.float64)

def load_csv(fp):
    """Load a submission CSV and return per-N data."""
    try:
        df = pd.read_csv(fp)
        if not {'id', 'x', 'y', 'deg'}.issubset(df.columns):
            return None
        df['N'] = df['id'].astype(str).str.split('_').str[0].astype(int)
        return df
    except:
        return None

print("Utility functions defined")

Utility functions defined


In [6]:
# Collect all CSV files from snapshots
snapshot_dir = '/home/nonroot/snapshots/santa-2025'
csv_files = []
for root, dirs, files in os.walk(snapshot_dir):
    for f in files:
        if f.endswith('.csv'):
            csv_files.append(os.path.join(root, f))

# Also add our baseline
csv_files.append('/home/code/experiments/001_valid_baseline/submission.csv')

# Add GitHub submission
csv_files.append('/tmp/github_submission.csv')

print(f"Found {len(csv_files)} CSV files to ensemble")

Found 3531 CSV files to ensemble


In [7]:
# Build ensemble - best per-N from all sources
best_per_n = {n: {'score': 1e300, 'data': None, 'src': None} for n in range(1, 201)}

print("Processing CSV files...")
processed = 0
for fp in csv_files:
    df = load_csv(fp)
    if df is None:
        continue
    
    for n, g in df.groupby('N'):
        if n < 1 or n > 200:
            continue
        xs = strip(g['x'].to_numpy())
        ys = strip(g['y'].to_numpy())
        ds = strip(g['deg'].to_numpy())
        sc = score_group(xs, ys, ds)
        
        if sc < best_per_n[n]['score']:
            best_per_n[n]['score'] = sc
            best_per_n[n]['xs'] = xs
            best_per_n[n]['ys'] = ys
            best_per_n[n]['ds'] = ds
            best_per_n[n]['src'] = fp.split('/')[-2]
    
    processed += 1
    if processed % 500 == 0:
        print(f"  Processed {processed} files...")

print(f"Processed {processed} files total")

# Calculate ensemble score
ensemble_score = sum(best_per_n[n]['score'] for n in range(1, 201))
print(f"\nEnsemble score (before fractional translation): {ensemble_score:.6f}")

Processing CSV files...


  Processed 500 files...


  Processed 1000 files...


  Processed 1500 files...


  Processed 2000 files...


  Processed 2500 files...


  Processed 3000 files...


  Processed 3500 files...


Processed 3520 files total

Ensemble score (before fractional translation): 27.414787


In [8]:
@njit
def fractional_translation_single_tree(xs, ys, ds, tree_idx, max_iter=100):
    """Apply fractional translation to a single tree."""
    n = len(xs)
    best_xs = xs.copy()
    best_ys = ys.copy()
    best_score = score_group(xs, ys, ds)
    
    frac_steps = np.array([0.001, 0.0005, 0.0002, 0.0001, 0.00005, 0.00002, 0.00001])
    directions = np.array([
        [0, 1], [0, -1], [1, 0], [-1, 0],
        [1, 1], [1, -1], [-1, 1], [-1, -1]
    ], dtype=np.float64)
    
    for _ in range(max_iter):
        improved = False
        for step in frac_steps:
            for d in range(8):
                dx = directions[d, 0] * step
                dy = directions[d, 1] * step
                
                # Try moving tree
                test_xs = best_xs.copy()
                test_ys = best_ys.copy()
                test_xs[tree_idx] += dx
                test_ys[tree_idx] += dy
                
                # Check overlap
                if not has_any_overlap(test_xs, test_ys, ds):
                    new_score = score_group(test_xs, test_ys, ds)
                    if new_score < best_score - 1e-12:
                        best_score = new_score
                        best_xs = test_xs
                        best_ys = test_ys
                        improved = True
        
        if not improved:
            break
    
    return best_xs, best_ys, best_score

print("Fractional translation function defined")

Fractional translation function defined


In [9]:
@njit
def fractional_translation_all(xs, ys, ds, max_iter=50):
    """Apply fractional translation to all trees."""
    n = len(xs)
    best_xs = xs.copy()
    best_ys = ys.copy()
    best_score = score_group(xs, ys, ds)
    
    frac_steps = np.array([0.001, 0.0005, 0.0002, 0.0001, 0.00005])
    directions = np.array([
        [0, 1], [0, -1], [1, 0], [-1, 0],
        [1, 1], [1, -1], [-1, 1], [-1, -1]
    ], dtype=np.float64)
    
    for _ in range(max_iter):
        improved = False
        for i in range(n):
            for step in frac_steps:
                for d in range(8):
                    dx = directions[d, 0] * step
                    dy = directions[d, 1] * step
                    
                    # Try moving tree
                    test_xs = best_xs.copy()
                    test_ys = best_ys.copy()
                    test_xs[i] += dx
                    test_ys[i] += dy
                    
                    # Check overlap
                    if not has_any_overlap(test_xs, test_ys, ds):
                        new_score = score_group(test_xs, test_ys, ds)
                        if new_score < best_score - 1e-12:
                            best_score = new_score
                            best_xs = test_xs
                            best_ys = test_ys
                            improved = True
        
        if not improved:
            break
    
    return best_xs, best_ys, best_score

print("Full fractional translation function defined")

Full fractional translation function defined


In [10]:
# Debug - check what's in best_per_n
print("Checking ensemble data...")
for n in [1, 2, 5, 10, 50, 100, 200]:
    entry = best_per_n[n]
    print(f"N={n}: score={entry['score']:.6f}, has_data={entry.get('xs') is not None}, src={entry.get('src')}")

Checking ensemble data...
N=1: score=0.661250, has_data=True, src=code
N=2: score=0.338427, has_data=True, src=experiments
N=5: score=0.206839, has_data=True, src=experiments
N=10: score=0.154792, has_data=True, src=experiments
N=50: score=0.034288, has_data=True, src=experiments
N=100: score=0.157260, has_data=True, src=experiments
N=200: score=0.124941, has_data=True, src=experiments


In [None]:
# Apply fractional translation to each N
print("Applying fractional translation...")
print("="*70)

improved_per_n = {}
improvements = []

for n in range(1, 201):
    if best_per_n[n]['xs'] is None:
        continue
    
    xs = best_per_n[n]['xs']
    ys = best_per_n[n]['ys']
    ds = best_per_n[n]['ds']
    old_score = best_per_n[n]['score']
    
    # Apply fractional translation
    new_xs, new_ys, new_score = fractional_translation_all(xs, ys, ds, max_iter=30)
    
    improved_per_n[n] = {
        'xs': new_xs,
        'ys': new_ys,
        'ds': ds,
        'score': new_score
    }
    
    if new_score < old_score - 1e-9:
        diff = old_score - new_score
        improvements.append((n, diff))
        print(f"N={n:3d}: {old_score:.9f} -> {new_score:.9f} (improved by {diff:.9f})")
    
    if n % 50 == 0:
        print(f"  Processed N=1 to {n}...")

print(f"\nTotal improvements: {len(improvements)}")
if improvements:
    total_improvement = sum(d for _, d in improvements)
    print(f"Total score improvement: {total_improvement:.9f}")

In [None]:
# Calculate final score
final_score = sum(improved_per_n[n]['score'] for n in range(1, 201))
print(f"\nFinal score after fractional translation: {final_score:.6f}")
print(f"Improvement from ensemble: {ensemble_score - final_score:.9f}")

In [None]:
# Save submission
rows = []
for n in range(1, 201):
    xs = improved_per_n[n]['xs']
    ys = improved_per_n[n]['ys']
    ds = improved_per_n[n]['ds']
    for i in range(n):
        rows.append({
            'id': f'{n:03d}_{i}',
            'x': f's{xs[i]:.20f}',
            'y': f's{ys[i]:.20f}',
            'deg': f's{ds[i]:.20f}'
        })

submission_df = pd.DataFrame(rows)
submission_df.to_csv('submission.csv', index=False)
print(f"Saved submission.csv with {len(rows)} rows")

# Copy to submission folder
import shutil
shutil.copy('submission.csv', '/home/submission/submission.csv')
print("Copied to /home/submission/")

In [None]:
# Save metrics
metrics = {
    'cv_score': final_score,
    'ensemble_score': ensemble_score,
    'improvement': ensemble_score - final_score,
    'n_improvements': len(improvements),
    'improvements': [(n, float(d)) for n, d in improvements[:20]],
    'notes': 'Ensemble from all snapshots + GitHub + fractional translation optimization'
}

with open('metrics.json', 'w') as f:
    json.dump(metrics, f, indent=2)

print(f"\nFinal CV Score: {final_score:.6f}")