In [2]:
# ------------------------------------------------------------------------------------
# Single Jupyter Cell: Chunked CSV Loading + PINN for Kinematic Wave (Corrected)
# ------------------------------------------------------------------------------------

import sys
import os
import platform
import subprocess
import logging
import time

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd

################################################################################
# 1) LOGGING & ENVIRONMENT CHECK
################################################################################
logger = logging.getLogger("ChunkedKinematicPINN")
logger.setLevel(logging.INFO)

# Avoid adding multiple handlers if cell is re-run
if not logger.handlers:
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(logging.INFO)
    formatter = logging.Formatter("[%(levelname)s] %(asctime)s - %(message)s")
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)

logger.info("===== START: SINGLE-CELL CHUNKED KINEMATIC WAVE PINN =====")
logger.info(f"Python version : {sys.version}")
logger.info(f"Platform       : {platform.platform()}")
logger.info(f"PyTorch version: {torch.__version__}")
logger.info(f"CUDA available : {torch.cuda.is_available()}")

if torch.cuda.is_available():
    logger.info(f"CUDA device(s) : {torch.cuda.device_count()} GPU(s).")
    logger.info(f"GPU 0 name     : {torch.cuda.get_device_name(0)}")

# (Optional) attempt nvidia-smi
try:
    result = subprocess.run(["nvidia-smi"], capture_output=True, text=True)
    if result.returncode == 0:
        logger.info("nvidia-smi output:\n" + result.stdout)
    else:
        logger.warning("Could not run nvidia-smi (non-zero return).")
except FileNotFoundError:
    logger.warning("nvidia-smi not found. Skipping GPU memory check.")

logger.info("===== ENV CHECK DONE =====\n")

################################################################################
# 2) CHUNK-BASED CSV LOADING WITH SAMPLING
################################################################################

def load_csv_in_chunks(path, chunk_size=500_000, fraction=1.0, max_rows=None):
    """
    Reads a CSV in chunks to avoid loading huge files into memory at once.
    Optionally samples a fraction of rows from each chunk, 
    and stops after 'max_rows' if provided.

    Args:
      path (str): Path to CSV file.
      chunk_size (int): # of rows per chunk read.
      fraction (float): fraction of rows to keep (random sample) from each chunk.
      max_rows (int): if not None, stop after reading this many rows in total.

    Returns:
      pd.DataFrame: concatenated result of the chunked read.
    """
    logger.info(f"Loading CSV in chunks from: {path}")
    start_time = time.time()
    dfs = []
    total_rows = 0

    for chunk in pd.read_csv(path, chunksize=chunk_size):
        # random sample from this chunk if fraction < 1.0
        if fraction < 1.0:
            chunk = chunk.sample(frac=fraction)

        dfs.append(chunk)
        total_rows += len(chunk)

        if max_rows is not None and total_rows >= max_rows:
            logger.info(f"Reached max_rows={max_rows}; stopping chunk read.")
            break

    df = pd.concat(dfs, ignore_index=True)
    elapsed = time.time() - start_time
    logger.info(f"Loaded {len(df)} rows from {path} in {elapsed:.2f} s.")
    return df


