In [2]:
import numpy as np

# Hypothetical medical environment for patient treatment decision-making
class MedicalEnv:
    def __init__(self, num_states=5):
        self.num_states = num_states
        self.terminal_state = num_states - 1

    def step(self, state, action):
        # Transition model (simplified for demonstration)
        next_state = min(state + action, self.terminal_state)
        reward = 1.0 if next_state == self.terminal_state else -0.1
        done = next_state == self.terminal_state
        return next_state, reward, done

    def reset(self):
        return 0  # initial patient state

# Feature representation (one-hot encoding for simplicity)
def feature(s, num_states):
    vec = np.zeros(num_states)
    vec[s] = 1
    return vec

# LSTD policy evaluation
def LSTD(samples, num_states, gamma=0.95):
    A = np.zeros((num_states, num_states))
    b = np.zeros(num_states)

    for s, r, s_next in samples:
        phi_s = feature(s, num_states)
        phi_s_next = feature(s_next, num_states)
        A += np.outer(phi_s, (phi_s - gamma * phi_s_next))
        b += phi_s * r

    theta = np.linalg.pinv(A) @ b
    return theta

# Federated averaging
def federated_avg(thetas):
    return np.mean(thetas, axis=0)

# Simulate local learning at hospitals
def local_training(env, episodes=20):
    samples = []
    for _ in range(episodes):
        s = env.reset()
        done = False
        while not done:
            # Simple random policy: action = 1 (treatment) or stay (0)
            action = np.random.choice([0, 1])
            s_next, reward, done = env.step(s, action)
            samples.append((s, reward, s_next))
            s = s_next
    return samples

# Main Federated Learning Loop
def federated_lstd(num_hospitals=3, rounds=5):
    num_states = 5
    global_theta = np.zeros(num_states)

    for round in range(rounds):
        local_thetas = []
        print(f"\n--- Federated Round {round+1} ---")
        for hospital in range(num_hospitals):
            env = MedicalEnv(num_states)
            samples = local_training(env)
            theta = LSTD(samples, num_states)
            local_thetas.append(theta)
            print(f"Hospital {hospital+1} local theta: {theta.round(3)}")

        # Aggregate using federated averaging
        global_theta = federated_avg(local_thetas)
        print(f"Aggregated global theta: {global_theta.round(3)}")

    return global_theta

if __name__ == "__main__":
    final_theta = federated_lstd()
    print("\nFinal Federated Policy Evaluation (theta values):")
    for idx, val in enumerate(final_theta):
        print(f"State {idx}: {val:.3f}")


--- Federated Round 1 ---
Hospital 1 local theta: [-0.169 -0.01   0.21   0.466 -0.478]
Hospital 2 local theta: [-0.242 -0.043  0.168  0.522 -0.43 ]
Hospital 3 local theta: [-0.188  0.003  0.188  0.441 -0.421]
Aggregated global theta: [-0.2   -0.017  0.189  0.476 -0.443]

--- Federated Round 2 ---
Hospital 1 local theta: [-0.197 -0.04   0.192  0.452 -0.397]
Hospital 2 local theta: [-0.175  0.061  0.234  0.428 -0.519]
Hospital 3 local theta: [-0.139  0.042  0.204  0.436 -0.504]
Aggregated global theta: [-0.17   0.021  0.21   0.438 -0.473]

--- Federated Round 3 ---
Hospital 1 local theta: [-0.241  0.004  0.205  0.443 -0.407]
Hospital 2 local theta: [-0.171  0.031  0.234  0.428 -0.493]
Hospital 3 local theta: [-0.172  0.021  0.207  0.451 -0.481]
Aggregated global theta: [-0.195  0.019  0.215  0.441 -0.46 ]

