# Develop a CasADi Model of The Steady-State Solar Plant RTO Problem

Replicates the calculations in the following Excel spreadsheet and uses CasADi to solve the optimization problem.

- `Solar Plant Optimization of N-Pumps I-O 2025-08-29.xlsm`

In [None]:
import casadi as cas
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm

# Import functions from the solar_plant_rto module
from problems.solar_plant_rto.solar_plant_rto import (
    actual_pump_speed_from_scaled,
    calculate_pump_and_drive_efficiency,
    calculate_pump_fluid_power,
    calculate_collector_flow_rate,
    calculate_total_flowrate,
    calculate_boiler_dp,
    calculate_pump_dp,
    calculate_pressure_balance,
    make_pressure_balance_function,
    make_calculate_pump_and_drive_power_function,
    calculate_collector_oil_exit_temp,
    calculate_rms_oil_exit_temps,
    make_collector_exit_temps_and_pump_power_function,
    calculate_net_power,
    solar_plant_rto_solve,
    PUMP_SPEED_MIN,
    PUMP_SPEED_MAX,
    LOOP_THERMAL_EFFICIENCIES,
    OIL_EXIT_TEMPS_SP,
)

from tests.test_solar_plant_rto import test_data

In [None]:
# This data is from the Excel spreadsheet
test_data

## Pump and Flow Calcualtions

In [None]:
# Test the imported function
assert actual_pump_speed_from_scaled(0.2) == PUMP_SPEED_MIN
assert actual_pump_speed_from_scaled(1.0) == PUMP_SPEED_MAX

In [None]:
flow_rates = np.linspace(50, 90, 41)

plt.figure(figsize=(5, 3))
for pump_speed in [1200, 1800, 2400, 3000]:
    pump_and_drive_efficiency = calculate_pump_and_drive_efficiency(
        flow_rates, pump_speed
    )
    plt.plot(flow_rates, pump_and_drive_efficiency, label=pump_speed)
plt.xlabel('Flow rate (kg/s?)')
plt.ylabel('Efficiency')
plt.grid()
plt.legend(title="Speed (rpm)")
plt.title("Pump and Drive Efficiency Curves")
plt.tight_layout()
plt.show()

In [None]:
# Test calculation
pump_dp = test_data["pump_dp"]
total_flow_rate = test_data["total_flow_rate"]
pump_fluid_power = calculate_pump_fluid_power(total_flow_rate, pump_dp)
assert np.isclose(pump_fluid_power, test_data["pump_fluid_power"])

In [None]:
# Test calculation
valve_position = test_data["valve_positions"][1]
loop_dp = test_data["loop_dp"]
collector_flow_rate = calculate_collector_flow_rate(valve_position, loop_dp, sqrt=np.sqrt)
assert np.isclose(collector_flow_rate, test_data["collector_flow_rates"][1])

In [None]:
valve_positions = np.linspace(0.1, 1.0, 91)

plt.figure(figsize=(5, 3))
for loop_dp in [50, 60, 70]:
    flow_rate = calculate_collector_flow_rate(valve_positions, loop_dp, sqrt=np.sqrt)
    plt.plot(valve_positions, flow_rate, label=loop_dp)
plt.xlabel('Valve position (-)')
plt.ylabel('Flow rate (kg/s)')
plt.grid()
plt.legend(title="Loop dp (kPa)")
plt.title("Collector Flow Rate Curves")
plt.tight_layout()
plt.show()

In [None]:
# Test calculation
valve_positions = test_data["valve_positions"]
loop_dp = test_data["loop_dp"]
total_flowrate = calculate_total_flowrate(valve_positions, loop_dp, sum=np.sum, sqrt=np.sqrt)
assert np.isclose(total_flowrate, test_data["total_flow_rate"])

In [None]:
# Test calculation
total_flow_rate = test_data["total_flow_rate"]
boiler_dp = calculate_boiler_dp(total_flow_rate)
assert np.isclose(boiler_dp, test_data["boiler_dp"])

In [None]:
# Test calculation
actual_pump_speed = test_data["actual_pump_speed"]
m_pumps = 2
pump_dp = calculate_pump_dp(actual_pump_speed, total_flow_rate, m_pumps)
assert np.isclose(pump_dp, test_data["pump_dp"])

