## 1. Setup and Imports

First, we initialize JPype to access NeqSim's Java classes directly from Python.

In [None]:
# Install dependencies if needed
# !pip install neqsim numpy matplotlib

In [None]:
# Import NeqSim - Direct Java Access via jneqsim
from neqsim import jneqsim
import numpy as np
import matplotlib.pyplot as plt

print(f"NeqSim loaded via jneqsim gateway")

In [None]:
# Import NeqSim ML classes through jneqsim gateway
StateVector = jneqsim.process.ml.StateVector
ActionVector = jneqsim.process.ml.ActionVector
Constraint = jneqsim.process.ml.Constraint
ConstraintManager = jneqsim.process.ml.ConstraintManager
GymEnvironment = jneqsim.process.ml.GymEnvironment
RLEnvironment = jneqsim.process.ml.RLEnvironment
TrainingDataCollector = jneqsim.process.ml.TrainingDataCollector

# ML examples
SeparatorGymEnv = jneqsim.process.ml.examples.SeparatorGymEnv
SeparatorLevelControlEnv = jneqsim.process.ml.examples.SeparatorLevelControlEnv
SeparatorCompressorMultiAgentEnv = jneqsim.process.ml.examples.SeparatorCompressorMultiAgentEnv
MultiAgentEnvironment = jneqsim.process.ml.multiagent.MultiAgentEnvironment

# Import process equipment
Separator = jneqsim.process.equipment.separator.Separator
Compressor = jneqsim.process.equipment.compressor.Compressor
Stream = jneqsim.process.equipment.stream.Stream
SystemSrkEos = jneqsim.thermo.system.SystemSrkEos

print("NeqSim ML classes imported successfully!")

## 2. StateVector - Normalized State Representation

The `StateVector` class provides:
- Automatic normalization to [0, 1] for neural network inputs
- Physical bounds tracking
- Unit awareness
- JSON/array export for ML frameworks

In [None]:
# Create a state vector manually
state = StateVector()
state.add("temperature", 350.0, 200.0, 500.0, "K")  # value, min, max, unit
state.add("pressure", 50.0, 0.0, 100.0, "bar")
state.add("liquid_level", 0.6, 0.0, 1.0, "fraction")

# Access values
print(f"Temperature: {state.getValue('temperature'):.1f} K")
print(f"Temperature (normalized): {state.getNormalized('temperature'):.3f}")

# Convert to numpy array for ML
raw_array = np.array(state.toArray())
norm_array = np.array(state.toNormalizedArray())

print(f"\nRaw array: {raw_array}")
print(f"Normalized array: {norm_array}")
print(f"Feature names: {list(state.getFeatureNames())}")

## 3. Equipment StateVectorProvider

Key equipment classes (Separator, Compressor, HeatExchanger) implement `StateVectorProvider` interface, enabling direct state extraction for RL.

In [None]:
# Create a simple separation process
fluid = SystemSrkEos(280.0, 50.0)  # T=280K, P=50bar
fluid.addComponent("methane", 0.8)
fluid.addComponent("ethane", 0.1)
fluid.addComponent("propane", 0.05)
fluid.addComponent("n-pentane", 0.05)
fluid.setMixingRule("classic")
fluid.setMultiPhaseCheck(True)

# Create and run separator
feed = Stream("Feed", fluid)
feed.setFlowRate(1000.0, "kg/hr")
feed.run()

separator = Separator("Sep-001", feed)
separator.run()

# Extract state vector directly from equipment
sep_state = separator.getStateVector()

print("Separator State Vector:")
print("-" * 40)
for name in sep_state.getFeatureNames():
    value = sep_state.getValue(name)
    norm = sep_state.getNormalized(name)
    print(f"{name:20s}: {value:10.3f} (norm: {norm:.3f})")

## 4. Constraint Management

The `ConstraintManager` handles physical and safety constraints with:
- **HARD** constraints: Terminate episode on violation (safety limits)
- **SOFT** constraints: Penalize reward (operational targets)
- Automatic penalty computation for RL rewards

In [None]:
# Setup constraints
constraints = ConstraintManager()

# Hard constraints (safety limits)
constraints.addHardRange("max_pressure", "pressure", 0.0, 80.0, "bar")
constraints.addHardRange("level_bounds", "liquid_level", 0.1, 0.9, "fraction")

# Soft constraints (operational targets)
constraints.addSoftRange("optimal_level", "liquid_level", 0.4, 0.6, "fraction")

# Test against different states
test_cases = [
    {"pressure": 50.0, "liquid_level": 0.5},  # All OK
    {"pressure": 50.0, "liquid_level": 0.75}, # Soft violation
    {"pressure": 90.0, "liquid_level": 0.5},  # Hard violation!
]

