In [None]:
# Cell 1: Load, Inspect & Plot Time Series

# 1) Imports
import os
import glob
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from typing import List


import os
os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python"


# 2) Define the loader & plotter function
def load_time_series(data_dir: str, feature: str) -> List[np.ndarray]:
    """
    Reads every CSV in `data_dir` whose filename starts with "3C", extracts column `feature`,
    plots each series, and returns a list of 1D numpy arrays (one per file), 
    while printing and plotting detailed stats.
    """
    # only CSVs whose basename starts with "3C"
    pattern = os.path.join(data_dir, "3C*.csv")
    paths = glob.glob(pattern)
    if not paths:
        raise FileNotFoundError(f"No CSV files starting with '3C' found in {data_dir!r}")
    
    series_list: List[np.ndarray] = []
    lengths = []
    
    for p in sorted(paths):
        fname = os.path.basename(p)
        print(f"\n– Processing file: {fname}")
        df = pd.read_csv(p)
        
        if feature not in df.columns:
            print(f"⚠️  Feature {feature!r} not found → skipping")
            continue
        
        arr = df[feature].dropna().to_numpy(dtype=np.float32)
        series_list.append(arr)
        lengths.append(len(arr))
        print(f"✔️  Loaded series length: {len(arr)}")
        
        # --- Plot this single time series ---
        plt.figure(figsize=(8, 3))
        plt.plot(arr, alpha=0.7)
        plt.title(f"{feature!r} from {fname}")
        plt.xlabel("Index")
        plt.ylabel(feature)
        plt.tight_layout()
        plt.show()
    
    if not series_list:
        raise ValueError(f"No series extracted for feature {feature!r}")
    
    # Summary statistics over all loaded series
    lengths = np.array(lengths)
    print("\n=== Summary of Loaded Series ===")
    print(f"Total series: {len(lengths)}")
    print(f"Min length:   {lengths.min()}")
    print(f"Max length:   {lengths.max()}")
    print(f"Mean length:  {lengths.mean():.1f}")
    print(f"Std  length:  {lengths.std():.1f}")
    
    # Plot histogram of lengths
    plt.figure(figsize=(8, 3))
    plt.hist(lengths, bins=20, edgecolor="black")
    plt.title("Distribution of Time Series Lengths")
    plt.xlabel("Series Length")
    plt.ylabel("Count")
    plt.tight_layout()
    plt.show()
    
    return series_list

# 3) Execute the loader on your data directory
if __name__ == "__main__":
    #DATA_DIR     = "/workspace/BatterySOH/PINN4SOH/data/XJTU data"
    DATA_DIR     = "Payam"
    FEATURE_NAME = "CC charge time"
    
    print(f"\n==> Loading & plotting all '{FEATURE_NAME}' series from '3C*.csv' in {DATA_DIR!r}")
    all_series = load_time_series(DATA_DIR, FEATURE_NAME)
    
    print(f"\n✅ Completed loading and plotting {len(all_series)} series.")


In [None]:
# Cell 2: Build & Inspect the Sliding‐Window Dataset (with sampling plot)

import torch
from torch.utils.data import Dataset
import matplotlib.pyplot as plt
from typing import List, Tuple
import numpy as np

class TimeSeriesWindowDataset(Dataset):
    """
    Each item is:
      - a window of length `window_size`
      - the start index of that window
      - the total length of its parent series
    """
    def __init__(self, series_list: List[np.ndarray], window_size: int):
        self.series_list = series_list
        self.window_size = window_size
        self.index_map: List[Tuple[int, int]] = []  # (series_idx, start_idx)
        
        print(f"\n==> Constructing TimeSeriesWindowDataset (window_size={window_size})")
        skipped = 0
        for si, series in enumerate(series_list):
            L = len(series)
            if L < window_size:
                print(f"⚠️  Series #{si} (len={L}) shorter than window_size → skipped")
                skipped += 1
                continue
            n_windows = L - window_size + 1
            for start in range(n_windows):
                self.index_map.append((si, start))
        total_windows = len(self.index_map)
        
        print(f"✔️  Mapping complete")
        print(f"Total series provided: {len(series_list)}")
        print(f"Series skipped (too short): {skipped}")
        print(f"Total windows generated: {total_windows}")
        
        # Plot distribution of start indices
        starts = [start for (_, start) in self.index_map]
        plt.figure(figsize=(8, 3))
        plt.hist(starts, bins=min(30, max(starts)+1), edgecolor="black")
        plt.title("Distribution of Window Start Indices")
        plt.xlabel("Start Index")
        plt.ylabel("Count")
        plt.tight_layout()
        plt.show()

    def __len__(self):
        return len(self.index_map)

    def __getitem__(self, idx: int):
        si, start = self.index_map[idx]
        series = self.series_list[si]
        window = series[start : start + self.window_size]
        item = {
            "window":     torch.from_numpy(window),                # shape (window_size,)
            "start_idx":  torch.tensor(start, dtype=torch.long),  
            "series_len": torch.tensor(len(series), dtype=torch.long)
        }
        # Detailed print for this sample
        #print(f"→ Sample idx={idx}: series #{si}, start={start}, "
        #      f"series_len={len(series)}, window_shape={item['window'].shape}")
        return item

