In [1]:
import numpy as np

In [None]:

class TrainInductionEnv:
    def __init__(self, max_trains=20):
        self.max_trains = max_trains

    def get_state(self, demand, peak, available_trains):
        # Discretize demand
        if demand < 2000:
            demand_level = 0  # Low
        elif demand < 5000:
            demand_level = 1  # Medium
        else:
            demand_level = 2  # High

        return (demand_level, peak, available_trains)

    def step(self, state, action):
        demand_level, peak, available_trains = state
        deployed_trains = action

        # --- Constraints ---
        if deployed_trains > available_trains or deployed_trains < 2:
            return -100  # Invalid action penalty

        # --- Headway estimation ---
        headway = max(2, 12 - deployed_trains)

        waiting_time = headway / 2
        energy_cost = deployed_trains * 2

        overcrowding_penalty = 0
        if peak and demand_level == 2 and deployed_trains < available_trains * 0.7:
            overcrowding_penalty = 50

        # --- Reward function ---
        reward = (
            -waiting_time * 10
            -energy_cost
            -overcrowding_penalty
        )

        return reward


In [4]:
class QLearningAgent:
    def __init__(self, actions, alpha=0.1, gamma=0.9, epsilon=0.2):
        self.q_table = {}
        self.actions = actions
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon

    def get_q(self, state, action):
        return self.q_table.get((state, action), 0.0)

    def choose_action(self, state):
        if np.random.rand() < self.epsilon:
            return np.random.choice(self.actions)
        qs = [self.get_q(state, a) for a in self.actions]
        return self.actions[np.argmax(qs)]

    def update(self, state, action, reward):
        old_q = self.get_q(state, action)
        best_future = max([self.get_q(state, a) for a in self.actions])
        new_q = old_q + self.alpha * (reward + self.gamma * best_future - old_q)
        self.q_table[(state, action)] = new_q


In [5]:
env = TrainInductionEnv()
actions = list(range(2, 21))

agent = QLearningAgent(actions)

# Training episodes
for episode in range(3000):
    demand = np.random.randint(1000, 7000)
    peak = np.random.choice([0, 1])
    available = np.random.randint(6, 20)

    state = env.get_state(demand, peak, available)
    action = agent.choose_action(state)
    reward = env.step(state, action)

    agent.update(state, action, reward)


In [6]:
import joblib
joblib.dump(agent.q_table, "../model/rl_q_table.pkl")


['../model/rl_q_table.pkl']