for i, case in enumerate(test_cases):
    state = StateVector()
    state.add("pressure", case["pressure"], 0.0, 100.0, "bar")
    state.add("liquid_level", case["liquid_level"], 0.0, 1.0, "fraction")
    
    constraints.evaluate(state)
    
    print(f"\nCase {i+1}: P={case['pressure']} bar, Level={case['liquid_level']}")
    print(f"  Hard violation: {constraints.hasHardViolation()}")
    print(f"  Penalty: {constraints.getTotalViolationPenalty():.2f}")
    if constraints.getViolations().size() > 0:
        print(f"  Explanation: {constraints.explainViolations()}")

## 5. Single-Agent Gym Environment

The `SeparatorGymEnv` provides a Gymnasium-compatible interface:

**Observation Space (8-dim):**
- liquid_level, pressure, temperature, feed_flow
- gas_density, liquid_density, level_error, valve_position

**Action Space (1-dim):**
- valve_delta: Change in valve position [-0.1, 0.1]

**Reward:**
- Setpoint tracking: $-10 \times (level - setpoint)^2$
- Action smoothness: $-0.1 \times action^2$
- Survival bonus: $+1.0$

In [None]:
# Create Gym environment
env = SeparatorGymEnv()
env.setMaxEpisodeSteps(200)
env.setLevelSetpoint(0.5)

print("Environment Specification:")
print(f"  Observation dim: {env.getObservationDim()}")
print(f"  Action dim: {env.getActionDim()}")
print(f"  Observation names: {list(env.getObservationNames())}")
print(f"  Action bounds: [{env.getActionLow()[0]}, {env.getActionHigh()[0]}]")

In [None]:
# Run episode with simple P-controller
def simple_controller(obs, setpoint=0.5, Kp=0.5):
    """Simple proportional controller for level control."""
    level = obs[0]  # liquid_level is first observation
    error = setpoint - level
    action = Kp * error
    return np.clip(action, -0.1, 0.1)

# Run episode
reset_result = env.reset()
obs = np.array(reset_result.observation)

history = {
    'level': [], 'valve': [], 'reward': [], 'action': []
}

total_reward = 0
step = 0

while not env.isDone():
    # Get action from controller
    action = simple_controller(obs)
    
    # Step environment
    result = env.step([float(action)])
    obs = np.array(result.observation)
    
    # Record history
    history['level'].append(obs[0])
    history['valve'].append(obs[7])
    history['reward'].append(result.reward)
    history['action'].append(action)
    total_reward += result.reward
    step += 1

print(f"Episode finished after {step} steps")
print(f"Total reward: {total_reward:.2f}")
print(f"Final level: {history['level'][-1]:.3f} (setpoint: 0.5)")

In [None]:
# Plot results
fig, axes = plt.subplots(2, 2, figsize=(12, 8))

axes[0, 0].plot(history['level'], 'b-', linewidth=2)
axes[0, 0].axhline(y=0.5, color='r', linestyle='--', label='Setpoint')
axes[0, 0].set_ylabel('Liquid Level')
axes[0, 0].set_xlabel('Step')
axes[0, 0].set_title('Level Control')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

axes[0, 1].plot(history['valve'], 'g-', linewidth=2)
axes[0, 1].set_ylabel('Valve Position')
axes[0, 1].set_xlabel('Step')
axes[0, 1].set_title('Valve Position')
axes[0, 1].grid(True, alpha=0.3)

axes[1, 0].plot(history['action'], 'm-', linewidth=1)
axes[1, 0].set_ylabel('Action (valve delta)')
axes[1, 0].set_xlabel('Step')
axes[1, 0].set_title('Control Actions')
axes[1, 0].grid(True, alpha=0.3)

axes[1, 1].plot(np.cumsum(history['reward']), 'orange', linewidth=2)
axes[1, 1].set_ylabel('Cumulative Reward')
axes[1, 1].set_xlabel('Step')
axes[1, 1].set_title('Reward Accumulation')
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 6. Multi-Agent Environment

The `MultiAgentEnvironment` coordinates multiple agents controlling interconnected equipment.

### Example: Separator + Compressor Train

```
Feed ──► [Separator] ──► Gas ──► [Compressor] ──► Compressed Gas
              │
              └──► Liquid ──► [Valve] ──► Export
```

**Agents:**
1. **SeparatorAgent**: Controls liquid level via outlet valve
2. **CompressorAgent**: Controls discharge pressure via speed

