# Production System Assignment

## Production System with PSP and RL agent optimization

In [15]:
from __future__ import annotations

import random
import statistics
from collections.abc import Callable, Sequence

import numpy as np
import simpy
from scipy import stats
from simpy.events import ProcessGenerator
from lib.server import Server
from lib.job import Job
from matplotlib import pyplot as plt
from lib.config import SEEDS
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3 import DQN
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import VecNormalize
from stable_baselines3.common.running_mean_std import RunningMeanStd
import os

In [16]:
NUM_MACHINES = 6

In [13]:
class SystemEnv(gym.Env):
    def __init__(
        self,
        inter_arrival_time_distribution: Callable[[], float],
        processing_time_per_family_distribution: list[Callable[[], float]],
        families_distribution: Callable[[], float],
        due_dates_distribution: Callable[[], float],
        routing_distribution: dict[int, list[Callable[[], float]]],
        routing_prob: dict[int, list[float]],
        release_interval: float = 5.0,
        episode_duration: float = 60 * 24 * 7,
        reward_weights: dict[str, float] = None
    ) -> None:
        super().__init__()
        self.env: simpy.Environment | None = None
        self.inter_arrival_time_distribution = inter_arrival_time_distribution
        self.processing_time_per_family_distribution = processing_time_per_family_distribution
        self.families_distribution = families_distribution
        self.due_dates_distribution = due_dates_distribution
        self.routing_distribution = routing_distribution
        self.routing_prob = routing_prob
        self.release_interval = release_interval
        self.episode_duration = episode_duration

        self.running_throughput = RunningMeanStd()
        self.running_tardiness = RunningMeanStd()
        self.running_wip = RunningMeanStd()

        self.reward_weights = reward_weights if reward_weights is not None else {
            'throughput': 1.0,
            'wip_penalty': -1.0,
            'tardiness_penalty': -1.0
        }

        low_bounds = np.array([
            0.0,
            0.0,
            -5000.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0,
            0.0
        ], dtype=np.float32)

        high_bounds = np.array([
            1.0,
            500.0,
            5000.0,
            1000.0,
            10000.0,
            10000.0,
            10000.0,
            10000.0,
            10000.0,
            10000.0
        ], dtype=np.float32)

        self.observation_space = spaces.Box(low=low_bounds, high=high_bounds, dtype=np.float32)

        # action space: 0 = Don't Release, 1 = Release
        self.action_space = spaces.Discrete(2)

        self.machines: list[Server] = []

        self.jobs: list[Job] = []

        self.pre_shop_pool: list[Job] = []

        self.idx_counter = 0

        self.jobs_completed_this_episode = 0
        self.total_tardiness_this_episode = 0.0
        self.current_wip = 0


    def _set_seed(self, seed: int | None = None):
        if seed is not None:
            random.seed(seed)
            np.random.seed(seed)

    def reset(self, seed: int | None = None, options: dict | None = None) -> tuple[np.ndarray, dict]:
        super().reset(seed=seed)
        self._set_seed(seed)

        self.env = simpy.Environment()
        self.machines = [Server(self.env, 1, f"WC{i+1}") for i in range(6)]
        self.pre_shop_pool = []
        self.jobs = []

        self.idx_counter = 0

        self.jobs_completed_this_episode = 0
        self.total_tardiness_this_episode = 0.0
        self.current_wip = 0

        self.env.process(self._run_job_arrivals())

        observation = self._get_obs()
        info = self._get_info()

        return observation, info

    def step(self, action: int) -> tuple[np.ndarray, float, bool, bool, dict]:
        assert self.action_space.contains(action), f"Non valid action: {action}"

        jobs_completed_before_step = self.jobs_completed_this_episode
        tardiness_before_step = self.total_tardiness_this_episode
        wip_at_start_of_interval = self.current_wip

        if action == 1:
            self._release_job_from_psp()

        self.env.run(until=self.env.now + self.release_interval)

        jobs_completed_in_interval = self.jobs_completed_this_episode - jobs_completed_before_step
        tardiness_in_interval = self.total_tardiness_this_episode - tardiness_before_step

        # self.current_wip = len([job for job in self.jobs if job.in_system and not job.done])
        if self.current_wip != 0:
            print(f"Current wip: {self.current_wip}")

        reward = self._calculate_reward(
            jobs_completed_in_interval,
            tardiness_in_interval,
            wip_at_start_of_interval
        )

        terminated = self.env.now >= self.episode_duration
        truncated = False

        observation = self._get_obs()
        info = self._get_info()

        return observation, reward, terminated, truncated, info

    def _get_obs(self) -> np.ndarray:
        presence_job_top = 1.0 if self.pre_shop_pool else 0.0

        job_top_proc_time = 0.0
        job_top_urgency = 0.0
        if presence_job_top == 1.0:
            job_top = self.pre_shop_pool[0]
            for m in job_top.routing:
                job_top_proc_time += job_top.process_time
            job_top_urgency = job_top.due_date - self.env.now

        raw_wip = self.current_wip

        machine_workloads = []
        for machine in self.machines:
            current_machine_workload = 0.0

            if machine.job_on_machine is not None:
                time_spent_on_op = self.env.now - machine.job_start_time
                remaining_op_time = machine.job_on_machine.process_time - time_spent_on_op
                current_machine_workload += max(0, remaining_op_time)

            for request in machine.queue:
                job_in_queue = request.associated_job
                current_machine_workload += job_in_queue.process_time

            machine_workloads.append(current_machine_workload)

        return np.array([
            presence_job_top,
            job_top_proc_time,
            job_top_urgency,
            float(raw_wip),
            *machine_workloads
        ], dtype=np.float32)

    def _run_job_arrivals(self) -> ProcessGenerator:
        while True:
            timeout_inter_arrival = self.inter_arrival_time_distribution()

            weight = self.families_distribution()
            if weight <= 0.1:
                family = 1
            elif weight <= 0.62:
                family = 2
            else:
                family = 3
            processing_time = self.processing_time_per_family_distribution[family-1]()
            due_date_offset = self.due_dates_distribution()

            family_routing_distr = self.routing_distribution[family]
            family_routing_prob = self.routing_prob[family]

            job_routing = []
            for i in range(6):
                if family_routing_distr[i]() <= family_routing_prob[i]:
                    job_routing.append(self.machines[i])

            yield self.env.timeout(timeout_inter_arrival)

            job = Job(
                env=self.env,
                routing=job_routing,
                arrival_time=self.env.now,
                process_time=processing_time,
                due_date=(self.env.now + due_date_offset),
                idx=self.idx_counter,
                family="F{}".format(family),
                completion_callback=self._job_completed_callback
            )

            # job_proc_time = 0
            # for m in job.routing:
                # job_proc_time += job.process_time

            # print(job_proc_time)

            self.idx_counter += 1
            self.jobs.append(job)
            self.pre_shop_pool.append(job)

    def _release_job_from_psp(self):
        if self.pre_shop_pool:
            job_to_release = self.pre_shop_pool.pop(0)
            job_to_release.in_system = True
            self.current_wip += 1
            self.env.process(job_to_release.main())


    def _job_completed_callback(self, job: Job):
        job.in_system = False
        self.current_wip -= 1
        self.jobs_completed_this_episode += 1
        self.total_tardiness_this_episode += job.tardiness


    def _calculate_reward(self, jobs_completed_in_interval: int, tardiness_in_interval: float, wip_at_start_of_interval: int) -> float:
        self.running_throughput.update(np.array([[jobs_completed_in_interval]]))
        self.running_tardiness.update(np.array([[tardiness_in_interval]]))
        self.running_wip.update(np.array([[wip_at_start_of_interval]]))

        norm_throughput = (jobs_completed_in_interval - self.running_throughput.mean[0]) / (np.sqrt(self.running_throughput.var[0]) + 1e-8)
        norm_tardiness = (tardiness_in_interval - self.running_tardiness.mean[0]) / (np.sqrt(self.running_tardiness.var[0]) + 1e-8)
        norm_wip = (wip_at_start_of_interval - self.running_wip.mean[0]) / (np.sqrt(self.running_wip.var[0]) + 1e-8)


        reward = (self.reward_weights['throughput'] * norm_throughput) + \
                 (self.reward_weights['tardiness_penalty'] * norm_tardiness) + \
                 (self.reward_weights['wip_penalty'] * norm_wip)
        return reward

    def _get_info(self) -> dict:
        info = {
            "current_time": self.env.now,
            "jobs_in_psp": len(self.pre_shop_pool),
            "jobs_in_system_wip": self.current_wip,
            "jobs_completed_episode": self.jobs_completed_this_episode,
            "total_tardiness_episode": self.total_tardiness_this_episode
        }
        return info


