In [None]:
!pip install --upgrade "protobuf<5.0.0"

import google.protobuf
print("protobuf version:", google.protobuf.__version__)

In [1]:
#### Newton's law of cooling (PINN) 

import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

# Initial and boundary conditions 
T_amb = 27.0 # Ambient Temperature (¬∞C)
T_o = 250.0  # Initial temperature at t = 0 (¬∞C)
k = 0.45     # Cooling rate constant in the ODE dT/dt = k(T -T_amb)


# NN configuration 
# Define the neural network model (w/ model parameters)
def create_model():
    model = {
        'hidden_1': tf.keras.layers.Dense(50, activation='tanh'),
        'hidden_2': tf.keras.layers.Dense(50, activation='tanh'),
        'hidden_3': tf.keras.layers.Dense(50, activation='tanh'),
        'output_layer': tf.keras.layers.Dense(1, dtype='float32')
    }
    return model

# function to call NN model 
def call_model(model, t):
    # Forward pass through the network: t -> hidden_1 -> hidden_... -> output_layer.
    t = model['hidden_1'](t)
    t = model['hidden_2'](t)
    t = model['hidden_3'](t)
    t = model['output_layer'](t)
    
    return t

    
# PDE residual: the differential equation
def pde(t, model):
    """
    Compute the residual of the ODE:    dT/dt - k (T_amb - T) = 0

    for the network prediction T_pred(t), the residual is:
        f(t) = dT_pred/dt - k (T_amb - T_pred).
    """
    
    with tf.GradientTape(persistent=True) as tape:
        tape.watch(t)
        T_pred = call_model(model, t)    # Network approximation TÃÇ(t)
        T_t = tape.gradient(T_pred, t)   # dT_hat/dt
    
    del tape
    
    return T_t - k * (T_amb -T_pred)



# Loss function 
def loss(model, t, t_ic, T_ic):
    """
    Total loss = PDE residual loss (interior points)
               + initial condition loss (t = 0).

    t     : interior time points
    t_ic  : time at the initial condition (here, [[0.0]])
    T_ic  : initial temperature T(0) = T_o
    """
    
    # Residual of the OED at interior points
    f = pde(t, model)
    
    # Mean squared error of the PDE residual
    loss_pde = tf.reduce_mean(tf.square(f))
    
    # Netwrok prediction at the initial time
    T_ic_pred = call_model(model, t_ic)
    
    # Mean squared error on the initial condition, T(0) = T_0
    loss_ic = tf.reduce_mean(tf.square(T_ic - T_ic_pred))
    
    # Total loss 
    return loss_pde + loss_ic  # Totoal loss

    # # Total loss with different weights on each loss term
    # return 4*loss_pde + 2*loss_ic  



# Training step

# tf.function compiles this Python function into a TensorFlow graph,
# which usually runs faster than eager execution for repeated calls.
@tf.function

def train_step(model, t, t_ic, T_ic, optimizer):
    """
    Perform a single optimization step:
      1) compute total loss,
      2) compute gradients w.r.t. all trainable variables,
      3) update parameters using the optimizer.
    """
    
    with tf.GradientTape() as tape:
        loss_value = loss(model, t, t_ic, T_ic)
    
    # Gradients for each layer's trainable variables
    grads = tape.gradient(
        loss_value,
        [layer.trainable_variables for layer in model.values()])

    # Flatten the nested list: [[g_W1, g_b1], [g_W2, g_b2], ...] -> [g_W1, g_b1, g_W2, ...]
    grads = [g
            for grad_list in grads
            for g in grad_list]

    # Collect the corresponding variables in the same order
    variables = [var
                 for layer in model.values()
                 for var in layer.trainable_variables]

    # Apply gradient updates (e.g., Adam step)
    optimizer.apply_gradients(zip(grads, variables))

    return loss_value


# Interior time points t ‚àà [0, 10] used as collocation points for the ODE
t_train = np.linspace(0, 10, 50).reshape(-1, 1)
t_train = tf.convert_to_tensor(t_train, dtype=tf.float32)

# Initial condition: T(0) = T_o
t_ic = np.array([[0.0]], dtype=np.float32)
T_ic = np.array([[T_o]], dtype=np.float32)
t_ic = tf.convert_to_tensor(t_ic, dtype=tf.float32)
T_ic = tf.convert_to_tensor(T_ic, dtype=tf.float32)


## Model and optimizer
# Instantiate the PINN
model = create_model()

# Adam optimizer 
"""
Look inside '1d_Poisson eq.ipynb'. There are different optimizers in the code.
(including two-stage optimization process)
"""
# Exponentially decaying learning rate for stable training
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=1e-3,
    decay_steps=500,
    decay_rate=0.95
)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)


## Main training loop
epochs = 5000
for epoch in range(epochs):
    loss_value = train_step(model, t_train, t_ic, T_ic, optimizer)
    if epoch % 200 == 0:
        print(f"Epoch {epoch}: Loss = {loss_value.numpy()}")


2025-11-26 22:25:54.536869: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1764195954.852323      47 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1764195954.942527      47 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