**Coordination Modes:**
- `INDEPENDENT`: Each agent optimizes local reward
- `COOPERATIVE`: All agents share team reward
- `CTDE`: Centralized training, decentralized execution
- `COMMUNICATING`: Agents exchange messages

In [None]:
# Create multi-agent environment
multi_env = SeparatorCompressorMultiAgentEnv()
multi_env.setCoordinationMode(MultiAgentEnvironment.CoordinationMode.COOPERATIVE)
multi_env.setMaxEpisodeSteps(100)

print("Multi-Agent Environment:")
print(f"  Number of agents: {multi_env.getNumAgents()}")
print(f"  Agent IDs: {list(multi_env.getAgentIds())}")

# Get agent details
for agent_id in multi_env.getAgentIds():
    agent = multi_env.getAgent(agent_id)
    print(f"\n  {agent_id}:")
    print(f"    Observation dim: {agent.getObservationDim()}")
    print(f"    Action dim: {agent.getActionDim()}")

In [None]:
# Run multi-agent episode
obs = multi_env.reset()

multi_history = {
    'sep_reward': [], 'comp_reward': [],
    'team_reward': []
}

step = 0
while not multi_env.isDone() and step < 100:
    # Simple policies for each agent
    sep_obs = np.array(obs.get("separator"))
    comp_obs = np.array(obs.get("compressor"))
    
    # Separator: P-control on level error (obs[2])
    sep_action = -0.3 * sep_obs[2] if len(sep_obs) > 2 else 0.0
    sep_action = np.clip(sep_action, -0.1, 0.1)
    
    # Compressor: P-control on pressure error (obs[6])
    comp_action = -0.2 * comp_obs[6] if len(comp_obs) > 6 else 0.0
    comp_action = np.clip(comp_action, -0.05, 0.05)
    
    # Create action dict
    actions = {
        "separator": [float(sep_action)],
        "compressor": [float(comp_action)]
    }
    
    # Step
    result = multi_env.step(actions)
    obs = result.observations
    
    # Record
    multi_history['sep_reward'].append(result.rewards.get("separator"))
    multi_history['comp_reward'].append(result.rewards.get("compressor"))
    multi_history['team_reward'].append(
        result.rewards.get("separator") + result.rewards.get("compressor")
    )
    step += 1

print(f"Multi-agent episode finished after {step} steps")
print(f"Total team reward: {sum(multi_history['team_reward']):.2f}")

In [None]:
# Plot multi-agent results
fig, ax = plt.subplots(figsize=(10, 5))

ax.plot(np.cumsum(multi_history['sep_reward']), 'b-', 
        label='Separator Agent', linewidth=2)
ax.plot(np.cumsum(multi_history['comp_reward']), 'r-', 
        label='Compressor Agent', linewidth=2)
ax.plot(np.cumsum(multi_history['team_reward']), 'g--', 
        label='Team Total', linewidth=2, alpha=0.7)

ax.set_xlabel('Step')
ax.set_ylabel('Cumulative Reward')
ax.set_title('Multi-Agent Cooperative Rewards')
ax.legend()
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 7. Training Data Collection for Surrogates

The `TrainingDataCollector` generates datasets for training neural network surrogates of expensive thermodynamic calculations.

In [None]:
from neqsim.thermodynamicoperations import ThermodynamicOperations

# Create collector for flash calculation surrogate
collector = TrainingDataCollector("flash_surrogate")
collector.defineInput("temperature", "K", 250.0, 350.0)
collector.defineInput("pressure", "bar", 10.0, 80.0)
collector.defineOutput("vapor_fraction", "mole_frac", 0.0, 1.0)
collector.defineOutput("gas_density", "kg/m3", 0.0, 100.0)

# Generate training data
fluid = SystemSrkEos(280.0, 50.0)
fluid.addComponent("methane", 0.85)
fluid.addComponent("ethane", 0.10)
fluid.addComponent("propane", 0.05)
fluid.setMixingRule("classic")

ops = ThermodynamicOperations(fluid)

# Sample grid
n_samples = 0
for T in np.linspace(250, 350, 15):
    for P in np.linspace(10, 80, 15):
        try:
            fluid.setTemperature(T, "K")
            fluid.setPressure(P, "bar")
            ops.TPflash()
            
            collector.startSample()
            collector.recordInput("temperature", T)
            collector.recordInput("pressure", P)
            
            if fluid.getNumberOfPhases() > 0:
                vapor_frac = fluid.getPhase(0).getBeta()
                gas_density = fluid.getPhase(0).getDensity("kg/m3")
            else:
                vapor_frac = 0.0
                gas_density = 0.0
            
            collector.recordOutput("vapor_fraction", vapor_frac)
            collector.recordOutput("gas_density", gas_density)
            collector.endSample()
            n_samples += 1
        except:
            pass