def load_csv_as_dataset(paths, fraction=1.0, chunk_size=500_000, max_rows=None):
    """
    Loads multiple CSV files via chunk-based reading. Combines them into one DataFrame,
    then creates (x, t, rainrate) as input and (rg_qms) as output. 
    Returns torch tensors (in_tensor, out_tensor).

    fraction (float): fraction of data rows to keep. For huge files, try <1.0
    chunk_size (int): # of rows per chunk in read_csv
    max_rows (int): optional total row limit per file (stops reading after this many).
    """
    dfs = []
    for p in paths:
        df_chunk = load_csv_in_chunks(
            path=p,
            chunk_size=chunk_size,
            fraction=fraction,
            max_rows=max_rows
        )
        dfs.append(df_chunk)

    data = pd.concat(dfs, ignore_index=True)
    logger.info(f"Combined total rows: {len(data)}")

    # Convert "time" to numeric scale
    # Here we do a naive approach: convert to seconds, then offset by min, etc.
    data["t_numeric"] = pd.to_datetime(data["time"]).astype(int) / 1e9
    t0 = data["t_numeric"].min()
    data["t"] = (data["t_numeric"] - t0) / (60.0 * 60.0 * 24.0)  # days

    # Use "longitude" as x for demonstration
    data["x"] = data["longitude"]

    # Create input arrays
    inputs = np.stack([
        data["x"].values.astype(np.float32),
        data["t"].values.astype(np.float32),
        data["rainrate"].values.astype(np.float32)
    ], axis=1)

    # The target is flow "rg_qms"
    Q = data["rg_qms"].values.astype(np.float32).reshape(-1, 1)

    # Convert to torch
    in_tensor = torch.from_numpy(inputs)
    out_tensor = torch.from_numpy(Q)
    return in_tensor, out_tensor


################################################################################
# 3) PINN DEFINITION & PDE RESIDUAL (KINEMATIC WAVE)
################################################################################

class PINN(nn.Module):
    def __init__(self, input_dim=3, hidden_dim=64, output_dim=1, num_layers=4):
        """
        input_dim: (x, t, rainrate)
        output_dim: Q (flow)
        hidden_dim, num_layers: network size
        """
        super(PINN, self).__init__()
        layers = []
        in_features = input_dim
        for _ in range(num_layers):
            layers.append(nn.Linear(in_features, hidden_dim))
            layers.append(nn.Tanh())  # or ReLU, etc.
            in_features = hidden_dim
        layers.append(nn.Linear(hidden_dim, output_dim))
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        # x shape: [batch, 3]
        return self.net(x)


def pde_residual(model, x, t, rain, v=1.0):
    """
    Kinematic wave PDE: dQ/dt + v * dQ/dx = rain
    => residual = dQ/dt + v*dQ/dx - rain

    Returns a tensor of shape [batch_size, 1].
    """
    inp = torch.stack([x, t, rain], dim=1)
    inp.requires_grad_()

    Q = model(inp)  # [batch_size, 1]

    # partial derivatives wrt x, t, (and ignoring rain derivative)
    grads = torch.autograd.grad(
        outputs=Q,
        inputs=inp,
        grad_outputs=torch.ones_like(Q),
        retain_graph=True,
        create_graph=True
    )[0]  # shape: [batch_size, 3]

    Q_x = grads[:, 0].unsqueeze(-1)  # derivative wrt x
    Q_t = grads[:, 1].unsqueeze(-1)  # derivative wrt t

    # PDE residual
    return Q_t + v * Q_x - rain.unsqueeze(-1)