In [14]:
def make_env():
    return SystemEnv(
        inter_arrival_time_distribution=lambda: random.expovariate(lambd=0.65),
        processing_time_per_family_distribution=[
            lambda: random.gammavariate(2,2),
            lambda: random.gammavariate(4,0.5),
            lambda: random.gammavariate(6,1/6)
        ],
        families_distribution=lambda: random.random(),
        routing_distribution={
            1: [lambda: random.random(), lambda: random.random(), lambda: random.random(),lambda: random.random(),lambda: random.random(),lambda: random.random()],
            2: [lambda: random.random(), lambda: random.random(), lambda: random.random(),lambda: random.random(),lambda: random.random(),lambda: random.random()],
            3: [lambda: random.random(), lambda: random.random(), lambda: random.random(),lambda: random.random(),lambda: random.random(),lambda: random.random()]
        },
        routing_prob={
            1: [1,1,0,1,1,1],
            2: [0.8, 0.8, 1, 0.8, 0.8, 0.75],
            3: [0,0,1,0,0,0.75]
        },
        due_dates_distribution=lambda: random.uniform(30,50),
        reward_weights={
            'throughput': 1.0,
            'wip_penalty': -0.5,
            'tardiness_penalty': -0.5
        }
    )

num_envs = 4
seed = 42