print(f"Collected {n_samples} samples")
print(f"\nInput statistics:")
for name, stats in collector.getInputStatistics().items():
    print(f"  {name}: mean={stats['mean']:.2f}, std={stats['std']:.2f}")

In [None]:
# Export to CSV (can be used with PyTorch/TensorFlow)
csv_data = collector.toCSV()
print("CSV Preview (first 500 chars):")
print(csv_data[:500])

# Save to file
# collector.exportCSV("flash_training_data.csv")

## 8. Integration with stable-baselines3

Here's a template for training with popular RL libraries.

### Gym Wrapper

In [None]:
import gymnasium as gym
from gymnasium import spaces

class NeqSimGymWrapper(gym.Env):
    """Gymnasium wrapper for NeqSim environments."""
    
    def __init__(self, java_env):
        super().__init__()
        self.java_env = java_env
        
        # Define spaces
        obs_dim = java_env.getObservationDim()
        act_dim = java_env.getActionDim()
        
        self.observation_space = spaces.Box(
            low=np.array(java_env.getObservationLow()),
            high=np.array(java_env.getObservationHigh()),
            dtype=np.float32
        )
        
        self.action_space = spaces.Box(
            low=np.array(java_env.getActionLow()),
            high=np.array(java_env.getActionHigh()),
            dtype=np.float32
        )
    
    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        result = self.java_env.reset()
        obs = np.array(result.observation, dtype=np.float32)
        info = dict(result.info)
        return obs, info
    
    def step(self, action):
        result = self.java_env.step(list(action.astype(float)))
        obs = np.array(result.observation, dtype=np.float32)
        reward = float(result.reward)
        terminated = bool(result.terminated)
        truncated = bool(result.truncated)
        info = dict(result.info)
        return obs, reward, terminated, truncated, info

# Create wrapped environment
java_env = SeparatorGymEnv()
java_env.setMaxEpisodeSteps(200)
wrapped_env = NeqSimGymWrapper(java_env)

print(f"Observation space: {wrapped_env.observation_space}")
print(f"Action space: {wrapped_env.action_space}")

In [None]:
# Example: Training with stable-baselines3 (uncomment to run)
# from stable_baselines3 import PPO
# from stable_baselines3.common.env_checker import check_env

# # Validate environment
# check_env(wrapped_env)

# # Train PPO agent
# model = PPO("MlpPolicy", wrapped_env, verbose=1)
# model.learn(total_timesteps=10000)

# # Evaluate
# obs, _ = wrapped_env.reset()
# for _ in range(200):
#     action, _ = model.predict(obs, deterministic=True)
#     obs, reward, terminated, truncated, info = wrapped_env.step(action)
#     if terminated or truncated:
#         break

print("stable-baselines3 integration ready!")
print("Uncomment the code above to train a PPO agent.")

## 9. Java-Only Testing (No Python ML Required)

NeqSim includes simple controllers that can be used to test the RL infrastructure directly from Java without requiring Python or ML libraries.

### Available Controllers:

| Controller | Description | Use Case |
|------------|-------------|----------|
| `ProportionalController` | P-control: action = -Kp × error | Simple setpoint tracking |
| `PIDController` | PID with anti-windup | Industrial control baseline |
| `BangBangController` | On-off with hysteresis | Simple threshold control |
| `RandomController` | Uniform random actions | Baseline comparison |

### Java Usage Example:

```java
import neqsim.process.ml.EpisodeRunner;
import neqsim.process.ml.controllers.*;
import neqsim.process.ml.examples.SeparatorGymEnv;

// Create environment
SeparatorGymEnv env = new SeparatorGymEnv();
env.setMaxEpisodeSteps(500);

// Create controllers
Controller pController = new ProportionalController("P-Level", 6, 0.5, -0.1, 0.1);
Controller pidController = new PIDController("PID-Level", 6, 0.3, 0.1, 0.05, -0.1, 0.1, 1.0);

// Run and compare
EpisodeRunner runner = new EpisodeRunner(env).setVerbose(true);

List<Controller> controllers = List.of(pController, pidController, 
    new RandomController("Random", -0.1, 0.1));

List<BenchmarkResult> results = runner.compareControllers(controllers, 10, 500);
EpisodeRunner.printComparison(results);
```

In [None]:
# Java controllers can also be accessed from Python via JPype
from neqsim.process.ml.controllers import ProportionalController, PIDController, RandomController
from neqsim.process.ml import EpisodeRunner