# -------------------------------
# Example instantiation, smoke-test & plotting
# -------------------------------
if __name__ == "__main__":
    WINDOW_SIZE = 10

    print(f"\n==> Instantiating dataset with window_size={WINDOW_SIZE}")
    ds = TimeSeriesWindowDataset(all_series, window_size=WINDOW_SIZE)

    print(f"\nDataset length (total windows): {len(ds)}\n")

    # Choose a few sample indices
    sample_idxs = [0, len(ds)//2, len(ds)-1]

    # Print details and collect windows
    windows = []
    for test_idx in sample_idxs:
        print(f"\n--- Fetching sample at flattened index {test_idx} ---")
        item = ds[test_idx]
        windows.append((test_idx, item["window"].numpy()))

    # Plot those sample windows
    plt.figure(figsize=(10, 4))
    for idx, window in windows:
        plt.plot(window, label=f"window @{idx}")
    plt.title("Sample Windows")
    plt.xlabel("Time Step")
    plt.ylabel("Value")
    plt.legend()
    plt.tight_layout()
    plt.show()


In [None]:
# Cell 3: Cosine Schedule + Sinusoidal Embeddings + Improved 1D UNet

import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt

# 1) Cosine schedule
def cosine_beta_schedule(T: int, s: float = 0.008):
    """
    Returns torch tensors betas, alphas, alpha_bars of length T
    following the cosine schedule from Nichol & Dhariwal (2021).
    """
    steps = T + 1
    t = torch.linspace(0, T, steps)
    f = torch.cos(((t / T) + s) / (1 + s) * math.pi / 2) ** 2
    alpha_bars = f / f[0]
    # β_t = 1 - ᾱ_t / ᾱ_{t-1}
    betas = 1 - alpha_bars[1:] / alpha_bars[:-1]
    betas = torch.clamp(betas, max=0.999)
    alphas = 1 - betas
    alpha_bars = torch.cumprod(alphas, dim=0)

    # Quick plot
    plt.figure(figsize=(6,3))
    plt.plot(betas.cpu(), label="βₜ")
    plt.plot(alpha_bars.cpu(), label="ᾱₜ")
    plt.title("Cosine β-Schedule")
    plt.xlabel("t")
    plt.legend()
    plt.tight_layout()
    plt.show()

    return betas, alphas, alpha_bars

# 2) Sinusoidal embedding (from Transformer)
def get_sinusoidal_embedding(x: torch.Tensor, dim: int):
    """
    x: LongTensor of shape (B,) or (B,1)
    returns: FloatTensor (B, dim)
    """
    if x.dim()==2: x = x.squeeze(-1)
    device = x.device
    half = dim // 2
    freqs = torch.exp(-math.log(10000) * torch.arange(0, half, device=device) / half)
    args = x.float().unsqueeze(-1) * freqs.unsqueeze(0)  # (B, half)
    emb = torch.cat([torch.sin(args), torch.cos(args)], dim=-1)
    if dim % 2 == 1:  # pad if odd
        emb = F.pad(emb, (0,1))
    return emb  # (B, dim)

# 3) Residual Block with FiLM on time+pos embeddings
class ResBlock1D(nn.Module):
    def __init__(self, c: int, emb_dim: int):
        super().__init__()
        self.conv1 = nn.Conv1d(c, c, 3, padding=1)
        self.gn1   = nn.GroupNorm(8, c)
        self.conv2 = nn.Conv1d(c, c, 3, padding=1)
        self.gn2   = nn.GroupNorm(8, c)
        self.film  = nn.Linear(emb_dim, c)  # produce an additive bias

    def forward(self, x: torch.Tensor, emb: torch.Tensor):
        """
        x: (B, C, W)
        emb: (B, emb_dim)
        """
        h = self.conv1(x)
        h = self.gn1(h)
        # FiLM bias
        bias = self.film(emb).unsqueeze(-1)  # (B, C, 1)
        h = h + bias
        h = F.silu(h)

        h = self.conv2(h)
        h = self.gn2(h)
        return x + h  # residual

# 4) Improved U-Net model
class ImprovedDiffusionUNet1D(nn.Module):
    def __init__(self, window_size: int,
                 time_emb_dim: int = 128,
                 base_channels: int = 64,
                 n_res_blocks: int = 4):
        super().__init__()
        self.W = window_size
        self.base_c = base_channels
        self.time_emb_dim = time_emb_dim

        # input conv
        self.init_conv = nn.Conv1d(1, base_channels, 3, padding=1)

        # MLP to process sinusoidal time embeddings
        self.time_mlp = nn.Sequential(
            nn.Linear(time_emb_dim, time_emb_dim*4),
            nn.SiLU(),
            nn.Linear(time_emb_dim*4, time_emb_dim),
        )

        # MLP to fuse start_idx & series_len embeddings
        # we will concat their sin'embs → 2*time_emb_dim
        self.cond_mlp = nn.Sequential(
            nn.Linear(time_emb_dim*2, time_emb_dim),
            nn.SiLU(),
            nn.Linear(time_emb_dim, time_emb_dim),
        )

        # stack of residual blocks
        self.res_blocks = nn.ModuleList([
            ResBlock1D(base_channels, time_emb_dim) for _ in range(n_res_blocks)
        ])

        # output conv
        self.out_conv = nn.Conv1d(base_channels, 1, 3, padding=1)

    def forward(self,
                x: torch.Tensor,
                t: torch.Tensor,
                start_idx: torch.Tensor,
                series_len: torch.Tensor):
        """
        x: (B, W)
        t: LongTensor (B,)
        start_idx, series_len: LongTensor (B,)
        """
        B = x.shape[0]
        # Prepare input
        h = x.unsqueeze(1)  # (B,1,W)
        h = self.init_conv(h)  # → (B, C, W)

        # time embedding
        t_emb = get_sinusoidal_embedding(t, self.time_emb_dim)
        t_emb = self.time_mlp(t_emb)  # (B, time_emb_dim)

        # cond embedding
        se = get_sinusoidal_embedding(start_idx, self.time_emb_dim)
        le = get_sinusoidal_embedding(series_len, self.time_emb_dim)
        cond = torch.cat([se, le], dim=-1)  # (B, 2*time_emb_dim)
        cond_emb = self.cond_mlp(cond)      # (B, time_emb_dim)

        # combined
        emb = t_emb + cond_emb              # (B, time_emb_dim)

        # pass through residual blocks
        for block in self.res_blocks:
            h = block(h, emb)

        out = self.out_conv(F.silu(h))      # (B,1,W)
        return out.squeeze(1)               # (B,W)

print("✅ ImprovedDiffusionUNet1D initialized")


In [None]:
# Cell 4: Training Loop with the Improved Model

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt

# Hyperparameters
T          = 1000             # number of diffusion timesteps
EPOCHS     = 500
BATCH_SIZE = 64
LR         = 1e-3
DEVICE     = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(f"Training on: {DEVICE}")

# 1) Build cosine schedule and move to DEVICE
betas_tensor, alphas_tensor, alpha_bars_tensor = cosine_beta_schedule(T)
betas_tensor      = betas_tensor.to(DEVICE)
alphas_tensor     = alphas_tensor.to(DEVICE)
alpha_bars_tensor = alpha_bars_tensor.to(DEVICE)

# 2) Dataset & DataLoader
dataset = TimeSeriesWindowDataset(all_series, window_size=WINDOW_SIZE)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True)