2025-11-26 22:26:15.578313: E external/local_xla/xla/stream_executor/cuda/cuda_driver.cc:152] failed call to cuInit: INTERNAL: CUDA error: Failed call to cuInit: UNKNOWN ERROR (303)


Epoch 0: Loss = 62646.11328125
Epoch 200: Loss = 52397.4921875
Epoch 400: Loss = 47728.5859375
Epoch 600: Loss = 43681.58984375
Epoch 800: Loss = 40068.62109375
Epoch 1000: Loss = 36814.81640625
Epoch 1200: Loss = 33873.1875
Epoch 1400: Loss = 31208.578125
Epoch 1600: Loss = 28792.447265625
Epoch 1800: Loss = 26600.705078125
Epoch 2000: Loss = 24612.48828125
Epoch 2200: Loss = 22809.533203125
Epoch 2400: Loss = 21175.609375
Epoch 2600: Loss = 19696.23828125
Epoch 2800: Loss = 18358.408203125
Epoch 3000: Loss = 17150.37109375
Epoch 3200: Loss = 16061.4384765625
Epoch 3400: Loss = 15081.90625
Epoch 3600: Loss = 14202.8466796875
Epoch 3800: Loss = 13416.0888671875
Epoch 4000: Loss = 12714.0849609375
Epoch 4200: Loss = 12089.8388671875
Epoch 4400: Loss = 11536.87109375
Epoch 4600: Loss = 11049.154296875
Epoch 4800: Loss = 10621.0263671875


In [None]:
## (Optional 1) Warm start: pre-training on initial conditions

# warm_start_epochs = 500
# for epoch in range(warm_start_epochs):
#     loss_value = train_step(model, t_ic, t_ic, T_ic, optimizer)
#     if epoch % 100 == 0:
#         print(f"Warm Start Epoch {epoch}: Loss = {loss_value.numpy()}")

In [None]:
# ## (Optional 2 approach) loss weight change

# def loss_curriculum(model, t, t_ic, T_ic, w_pde=1.0, w_ic=1.0):
#     """
#     Weighted loss:
#         L = w_pde * L_pde + w_ic * L_ic
#     """
#     # PDE residual
#     f = pde(t, model)
#     loss_pde = tf.reduce_mean(tf.square(f))

#     # Initial condition
#     T_ic_pred = call_model(model, t_ic)
#     loss_ic = tf.reduce_mean(tf.square(T_ic - T_ic_pred))

#     # early epochs: Set ùúÜ_IC large, ùúÜ_PDE smaller
#     # later epochs: Gradually increase ùúÜ_PDE and/or decrease ùúÜ_IC 
#     return w_pde * loss_pde + w_ic * loss_ic


# @tf.function
# def train_step_curriculum(model, t, t_ic, T_ic, optimizer, w_pde, w_ic):
#     """
#     One training step with curriculum weights for PDE and IC.
#     """
#     with tf.GradientTape() as tape:
#         loss_value = loss_curriculum(model, t, t_ic, T_ic, w_pde, w_ic)

#     grads = tape.gradient(
#         loss_value,
#         [layer.trainable_variables for layer in model.values()]
#     )
#     grads = [g for grad_list in grads for g in grad_list]

#     variables = [
#         var
#         for layer in model.values()
#         for var in layer.trainable_variables
#     ]

#     optimizer.apply_gradients(zip(grads, variables))
#     return loss_value


# epochs = 5000
# for epoch in range(epochs):
#     """
#     Example) Simple curriculum:
#             (1) first half: emphasize IC
#             (2) second half: emphasize PDE
#     """
#     if epoch < epochs // 2:
#         w_ic = 5.0   # strong initial condition
#         w_pde = 1.0
#     else:
#         w_ic = 1.0
#         w_pde = 5.0  # stronger PDE residual

#     loss_value = train_step_curriculum(
#         model, t_train, t_ic, T_ic, optimizer, w_pde, w_ic
#     )

#     if epoch % 500 == 0:
#         print(f"[Curriculum] Epoch {epoch} "
#               f"(w_pde={w_pde}, w_ic={w_ic}): loss = {loss_value.numpy():.6e}")


In [None]:
# ## (Optional 3 approach) Output transform to enforce IC/BC exactly ("Hard constraints")

# # Define 'Raw' network + transform
# def call_raw(model, t):
#     """
#     Base network N(t). This does NOT yet enforce the initial condition.
#     """
#     z = model['hidden_1'](t)
#     z = model['hidden_2'](z)
#     z = model['hidden_3'](z)
#     out = model['output_layer'](z)
#     return out

# def call_model_constrained(model, t):
#     """
#     Enforce T(0) = T_o by construction:

#         T_hat(t) = T_o + t * N(t)

#     No matter what N(t) is, T_hat(0) = T_o.
#     """
#     N_t = call_raw(model, t)
#     T_hat = T_o + t * N_t
#     return T_hat