# Create environment and controllers
env = SeparatorGymEnv()
env.setMaxEpisodeSteps(100)

# P-Controller: index 6 is level_error, Kp=0.5, action bounds [-0.1, 0.1]
p_controller = ProportionalController("P-Level", 6, 0.5, -0.1, 0.1)

# PID Controller: Kp=0.3, Ki=0.1, Kd=0.05, dt=1.0
pid_controller = PIDController("PID-Level", 6, 0.3, 0.1, 0.05, -0.1, 0.1, 1.0)

# Random baseline
random_controller = RandomController("Random", -0.1, 0.1)

# Create runner
runner = EpisodeRunner(env)

# Run single episode with P controller
result = runner.runEpisode(p_controller, 100)
print(f"P-Controller Episode:")
print(f"  Steps: {result.steps}")
print(f"  Total reward: {result.totalReward:.2f}")
print(f"  Mean reward: {result.getMeanReward():.3f}")
print(f"  Terminated: {result.terminated}")

In [None]:
# Benchmark multiple controllers
from java.util import ArrayList

controllers = ArrayList()
controllers.add(p_controller)
controllers.add(pid_controller)
controllers.add(random_controller)

# Run 5 episodes each, max 100 steps
benchmark_results = runner.compareControllers(controllers, 5, 100)

# Print comparison table
EpisodeRunner.printComparison(benchmark_results)

# Access individual results
print("\nDetailed Results:")
for result in benchmark_results:
    print(f"  {result.controllerName}: mean={result.meanReward:.2f}, "
          f"std={result.stdReward:.2f}, success={result.successRate*100:.0f}%")

In [None]:
# Extract and plot trajectory from episode result
result = runner.runEpisode(pid_controller, 100)

# Get level trajectory (feature index 0)
level_trajectory = np.array(result.getFeatureTrajectory(0))

fig, axes = plt.subplots(1, 2, figsize=(12, 4))

axes[0].plot(level_trajectory, 'b-', linewidth=2)
axes[0].axhline(y=0.5, color='r', linestyle='--', label='Setpoint')
axes[0].set_xlabel('Step')
axes[0].set_ylabel('Liquid Level')
axes[0].set_title(f'PID Controller Level Control (Reward: {result.totalReward:.1f})')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Plot rewards over time
rewards = [result.rewards.get(i) for i in range(int(result.steps))]
axes[1].plot(np.cumsum(rewards), 'g-', linewidth=2)
axes[1].set_xlabel('Step')
axes[1].set_ylabel('Cumulative Reward')
axes[1].set_title('Reward Accumulation')
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 10. Summary

### Key Classes

| Class | Purpose |
|-------|---------|
| `StateVector` | Normalized state representation with bounds |
| `ActionVector` | Bounded action representation |
| `ConstraintManager` | Safety/operational constraint handling |
| `GymEnvironment` | Gymnasium-compatible base class |
| `MultiAgentEnvironment` | Multi-agent coordination |
| `TrainingDataCollector` | Surrogate model data generation |
| `EpisodeRunner` | Java-based episode execution and benchmarking |

### Java Controllers (No Python Required)

| Controller | Formula | Use Case |
|------------|---------|----------|
| `ProportionalController` | $u = -K_p \cdot e$ | Simple setpoint tracking |
| `PIDController` | $u = -K_p e - K_i \int e - K_d \dot{e}$ | Industrial baseline |
| `BangBangController` | On/off with deadband | Threshold control |
| `RandomController` | $u \sim \text{Uniform}(a_{min}, a_{max})$ | Baseline comparison |

### Equipment with StateVectorProvider

- `Separator` - level, pressure, densities, flows
- `Compressor` - pressures, temperatures, efficiency, surge margin
- `HeatExchanger` - temperatures, duty, effectiveness

### Testing Strategy

1. **Unit test with Java controllers** - Fast, no Python deps
2. **Benchmark baselines** - P, PID, random comparison
3. **Train RL in Python** - Use stable-baselines3 or RLlib
4. **Compare RL vs baselines** - Measure improvement

### Next Steps

1. **Train RL agents** using stable-baselines3 or RLlib
2. **Build surrogates** with collected training data
3. **Extend to dynamic simulation** for time-stepping control
4. **Add more equipment agents** (valves, pumps, columns)

In [None]:
print("NeqSim ML Integration Tutorial Complete!")
print("\nFor more information, see:")
print("  - docs/ml_integration.md")
print("  - neqsim.process.ml package JavaDoc")
print("  - https://github.com/equinor/neqsim")