# 3) Model, optimizer, loss
model = ImprovedDiffusionUNet1D(window_size=WINDOW_SIZE).to(DEVICE)
optimizer = optim.Adam(model.parameters(), lr=LR)
criterion = nn.MSELoss()

loss_hist = []

for epoch in range(1, EPOCHS + 1):
    model.train()
    running_loss = 0.0

    for batch in dataloader:
        x0    = batch["window"].to(DEVICE).float()       # (B, window_size)
        start = batch["start_idx"].to(DEVICE)            # (B,)
        slen  = batch["series_len"].to(DEVICE)           # (B,)

        # Sample random timesteps
        t = torch.randint(0, T, (x0.size(0),), device=DEVICE)

        # Generate noise
        eps = torch.randn_like(x0)

        # Forward diffusion q(x_t | x_0)
        sqrt_ab   = torch.sqrt(alpha_bars_tensor[t]).unsqueeze(-1)
        sqrt_1_ab = torch.sqrt(1 - alpha_bars_tensor[t]).unsqueeze(-1)
        xt = sqrt_ab * x0 + sqrt_1_ab * eps

        # Predict noise
        optimizer.zero_grad()
        eps_pred = model(xt, t, start, slen)
        loss = criterion(eps_pred, eps)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    avg_loss = running_loss / len(dataloader)
    loss_hist.append(avg_loss)
    print(f"Epoch {epoch}/{EPOCHS} — Avg Loss: {avg_loss:.4f}")

    # Qualitative check every 5 epochs
    if epoch % 5 == 0:
        model.eval()
        with torch.no_grad():
            x0s   = dataset[0]["window"].unsqueeze(0).to(DEVICE).float()
            st0   = dataset[0]["start_idx"].unsqueeze(0).to(DEVICE)
            sl0   = dataset[0]["series_len"].unsqueeze(0).to(DEVICE)
            t_hi  = torch.tensor([T - 1], device=DEVICE)
            eps0  = torch.randn_like(x0s)
            xt0   = (torch.sqrt(alpha_bars_tensor[t_hi]).unsqueeze(-1) * x0s
                    + torch.sqrt(1 - alpha_bars_tensor[t_hi]).unsqueeze(-1) * eps0)
            epred = model(xt0, t_hi, st0, sl0)

            # One reverse step
            ab  = alpha_bars_tensor[t_hi].unsqueeze(-1)
            a   = alphas_tensor[t_hi].unsqueeze(-1)
            b   = betas_tensor[t_hi].unsqueeze(-1)
            pred_x0 = (xt0 - torch.sqrt(1 - ab) * epred) / torch.sqrt(a)
            reco = pred_x0.squeeze(0).cpu().numpy()
            clean= x0s.squeeze(0).cpu().numpy()

        plt.figure(figsize=(5, 3))
        plt.plot(clean, label="x₀")
        plt.plot(reco, label="one‐step reverse", alpha=0.8)
        plt.title(f"Reverse Check @ Epoch {epoch}")
        plt.legend()
        plt.tight_layout()
        plt.show()