vec_env = make_vec_env(make_env, n_envs=num_envs, seed=seed)
vec_env = VecNormalize(
    vec_env,
    norm_obs=True,
    norm_reward=True,
    clip_obs=10,
    clip_reward=10,
    gamma=0.99
)

model = DQN(
    "MlpPolicy",
    vec_env,
    verbose=1,
    learning_rate=1e-4,
    buffer_size=50000,
    learning_starts=1000,
    batch_size=32,
    tau=1.0,
    gamma=0.99,
    train_freq=(1, "step"),
    gradient_steps=1,
    exploration_fraction=0.1,
    exploration_initial_eps=1.0,
    exploration_final_eps=0.05,
    target_update_interval=1000,
    seed=seed
)

print("Starting DQN training...")
total_timesteps = 1000000
model.learn(total_timesteps=total_timesteps, log_interval=10)
print("DQN training finished")

model.save("production_system_agent_dqn_final")
vec_env.save("vec_normalize_stats_dqn_final.pkl")

Using cpu device
Starting DQN training...
Current wip: 1
Current wip: 1
Current wip: 1
Current wip: 1
Current wip: 1
Current wip: 2
Current wip: 1
Current wip: 1
Current wip: 2
Current wip: 2
Current wip: 1
Current wip: 1
Current wip: 1
Current wip: 1
Current wip: 1
Current wip: 1
Current wip: 1
Current wip: 1
Current wip: 2
Current wip: 1
Current wip: 1
Current wip: 2
Current wip: 1
Current wip: 1
Current wip: 3
Current wip: 1
Current wip: 2
Current wip: 1
Current wip: 1
Current wip: 1
Current wip: 2
Current wip: 1
Current wip: 2
Current wip: 2
Current wip: 1
Current wip: 2
Current wip: 1
Current wip: 2
Current wip: 1
Current wip: 2
Current wip: 1
Current wip: 2
Current wip: 2
Current wip: 1
Current wip: 2
Current wip: 2
Current wip: 1
Current wip: 1
Current wip: 3
Current wip: 1
Current wip: 3
Current wip: 3
Current wip: 1
Current wip: 3
Current wip: 1
Current wip: 1
Current wip: 2
Current wip: 1
Current wip: 1
Current wip: 3
Current wip: 1
Current wip: 4
Current wip: 1
Current wip: 


KeyboardInterrupt



In [16]:
print("\nStarting evaluation phase...")

eval_env = make_vec_env(make_env, n_envs=1, seed=seed + 1)

eval_env = VecNormalize.load("vec_normalize_stats_dqn_final.pkl", eval_env)

eval_env.training = False
eval_env.norm_reward = False

model = DQN.load("production_system_agent_dqn_final", env=eval_env)

num_eval_episodes = 10
episode_rewards = []
episode_tardiness = []
episode_throughput = []


for episode in range(num_eval_episodes):
    obs = eval_env.reset()
    print(f"Type of reset_result: {type(obs)}")
    print(f"Value of reset_result: {obs}")

    done = False
    total_reward = 0
    while not done:
        action, _states = model.predict(obs, deterministic=True)
        obs, reward, done_status_array, info = eval_env.step(action)
        print(obs, reward, done_status_array, info)
        total_reward += reward[0]
        if done_status_array[0]:
            episode_rewards.append(total_reward)
            episode_tardiness.append(info[0]["total_tardiness_episode"])
            episode_throughput.append(info[0]["jobs_completed_episode"])
            done = True

print(f"\nEvaluation on {num_eval_episodes} episodes:")
print(f"Average Reward per episode: {np.mean(episode_rewards):.2f}")
print(f"Average total tardiness per episode: {np.mean(episode_tardiness):.2f}")
print(f"Average Throughput per episode: {np.mean(episode_throughput):.2f}")

eval_env.close()
vec_env.close()



