In [1]:
!pip install gym



In [2]:
import gym
from gym import spaces
import numpy as np

In [3]:
class CompressorEnv(gym.Env):
  def __init__(self):
    super(CompressorEnv, self).__init__()
    # (State Space):[Q_in, P_in, T_in, R_C, N]
    self.observation_space = spaces.Box(low=np.array([0, 1, 273, 1, 500]),
                                        high=np.array([100, 10, 373, 5, 2000]),
                                        dtype=np.float32)
    # (Action Space) = delta
    self.action_space = spaces.Box(low=np.array([-10, -1, - 0.1, -50]),
                                   high=np.array([10, 1, 0.1, 50]),
                                   dtype=np.float32)
    self.state = np.array([50.0, 1.0, 300.0, 3.0, 1000.0]) # [ Q_IN, P_IN, T_IN, R_C, N]
    self.gamma = 1.4
    self.cp = 1000.0

  def reset(self):
    self.state = np.array([50.0, 1.0, 300.0, 3.0, 1000.0])
    return self.state

  def step(self, action):
    Q_in, P_in,  T_in, R_C, N = self.state
    delta_Q_in, delta_P_in, delta_R_C, delta_N = action

    Q_in += delta_Q_in
    P_in += delta_P_in
    R_C += delta_R_C
    N += delta_N

    Q_in = np.clip(Q_in, 0, 100)
    P_in = np.clip(P_in, 1, 10)
    R_C = np.clip(R_C, 1, 5)
    N = np.clip(N, 500, 2000)


    P_out = P_in * R_C
    T_out = T_in * (R_C ** (self.gamma - 1) / self.gamma)
    energy_consumption = Q_in * self.cp * (T_out - T_in)
    efficiency = (P_out - P_in) / energy_consumption if energy_consumption > 0 else 0

    self.state = np.array([Q_in, P_in,  T_in, R_C, N])

    reward = efficiency - (energy_consumption / 1e6) - abs(T_out - 350)

    done = False

    if efficiency < 0.1 or energy_consumption > 1e6:
      done = True

    return self.state, reward, done, {}

In [4]:
!pip install shimmy



In [5]:
!pip install stable_baselines3



In [6]:
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env

# ایجاد محیط
env = CompressorEnv()

# بررسی صحت محیط
# check_env(env)

# ایجاد مدل PPO
model = PPO("MlpPolicy", env, verbose=1)

# آموزش مدل
model.learn(total_timesteps=100000)

# ذخیره مدل
model.save("compressor_optimization_model")

  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




---------------------------------
| rollout/           |          |
|    ep_len_mean     | 1        |
|    ep_rew_mean     | -19.2    |
| time/              |          |
|    fps             | 782      |
|    iterations      | 1        |
|    time_elapsed    | 2        |
|    total_timesteps | 2048     |
---------------------------------
----------------------------------------
| rollout/                |            |
|    ep_len_mean          | 1          |
|    ep_rew_mean          | -17.5      |
| time/                   |            |
|    fps                  | 545        |
|    iterations           | 2          |
|    time_elapsed         | 7          |
|    total_timesteps      | 4096       |
| train/                  |            |
|    approx_kl            | 0.17259258 |
|    clip_fraction        | 0.656      |
|    clip_range           | 0.2        |
|    entropy_loss         | -5.61      |
|    explained_variance   | -2.38e-07  |
|    learning_rate        | 0.0003     |
|   

In [17]:
import time

# بارگذاری مدل
model = PPO.load("compressor_optimization_model")

# تست مدل
obs = env.reset()
for i in range(1000):
    action, _states = model.predict(obs)
    obs, reward, done, info = env.step(action)
    print(f"Step {i}: State={obs}, '\t'Reward={reward}")
    if done:
        obs = env.reset()
    time.sleep(0.1)

