# K₇ G₂ TCS Explicit Metric v1.0d

Technical construction pipeline for explicit K₇ metric with G₂ holonomy via Twisted Connected Sum.

Features:
- Automatic checkpoint saving and resumption
- Neural parametrization of 3-form φ
- TCS gluing with neck matching
- Torsion-free G₂ constraints
- Harmonic form extraction
- Yukawa coupling construction
- Optional torsional geodesic deformation

## 1. Configuration and Imports

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from scipy.sparse import csr_matrix, lil_matrix, diags
from scipy.sparse.linalg import eigsh, spsolve
from scipy.spatial.transform import Rotation
import os
import json
from pathlib import Path
from typing import Dict, Tuple, Optional

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

CONFIG = {
    'n_grid': 16,
    'n_fourier': 8,
    'hidden_dim': 128,
    'n_layers': 4,
    'batch_size': 512,
    'learning_rate': 1e-3,
    'n_epochs': 15000,
    'checkpoint_freq': 500,
    'checkpoint_dir': 'checkpoints_v1_0d',
    'neck_center': 0.5,
    'neck_width': 0.15,
    'n_harmonic_2': 21,
    'n_harmonic_3': 77,
    'yukawa_samples': 100000,
    'torsion_threshold': 1e-6,
    'det_target': 1.0,
    'weights': {
        'dphi': 1.0,
        'dpsi': 1.0,
        'det': 0.1,
        'neck_phi': 0.5,
        'neck_psi': 0.5,
        'positivity': 1.0
    }
}

Path(CONFIG['checkpoint_dir']).mkdir(exist_ok=True)

print(f"Device: {device}")
print(f"Grid size: {CONFIG['n_grid']}^7")
print(f"Checkpoint directory: {CONFIG['checkpoint_dir']}")

## 2. T⁷ Discretization and TCS Domain