Starting evaluation phase...
Type of reset_result: <class 'numpy.ndarray'>
Value of reset_result: [[ 9.8931116e-07  4.4548702e-01  1.7232019e+00 -4.0032331e-02
   0.0000000e+00  0.0000000e+00  0.0000000e+00 -5.1195906e-03
  -1.5382705e-02 -2.8069865e-02]]
[[ 9.8931116e-07  1.3947836e-01  1.7008340e+00 -4.0032331e-02
   0.0000000e+00  0.0000000e+00  0.0000000e+00 -5.1195906e-03
  -1.5382705e-02 -2.8069865e-02]] [0.00657092] [False] [{'current_time': 180.0, 'jobs_in_psp': 118, 'jobs_in_system_wip': 0, 'jobs_completed_episode': 1, 'total_tardiness_episode': 85.13001805818635, 'TimeLimit.truncated': False}]
[[ 9.8931116e-07 -5.4652011e-01  1.6818308e+00 -4.0032331e-02
   0.0000000e+00  0.0000000e+00  0.0000000e+00 -5.1195906e-03
  -1.5382705e-02 -2.8069865e-02]] [-0.0449926] [False] [{'current_time': 240.0, 'jobs_in_psp': 153, 'jobs_in_system_wip': 0, 'jobs_completed_episode': 2, 'total_tardiness_episode': 230.52442504695833, 'TimeLimit.truncated': False}]
[[ 9.8931116e-07 -6.7774725e-01 

In [3]:
N = 10
M = N + 100
O = M + 100
P = O + 100
Q = P + 100
R = Q + 100
S = R + 100
T = S + 100
U = T + 10
V = U + 100

In [5]:
def run_system(seed: int | None, until: float = 60*120) -> System:
    random.seed(seed)
    production_system = System(
        env=simpy.Environment(),
        inter_arrival_time_distribution=lambda: random.expovariate(lambd=0.65),
        processing_time_per_family_distribution=[
            lambda: random.gammavariate(2,2),
            lambda: random.gammavariate(4,0.5),
            lambda: random.gammavariate(6,1/6)
        ],
        families_distribution=lambda: random.random(),
        routing_distribution={
            1: [lambda: random.random(), lambda: random.random(), lambda: random.random(),lambda: random.random(),lambda: random.random(),lambda: random.random()],
            2: [lambda: random.random(), lambda: random.random(), lambda: random.random(),lambda: random.random(),lambda: random.random(),lambda: random.random()],
            3: [lambda: random.random(), lambda: random.random(), lambda: random.random(),lambda: random.random(),lambda: random.random(),lambda: random.random()]
        },
        routing_prob={
            1: [1,1,0,1,1,1],
            2: [0.8, 0.8, 1, 0.8, 0.8, 0.75],
            3: [0,0,1,0,0,0.75]
        },
        due_dates_distribution=lambda: random.uniform(30,50)
    )
    production_system.env.run(until=until)
    return production_system

def main_system(*SEEDS: int, until) -> list[System]:
    return [run_system(seed, until=until) for seed in SEEDS]

In [12]:
class Welch:
    def __init__(self, process: np.ndarray, window_size: int, tol: float) -> None:
        self.process = process
        self.window_size = window_size
        self.tol = tol
        self.replications_mean = np.mean(process, axis=0)
        self.averaged_process = self._welch()
        self.diff, self.warmup_period = self._find_steady_state()

    @staticmethod
    def moving_average(arr: np.ndarray, window_size: int) -> np.ndarray:
        weights = np.ones(window_size) / window_size
        return np.convolve(arr, weights, mode="valid")

    def _welch(self) -> np.ndarray:
        averaged_process = []
        for i in range(1, self.replications_mean.shape[0] - self.window_size):
            if i <= self.window_size:
                averaged_process.append(self.replications_mean[: 2 * i - 1].mean())
            else:
                # averaged_process.append(
                #    self.replications_mean[
                #        i - self.window_size // 2 : i + self.window_size // 2
                #    ].mean()
                #)
                averaged_process.append(
                    self.replications_mean[
                        (i - 1 - self.window_size) : (i + self.window_size)
                    ].mean()
                )
        return np.array(averaged_process)

    def _find_steady_state(self) -> tuple[np.ndarray, int]:
        arr = self.averaged_process# self.moving_average(self.averaged_process, self.window_size)
        diff = np.diff(arr.flatten())
        for i, d in enumerate(diff):
            if d < self.tol:
                return diff, i + self.window_size
        return diff, -1

    def plot(self):
        plt.plot(self.averaged_process, label="Averaged Process")
        plt.axvline(
            self.warmup_period,
            color="r",
            linestyle="--",
            label=f"Warmup period: {self.warmup_period}",
        )
        plt.legend(loc="best")
        plt.show()