In [None]:
# Test calculation
loop_dp = test_data["loop_dp"]
pump_dp = test_data["pump_dp"]
boiler_dp = test_data["boiler_dp"]
pressure_balance = calculate_pressure_balance(loop_dp, pump_dp, boiler_dp)

assert np.isclose(pressure_balance, 0.0, atol=1e-7)

In [None]:
# Test calculation
valve_positions = test_data["valve_positions"]
loop_dp = test_data["loop_dp"]
total_flow_rate = calculate_total_flowrate(valve_positions, loop_dp, sum=np.sum, sqrt=np.sqrt)
assert np.isclose(total_flow_rate, test_data["total_flow_rate"])

pump_speed_scaled = test_data["pump_speed_scaled"]
actual_pump_speed = actual_pump_speed_from_scaled(pump_speed_scaled)
assert np.isclose(actual_pump_speed, test_data["actual_pump_speed"])

pump_dp = test_data["pump_dp"]
pump_fluid_power = calculate_pump_fluid_power(total_flow_rate, pump_dp)
assert np.isclose(pump_fluid_power, test_data["pump_fluid_power"])

m_pumps = 2
pump_and_drive_efficiency = calculate_pump_and_drive_efficiency(
    total_flow_rate, actual_pump_speed
)
assert np.isclose(pump_and_drive_efficiency, test_data["pump_and_drive_efficiency"])

pump_and_drive_power = pump_fluid_power / pump_and_drive_efficiency
assert np.isclose(pump_and_drive_power, test_data["pump_and_drive_power"])

In [None]:
n_lines = 15
m_pumps = 2
pressure_balance_function = make_pressure_balance_function(n_lines, m_pumps)
pressure_balance_function

In [None]:
# Test calculation
valve_positions = test_data["valve_positions"]
pump_speed_scaled = test_data["pump_speed_scaled"]
loop_dp = test_data["loop_dp"]

pressure_balance = pressure_balance_function(
    valve_positions, pump_speed_scaled, loop_dp
)
assert np.isclose(pressure_balance, [[0.0]], atol=1e-5)

## Use Rootfinder to Find Pressure Balance

In [None]:
def g(x):
    loop_dp = x
    return pressure_balance_function(
        cas.DM(valve_positions), cas.DM(pump_speed_scaled), loop_dp
    )

x = cas.SX.sym('x')
rf = cas.rootfinder('rf', 'newton', {'x': x, 'g': g(x)})

x_sol_rf = rf([30.0], [])
x_sol_rf

In [None]:
pump_and_drive_power_function = \
    make_calculate_pump_and_drive_power_function(n_lines, m_pumps)
pump_and_drive_power_function

In [None]:
# Test calculation
valve_positions = test_data["valve_positions"]
pump_speed_scaled = test_data["pump_speed_scaled"]
pump_and_drive_power_function(valve_positions, pump_speed_scaled)
assert np.isclose(pump_and_drive_power, test_data["pump_and_drive_power"])

In [None]:
%timeit pump_and_drive_power_function(valve_positions, pump_speed_scaled)

## Oil Exit Temperature Calculations

In [None]:
# LOOP_THERMAL_EFFICIENCIES now imported from module

In [None]:
# Test calculation
flow_rate = 8.968468201
oil_return_temp = 273
ambient_temp = 20
solar_rate = 900
loop_thermal_efficiency = 0.9
oil_exit_temp = calculate_collector_oil_exit_temp(
    flow_rate,
    oil_return_temp,
    ambient_temp,
    solar_rate,
    loop_thermal_efficiency,
    exp=np.exp,
    pi=np.pi
)
assert np.isclose(oil_exit_temp, 397.4882567363479)

# Test calculation - vectorized
collector_flow_rates = cas.DM(test_data["collector_flow_rates"])
oil_return_temp = test_data["oil_return_temp"]
ambient_temp = test_data["ambient_temp"]
solar_rate = test_data["solar_rate"]
loop_thermal_efficiencies = cas.DM(test_data["loop_thermal_efficiencies"])
oil_exit_temps = calculate_collector_oil_exit_temp(
    collector_flow_rates,
    oil_return_temp,
    ambient_temp,
    solar_rate,
    loop_thermal_efficiencies
)
assert np.allclose(
    oil_exit_temps,
    test_data["oil_exit_temps"].reshape(-1, 1)
)