--- Federated Round 4 ---
Hospital 1 local theta: [-0.231 -0.003  0.197  0.457 -0.416]
Hospital 2 local theta: [-0.222  0.012  0.251  0.446 -0.48 ]
Hospital 3 local theta: [-0.115  0.

This code only implements federated version if you want to play with federated LSTD only. The following code, implements non-federated or centralized version and compares each other. Run the code and compare both. What do you observe?

In [3]:
# Non-Federated Centralized LSTD
def centralized_training(env, num_states, episodes=60):  # same total episodes
    samples = []
    for _ in range(episodes):
        s = env.reset()
        done = False
        while not done:
            action = np.random.choice([0, 1])
            s_next, reward, done = env.step(s, action)
            samples.append((s, reward, s_next))
            s = s_next
    theta = LSTD(samples, num_states)
    return theta

# Modify the main function to run both versions
def compare_federated_vs_centralized(num_hospitals=3, rounds=5):
    num_states = 5
    global_theta = np.zeros(num_states)

    # Federated Learning
    print("\n--- Federated LSTD Training ---")
    for round in range(rounds):
        local_thetas = []
        for hospital in range(num_hospitals):
            env = MedicalEnv(num_states)
            samples = local_training(env)
            theta = LSTD(samples, num_states)
            local_thetas.append(theta)
        global_theta = federated_avg(local_thetas)

    print(f"\nFinal Federated Theta: {global_theta.round(3)}")

    # Centralized Learning (single node, equivalent amount of data)
    print("\n--- Centralized LSTD Training ---")
    env = MedicalEnv(num_states)
    centralized_theta = centralized_training(env, num_states, episodes=num_hospitals*20)
    print(f"\nFinal Centralized Theta: {centralized_theta.round(3)}")

    # Comparison
    print("\n--- Theta Comparison (Federated vs. Centralized) ---")
    for i in range(num_states):
        print(f"State {i}: Federated: {global_theta[i]:.3f} | Centralized: {centralized_theta[i]:.3f}")

if __name__ == "__main__":
    compare_federated_vs_centralized()


--- Federated LSTD Training ---

Final Federated Theta: [-0.188 -0.014  0.192  0.457 -0.432]

--- Centralized LSTD Training ---

Final Centralized Theta: [-0.163  0.001  0.208  0.442 -0.46 ]

--- Theta Comparison (Federated vs. Centralized) ---
State 0: Federated: -0.188 | Centralized: -0.163
State 1: Federated: -0.014 | Centralized: 0.001
State 2: Federated: 0.192 | Centralized: 0.208
State 3: Federated: 0.457 | Centralized: 0.442
State 4: Federated: -0.432 | Centralized: -0.460


Below is the code for synthetic medical data. This code can now be extended to some large medical data such as MIMIC-III with access to good GPUs. First run and understand this code, before going full scale on a realistic medical data.

In [4]:
import numpy as np

class MedicalEnv:
    """
    A simple medical decision-making environment.
    States: 0 (healthy) to num_states-1 (critical).
    Terminal states: 0 (recovered) and num_states-1 (severe condition).
    Actions: 0 = no treatment, 1 = treatment.
    """
    def __init__(self, num_states=5):
        self.num_states = num_states
        self.terminal_states = [0, self.num_states - 1]

    def reset(self):
        # Initialize state to a value between 1 and num_states-2 (i.e., not already terminal)
        s = np.random.randint(1, self.num_states - 1)
        return s

    def step(self, s, action):
        """
        Simulate a step in the environment.
        - Action 1 (treatment): With 70% chance, state improves (decreases by 1),
          with 30% chance, state worsens (increases by 1), if possible.
        - Action 0 (no treatment): With 50% chance, state worsens (increases by 1),
          otherwise stays the same.
        A treatment cost is subtracted from the reward.
        Reward is defined as negative of the new state (lower is better) and treatment cost.
        """
        if action == 1:  # Treatment applied
            if np.random.rand() < 0.7:
                s_next = max(s - 1, 0)
            else:
                s_next = min(s + 1, self.num_states - 1)
            cost = 2  # Cost for treatment
        else:  # No treatment
            if np.random.rand() < 0.5:
                s_next = min(s + 1, self.num_states - 1)
            else:
                s_next = s
            cost = 0

        # Reward: lower state (better health) gives higher reward; treatment incurs cost.
        reward = - s_next - cost

        done = s_next in self.terminal_states
        return s_next, reward, done

def local_training(env, episodes=20):
    """
    Collect samples locally on a single hospital environment.
    Each sample is a tuple (state, reward, next_state).
    """
    samples = []
    for _ in range(episodes):
        s = env.reset()
        done = False
        while not done:
            action = np.random.choice([0, 1])
            s_next, reward, done = env.step(s, action)
            samples.append((s, reward, s_next))
            s = s_next
    return samples

def centralized_training(env, num_states, episodes=60):
    """
    Centralized training: Collect samples over multiple episodes on one environment.
    """
    samples = []
    for _ in range(episodes):
        s = env.reset()
        done = False
        while not done:
            action = np.random.choice([0, 1])
            s_next, reward, done = env.step(s, action)
            samples.append((s, reward, s_next))
            s = s_next
    theta = LSTD(samples, num_states)
    return theta

def LSTD(samples, num_states, gamma=0.9):
    """
    Least-Squares Temporal Difference (LSTD) learning.
    Uses one-hot encoding for state features.
    Solves: A theta = b, where:
      A = sum(phi(s) (phi(s)- gamma phi(s_next))^T)
      b = sum(phi(s) * reward)
    """
    A = np.zeros((num_states, num_states))
    b = np.zeros(num_states)
    for s, reward, s_next in samples:
        phi_s = np.zeros(num_states)
        phi_s[int(s)] = 1.0
        phi_s_next = np.zeros(num_states)
        phi_s_next[int(s_next)] = 1.0
        A += np.outer(phi_s, (phi_s - gamma * phi_s_next))
        b += phi_s * reward
    # Add a small regularization term for numerical stability
    reg = 1e-5 * np.eye(num_states)
    theta = np.linalg.solve(A + reg, b)
    return theta

def federated_avg(local_thetas):
    """
    Federated averaging: simply compute the mean of local theta estimates.
    """
    return np.mean(local_thetas, axis=0)

def compare_federated_vs_centralized(num_hospitals=3, rounds=5):
    num_states = 5
    global_theta = np.zeros(num_states)

    # Federated Learning
    print("\n--- Federated LSTD Training ---")
    for r in range(rounds):
        local_thetas = []
        for hospital in range(num_hospitals):
            env = MedicalEnv(num_states)
            samples = local_training(env)
            theta = LSTD(samples, num_states)
            local_thetas.append(theta)
        global_theta = federated_avg(local_thetas)
        print(f"Round {r+1}: Averaged Theta: {global_theta.round(3)}")

    print(f"\nFinal Federated Theta: {global_theta.round(3)}")

    # Centralized Learning (single node, equivalent amount of data)
    print("\n--- Centralized LSTD Training ---")
    env = MedicalEnv(num_states)
    centralized_theta = centralized_training(env, num_states, episodes=num_hospitals * 20)
    print(f"\nFinal Centralized Theta: {centralized_theta.round(3)}")

    # Comparison
    print("\n--- Theta Comparison (Federated vs. Centralized) ---")
    for i in range(num_states):
        print(f"State {i}: Federated: {global_theta[i]:.3f} | Centralized: {centralized_theta[i]:.3f}")

if __name__ == "__main__":
    compare_federated_vs_centralized()


--- Federated LSTD Training ---
Round 1: Averaged Theta: [  0.     -7.649 -11.227  -9.307   0.   ]
Round 2: Averaged Theta: [  0.     -7.96  -11.963 -10.801   0.   ]
Round 3: Averaged Theta: [ -0.     -8.183 -11.197  -9.327   0.   ]
Round 4: Averaged Theta: [  0.     -8.789 -11.684  -9.397   0.   ]
Round 5: Averaged Theta: [ -0.     -9.397 -11.628  -8.741   0.   ]

Final Federated Theta: [ -0.     -9.397 -11.628  -8.741   0.   ]

--- Centralized LSTD Training ---

Final Centralized Theta: [  0.     -7.152 -11.497  -9.515   0.   ]

--- Theta Comparison (Federated vs. Centralized) ---
State 0: Federated: -0.000 | Centralized: 0.000
State 1: Federated: -9.397 | Centralized: -7.152
State 2: Federated: -11.628 | Centralized: -11.497
State 3: Federated: -8.741 | Centralized: -9.515
State 4: Federated: 0.000 | Centralized: 0.000


In [5]:
!pip3 install mpi4py



In [6]:
%%writefile mpi_federated_lstd.py
from mpi4py import MPI
import numpy as np

class MedicalEnv:
    """
    A simple medical decision-making environment.
    States: 0 (healthy) to num_states-1 (critical).
    Terminal states: 0 (recovered) and num_states-1 (severe condition).
    Actions: 0 = no treatment, 1 = treatment.
    """
    def __init__(self, num_states=5):
        self.num_states = num_states
        self.terminal_states = [0, self.num_states - 1]

    def reset(self):
        # Initialize state to a value between 1 and num_states-2 (i.e., not terminal)
        s = np.random.randint(1, self.num_states - 1)
        return s

    def step(self, s, action):
        """
        Simulate a step in the environment.
        - Action 1 (treatment): With 70% chance, state improves (decreases by 1),
          with 30% chance, state worsens (increases by 1), if possible.
        - Action 0 (no treatment): With 50% chance, state worsens (increases by 1),
          otherwise stays the same.
        A treatment cost is subtracted from the reward.
        Reward is defined as negative of the new state (lower is better) and treatment cost.
        """
        if action == 1:  # Treatment applied
            if np.random.rand() < 0.7:
                s_next = max(s - 1, 0)
            else:
                s_next = min(s + 1, self.num_states - 1)
            cost = 2  # Treatment cost
        else:  # No treatment
            if np.random.rand() < 0.5:
                s_next = min(s + 1, self.num_states - 1)
            else:
                s_next = s
            cost = 0

        reward = - s_next - cost
        done = s_next in self.terminal_states
        return s_next, reward, done

def local_training(env, episodes=20):
    """
    Local training: collect samples from a single hospital (environment).
    Each sample is a tuple (state, reward, next_state).
    """
    samples = []
    for _ in range(episodes):
        s = env.reset()
        done = False
        while not done:
            action = np.random.choice([0, 1])
            s_next, reward, done = env.step(s, action)
            samples.append((s, reward, s_next))
            s = s_next
    return samples

def centralized_training(env, num_states, episodes=60):
    """
    Centralized training on one environment using all data.
    """
    samples = []
    for _ in range(episodes):
        s = env.reset()
        done = False
        while not done:
            action = np.random.choice([0, 1])
            s_next, reward, done = env.step(s, action)
            samples.append((s, reward, s_next))
            s = s_next
    theta = LSTD(samples, num_states)
    return theta

def LSTD(samples, num_states, gamma=0.9):
    """
    Least-Squares Temporal Difference (LSTD) learning.
    Uses one-hot encoding for state features.
    Solves: A theta = b, where:
      A = sum(phi(s) (phi(s)- gamma phi(s_next))^T)
      b = sum(phi(s) * reward)
    """
    A = np.zeros((num_states, num_states))
    b = np.zeros(num_states)
    for s, reward, s_next in samples:
        phi_s = np.zeros(num_states)
        phi_s[int(s)] = 1.0
        phi_s_next = np.zeros(num_states)
        phi_s_next[int(s_next)] = 1.0
        A += np.outer(phi_s, (phi_s - gamma * phi_s_next))
        b += phi_s * reward
    reg = 1e-5 * np.eye(num_states)
    theta = np.linalg.solve(A + reg, b)
    return theta

def federated_avg(local_thetas):
    """
    Federated averaging: average local theta estimates.
    """
    return np.mean(local_thetas, axis=0)

def federated_training_mpi(rounds=5, num_states=5):
    """
    Federated training using MPI.
    Each MPI process represents one hospital.
    """
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()

    global_theta = np.zeros(num_states)
    for r in range(rounds):
        env = MedicalEnv(num_states)
        samples = local_training(env, episodes=20)
        local_theta = LSTD(samples, num_states)
        print(f"Rank {rank} in round {r+1}: local theta = {local_theta.round(3)}", flush=True)
        theta_sum = np.zeros(num_states)
        # Sum all local thetas across processes
        comm.Allreduce(local_theta, theta_sum, op=MPI.SUM)
        global_theta = theta_sum / size
        if rank == 0:
            print(f"Round {r+1}: Averaged Theta: {global_theta.round(3)}")
    return global_theta

def compare_federated_vs_centralized_MPI():
    """
    Compare federated vs. centralized LSTD training using MPI.
    Rank 0 performs the centralized training and prints comparisons.
    """
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()
    num_states = 5

    if rank == 0:
        print("\n--- Federated LSTD Training (MPI) ---")
    global_theta = federated_training_mpi(rounds=5, num_states=num_states)

    # if rank == 0:
    #     print(f"\nFinal Federated Theta: {global_theta.round(3)}")
    #     print("\n--- Centralized LSTD Training ---")
    #     env = MedicalEnv(num_states)
    #     # Use an equivalent amount of episodes: total episodes = (num_hospitals * 20)
    #     centralized_theta = centralized_training(env, num_states, episodes=size * 20)
    #     print(f"\nFinal Centralized Theta: {centralized_theta.round(3)}")
    #     print("\n--- Theta Comparison (Federated vs. Centralized) ---")
    #     for i in range(num_states):
    #         print(f"State {i}: Federated: {global_theta[i]:.3f} | Centralized: {centralized_theta[i]:.3f}")

    if rank == 0:
        print("\n--- Federated LSTD Training (MPI) ---", flush=True)
    global_theta = federated_training_mpi(rounds=5, num_states=num_states)

    if rank == 0:
        print(f"\nFinal Federated Theta: {global_theta.round(3)}", flush=True)
        print("\n--- Centralized LSTD Training ---", flush=True)
        env = MedicalEnv(num_states)
        centralized_theta = centralized_training(env, num_states, episodes=size * 20)
        print(f"\nFinal Centralized Theta: {centralized_theta.round(3)}", flush=True)
        print("\n--- Theta Comparison (Federated vs. Centralized) ---", flush=True)
        for i in range(num_states):
            print(f"State {i}: Federated: {global_theta[i]:.3f} | Centralized: {centralized_theta[i]:.3f}", flush=True)

if __name__ == "__main__":
    compare_federated_vs_centralized_MPI()


Overwriting mpi_federated_lstd.py


In [7]:
!mpiexec --allow-run-as-root -n 3 python mpi_federated_lstd.py

--------------------------------------------------------------------------
prterun was unable to find the specified executable file, and therefore did
not launch the job.  This error was first reported for process rank
0; it may have occurred for other processes as well.

NOTE: A common cause for this error is misspelling a prterun command
   line parameter option (remember that prterun interprets the first
   unrecognized command line token as the executable).

   Node:       Hitens-Macbook-Pro
   Executable: python
--------------------------------------------------------------------------


Try this on ADA HPC of IIIT-H