################################################################################
# 4) TRAIN FUNCTION (CORRECTED 'collocation_points' REFERENCE)
################################################################################
def train_pinn(
    train_data_paths,
    test_data_path=None,
    fraction=1.0,
    chunk_size=500_000,
    max_rows=None,
    wave_speed=1.0,
    num_epochs=500,
    lr=1e-3,
    pde_weight=1.0,
    batch_size=512,
    collocation_points=1000,  # corrected
    device="cuda"
):
    """
    1) Load training data from CSVs with chunking & optional row sampling.
    2) Create PDE collocation points in domain ranges.
    3) Train the PINN with data + PDE losses.
    4) (Optionally) test on separate data set.

    Args:
      fraction (float): fraction of data rows to keep. For huge files, try <1.0
      chunk_size (int): # of rows per chunk in read_csv
      max_rows (int): optional total row limit per file (stops reading after this many)
      wave_speed (float): constant v in PDE
      num_epochs (int): training epochs
      lr (float): learning rate
      pde_weight (float): weighting factor for PDE residual vs data loss
      batch_size (int): mini-batch size
      collocation_points (int): how many PDE points to sample
      device (str): "cuda" or "cpu"
    """
    logger.info("=== START TRAINING PINN ===")
    logger.info(f"Loading training data with fraction={fraction}, chunk_size={chunk_size}, max_rows={max_rows}")
    in_tensor, out_tensor = load_csv_as_dataset(train_data_paths, fraction, chunk_size, max_rows)
    n_data = in_tensor.shape[0]

    # Setup device
    dev = torch.device(device if torch.cuda.is_available() else "cpu")
    logger.info(f"Using device: {dev}")
    in_tensor = in_tensor.to(dev)
    out_tensor = out_tensor.to(dev)

    dataset = torch.utils.data.TensorDataset(in_tensor, out_tensor)
    data_loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # Collocation
    x_min, x_max = float(in_tensor[:,0].min()), float(in_tensor[:,0].max())
    t_min, t_max = float(in_tensor[:,1].min()), float(in_tensor[:,1].max())
    r_min, r_max = float(in_tensor[:,2].min()), float(in_tensor[:,2].max())
    logger.info(f"Spatial/time domain: x in [{x_min:.2f},{x_max:.2f}], t in [{t_min:.2f},{t_max:.2f}], rain in [{r_min:.2f},{r_max:.2f}]")

    # Generate random PDE collocation points within domain
    x_col = np.random.uniform(x_min, x_max, collocation_points).astype(np.float32)
    t_col = np.random.uniform(t_min, t_max, collocation_points).astype(np.float32)
    r_col = np.random.uniform(r_min, r_max, collocation_points).astype(np.float32)
    col_array = np.stack([x_col, t_col, r_col], axis=1)
    colloc_tensor = torch.from_numpy(col_array).to(dev)

    # Model
    model = PINN(input_dim=3, hidden_dim=64, output_dim=1, num_layers=4).to(dev)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.MSELoss()

    # Anomaly detection
    torch.autograd.set_detect_anomaly(True)

    logger.info(f"num_epochs={num_epochs}, lr={lr}, pde_weight={pde_weight}, batch_size={batch_size}, collocation_points={collocation_points}")
    logger.info(f"wave_speed={wave_speed}")

    start_time = time.time()

    # Training loop
    for epoch in range(num_epochs):
        model.train()
        epoch_data_loss = 0.0
        epoch_pde_loss = 0.0

        for (batch_in, batch_out) in data_loader:
            optimizer.zero_grad()

            # 1) Data loss
            Q_pred = model(batch_in)
            data_l = loss_fn(Q_pred, batch_out)

            # 2) PDE loss
            # sample sub-batch from collocation points
            idx = torch.randint(0, collocation_points, (batch_in.size(0),))
            c_batch = colloc_tensor[idx, :]  # shape [batch_size, 3]
            x_c = c_batch[:,0]
            t_c = c_batch[:,1]
            r_c = c_batch[:,2]

            res = pde_residual(model, x_c, t_c, r_c, v=wave_speed)
            pde_l = loss_fn(res, torch.zeros_like(res))

            # total
            total_loss = data_l + pde_weight * pde_l
            total_loss.backward()
            optimizer.step()

            epoch_data_loss += data_l.item() * batch_in.size(0)
            epoch_pde_loss  += pde_l.item()  * batch_in.size(0)

        epoch_data_loss /= n_data
        epoch_pde_loss  /= n_data

        # log every 50 epochs
        if (epoch+1) % 50 == 0:
            logger.info(f"Epoch [{epoch+1}/{num_epochs}] Data Loss: {epoch_data_loss:.4e}, PDE Loss: {epoch_pde_loss:.4e}")

    duration = (time.time() - start_time)/60.0
    logger.info(f"Training completed in {duration:.2f} minutes.")

    # (Optional) test
    if test_data_path is not None:
        logger.info("Loading test data...")
        test_in, test_out = load_csv_as_dataset([test_data_path], fraction=1.0, chunk_size=500_000, max_rows=None)
        test_in = test_in.to(dev)
        test_out = test_out.to(dev)
        model.eval()
        with torch.no_grad():
            Q_test = model(test_in)
            test_mse = loss_fn(Q_test, test_out).item()
        logger.info(f"Test MSE: {test_mse:.4e}")

    logger.info("=== END TRAINING PINN ===")
    return model


