<a href="https://colab.research.google.com/github/cedamusk/.py/blob/main/Untitled2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from torch.optim.lr_scheduler import ReduceLROnPlateau
from tqdm import tqdm
from typing import Tuple, Dict, Any

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


class ImprovedPINN(nn.Module):
    def __init__(self):
        super().__init__()
        # Define a deeper and wider network
        self.network = nn.Sequential(
            nn.Linear(1, 128),
            nn.Tanh(),
            nn.Linear(128, 128),
            nn.Tanh(),
            nn.Linear(128, 128),
            nn.Tanh(),
            nn.Linear(128, 64),
            nn.Tanh(),
            nn.Linear(64, 2)
        )

        # Apply Xavier initialization for all Linear layers
        for layer in self.network:
            if isinstance(layer, nn.Linear):
                nn.init.xavier_normal_(layer.weight)
                nn.init.zeros_(layer.bias)

    def forward(self, t: torch.Tensor) -> torch.Tensor:
        return self.network(t)

    def compute_derivatives(self, t: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        """
        Computes the output, its first derivative, and second derivative with respect to time t.
        """
        # Ensure gradients are computed with respect to t
        t.requires_grad_(True)
        xy = self.forward(t)

        # Compute first derivative d(xy)/dt
        dxy_dt = torch.autograd.grad(
            outputs=xy, inputs=t,
            grad_outputs=torch.ones_like(xy),
            create_graph=True,
            retain_graph=True
        )[0]

        # Compute second derivative d^2(xy)/dt^2
        d2xy_dt2 = torch.autograd.grad(
            outputs=dxy_dt, inputs=t,
            grad_outputs=torch.ones_like(dxy_dt),
            create_graph=True
        )[0]

        return xy, dxy_dt, d2xy_dt2


def generate_orbital_data(n_points: int = 1000, noise_level: float = 0.005) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Generates orbital data (with reduced noise) for a stable circular orbit.

    Returns:
        t: Time array.
        x: X-coordinate positions.
        y: Y-coordinate positions.
    """
    t = np.linspace(0, 10, n_points)
    r = 1.0          # orbital radius
    omega = 2 * np.pi / 5  # angular velocity

    x = r * np.cos(omega * t)
    y = r * np.sin(omega * t)

    # Add small noise to simulate measurement error
    x += noise_level * np.random.randn(n_points)
    y += noise_level * np.random.randn(n_points)

    return t, x, y


def physics_loss(model: ImprovedPINN, t: torch.Tensor, normalize: bool = True) -> torch.Tensor:
    """
    Computes the physics-informed loss by enforcing the orbital dynamics ODE.

    Args:
        model: The PINN model.
        t: Input tensor for time.
        normalize: Whether to normalize the physics residual.

    Returns:
        A scalar tensor representing the physics loss.
    """
    xy, dxy_dt, d2xy_dt2 = model.compute_derivatives(t)

    # Gravitational parameter for the two-body problem (assuming unit masses and G=1)
    k = 4 * np.pi ** 2

    # Compute the distance from the origin
    r = torch.sqrt(xy[:, 0] ** 2 + xy[:, 1] ** 2)

    # Compute the residuals of the second order ODE: d²xy/dt² + k * xy / r³ = 0
    residual_x = d2xy_dt2[:, 0] + k * xy[:, 0] / (r ** 3)
    residual_y = d2xy_dt2[:, 1] + k * xy[:, 1] / (r ** 3)

    if normalize:
        # Normalize the residuals by the average magnitude of the gravitational term
        scale = torch.mean(torch.abs(k * xy / (r ** 3).unsqueeze(1)))
        residual_x /= scale
        residual_y /= scale

    return torch.mean(residual_x ** 2 + residual_y ** 2)


def train_pinn(model: ImprovedPINN,
               t_data: np.ndarray,
               xy_data: np.ndarray,
               n_epochs: int = 10000) -> Dict[str, np.ndarray]:
    """
    Trains the PINN model using both data loss and physics-informed loss.

    Args:
        model: The PINN model.
        t_data: Numpy array of time data.
        xy_data: Numpy array of shape (n_points, 2) containing x and y positions.
        n_epochs: Number of training epochs.

    Returns:
        A dictionary containing the training losses.
    """
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=100, factor=0.5)

    # Convert data to torch tensors and move to device
    t_torch = torch.FloatTensor(t_data).view(-1, 1).to(device)
    xy_torch = torch.FloatTensor(xy_data).to(device)

    # Normalize the position data
    xy_scale = torch.max(torch.abs(xy_torch))
    xy_torch /= xy_scale

    # To track losses
    losses = {
        "total_loss": [],
        "data_loss": [],
        "physics_loss": []
    }
    best_loss = float('inf')
    best_state: Dict[str, Any] = {}

    # Training loop with progress bar
    for epoch in tqdm(range(n_epochs), desc="Training", unit="epoch"):
        optimizer.zero_grad()

        # Forward pass for data loss
        xy_pred = model(t_torch)
        data_loss = torch.mean((xy_pred - xy_torch) ** 2)

        # Compute the physics-based loss
        phys_loss = physics_loss(model, t_torch)

        # Gradually increase the weight of the physics loss
        physics_weight = 0.01 * (1 - np.exp(-epoch / 1000))
        total_loss = data_loss + physics_weight * phys_loss

        total_loss.backward()
        optimizer.step()
        scheduler.step(total_loss)

        # Record losses
        losses["total_loss"].append(total_loss.item())
        losses["data_loss"].append(data_loss.item())
        losses["physics_loss"].append(phys_loss.item())

        # Save the best model (lowest total loss)
        if total_loss.item() < best_loss:
            best_loss = total_loss.item()
            best_state = {k: v.cpu() for k, v in model.state_dict().items()}

        if (epoch + 1) % 500 == 0:
            tqdm.write(f'Epoch [{epoch + 1}/{n_epochs}] - Total Loss: {total_loss.item():.4f}, '
                       f'Data Loss: {data_loss.item():.4f}, Physics Loss: {phys_loss.item():.4f}')

    # Load the best model parameters
    model.load_state_dict(best_state)
    return {key: np.array(val) for key, val in losses.items()}


def plot_losses(losses: Dict[str, np.ndarray]) -> None:
    """
    Plots the training losses on a semilog scale.
    """
    plt.figure(figsize=(10, 4))
    plt.semilogy(losses["total_loss"], label='Total Loss')
    plt.semilogy(losses["data_loss"], label='Data Loss')
    plt.semilogy(losses["physics_loss"], label='Physics Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss (log scale)')
    plt.legend()
    plt.grid(True)
    plt.show()


def plot_results(t_data: np.ndarray, xy_data: np.ndarray, model: ImprovedPINN, xy_scale: float) -> None:
    """
    Plots the predicted orbit and time series comparisons.
    """
    model.eval()
    with torch.no_grad():
        t_torch = torch.FloatTensor(t_data).view(-1, 1).to(device)
        xy_pred = model(t_torch).cpu().numpy() * xy_scale  # Denormalize predictions

    # Plot orbit
    plt.figure(figsize=(12, 5))
    plt.subplot(121)
    plt.plot(xy_data[:, 0], xy_data[:, 1], 'b.', label='Data')
    plt.plot(xy_pred[:, 0], xy_pred[:, 1], 'r-', label='PINN Prediction')
    plt.plot(0, 0, 'y*', markersize=15, label='Central Body')
    plt.xlabel('X Position')
    plt.ylabel('Y Position')
    plt.legend()
    plt.axis('equal')
    plt.grid(True)

    # Plot x position over time
    plt.subplot(122)
    plt.plot(t_data, xy_data[:, 0], 'b.', label='Data X')
    plt.plot(t_data, xy_pred[:, 0], 'r-', label='PINN X')
    plt.xlabel('Time')
    plt.ylabel('X Position')
    plt.legend()
    plt.grid(True)
    plt.show()


def main() -> None:
    # Generate orbital data
    t_data, x_data, y_data = generate_orbital_data()
    xy_data = np.stack([x_data, y_data], axis=1)

    # Create the PINN model and train it
    model = ImprovedPINN()
    losses = train_pinn(model, t_data, xy_data)

    # Plot the loss curves
    plot_losses(losses)

    # For plotting predictions, we need the scaling factor used during training.
    # Since data was normalized by the max absolute value, we recalcualte that here.
    xy_scale = np.max(np.abs(xy_data))
    plot_results(t_data, xy_data, model, xy_scale)


if __name__ == "__main__":
    main()


Training:   0%|          | 0/10000 [00:00<?, ?epoch/s]


IndexError: index 1 is out of bounds for dimension 1 with size 1