# 4) Plot training loss curve
plt.figure(figsize=(6, 3))
plt.plot(loss_hist, marker="o")
plt.title("Training Loss over Epochs")
plt.xlabel("Epoch")
plt.ylabel("MSE Loss")
plt.tight_layout()
plt.show()


In [None]:
# Cell 5: Sampling windows across a full-length series (conditional on start_idx & series_len)

import torch
import numpy as np
import matplotlib.pyplot as plt
from typing import List

def sample_windows_conditional(
    model,
    T: int,
    betas: torch.Tensor,
    alphas: torch.Tensor,
    alpha_bars: torch.Tensor,
    window_size: int,
    start_idxs: List[int],
    series_len: int,
    device: torch.device
) -> torch.Tensor:
    """
    Reverse-diffuse one window per start index, conditioning on start_idx & series_len.
    start_idxs: list of ints of length num_windows
    Returns: Tensor of shape (num_windows, window_size) on CPU.
    """
    model.eval()
    num_samples = len(start_idxs)
    with torch.no_grad():
        x_t = torch.randn(num_samples, window_size, device=device)
        start_batch      = torch.tensor(start_idxs, dtype=torch.long, device=device)
        series_len_batch = torch.full((num_samples,), series_len, dtype=torch.long, device=device)
        for t in reversed(range(T)):
            t_batch = torch.full((num_samples,), t, dtype=torch.long, device=device)
            eps_pred = model(x_t, t_batch, start_batch, series_len_batch)
            a_t    = alphas[t].unsqueeze(-1)
            ab_t   = alpha_bars[t].unsqueeze(-1)
            b_t    = betas[t].unsqueeze(-1)
            x0_pred = (x_t - torch.sqrt(1 - ab_t) * eps_pred) / torch.sqrt(a_t)
            if t > 0:
                noise = torch.randn_like(x_t)
                x_t = torch.sqrt(a_t) * x0_pred + torch.sqrt(b_t) * noise
            else:
                x_t = x0_pred
        return x_t.cpu()

# Example usage for a series of length 300:
series_len  = 300
window_size = WINDOW_SIZE  # e.g. 10
num_windows = series_len - window_size   # 290
start_idxs  = list(range(num_windows))

samples = sample_windows_conditional(
    model, T,
    betas_tensor, alphas_tensor, alpha_bars_tensor,
    window_size, start_idxs, series_len, DEVICE
)  # shape: (290, 10)

# plot aligned windows
plt.figure(figsize=(12,6))
for i, w in enumerate(samples.numpy()):
    xs = np.arange(i, i+window_size)
    plt.plot(xs, w, alpha=0.1)