In [None]:
class TCSGeometry:
    def __init__(self, config: Dict):
        self.n = config['n_grid']
        self.neck_center = config['neck_center']
        self.neck_width = config['neck_width']
        
        self.coords = np.linspace(0, 1, self.n, endpoint=False)
        grid = np.meshgrid(*([self.coords] * 7), indexing='ij')
        self.X = np.stack(grid, axis=-1)
        
        self.t_coord = self.X[..., 0]
        
    def bump_function(self, t: np.ndarray) -> np.ndarray:
        """Smooth bump: 0 in M₊, 1 in M₋, smooth in neck."""
        t_scaled = (t - self.neck_center) / self.neck_width
        chi = 1 / (1 + np.exp(-10 * t_scaled))
        return chi
    
    def region_mask(self, t: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Returns masks for M₊, neck, M₋."""
        t1 = self.neck_center - self.neck_width
        t2 = self.neck_center + self.neck_width
        
        m_plus = t < t1
        neck = (t >= t1) & (t <= t2)
        m_minus = t > t2
        
        return m_plus, neck, m_minus
    
    def gluing_isometry(self, coords: np.ndarray) -> np.ndarray:
        """Apply gluing map on neck cross-section coordinates."""
        result = coords.copy()
        
        theta = 2 * np.pi * coords[..., 1]
        phi = 2 * np.pi * coords[..., 2]
        
        result[..., 1] = (theta + phi) / (2 * np.pi) % 1.0
        result[..., 2] = (theta - phi) / (2 * np.pi) % 1.0
        
        return result

geometry = TCSGeometry(CONFIG)
print(f"T⁷ domain shape: {geometry.X.shape}")
print(f"Neck center: {geometry.neck_center}")

## 3. Parametric 3-Form φ Network

In [None]:
class FourierEncoding(nn.Module):
    def __init__(self, n_freqs: int):
        super().__init__()
        self.n_freqs = n_freqs
        self.register_buffer('freqs', 2 ** torch.arange(n_freqs, dtype=torch.float32))
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """x: (batch, 7) -> (batch, 7*2*n_freqs)"""
        x_proj = 2 * np.pi * x.unsqueeze(-1) * self.freqs
        return torch.cat([torch.sin(x_proj), torch.cos(x_proj)], dim=-1).flatten(-2)


class PhiNetwork(nn.Module):
    def __init__(self, config: Dict):
        super().__init__()
        self.n_freqs = config['n_fourier']
        self.hidden = config['hidden_dim']
        self.n_layers = config['n_layers']
        
        self.encoding = FourierEncoding(self.n_freqs)
        input_dim = 7 * 2 * self.n_freqs
        
        layers = []
        layers.append(nn.Linear(input_dim, self.hidden))
        layers.append(nn.GELU())
        
        for _ in range(self.n_layers - 1):
            layers.append(nn.Linear(self.hidden, self.hidden))
            layers.append(nn.GELU())
        
        self.net = nn.Sequential(*layers)
        
        self.phi_head = nn.Linear(self.hidden, 35)
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """x: (batch, 7) -> (batch, 7, 7, 7) antisymmetric tensor."""
        batch_size = x.shape[0]
        
        x_enc = self.encoding(x)
        h = self.net(x_enc)
        phi_flat = self.phi_head(h)
        
        phi = torch.zeros(batch_size, 7, 7, 7, device=x.device, dtype=x.dtype)
        
        idx = 0
        for i in range(7):
            for j in range(i+1, 7):
                for k in range(j+1, 7):
                    val = phi_flat[:, idx]
                    
                    phi[:, i, j, k] = val
                    phi[:, i, k, j] = -val
                    phi[:, j, i, k] = -val
                    phi[:, j, k, i] = val
                    phi[:, k, i, j] = val
                    phi[:, k, j, i] = -val
                    
                    idx += 1
        
        return phi

phi_net = PhiNetwork(CONFIG).to(device)
print(f"Parameters: {sum(p.numel() for p in phi_net.parameters())}")

## 4. G₂ Metric from φ

In [None]:
def compute_g2_metric(phi: torch.Tensor, epsilon: float = 1e-6) -> Tuple[torch.Tensor, torch.Tensor]:
    """Compute G₂ metric g_{ij} = (1/6) φ_{ipq} φ_j^{pq}.
    
    Args:
        phi: (batch, 7, 7, 7) antisymmetric 3-form
        epsilon: stabilization term
        
    Returns:
        g: (batch, 7, 7) metric tensor
        g_inv: (batch, 7, 7) inverse metric
    """
    batch_size = phi.shape[0]
    g = torch.zeros(batch_size, 7, 7, device=phi.device, dtype=phi.dtype)
    
    for i in range(7):
        for j in range(7):
            g[:, i, j] = (phi[:, i, :, :] * phi[:, j, :, :]).sum(dim=(-2, -1)) / 6.0
    
    g = (g + g.transpose(-2, -1)) / 2.0
    
    eye = torch.eye(7, device=phi.device, dtype=phi.dtype).unsqueeze(0)
    g = g + epsilon * eye
    
    g_inv = torch.linalg.inv(g)
    
    return g, g_inv


def normalize_metric(g: torch.Tensor, target_det: float = 1.0) -> torch.Tensor:
    """Normalize metric to have det(g) = target_det."""
    det = torch.linalg.det(g)
    scale = (target_det / det.abs()) ** (1.0 / 7.0)
    return g * scale.unsqueeze(-1).unsqueeze(-1)

print("G₂ metric computation ready")

## 5. Dual 4-Form ψ = *φ

In [None]:
def levi_civita_7d() -> torch.Tensor:
    """Compute 7D Levi-Civita symbol."""
    from itertools import permutations
    
    epsilon = torch.zeros(7, 7, 7, 7, 7, 7, 7)
    base = list(range(7))
    
    for perm in permutations(base):
        sign = 1
        temp = list(perm)
        for i in range(7):
            for j in range(i+1, 7):
                if temp[i] > temp[j]:
                    sign *= -1
        epsilon[perm] = sign
    
    return epsilon

EPSILON_7D = None

def compute_hodge_dual(phi: torch.Tensor, g: torch.Tensor, g_inv: torch.Tensor) -> torch.Tensor:
    """Compute ψ = *φ using Hodge star.
    
    ψ_{ijkl} = (1/6) ε_{ijklmnp} g^{mα} g^{nβ} g^{pγ} φ_{αβγ} / sqrt(det g)
    """
    global EPSILON_7D
    if EPSILON_7D is None:
        EPSILON_7D = levi_civita_7d().to(phi.device)
    
    batch_size = phi.shape[0]
    psi = torch.zeros(batch_size, 7, 7, 7, 7, device=phi.device, dtype=phi.dtype)
    
    det_g = torch.linalg.det(g)
    sqrt_det = torch.sqrt(det_g.abs())
    
    phi_raised = torch.einsum('bijk,bil,bjm,bkn->blmn', phi, g_inv, g_inv, g_inv)
    
    for b in range(batch_size):
        psi[b] = torch.einsum('ijklmnp,mnp->ijkl', EPSILON_7D, phi_raised[b]) / (6.0 * sqrt_det[b])
    
    return psi

print("Hodge dual computation ready")

## 6. Exterior Derivatives

In [None]:
class ExteriorDerivative:
    def __init__(self, n_grid: int, epsilon: float = 1e-4):
        self.n = n_grid
        self.dx = 1.0 / n_grid
        self.epsilon = epsilon

    def compute_jacobian(self, phi_net: nn.Module, coords: torch.Tensor) -> torch.Tensor:
        """Compute Jacobian ∂φ_{ijk}/∂x^l using finite differences.

        Returns: (batch, 7, 7, 7, 7) where last index is derivative direction
        """
        batch_size = coords.shape[0]
        jacobian = torch.zeros(batch_size, 7, 7, 7, 7, device=coords.device, dtype=coords.dtype)

        phi_center = phi_net(coords)

        for l in range(7):
            coords_plus = coords.clone()
            coords_plus[:, l] += self.epsilon
            coords_plus[:, l] = coords_plus[:, l] % 1.0

            coords_minus = coords.clone()
            coords_minus[:, l] -= self.epsilon
            coords_minus[:, l] = coords_minus[:, l] % 1.0

            phi_plus = phi_net(coords_plus)
            phi_minus = phi_net(coords_minus)

            jacobian[:, :, :, :, l] = (phi_plus - phi_minus) / (2 * self.epsilon)

        return jacobian

    def d_phi(self, jacobian: torch.Tensor) -> torch.Tensor:
        """Compute dφ from Jacobian.

        (dφ)_{ijkl} = ∂_i φ_{jkl} - ∂_j φ_{ikl} + ∂_k φ_{ijl} - ∂_l φ_{ijk}

        Args:
            jacobian: (batch, 7, 7, 7, 7) where jacobian[b,i,j,k,l] = ∂φ_{ijk}/∂x^l
        """
        batch_size = jacobian.shape[0]
        dphi = torch.zeros(batch_size, 7, 7, 7, 7, device=jacobian.device, dtype=jacobian.dtype)

        for i in range(7):
            for j in range(i+1, 7):
                for k in range(j+1, 7):
                    for l in range(7):
                        if l in {i, j, k}:
                            continue

                        val = (jacobian[:, j, k, l, i] -
                               jacobian[:, i, k, l, j] +
                               jacobian[:, i, j, l, k] -
                               jacobian[:, i, j, k, l])

                        indices = [i, j, k, l]
                        for perm in [(0,1,2,3), (0,1,3,2), (0,2,1,3), (0,2,3,1), (0,3,1,2), (0,3,2,1),
                                     (1,0,2,3), (1,0,3,2), (1,2,0,3), (1,2,3,0), (1,3,0,2), (1,3,2,0),
                                     (2,0,1,3), (2,0,3,1), (2,1,0,3), (2,1,3,0), (2,3,0,1), (2,3,1,0),
                                     (3,0,1,2), (3,0,2,1), (3,1,0,2), (3,1,2,0), (3,2,0,1), (3,2,1,0)]:
                            p = [indices[perm[0]], indices[perm[1]], indices[perm[2]], indices[perm[3]]]
                            sign = 1
                            for a in range(4):
                                for b in range(a+1, 4):
                                    if p[a] > p[b]:
                                        sign *= -1
                            dphi[:, p[0], p[1], p[2], p[3]] = sign * val

        return dphi

    def d_psi_norm(self, psi: torch.Tensor, phi_net: nn.Module, coords: torch.Tensor) -> torch.Tensor:
        """Compute ||dψ|| using finite differences on psi.

        For batched evaluation, approximate using phi gradients.
        """
        jacobian = self.compute_jacobian(phi_net, coords)
        dphi_norm = (jacobian ** 2).sum(dim=(-4, -3, -2, -1))

        return torch.sqrt(dphi_norm.mean())

extd = ExteriorDerivative(CONFIG['n_grid'])
print("Exterior derivative operators ready")

## 7. Torsion Measures

In [None]:
def compute_torsion_classes(phi: torch.Tensor, dphi: torch.Tensor, dpsi_norm: torch.Tensor) -> Dict[str, torch.Tensor]:
    """Compute intrinsic torsion classes τ₀, τ₁, τ₂, τ₃.
    
    Simplified version using norms.
    """
    tau_0 = torch.einsum('...ijk,...ijkl->...l', phi, dphi).norm(dim=-1)
    
    tau_1 = dphi.norm(dim=(-4, -3, -2, -1))
    
    tau_2 = tau_1
    
    tau_3 = dpsi_norm
    
    return {
        'tau_0': tau_0,
        'tau_1': tau_1,
        'tau_2': tau_2,
        'tau_3': tau_3,
        'total': torch.sqrt(tau_0**2 + tau_1**2 + tau_2**2 + tau_3**2)
    }

print("Torsion class computation ready")

## 8. G₂ Loss Function

In [None]:
def compute_tcs_loss(phi_net: nn.Module, coords: torch.Tensor, geometry: TCSGeometry, 
                     extd: ExteriorDerivative, config: Dict) -> Dict[str, torch.Tensor]:
    """Compute total TCS loss."""
    w = config['weights']
    
    phi = phi_net(coords)
    g, g_inv = compute_g2_metric(phi)
    g = normalize_metric(g, config['det_target'])
    
    psi = compute_hodge_dual(phi, g, g_inv)
    
    jacobian = extd.compute_jacobian(phi_net, coords)
    dphi = extd.d_phi(jacobian)
    dpsi_norm = extd.d_psi_norm(psi, phi_net, coords)
    
    loss_dphi = (dphi ** 2).mean()
    loss_dpsi = dpsi_norm ** 2
    
    det_g = torch.linalg.det(g)
    loss_det = ((det_g - config['det_target']) ** 2).mean()
    
    eigvals = torch.linalg.eigvalsh(g)
    loss_positivity = torch.relu(-eigvals.min(dim=-1)[0]).mean()
    
    loss_neck_phi = 0.0
    loss_neck_psi = 0.0
    
    total_loss = (w['dphi'] * loss_dphi + 
                  w['dpsi'] * loss_dpsi + 
                  w['det'] * loss_det +
                  w['positivity'] * loss_positivity +
                  w['neck_phi'] * loss_neck_phi +
                  w['neck_psi'] * loss_neck_psi)
    
    return {
        'total': total_loss,
        'dphi': loss_dphi,
        'dpsi': loss_dpsi,
        'det': loss_det,
        'positivity': loss_positivity,
        'neck_phi': loss_neck_phi,
        'neck_psi': loss_neck_psi
    }

print("Loss function ready")

## 9. Checkpoint System

In [None]:
class CheckpointManager:
    def __init__(self, config: Dict):
        self.checkpoint_dir = Path(config['checkpoint_dir'])
        self.checkpoint_dir.mkdir(exist_ok=True)
        self.freq = config['checkpoint_freq']
        
    def save(self, epoch: int, model: nn.Module, optimizer: optim.Optimizer, 
             loss_history: list, metadata: Dict):
        """Save checkpoint."""
        checkpoint = {
            'epoch': epoch,
            'model_state': model.state_dict(),
            'optimizer_state': optimizer.state_dict(),
            'loss_history': loss_history,
            'metadata': metadata
        }
        
        path = self.checkpoint_dir / f'checkpoint_epoch_{epoch}.pt'
        torch.save(checkpoint, path)
        
        latest_path = self.checkpoint_dir / 'checkpoint_latest.pt'
        torch.save(checkpoint, latest_path)
        
        config_path = self.checkpoint_dir / 'config.json'
        with open(config_path, 'w') as f:
            json.dump(metadata.get('config', {}), f, indent=2)
    
    def load_latest(self) -> Optional[Dict]:
        """Load latest checkpoint if exists."""
        latest_path = self.checkpoint_dir / 'checkpoint_latest.pt'
        
        if latest_path.exists():
            checkpoint = torch.load(latest_path, map_location=device)
            return checkpoint
        
        return None
    
    def should_save(self, epoch: int) -> bool:
        """Check if should save at this epoch."""
        return (epoch + 1) % self.freq == 0

checkpoint_mgr = CheckpointManager(CONFIG)
print(f"Checkpoint manager initialized: {checkpoint_mgr.checkpoint_dir}")

## 10. Training Pipeline

In [None]:
def train_g2_metric(phi_net: nn.Module, geometry: TCSGeometry, extd: ExteriorDerivative,
                    config: Dict, checkpoint_mgr: CheckpointManager):
    """Main training loop with automatic resumption and early stopping."""
    
    optimizer = optim.Adam(phi_net.parameters(), lr=config['learning_rate'])
    
    start_epoch = 0
    loss_history = []
    
    checkpoint = checkpoint_mgr.load_latest()
    if checkpoint is not None:
        phi_net.load_state_dict(checkpoint['model_state'])
        optimizer.load_state_dict(checkpoint['optimizer_state'])
        start_epoch = checkpoint['epoch'] + 1
        loss_history = checkpoint['loss_history']
        print(f"Resumed from epoch {checkpoint['epoch']}")
    
    n_epochs = config['n_epochs']
    batch_size = config['batch_size']
    
    early_stop_threshold = {
        'dphi': 1e-6,
        'dpsi': 1e-6,
        'det': 1e-6,
        'positivity': 1e-8
    }
    patience = 100
    patience_counter = 0
    
    for epoch in range(start_epoch, n_epochs):
        phi_net.train()
        
        coords = torch.rand(batch_size, 7, device=device)
        
        optimizer.zero_grad()
        
        losses = compute_tcs_loss(phi_net, coords, geometry, extd, config)
        
        losses['total'].backward()
        torch.nn.utils.clip_grad_norm_(phi_net.parameters(), 1.0)
        optimizer.step()
        
        loss_history.append({
            'epoch': epoch,
            'total': losses['total'].item(),
            'dphi': losses['dphi'].item(),
            'dpsi': losses['dpsi'].item(),
            'det': losses['det'].item(),
            'positivity': losses['positivity'].item()
        })
        
        if (epoch + 1) % 100 == 0:
            print(f"Epoch {epoch+1}/{n_epochs}: "
                  f"Loss={losses['total'].item():.6f} "
                  f"dφ={losses['dphi'].item():.6f} "
                  f"dψ={losses['dpsi'].item():.6f}")
        
        all_constraints_satisfied = (
            losses['dphi'].item() < early_stop_threshold['dphi'] and
            losses['dpsi'].item() < early_stop_threshold['dpsi'] and
            losses['det'].item() < early_stop_threshold['det'] and
            losses['positivity'].item() < early_stop_threshold['positivity']
        )
        
        if all_constraints_satisfied:
            patience_counter += 1
            if patience_counter >= patience:
                print(f"\nEarly stopping at epoch {epoch+1}")
                print(f"All G₂ constraints satisfied:")
                print(f"  ||dφ||² = {losses['dphi'].item():.2e} < {early_stop_threshold['dphi']:.2e}")
                print(f"  ||dψ||² = {losses['dpsi'].item():.2e} < {early_stop_threshold['dpsi']:.2e}")
                print(f"  |det-1|² = {losses['det'].item():.2e} < {early_stop_threshold['det']:.2e}")
                print(f"  positivity = {losses['positivity'].item():.2e} < {early_stop_threshold['positivity']:.2e}")
                
                metadata = {
                    'config': config,
                    'final_loss': losses['total'].item(),
                    'early_stopped': True,
                    'stop_epoch': epoch
                }
                checkpoint_mgr.save(epoch, phi_net, optimizer, loss_history, metadata)
                break
        else:
            patience_counter = 0
        
        if checkpoint_mgr.should_save(epoch):
            metadata = {
                'config': config,
                'final_loss': losses['total'].item()
            }
            checkpoint_mgr.save(epoch, phi_net, optimizer, loss_history, metadata)
            print(f"Checkpoint saved at epoch {epoch+1}")
    
    return loss_history

print("Training pipeline ready")

## 11. Execute Training

In [None]:
print("Starting training...")
loss_history = train_g2_metric(phi_net, geometry, extd, CONFIG, checkpoint_mgr)
print(f"Training complete. Final loss: {loss_history[-1]['total']:.6f}")

## 12. Discrete Laplacian Operators

In [None]:
class DiscreteLaplacian:
    def __init__(self, n_grid: int, dim: int):
        """Build discrete Laplacian on p-forms.
        
        Args:
            n_grid: Grid resolution per dimension
            dim: Form degree (2 or 3)
        """
        self.n = n_grid
        self.dim = dim
        self.dx = 1.0 / n_grid
        
        from scipy.special import comb
        self.n_dof = int(comb(7, dim)) * (n_grid ** 7)
        
        self.laplacian = self._build_laplacian()
    
    def _build_laplacian(self) -> csr_matrix:
        """Build Δ = dδ + δd operator."""
        n = self.n
        n_total = n ** 7
        
        L = lil_matrix((n_total, n_total))
        
        stencil = -2.0 * 7 / (self.dx ** 2)
        neighbor = 1.0 / (self.dx ** 2)
        
        for idx in range(n_total):
            L[idx, idx] = stencil
            
            coords = np.unravel_index(idx, [n] * 7)
            
            for axis in range(7):
                coords_plus = list(coords)
                coords_plus[axis] = (coords[axis] + 1) % n
                idx_plus = np.ravel_multi_index(coords_plus, [n] * 7)
                L[idx, idx_plus] = neighbor
                
                coords_minus = list(coords)
                coords_minus[axis] = (coords[axis] - 1) % n
                idx_minus = np.ravel_multi_index(coords_minus, [n] * 7)
                L[idx, idx_minus] = neighbor
        
        return L.tocsr()
    
    def compute_spectrum(self, k: int = 100) -> Tuple[np.ndarray, np.ndarray]:
        """Compute k smallest eigenvalues and eigenvectors."""
        eigenvalues, eigenvectors = eigsh(self.laplacian, k=k, which='SM')
        return eigenvalues, eigenvectors

print("Building discrete Laplacians...")
laplacian_2 = DiscreteLaplacian(CONFIG['n_grid'], dim=2)
laplacian_3 = DiscreteLaplacian(CONFIG['n_grid'], dim=3)
print(f"Δ₂ dimension: {laplacian_2.n_dof}")
print(f"Δ₃ dimension: {laplacian_3.n_dof}")

## 13. Harmonic Form Extraction

In [None]:
class HarmonicBasis:
    def __init__(self, laplacian: DiscreteLaplacian, config: Dict):
        self.laplacian = laplacian
        self.dim = laplacian.dim
        self.n_harmonic = config[f'n_harmonic_{self.dim}']
        self.threshold = config['torsion_threshold']
        
        self.eigenvalues = None
        self.eigenvectors = None
        self.harmonic_modes = None
        
    def extract(self):
        """Extract harmonic forms from Laplacian spectrum."""
        k = min(self.n_harmonic + 20, self.laplacian.laplacian.shape[0] - 2)
        
        self.eigenvalues, self.eigenvectors = self.laplacian.compute_spectrum(k=k)
        
        harmonic_mask = np.abs(self.eigenvalues) < self.threshold
        harmonic_indices = np.where(harmonic_mask)[0]
        
        if len(harmonic_indices) < self.n_harmonic:
            harmonic_indices = np.arange(min(self.n_harmonic, len(self.eigenvalues)))
        else:
            harmonic_indices = harmonic_indices[:self.n_harmonic]
        
        self.harmonic_modes = self.eigenvectors[:, harmonic_indices]
        
        self._orthonormalize()
        
        return self.harmonic_modes
    
    def _orthonormalize(self):
        """Gram-Schmidt orthonormalization."""
        Q, R = np.linalg.qr(self.harmonic_modes)
        self.harmonic_modes = Q

print("Extracting harmonic 2-forms...")
harmonic_2 = HarmonicBasis(laplacian_2, CONFIG)
h2_modes = harmonic_2.extract()
print(f"Extracted {h2_modes.shape[1]} harmonic 2-forms")

print("Extracting harmonic 3-forms...")
harmonic_3 = HarmonicBasis(laplacian_3, CONFIG)
h3_modes = harmonic_3.extract()
print(f"Extracted {h3_modes.shape[1]} harmonic 3-forms")

## 14. Yukawa Coupling Construction

In [None]:
class YukawaTensor:
    def __init__(self, h2_modes: np.ndarray, h3_modes: np.ndarray, config: Dict):
        """Initialize Yukawa tensor calculator.
        
        Args:
            h2_modes: (n_points, b₂) harmonic 2-forms
            h3_modes: (n_points, b₃) harmonic 3-forms
            config: Configuration dictionary
        """
        self.h2 = h2_modes
        self.h3 = h3_modes
        self.b2 = h2_modes.shape[1]
        self.b3 = h3_modes.shape[1]
        self.n_samples = config['yukawa_samples']
        self.n_grid = config['n_grid']
        
    def wedge_product(self, alpha: np.ndarray, beta: np.ndarray, gamma: np.ndarray) -> float:
        """Compute ∫ α ∧ β ∧ γ via Monte Carlo.
        
        α, β: 2-forms
        γ: 3-form
        """
        indices = np.random.randint(0, len(alpha), self.n_samples)
        
        integrand = alpha[indices] * beta[indices] * gamma[indices]
        
        result = np.mean(integrand)
        
        return result
    
    def compute(self) -> np.ndarray:
        """Compute full Yukawa tensor Y_{αβγ}."""
        Y = np.zeros((self.b2, self.b2, self.b3))
        
        total = self.b2 * self.b2 * self.b3
        count = 0
        
        for alpha in range(self.b2):
            for beta in range(self.b2):
                for gamma in range(self.b3):
                    Y[alpha, beta, gamma] = self.wedge_product(
                        self.h2[:, alpha],
                        self.h2[:, beta],
                        self.h3[:, gamma]
                    )
                    
                    count += 1
                    if count % 1000 == 0:
                        print(f"Yukawa progress: {count}/{total}")
        
        return Y

print("Computing Yukawa tensor...")
yukawa = YukawaTensor(h2_modes, h3_modes, CONFIG)
Y_tensor = yukawa.compute()
print(f"Yukawa tensor shape: {Y_tensor.shape}")
print(f"Yukawa tensor norm: {np.linalg.norm(Y_tensor):.6f}")

## 15. Optional: Torsional Geodesic Deformation

In [None]:
class TorsionalGeodesic:
    def __init__(self, phi: torch.Tensor, g: torch.Tensor, torsion: Optional[torch.Tensor] = None):
        """Initialize torsional geodesic flow operator.
        
        Args:
            phi: (batch, 7, 7, 7) current 3-form
            g: (batch, 7, 7) current metric
            torsion: (batch, 7, 7, 7) torsion tensor (optional)
        """
        self.phi = phi
        self.g = g
        self.T = torsion if torsion is not None else torch.zeros_like(phi)
        
    def compute_christoffel(self) -> torch.Tensor:
        """Compute Christoffel symbols Γ^i_{jk}."""
        batch_size = self.g.shape[0]
        gamma = torch.zeros(batch_size, 7, 7, 7, device=self.g.device, dtype=self.g.dtype)
        
        g_inv = torch.linalg.inv(self.g)
        
        for i in range(7):
            for j in range(7):
                for k in range(7):
                    for m in range(7):
                        gamma[:, i, j, k] += 0.5 * g_inv[:, i, m] * (
                            self.g[:, m, k].roll(-1, dims=0) - self.g[:, m, k].roll(1, dims=0) +
                            self.g[:, m, j].roll(-1, dims=0) - self.g[:, m, j].roll(1, dims=0) -
                            self.g[:, j, k].roll(-1, dims=0) + self.g[:, j, k].roll(1, dims=0)
                        )
        
        return gamma
    
    def deform_phi(self, step_size: float, rg_time: float) -> torch.Tensor:
        """Apply torsional geodesic flow deformation.
        
        φ → φ + λ (∇_T φ + contraction terms)
        
        Args:
            step_size: Integration step
            rg_time: RG time parameter λ
            
        Returns:
            Deformed 3-form
        """
        gamma = self.compute_christoffel()
        
        deformation = torch.zeros_like(self.phi)
        
        for i in range(7):
            for j in range(7):
                for k in range(7):
                    for m in range(7):
                        deformation[:, i, j, k] += (
                            self.T[:, m, i, j] * self.phi[:, m, j, k] +
                            self.T[:, m, i, k] * self.phi[:, i, m, k] +
                            self.T[:, m, j, k] * self.phi[:, i, j, m]
                        )
        
        phi_new = self.phi + step_size * rg_time * deformation
        
        return phi_new
    
    def deform_neck_geometry(self, neck_coords: torch.Tensor, step_size: float) -> torch.Tensor:
        """Apply geodesic flow to neck region coordinates."""
        flow = torch.zeros_like(neck_coords)
        
        for i in range(7):
            flow[:, i] = (self.T[:, i, :, :] * neck_coords.unsqueeze(-1).unsqueeze(-1)).sum(dim=(-2, -1))
        
        return neck_coords + step_size * flow

def deform_phi_via_geodesic(phi: torch.Tensor, g: torch.Tensor, T: Optional[torch.Tensor],
                           step: float, lambda_rg: float) -> torch.Tensor:
    """Convenience function for geodesic deformation."""
    geodesic = TorsionalGeodesic(phi, g, T)
    return geodesic.deform_phi(step, lambda_rg)

print("Torsional geodesic deformation module ready")

## 16. Save Final Results

In [None]:
output_dir = Path('outputs_v1_0d')
output_dir.mkdir(exist_ok=True)

np.save(output_dir / 'harmonic_2forms.npy', h2_modes)
np.save(output_dir / 'harmonic_3forms.npy', h3_modes)
np.save(output_dir / 'yukawa_tensor.npy', Y_tensor)

phi_net.eval()
with torch.no_grad():
    test_coords = torch.rand(1000, 7, device=device)
    phi_samples = phi_net(test_coords).cpu().numpy()
    np.save(output_dir / 'phi_samples.npy', phi_samples)

metadata = {
    'version': '1.0d',
    'config': CONFIG,
    'b2_effective': h2_modes.shape[1],
    'b3_effective': h3_modes.shape[1],
    'yukawa_norm': float(np.linalg.norm(Y_tensor)),
    'final_loss': loss_history[-1]['total'] if loss_history else None
}

with open(output_dir / 'metadata.json', 'w') as f:
    json.dump(metadata, f, indent=2)

import shutil
archive_name = 'K7_G2_TCS_v1_0d_results'
shutil.make_archive(archive_name, 'zip', output_dir)

print(f"Results saved to {output_dir}")
print(f"Archive created: {archive_name}.zip")

from google.colab import files
files.download(f'{archive_name}.zip')
print(f"Download started: {archive_name}.zip")

print("Pipeline complete.")