In [1]:
#Assignment 3 - Part B: Practical Implementation & Deployment
#By Nanfuka Fatuma 
#B36106.

file 1  _   PROJECT: Optimizing Food Distribution Scheduling to Refugee Settlements in West Nile, Uganda.

In [2]:
# importing necessary libraries
!pip install gym numpy
import gym
from gym import spaces
import numpy as np

# Custom Gym Environment for Food Distribution Centers
class FoodcenterEnv(gym.Env):
    """Custom Gym environment simulating food distribution centers in a camp/day.

    Observation (state) vector:
        - queue_lengths: array of ints for each center
        - staff_levels: array of ints for each center
        - trucks: int (number of trucks available)
        - time_of_day: int (0 morning, 1 noon, 2 afternoon)
        - weather: int (0 dry, 1 rainy)
        - supply_levels: array of ints for each center

    Actions (discrete multi-action encoded as vector payload when using a custom wrapper):
        - allocate staff moves: for simplicity we encode as an integer action that maps to a small
          discrete set of pre-defined moves. This keeps the action space manageable.

    Reward: negative weighted sum of average wait, queue std, stockout penalties, travel penalty.
    """

    metadata = {"render.modes": ["human"]}

    def __init__(self, num_centers=4, max_queue=200, max_staff=10, episode_length=48):
        super(FoodcenterEnv, self).__init__()
        self.num_centers = num_centers
        self.max_queue = max_queue
        self.max_staff = max_staff
        self.episode_length = episode_length  # timesteps per episode (e.g., 48 half-hour steps)

        # Observation: queues + staff + trucks + time + weather + supply
        low_obs = np.array([0] * (self.num_centers * 2 + 3 + self.num_centers), dtype=np.float32)
        high_obs = np.array(
            [self.max_queue] * self.num_centers +
            [self.max_staff] * self.num_centers +
            [5] +  # trucks
            [2] +  # time_of_day
            [1] +  # weather
            [1000] * self.num_centers,  # supply levels
            dtype=np.float32
        )

        self.observation_space = spaces.Box(low=low_obs, high=high_obs, dtype=np.float32)

        # Action: discrete for a set of pre-defined actions
        # 0: noop
        # 1..num_centers: allocate 1 extra staff to center i (if available)
        # num_centers+1 .. 2*num_centers: remove 1 staff from center i
        # 2*num_centers+1 .. 3*num_centers: open auxiliary center i (if closed)
        # 3*num_centers+1 .. 4*num_centers: close auxiliary center i
        # last actions: reroute sector X to center Y encoded generically (we keep small set)
        self.num_base_actions = 4 * self.num_centers + 1
        self.action_space = spaces.Discrete(self.num_base_actions)

        # internal state
        self.reset()

    def reset(self):
        # Initialize state variables
        self.t = 0
        self.queue_lengths = np.random.poisson(10, size=self.num_centers).astype(np.int32)
        self.staff_levels = np.minimum(np.ones(self.num_centers, dtype=np.int32) * 2, self.max_staff)
        self.trucks = 2
        self.time_of_day = 0
        self.weather = 0
        self.supply_levels = np.ones(self.num_centers, dtype=np.int32) * 500
        self.done = False
        return self._get_obs()

    def _get_obs(self):
        obs = np.concatenate([
            self.queue_lengths.astype(np.float32),
            self.staff_levels.astype(np.float32),
            np.array([self.trucks, self.time_of_day, self.weather], dtype=np.float32),
            self.supply_levels.astype(np.float32)
        ])
        return obs

    def step(self, action):
        assert self.action_space.contains(action)

        # Apply action effects
        self._apply_action(action)

        # New arrivals (time-varying Poisson)
        lam = 10 + 5 * (1 if self.time_of_day == 0 else 2 if self.time_of_day == 1 else 1)
        arrivals = np.random.poisson(lam, size=self.num_centers)
        self.queue_lengths += arrivals

        # Service: centers process beneficiaries proportional to staff and supply
        processed = np.minimum(self.queue_lengths, (self.staff_levels * np.random.randint(4, 8, size=self.num_centers)))
        # also limited by supply
        processed = np.minimum(processed, self.supply_levels)
        self.queue_lengths -= processed
        self.supply_levels -= processed

        # Advance time
        self.t += 1
        if self.t % (self.episode_length // 3) == 0:
            # advance time of day
            self.time_of_day = min(2, self.time_of_day + 1)

        # Random weather event
        if np.random.rand() < 0.05:
            self.weather = 1
        else:
            self.weather = 0

        # Reward calculation
        avg_wait = np.mean(self.queue_lengths)  # proxy for waiting time
        queue_std = np.std(self.queue_lengths)
        stockout = 1 if np.any(self.supply_levels <= 0) else 0
        travel_penalty = np.mean(np.maximum(0, self.queue_lengths - 100)) / 10.0

        reward = -1.0 * avg_wait - 0.5 * queue_std - 5.0 * stockout - 0.2 * travel_penalty

        # Episode termination
        done = False
        if self.t >= self.episode_length:
            done = True

        obs = self._get_obs()
        info = {
            "avg_wait": float(avg_wait),
            "queue_std": float(queue_std),
            "stockout": int(stockout)
        }

        return obs, reward, done, info

    def _apply_action(self, action):
        # Interpret action
        n = self.num_centers
        if action == 0:
            return
        elif 1 <= action <= n:
            idx = action - 1
            if self.trucks > 0 or True:
                # allocate 1 extra staff (if below max)
                if self.staff_levels[idx] < self.max_staff:
                    self.staff_levels[idx] += 1
        elif n+1 <= action <= 2*n:
            idx = action - (n+1)
            if self.staff_levels[idx] > 0:
                self.staff_levels[idx] -= 1
        elif 2*n+1 <= action <= 3*n:
            idx = action - (2*n+1)
            # Open auxiliary center -> increase capacity by 5 staff (simple simulation)
            self.staff_levels[idx] = min(self.max_staff, self.staff_levels[idx] + 5)
        elif 3*n+1 <= action <= 4*n:
            idx = action - (3*n+1)
            # Close auxiliary -> decrease staff
            self.staff_levels[idx] = max(0, self.staff_levels[idx] - 5)
        else:
            # noop
            pass

    def render(self, mode='human'):
        print(f"t={self.t}, queues={self.queue_lengths}, staff={self.staff_levels}, supply={self.supply_levels}")


if __name__ == '__main__':
    env = FoodcenterEnv()
    obs = env.reset()
    for _ in range(10):
        a = env.action_space.sample()
        o, r, d, info = env.step(a)
        env.render()
        if d:
            break




[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


t=1, queues=[19  5 10  7], staff=[2 3 2 2], supply=[492 482 486 490]
t=2, queues=[21  0 16  3], staff=[2 3 2 3], supply=[478 467 478 472]
t=3, queues=[24  0 14  2], staff=[2 3 3 3], supply=[468 449 457 457]
t=4, queues=[ 0  2 11  0], staff=[7 3 3 3], supply=[436 428 439 439]
t=5, queues=[0 0 9 0], staff=[7 8 3 3], supply=[421 411 418 423]
t=6, queues=[0 0 0 7], staff=[7 8 8 3], supply=[405 398 401 405]
t=7, queues=[0 0 0 4], staff=[6 8 8 3], supply=[384 389 381 384]
t=8, queues=[ 0  0  0 11], staff=[6 3 8 3], supply=[366 374 366 369]
t=9, queues=[ 0 16  0  0], staff=[6 0 8 3], supply=[347 374 352 351]
t=10, queues=[ 0 36  0  0], staff=[ 6  0 10  3], supply=[335 374 335 336]


Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
Users of this version of Gym should be able to simply replace 'import gym' with 'import gymnasium as gym' in the vast majority of cases.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.