In [None]:
# OIL_EXIT_TEMPS_SP now imported from module
rms_dev = calculate_rms_oil_exit_temps(oil_exit_temps, cas.DM(OIL_EXIT_TEMPS_SP))
assert np.isclose(rms_dev, test_data["rms_dev"], atol=0.00001)

## Combine into Collector and Pumps System Model

In [None]:
calculate_exit_temps_and_pump_power = \
    make_collector_exit_temps_and_pump_power_function(n_lines, m_pumps)

calculate_exit_temps_and_pump_power

In [None]:
# Test calculation
valve_positions = test_data["valve_positions"]
pump_speed_scaled = test_data["pump_speed_scaled"]
oil_return_temp = test_data["oil_return_temp"]
ambient_temp = test_data["ambient_temp"]
solar_rate = test_data["solar_rate"]

collector_flow_rates, pump_and_drive_power, oil_exit_temps = \
    calculate_exit_temps_and_pump_power(
        valve_positions, pump_speed_scaled, oil_return_temp, ambient_temp, solar_rate
    )
assert np.allclose(
    collector_flow_rates,
    test_data["collector_flow_rates"].reshape(-1, 1)
)
assert np.isclose(pump_and_drive_power, test_data["pump_and_drive_power"])
assert np.allclose(
    oil_exit_temps,
    test_data["oil_exit_temps"].reshape(-1, 1)
)

In [None]:
%timeit calculate_exit_temps_and_pump_power(valve_positions, pump_speed_scaled, oil_return_temp, ambient_temp, solar_rate)

## Solve by Maximising Potential Work Output

In [None]:
# solar_plant_rto_solve now imported from module

In [None]:
%%time

solar_rate = 900
ambient_temp = 20
oil_return_temp = 273
m_pumps = 3
n_lines = 15

sol, variables = solar_plant_rto_solve(
    solar_rate,
    ambient_temp,
    oil_return_temp,
    m_pumps,
    n_lines
)

In [None]:
inputs = {
    "solar_rate": solar_rate,
    "ambient_temp": ambient_temp,
    "oil_return_temp": oil_return_temp,
    "m_pumps": m_pumps,
    "n_lines": n_lines
}
output_vars = {name: variables[name] for name in ['pump_speed_scaled', 'pump_and_drive_power', 'potential_work']}

print(pd.concat([pd.Series(inputs), pd.Series(output_vars)]).round(4))
pd.DataFrame(
    {name: variables[name] for name in ['valve_positions', 'oil_exit_temps', 'collector_flow_rates']},
    index=range(1, n_lines + 1)
).round(4)


In [None]:
# Check sparsity of Hessian
variables['hess_f'].sparsity().spy()

## Test Robustness of Solver to Initial Conditions

In [None]:
# Constants
solar_rate = 900
ambient_temp = 20
oil_return_temp = 273
m_pumps = 3
n_lines = 15

selected_stats = ['iter_count', 'n_call_nlp_f', 'n_call_nlp_g', 'success']

rng = np.random.default_rng(0)

# Disable solver and CasADi output reporting
solver_opts = {
    'ipopt.print_level': 0,    # Suppress IPOPT output
    'print_time': 0            # Suppress CasADi timing
}

results = []
for i in tqdm(range(100)):

    valve_positions_init = rng.uniform(0.1, 1.0, size=n_lines)
    pump_speed_scaled_init = rng.uniform(0.2, 1.0)

    sol, variables = solar_plant_rto_solve(
        solar_rate,
        ambient_temp,
        oil_return_temp,
        m_pumps,
        n_lines,
        valve_positions_init=valve_positions_init,
        pump_speed_scaled_init=pump_speed_scaled_init,
        solver_opts=solver_opts
    )
    sol_stats = sol.stats()
    sol_stats = {name: sol_stats[name] for name in selected_stats}
    sol_stats['f'] = sol.value(variables['f'])
    results.append(sol_stats)

results = pd.DataFrame(results)

In [None]:
assert results['success'].value_counts()[True] == len(results)
results['iter_count'].value_counts()

In [None]:
# Number of different solutions found
results['f'].round(6).value_counts()