# ## using T_hat for PDE residual
# def pde_constrained(t, model):
#     """
#     PDE residual using the constrained output T_hat(t):
#         dT_hat/dt - k (T_amb - T_hat) = 0
#     """
#     with tf.GradientTape(persistent=True) as tape:
#         tape.watch(t)
#         T_pred = call_model_constrained(model, t)
#         T_t = tape.gradient(T_pred, t)

#     del tape
#     return T_t - k * (T_amb - T_pred)

# # Without loss_IC because IC is already enforced.
# def loss_constrained(model, t):
#     """
#     Only PDE residual loss.
#     The initial condition T(0) = T_o is automatically satisfied
#     by the output transform.
#     """
#     f = pde_constrained(t, model)
#     loss_pde = tf.reduce_mean(tf.square(f))
#     return loss_pde


# @tf.function
# def train_step_constrained(model, t, optimizer):
#     with tf.GradientTape() as tape:
#         loss_value = loss_constrained(model, t)

#     grads = tape.gradient(
#         loss_value,
#         [layer.trainable_variables for layer in model.values()]
#     )
#     grads = [g for grad_list in grads for g in grad_list]

#     variables = [
#         var
#         for layer in model.values()
#         for var in layer.trainable_variables
#     ]
#     optimizer.apply_gradients(zip(grads, variables))
#     return loss_value



# epochs = 5000
# for epoch in range(epochs):
#     loss_value = train_step_constrained(model, t_train, optimizer)
#     if epoch % 500 == 0:
#         print(f"[Hard IC] Epoch {epoch}: loss = {loss_value.numpy():.6e}")


In [None]:
# ## (Optional 4 approach) Pre-training with analytic solutions + PINN fine-tuning

# def data_loss(model, t_data, T_data):
#     """
#     Pure data-driven loss: MSE between PINN prediction and given data.
#     """
#     T_pred = call_model(model, t_data)
#     return tf.reduce_mean(tf.square(T_data - T_pred))


# @tf.function
# def train_step_data(model, t_data, T_data, optimizer):
#     with tf.GradientTape() as tape:
#         loss_value = data_loss(model, t_data, T_data)

#     grads = tape.gradient(
#         loss_value,
#         [layer.trainable_variables for layer in model.values()]
#     )
#     grads = [g for grad_list in grads for g in grad_list]

#     variables = [
#         var
#         for layer in model.values()
#         for var in layer.trainable_variables
#     ]
#     optimizer.apply_gradients(zip(grads, variables))
#     return loss_value

# ## Stage 1: pre-training by using analytic solution
# # Generate synthetic "measurement" data from the analytical solution
# t_data = np.linspace(0, 10, 100).reshape(-1, 1).astype(np.float32)
# T_data = T_amb + (T_o - T_amb) * np.exp(-k * t_data)

# t_data = tf.convert_to_tensor(t_data, dtype=tf.float32)
# T_data = tf.convert_to_tensor(T_data, dtype=tf.float32)

# optimizer_data = tf.keras.optimizers.Adam(learning_rate=1e-3)

# pretrain_epochs = 2000
# for epoch in range(pretrain_epochs):
#     loss_value = train_step_data(model, t_data, T_data, optimizer_data)
#     if epoch % 200 == 0:
#         print(f"[Stage 1: Data pretrain] Epoch {epoch}: loss = {loss_value.numpy():.6e}")


# ## Stage 2: Fine-tuning by using PINN loss 
# optimizer_pinn = tf.keras.optimizers.Adam(learning_rate=5e-4)

# fine_tune_epochs = 5000
# for epoch in range(fine_tune_epochs):
#     loss_value = train_step(
#         model,        # Í∏∞Ï°¥ train_step(PINN) ÏÇ¨Ïö©
#         t_train,
#         t_ic,
#         T_ic,
#         optimizer_pinn
#     )
#     if epoch % 500 == 0:
#         print(f"[Stage 2: PINN fine-tune] Epoch {epoch}: loss = {loss_value.numpy():.6e}")



In [None]:
## PINN prediction 
t_test = np.linspace(0, 10, 1000).reshape(-1, 1)
t_test = tf.convert_to_tensor(t_test, dtype=tf.float32)
T_pred = call_model(model, t_test).numpy()

## Analytical solution for comparison
# Exact solution of dT/dt = -k (T - T_amb) with T(0) = T_o:
#   T(t) = T_amb + (T_o - T_amb) * exp(-k t)
T_true = T_amb + (T_o - T_amb) * np.exp(-k * t_test)

# Plot the results
plt.figure(figsize=(10, 6))
plt.plot(t_test, T_true, 'b-', label='Analytical Solution')
plt.plot(t_test, T_pred, 'r--', label='PINN Solution')
plt.xlabel('Time (sec)')
plt.ylabel('T (¬∞C)')
plt.legend()
# plt.title('Comparison of Analytical Solution and PINN Solution')
plt.show()

# Plot PDE residuals
# pde_residuals = pde(t_test, model).numpy()
# pde_error = abs(T_true - T_pred)*100/abs(T_true)
# plt.figure()
# plt.plot(t_test, pde_error)
# plt.xlabel("Time (sec)")
# plt.ylabel("PDE Error (%)")
# # plt.title("PDE Residuals")
# plt.show()