Step 0: State=[  43.8838048     1.33652824  300.            3.1        1000.2861152 ], '	'Reward=-14.691951453277335
Step 1: State=[  43.87018442    2.          300.            3.1        1000.08182021], '	'Reward=-14.691447611075848
Step 2: State=[ 44.01420784   1.60686284 300.           3.1        999.70155483], '	'Reward=-14.696766712855284
Step 3: State=[4.39248829e+01 1.00000000e+00 3.00000000e+02 3.10000000e+00
 1.00124758e+03], '	'Reward=-14.693468847569495
Step 4: State=[  43.89135265    1.15047136  300.            3.1        1000.87116963], '	'Reward=-14.692230426478885
Step 5: State=[ 43.97965574   1.69857353 300.           3.1        999.57766029], '	'Reward=-14.695490631543233
Step 6: State=[4.39504676e+01 1.00000000e+00 3.00000000e+02 3.10000000e+00
 1.00087406e+03], '	'Reward=-14.69441365577383
Step 7: State=[ 43.8990159    1.07957639 300.           3.1        999.34122252], '	'Reward=-14.692513511783272
Step 8: State=[ 43.92398024   1.         300.           3.1        9

In [19]:
import numpy as np
import gym
from gym import spaces

class CompressorEnv(gym.Env):
    def __init__(self):
        super(CompressorEnv, self).__init__()

        # فضای حالت (State Space): [Q_in, P_in, T_in, R_c, N]
        self.observation_space = spaces.Box(low=np.array([0, 1, 273, 1, 500]),
                                            high=np.array([100, 10, 373, 5, 2000]),
                                            dtype=np.float32)

        # فضای عمل (Action Space): [ΔQ_in, ΔP_in, ΔR_c, ΔN]
        self.action_space = spaces.Box(low=np.array([-10, -1, -0.1, -50]),
                                       high=np.array([10, 1, 0.1, 50]),
                                       dtype=np.float32)

        # پارامترهای اولیه
        self.state = np.array([50.0, 1.0, 300.0, 3.0, 1000.0]) # [Q_in, P_in, T_in, R_c, N]
        self.gamma = 1.4 # نسبت ظرفیت‌های خاص گاز
        self.cp = 1000.0 # گرمای مخصوص ثابت فشار (J/kg.K)

    def reset(self):
        # بازنشانی حالت به حالت اولیه
        self.state = np.array([50.0, 1.0, 300.0, 3.0, 1000.0])
        return self.state

    def step(self, action):
        # اعمال عمل به حالت فعلی
        Q_in, P_in, T_in, R_c, N = self.state
        delta_Q_in, delta_P_in, delta_R_c, delta_N = action

        # بروزرسانی پارامترها
        Q_in += delta_Q_in
        P_in += delta_P_in
        R_c += delta_R_c
        N += delta_N

        # محدود کردن مقادیر در بازه مجاز
        Q_in = np.clip(Q_in, 0, 100)
        P_in = np.clip(P_in, 1, 10)
        R_c = np.clip(R_c, 1, 5)
        N = np.clip(N, 500, 2000)

        # محاسبه خروجی‌ها
        P_out = P_in * R_c
        T_out = T_in * (R_c ** ((self.gamma - 1) / self.gamma))
        energy_consumption = Q_in * self.cp * (T_out - T_in)
        efficiency = (P_out - P_in) / energy_consumption if energy_consumption > 0 else 0

        # به روز رسانی حالت
        self.state = np.array([Q_in, P_in, T_in, R_c, N])

        # تعریف تابع جایزه
        reward = efficiency - (energy_consumption / 1e6) - abs(T_out - 350) # بهینه‌سازی کارایی و دما

        # تشخیص پایان اپیزود
        done = False
        if efficiency < 0.1 or energy_consumption > 1e6:
            done = True

        info = {}  # اضافه کردن info برای رفع خطا
        return self.state, reward, done, info

# Genetic Algorithm Implementation
def genetic_algorithm(env, population_size=20, generations=50, mutation_rate=0.1):
    # Define the bounds for actions
    action_low = env.action_space.low
    action_high = env.action_space.high

    # Initialize population
    population = np.random.uniform(action_low, action_high, (population_size, len(action_low)))

    for generation in range(generations):
        # Evaluate fitness of each individual
        fitness_scores = []
        for individual in population:
            obs = env.reset()
            total_reward = 0
            done = False

            while not done:
                obs, reward, done, _ = env.step(individual)
                total_reward += reward

            fitness_scores.append(total_reward)

        # Print the best fitness score in this generation
        best_fitness = max(fitness_scores)
        print(f"Generation {generation}: Best Fitness = {best_fitness}")

        # Select parents based on fitness scores
        probabilities = np.array(fitness_scores) / sum(fitness_scores)
        selected_indices = np.random.choice(range(population_size), size=population_size, p=probabilities)
        parents = population[selected_indices]

        # Crossover
        offspring = []
        for i in range(0, population_size, 2):
            parent1, parent2 = parents[i], parents[i + 1]
            crossover_point = np.random.randint(1, len(parent1))
            child1 = np.concatenate((parent1[:crossover_point], parent2[crossover_point:]))
            child2 = np.concatenate((parent2[:crossover_point], parent1[crossover_point:]))
            offspring.extend([child1, child2])

        # Mutation
        for individual in offspring:
            if np.random.rand() < mutation_rate:
                mutation_index = np.random.randint(len(individual))
                individual[mutation_index] = np.random.uniform(action_low[mutation_index], action_high[mutation_index])

        # Replace population with offspring
        population = np.array(offspring)

    # Return the best individual
    best_index = np.argmax(fitness_scores)
    return population[best_index]


# Run Genetic Algorithm
env = CompressorEnv()
best_action = genetic_algorithm(env, population_size=20, generations=50, mutation_rate=0.1)

# Print the result in a user-friendly format
print(f"ΔQ_in = {best_action[0]:.4f}")
print("این مقدار نشان‌دهنده تغییر در نرخ جریان ورودی (Q_in) است.")
print(f"به این معنی که بهترین عمل پیشنهاد می‌کند نرخ جریان ورودی را حدوداً {abs(best_action[0]):.2f} واحد {'افزایش' if best_action[0] > 0 else 'کاهش'} دهید.\n")

print(f"ΔP_in = {best_action[1]:.4f}")
print("این مقدار نشان‌دهنده تغییر در فشار ورودی (P_in) است.")
print(f"به این معنی که فشار ورودی را باید حدوداً {abs(best_action[1]):.2f} واحد {'افزایش' if best_action[1] > 0 else 'کاهش'} دهید.\n")

print(f"ΔR_c = {best_action[2]:.4f}")
print("این مقدار نشان‌دهنده تغییر در نسبت فشار فشرده‌ساز (R_c) است.")
print(f"به این معنی که نسبت فشار را باید حدوداً {abs(best_action[2]):.2f} واحد {'افزایش' if best_action[2] > 0 else 'کاهش'} دهید.\n")

print(f"ΔN = {best_action[3]:.4f}")
print("این مقدار نشان‌دهنده تغییر در سرعت چرخش فشرده‌ساز (N) است.")
print(f"به این معنی که سرعت چرخش را باید حدوداً {abs(best_action[3]):.2f} واحد {'افزایش' if best_action[3] > 0 else 'کاهش'} دهید.")

Generation 0: Best Fitness = -64.01060523782355
Generation 1: Best Fitness = -62.764902162205736
Generation 2: Best Fitness = -63.838711036860154
Generation 3: Best Fitness = -63.838711036860154
Generation 4: Best Fitness = -63.83871086391697
Generation 5: Best Fitness = -63.83871086391697
Generation 6: Best Fitness = -63.08876327151208
Generation 7: Best Fitness = -64.010605273152
Generation 8: Best Fitness = -63.624784123698554
Generation 9: Best Fitness = -64.01060544076525
Generation 10: Best Fitness = -63.624784123698554
Generation 11: Best Fitness = -64.01060544076525
Generation 12: Best Fitness = -65.48673983555051
Generation 13: Best Fitness = -65.80111634347097
Generation 14: Best Fitness = -65.52695320828252
Generation 15: Best Fitness = -65.52695320828252
Generation 16: Best Fitness = -65.8011161827452
Generation 17: Best Fitness = -65.52695320828252
Generation 18: Best Fitness = -62.72134683964714
Generation 19: Best Fitness = -62.72134683964714
Generation 20: Best Fitness 

In [20]:
class CompressorSimulator:
    def __init__(self, env, model):
        """
        Initialize the simulator.
        :param env: The compressor environment (CompressorEnv).
        :param model: A pre-trained model (e.g., genetic algorithm or RL model).
        """
        self.env = env
        self.model = model

    def generate_data(self, num_samples=1000000):
        """
        Generate sensor data for the compressor system.
        :param num_samples: Number of data points to generate.
        :return: List of states and corresponding actions.
        """
        states = []
        actions = []

        obs = self.env.reset()
        for _ in range(num_samples):
            # Predict action using the model
            action, _ = self.model.predict(obs)

            # Store the current state and predicted action
            states.append(obs)
            actions.append(action)

            # Step the environment
            obs, _, done, _ = self.env.step(action)
            if done:
                obs = self.env.reset()

        return np.array(states), np.array(actions)

    def simulate_real_time(self, num_steps=100):
        """
        Simulate the compressor system in real-time and display outputs.
        :param num_steps: Number of steps to simulate.
        """
        obs = self.env.reset()
        for step in range(num_steps):
            # Predict action using the model
            action, _ = self.model.predict(obs)

            # Step the environment
            obs, reward, done, _ = self.env.step(action)

            # Display the results
            print(f"Step {step + 1}:")
            print(f"  State: {obs}")
            print(f"  Predicted Action: {action}")
            print(f"  Reward: {reward:.4f}\n")

            if done:
                obs = self.env.reset()


# Example Usage
if __name__ == "__main__":
    # Create the environment
    env = CompressorEnv()

    # Load your pre-trained model (replace this with your actual model)
    from stable_baselines3 import PPO
    model = PPO.load("compressor_optimization_model")

    # Create the simulator
    simulator = CompressorSimulator(env, model)

    # Generate 1,000,000 data points
    print("Generating 1,000,000 data points...")
    states, actions = simulator.generate_data(num_samples=1000000)
    print("Data generation complete.")

    # Simulate in real-time for 100 steps
    print("\nSimulating in real-time for 100 steps:")
    simulator.simulate_real_time(num_steps=100)

Generating 1,000,000 data points...
Data generation complete.

Simulating in real-time for 100 steps:
Step 1:
  State: [ 43.96978712   1.20782398 300.           3.1        999.04325634]
  Predicted Action: [-6.030213    0.20782398  0.1        -0.95674366]
  Reward: -69.5204

Step 2:
  State: [  43.92019701    1.32110226  300.            3.1        1000.02205462]
  Predicted Action: [-6.079803    0.32110226  0.1         0.02205462]
  Reward: -69.5147

Step 3:
  State: [ 43.92714739   1.         300.           3.1        999.09252083]
  Predicted Action: [-6.0728526  -1.          0.1        -0.90747917]
  Reward: -69.5155

Step 4:
  State: [4.39456983e+01 1.00000000e+00 3.00000000e+02 3.10000000e+00
 1.00093182e+03]
  Predicted Action: [-6.0543017  -0.10602164  0.1         0.9318199 ]
  Reward: -69.5176

Step 5:
  State: [ 43.85265446   1.         300.           3.1        999.59847495]
  Predicted Action: [-6.1473455  -0.12315427  0.1        -0.40152505]
  Reward: -69.5070

Step 6:
  St

In [21]:
class CompressorSimulator:
    def __init__(self, env, model):
        """
        Initialize the simulator.
        :param env: The compressor environment (CompressorEnv).
        :param model: A pre-trained model (e.g., genetic algorithm or RL model).
        """
        self.env = env
        self.model = model

    def generate_data(self, num_samples=1000000):
        """
        Generate sensor data for the compressor system.
        :param num_samples: Number of data points to generate.
        :return: List of states and corresponding actions.
        """
        states = []
        actions = []
        obs = self.env.reset()
        for _ in range(num_samples):
            # Predict action using the model
            action, _ = self.model.predict(obs)

            # Store the current state and predicted action
            states.append(obs)
            actions.append(action)

            # Step the environment
            obs, _, done, _ = self.env.step(action)
            if done:
                obs = self.env.reset()
        return np.array(states), np.array(actions)

    def simulate_real_time(self, num_steps=100):
        """
        Simulate the compressor system in real-time and display outputs.
        :param num_steps: Number of steps to simulate.
        """
        obs = self.env.reset()
        for step in range(num_steps):
            # Predict action using the model
            action, _ = self.model.predict(obs)

            # Step the environment
            obs, reward, done, _ = self.env.step(action)

            # Display the results in a user-friendly format
            print(f"Step {step + 1}:")
            print("وضعیت فعلی سیستم:")
            print(f"  Q_in = {obs[0]:.2f} : نرخ جریان ورودی.")
            print(f"  P_in = {obs[1]:.2f} : فشار ورودی.")
            print(f"  T_in = {obs[2]:.2f} : دمای ورودی.")
            print(f"  R_c = {obs[3]:.2f} : نسبت فشار فشرده‌ساز.")
            print(f"  N = {obs[4]:.2f} : سرعت چرخش فشرده‌ساز.")

            print("\nعمل پیشنهادی:")
            print(f"  ΔQ_in = {action[0]:+.2f} : نرخ جریان ورودی را حدوداً {abs(action[0]):.2f} واحد {'افزایش' if action[0] > 0 else 'کاهش'} دهید.")
            print(f"  ΔP_in = {action[1]:+.2f} : فشار ورودی را حدوداً {abs(action[1]):.2f} واحد {'افزایش' if action[1] > 0 else 'کاهش'} دهید.")
            print(f"  ΔR_c = {action[2]:+.2f} : نسبت فشار را حدوداً {abs(action[2]):.2f} واحد {'افزایش' if action[2] > 0 else 'کاهش'} دهید.")
            print(f"  ΔN = {action[3]:+.2f} : سرعت چرخش را حدوداً {abs(action[3]):.2f} واحد {'افزایش' if action[3] > 0 else 'کاهش'} دهید.")

            print(f"\nپاداش (Reward): {reward:.4f}")
            print("-" * 50)

            if done:
                obs = self.env.reset()


# Example Usage
if __name__ == "__main__":
    # Create the environment
    env = CompressorEnv()

    # Load your pre-trained model (replace this with your actual model)
    from stable_baselines3 import PPO
    model = PPO.load("compressor_optimization_model")

    # Create the simulator
    simulator = CompressorSimulator(env, model)

    # Generate 1,000,000 data points
    print("Generating 1,000,000 data points...")
    states, actions = simulator.generate_data(num_samples=1000000)
    print("Data generation complete.")

    # Simulate in real-time for 100 steps
    print("\nSimulating in real-time for 100 steps:")
    simulator.simulate_real_time(num_steps=100)

Generating 1,000,000 data points...
Data generation complete.

Simulating in real-time for 100 steps:
Step 1:
وضعیت فعلی سیستم:
  Q_in = 43.96 : نرخ جریان ورودی.
  P_in = 1.00 : فشار ورودی.
  T_in = 300.00 : دمای ورودی.
  R_c = 3.10 : نسبت فشار فشرده‌ساز.
  N = 1000.40 : سرعت چرخش فشرده‌ساز.

عمل پیشنهادی:
  ΔQ_in = -6.04 : نرخ جریان ورودی را حدوداً 6.04 واحد کاهش دهید.
  ΔP_in = -1.00 : فشار ورودی را حدوداً 1.00 واحد کاهش دهید.
  ΔR_c = +0.10 : نسبت فشار را حدوداً 0.10 واحد افزایش دهید.
  ΔN = +0.40 : سرعت چرخش را حدوداً 0.40 واحد افزایش دهید.

پاداش (Reward): -69.5190
--------------------------------------------------
Step 2:
وضعیت فعلی سیستم:
  Q_in = 43.93 : نرخ جریان ورودی.
  P_in = 1.81 : فشار ورودی.
  T_in = 300.00 : دمای ورودی.
  R_c = 3.10 : نسبت فشار فشرده‌ساز.
  N = 1000.64 : سرعت چرخش فشرده‌ساز.

عمل پیشنهادی:
  ΔQ_in = -6.07 : نرخ جریان ورودی را حدوداً 6.07 واحد کاهش دهید.
  ΔP_in = +0.81 : فشار ورودی را حدوداً 0.81 واحد افزایش دهید.
  ΔR_c = +0.10 : نسبت فشار را حدوداً 0.

In [22]:
!pip install serial

  and should_run_async(code)


Collecting serial
  Downloading serial-0.0.97-py2.py3-none-any.whl.metadata (889 bytes)
Collecting iso8601>=0.1.12 (from serial)
  Downloading iso8601-2.1.0-py3-none-any.whl.metadata (3.7 kB)
Downloading serial-0.0.97-py2.py3-none-any.whl (40 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.9/40.9 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading iso8601-2.1.0-py3-none-any.whl (7.5 kB)
Installing collected packages: iso8601, serial
Successfully installed iso8601-2.1.0 serial-0.0.97


In [None]:
import numpy as np
import time
import random  # For simulating sensor data

class CompressorEnv:
    def __init__(self):
        # فضای حالت (State Space): [Q_in, P_in, T_in, R_c, N]
        self.observation_space = {
            "low": np.array([0, 1, 273, 1, 500]),
            "high": np.array([100, 10, 373, 5, 2000])
        }
        # فضای عمل (Action Space): [ΔQ_in, ΔP_in, ΔR_c, ΔN]
        self.action_space = {
            "low": np.array([-10, -1, -0.1, -50]),
            "high": np.array([10, 1, 0.1, 50])
        }
        # پارامترهای اولیه
        self.state = np.array([50.0, 1.0, 300.0, 3.0, 1000.0])  # [Q_in, P_in, T_in, R_c, N]
        self.gamma = 1.4  # نسبت ظرفیت‌های خاص گاز
        self.cp = 1000.0  # گرمای مخصوص ثابت فشار (J/kg.K)

    def reset(self):
        self.state = np.array([50.0, 1.0, 300.0, 3.0, 1000.0])
        return self.state

    def step(self, action):
        Q_in, P_in, T_in, R_c, N = self.state
        delta_Q_in, delta_P_in, delta_R_c, delta_N = action

        # بروزرسانی پارامترها
        Q_in += delta_Q_in
        P_in += delta_P_in
        R_c += delta_R_c
        N += delta_N

        # محدود کردن مقادیر در بازه مجاز
        Q_in = np.clip(Q_in, 0, 100)
        P_in = np.clip(P_in, 1, 10)
        R_c = np.clip(R_c, 1, 5)
        N = np.clip(N, 500, 2000)

        # محاسبه خروجی‌ها
        P_out = P_in * R_c
        T_out = T_in * (R_c ** ((self.gamma - 1) / self.gamma))
        energy_consumption = Q_in * self.cp * (T_out - T_in)
        efficiency = (P_out - P_in) / energy_consumption if energy_consumption > 0 else 0

        # به روز رسانی حالت
        self.state = np.array([Q_in, P_in, T_in, R_c, N])

        # تعریف تابع جایزه
        reward = efficiency - (energy_consumption / 1e6) - abs(T_out - 350)

        # تشخیص پایان اپیزود
        done = False
        if efficiency < 0.1 or energy_consumption > 1e6:
            done = True

        return self.state, reward, done


import socket

def get_sensor_data():
    HOST = '127.0.0.1'  # Replace with your sensor IP
    PORT = 65432        # Replace with your sensor port
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((HOST, PORT))
        data = s.recv(1024).decode('utf-8').strip()
    sensor_values = list(map(float, data.split(',')))
    return np.array(sensor_values)



import serial

def get_sensor_data():
    ser = serial.Serial('COM3', 9600)  # Replace 'COM3' with your port
    line = ser.readline().decode('utf-8').strip()
    sensor_values = list(map(float, line.split(',')))
    ser.close()
    return np.array(sensor_values)


# Main Simulation Loop
def main():
    env = CompressorEnv()
    num_steps = 10  # Number of simulation steps

    for step in range(num_steps):
        print(f"\nStep {step + 1}:")

        # Fetch sensor data (current state)
        sensor_data = get_sensor_data()
        print("Sensor Data (Current State):")
        print(f"  Q_in = {sensor_data[0]:.2f}")
        print(f"  P_in = {sensor_data[1]:.2f}")
        print(f"  T_in = {sensor_data[2]:.2f}")
        print(f"  R_c = {sensor_data[3]:.2f}")
        print(f"  N = {sensor_data[4]:.2f}")

        # Set the sensor data as the current state
        env.state = sensor_data

        # Predict an action (replace this with your model's prediction logic)
        action = np.array([-5, -0.5, 0.05, 10])  # Example action

        # Step the environment
        next_state, reward, done = env.step(action)

        # Display results
        print("\nAction Taken:")
        print(f"  ΔQ_in = {action[0]:+.2f}")
        print(f"  ΔP_in = {action[1]:+.2f}")
        print(f"  ΔR_c = {action[2]:+.2f}")
        print(f"  ΔN = {action[3]:+.2f}")

        print("\nNext State:")
        print(f"  Q_in = {next_state[0]:.2f}")
        print(f"  P_in = {next_state[1]:.2f}")
        print(f"  T_in = {next_state[2]:.2f}")
        print(f"  R_c = {next_state[3]:.2f}")
        print(f"  N = {next_state[4]:.2f}")

        print(f"\nReward: {reward:.4f}")
        print(f"Episode Done: {done}")

        # Simulate a delay for real-time behavior
        time.sleep(1)


if __name__ == "__main__":
    main()

In [None]:
import numpy as np
import gym
from gym import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.callbacks import EvalCallback

class CompressorEnv(gym.Env):
    def __init__(self):
        super(CompressorEnv, self).__init__()

        # فضای حالت (State Space): [Q_in, P_in, T_in, R_c, N]
        self.observation_space = spaces.Box(
            low=np.array([0, 1, 273, 1, 500]),
            high=np.array([100, 10, 373, 5, 2000]),
            dtype=np.float32
        )

        # فضای عمل (Action Space): [ΔQ_in, ΔP_in, ΔR_c, ΔN]
        self.action_space = spaces.Box(
            low=np.array([-10, -1, -0.1, -50]),
            high=np.array([10, 1, 0.1, 50]),
            dtype=np.float32
        )

        # پارامترهای اولیه
        self.state = np.array([50.0, 1.0, 300.0, 3.0, 1000.0])  # [Q_in, P_in, T_in, R_c, N]
        self.gamma = 1.4  # نسبت ظرفیت‌های خاص گاز
        self.cp = 1000.0  # گرمای مخصوص ثابت فشار (J/kg.K)

    def reset(self):
        # بازنشانی حالت به حالت اولیه
        self.state = np.array([50.0, 1.0, 300.0, 3.0, 1000.0])
        return self.normalize_state(self.state)

    def step(self, action):
        # اعمال عمل به حالت فعلی
        Q_in, P_in, T_in, R_c, N = self.denormalize_state(self.state)
        delta_Q_in, delta_P_in, delta_R_c, delta_N = action

        # بروزرسانی پارامترها
        Q_in += delta_Q_in
        P_in += delta_P_in
        R_c += delta_R_c
        N += delta_N

        # محدود کردن مقادیر در بازه مجاز
        Q_in = np.clip(Q_in, 0, 100)
        P_in = np.clip(P_in, 1, 10)
        R_c = np.clip(R_c, 1, 5)
        N = np.clip(N, 500, 2000)

        # محاسبه خروجی‌ها
        P_out = P_in * R_c
        T_out = T_in * (R_c ** ((self.gamma - 1) / self.gamma))
        energy_consumption = Q_in * self.cp * (T_out - T_in)
        efficiency = (P_out - P_in) / energy_consumption if energy_consumption > 0 else 0

        # به روز رسانی حالت
        self.state = np.array([Q_in, P_in, T_in, R_c, N])

        # تعریف تابع جایزه (بهبود شده)
        reward = (
            10 * efficiency  # وزن بالا برای کارایی
            - 0.0001 * energy_consumption  # جریمه برای مصرف انرژی
            - 0.1 * abs(T_out - 350)  # جریمه برای انحراف دما
        )

        # تشخیص پایان اپیزود
        done = False
        if efficiency < 0.1 or energy_consumption > 1e6:
            done = True

        info = {}  # اضافه کردن info برای رفع خطا
        return self.normalize_state(self.state), reward, done, info

    def normalize_state(self, state):
        """Normalize state values to [0, 1] range."""
        low = self.observation_space.low
        high = self.observation_space.high
        return (state - low) / (high - low)

    def denormalize_state(self, normalized_state):
        """Denormalize state values from [0, 1] range."""
        low = self.observation_space.low
        high = self.observation_space.high
        return normalized_state * (high - low) + low


# Train the Model
def train_model():
    # Create the environment
    env = CompressorEnv()

    # Check the environment
    # check_env(env)

    # Define the model (PPO with optimized hyperparameters)
    model = PPO(
        "MlpPolicy",
        env,
        learning_rate=3e-4,
        n_steps=2048,
        batch_size=64,
        n_epochs=10,
        gamma=0.99,
        gae_lambda=0.95,
        clip_range=0.2,
        ent_coef=0.01,
        verbose=1
    )

    # Define evaluation callback
    eval_callback = EvalCallback(
        env,
        best_model_save_path="./best_model/",
        log_path="./logs/",
        eval_freq=1000,
        deterministic=True,
        render=False
    )

    # Train the model
    model.learn(total_timesteps=500_000, callback=eval_callback)

    # Save the model
    model.save("compressor_optimization_model")


# Simulate Real-Time Performance
def simulate_real_time():
    # Load the trained model
    model = PPO.load("compressor_optimization_model")

    # Create the environment
    env = CompressorEnv()

    # Simulate in real-time
    obs = env.reset()
    for step in range(100):
        # Predict action
        action, _ = model.predict(obs)

        # Step the environment
        obs, reward, done, _ = env.step(action)

        # Display results
        print(f"Step {step + 1}:")
        print(f"  State: {env.denormalize_state(obs)}")
        print(f"  Action: {action}")
        print(f"  Reward: {reward:.4f}")

        if done:
            obs = env.reset()


if __name__ == "__main__":
    # Train the model
    print("Training the model...")
    train_model()

    # Simulate real-time performance
    print("\nSimulating real-time performance...")
    simulate_real_time()

Training the model...
Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




Eval num_timesteps=1000, episode_reward=-181499.41 +/- 0.00
Episode length: 1.00 +/- 0.00
----------------------------------
| eval/              |           |
|    mean_ep_length  | 1         |
|    mean_reward     | -1.81e+05 |
| time/              |           |
|    total_timesteps | 1000      |
----------------------------------




[1;30;43mStreaming output truncated to the last 5000 lines.[0m
|    learning_rate        | 0.0003        |
|    loss                 | 1.62e+10      |
|    n_updates            | 2570          |
|    policy_gradient_loss | 0             |
|    std                  | 1.01          |
|    value_loss           | 3.24e+10      |
-------------------------------------------
Eval num_timesteps=528000, episode_reward=-181499.41 +/- 0.00
Episode length: 1.00 +/- 0.00
----------------------------------
| eval/              |           |
|    mean_ep_length  | 1         |
|    mean_reward     | -1.81e+05 |
| time/              |           |
|    total_timesteps | 528000    |
----------------------------------
----------------------------------
| rollout/           |           |
|    ep_len_mean     | 1         |
|    ep_rew_mean     | -1.81e+05 |
| time/              |           |
|    fps             | 500       |
|    iterations      | 258       |
|    time_elapsed    | 1056      |
|    total