plt.xlim(0, series_len)
plt.title(f"{num_windows} sampled windows (L={series_len}, W={window_size})")
plt.xlabel("Time Index")
plt.ylabel("Value")
plt.tight_layout()
plt.show()


In [None]:
# Cell 6: Seeded Sampling & Reconstruction Using First 10 Real Points

import numpy as np
import matplotlib.pyplot as plt
from typing import List

# --- Helper: reconstruct from overlapping windows ---
def reconstruct_series(samples: np.ndarray, window_size: int, series_len: int) -> np.ndarray:
    """
    Given `samples` with shape (num_windows, window_size),
    reconstruct the full series of length `series_len` by averaging overlaps.
    """
    recon = np.zeros(series_len, dtype=float)
    counts = np.zeros(series_len, dtype=int)
    for start, window in enumerate(samples):
        recon[start:start+window_size] += window
        counts[start:start+window_size] += 1
    counts[counts == 0] = 1
    return recon / counts

# --- Main loop over all series ---
for idx, real_series in enumerate(all_series):
    L = len(real_series)
    W = WINDOW_SIZE
    N = L - W
    if N <= 0:
        print(f"Series #{idx} (len={L}) too short—skipping.")
        continue

    print(f"\nSeries #{idx} (len={L}): sampling {N} windows, seeding first window with real data")

    # 1) Prepare start indices for every window
    start_idxs: List[int] = list(range(N))

    # 2) Sample windows via reverse diffusion
    sampled = sample_windows_conditional(
        model, T,
        betas_tensor, alphas_tensor, alpha_bars_tensor,
        window_size=W,
        start_idxs=start_idxs,
        series_len=L,
        device=DEVICE
    ).numpy()  # shape: (N, W)

    # 3) Overwrite first window (start=0) with ground-truth data
    sampled[0] = real_series[:W]

    # 4) Reconstruct the series by averaging overlapping windows
    reconstructed = reconstruct_series(sampled, W, L)

    # 5) Plot real vs. reconstructed
    plt.figure(figsize=(10, 4))
    plt.plot(real_series, label="Real Series", linewidth=2)
    plt.plot(reconstructed, "--", label="Reconstructed (seeded)", linewidth=2)
    plt.title(f"Series #{idx}: Real vs. Reconstructed (Seeded)")
    plt.xlabel("Time Index")
    plt.ylabel("Value")
    plt.legend()
    plt.tight_layout()
    plt.show()


In [None]:
# Cell 7: Final Test – Plot All Real Series and Sampled Windows with Uniformly Sampled Lengths

import random
import numpy as np
import matplotlib.pyplot as plt
from typing import List

# 1) Determine min/max lengths across real series
lengths = [len(s) for s in all_series]
min_len = max(min(lengths), WINDOW_SIZE)  # ensure at least WINDOW_SIZE
max_len = max(lengths)
print(f"Real series lengths range from {min(lengths)} to {max_len}")
print(f"Sampling synthetic series lengths uniformly in [{min_len}, {max_len}]")

# 2) Prepare the figure
plt.figure(figsize=(14, 7))

# 3) Plot all real series in light gray
for real in all_series:
    xs = np.arange(len(real))
    plt.plot(xs, real, color='gray', alpha=0.5, linewidth=1)

# 4) For each real series, sample a synthetic length and overlay sampled windows
for idx, _ in enumerate(all_series):
    # a) Draw a synthetic series length uniformly
    L_synth = random.randint(min_len, max_len)

    # b) Define start indices at steps of 30
    start_idxs: List[int] = list(range(0, L_synth - WINDOW_SIZE + 1, 30))
    if not start_idxs:
        continue

    # c) Sample windows via reverse diffusion conditioned on start & length
    windows = sample_windows_conditional(
        model, T,
        betas_tensor, alphas_tensor, alpha_bars_tensor,
        WINDOW_SIZE, start_idxs, L_synth, DEVICE
    ).numpy()  # shape (num_windows, WINDOW_SIZE)

    # d) Plot each sampled window as a red dashed segment
    for start, w in zip(start_idxs, windows):
        xs = np.arange(start, start + WINDOW_SIZE)
        plt.plot(xs, w, linestyle='--', color='red', alpha=0.7, linewidth=1)

# 5) Finalize plot
plt.xlim(0, max_len)
plt.title("Real Series (gray) and Sampled Windows on Synthetic Lengths (red dashed)")
plt.xlabel("Time Index")
plt.ylabel("Value")
plt.tight_layout()
plt.show()