################################################################################
# 5) EXAMPLE USAGE
################################################################################

if __name__ == "__main__":
    # Just an example set of paths (edit these to your actual CSVs)
    train_csv_paths = [
        "/media/data-ssd/PINN/DATA/model data after proccessing/first expirence train on 2011 2012 2013 2014 and test on 2015/lag_model/Northern Dead Sea_2011_merged.csv",
        "/media/data-ssd/PINN/DATA/model data after proccessing/first expirence train on 2011 2012 2013 2014 and test on 2015/lag_model/Northern Dead Sea_2012_merged.csv",
        "/media/data-ssd/PINN/DATA/model data after proccessing/first expirence train on 2011 2012 2013 2014 and test on 2015/lag_model/Northern Dead Sea_2013_merged.csv",
        "/media/data-ssd/PINN/DATA/model data after proccessing/first expirence train on 2011 2012 2013 2014 and test on 2015/lag_model/Northern Dead Sea_2014_merged.csv",
    ]
    test_csv_path = "/media/data-ssd/PINN/DATA/model data after proccessing/first expirence train on 2011 2012 2013 2014 and test on 2015/lag_model/Northern Dead Sea_2015_merged.csv"

    # Example: read only 10% of rows from each chunk, up to 500k rows per chunk,
    # and stop after 2M total rows per file:
    fraction_to_keep = 0.1
    chunk_size = 500_000
    max_rows_per_file = 2_000_000

    model = train_pinn(
        train_data_paths=train_csv_paths,
        test_data_path=test_csv_path,
        fraction=fraction_to_keep,
        chunk_size=chunk_size,
        max_rows=max_rows_per_file,
        wave_speed=1.0,
        num_epochs=300,   # fewer epochs for memory test
        lr=1e-3,
        pde_weight=1.0,
        batch_size=512,
        collocation_points=1000,  # note corrected usage here
        device="cuda"
    )

    # Save model
    torch.save(model.state_dict(), "kinematic_wave_pinn_chunked.pth")
    logger.info("Model saved to kinematic_wave_pinn_chunked.pth.")

logger.info("===== SINGLE-CELL SCRIPT DEFINITION COMPLETE =====")


[INFO] 2025-01-20 14:02:35,404 - ===== START: SINGLE-CELL CHUNKED KINEMATIC WAVE PINN =====
[INFO] 2025-01-20 14:02:35,405 - Python version : 3.11.9 | packaged by conda-forge | (main, Apr 19 2024, 18:36:13) [GCC 12.3.0]
[INFO] 2025-01-20 14:02:35,405 - Platform       : Linux-6.8.0-51-generic-x86_64-with-glibc2.35
[INFO] 2025-01-20 14:02:35,406 - PyTorch version: 2.2.2
[INFO] 2025-01-20 14:02:35,407 - CUDA available : True
[INFO] 2025-01-20 14:02:35,407 - CUDA device(s) : 1 GPU(s).
[INFO] 2025-01-20 14:02:35,408 - GPU 0 name     : NVIDIA RTX A6000
[INFO] 2025-01-20 14:02:35,741 - nvidia-smi output:
Mon Jan 20 14:02:35 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 560.35.03              Driver Version: 560.35.03      CUDA Version: 12.6     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. E

OutOfMemoryError: CUDA out of memory. Tried to allocate 45.34 GiB. GPU 0 has a total capacity of 47.42 GiB of which 43.70 GiB is free. Including non-PyTorch memory, this process has 3.43 GiB memory in use. Of the allocated memory 3.09 GiB is allocated by PyTorch, and 23.30 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [None]:
##sum: good run - 17 hours for 2M rows in each year. I got a model and not evaluate it on 2015. meaning it didnt save the shp file.