<a href="https://colab.research.google.com/github/asmaakhaledd/PID-NN/blob/PID-Bolus/Inference_of_pid_LSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install numpy pandas tensorflow



In [3]:
import numpy as np
import pandas as pd
import tensorflow as tf
import random
import math

Load model

In [4]:
# Load the trained model
def load_model(model_path):
    pid_model = tf.keras.models.load_model(model_path, compile=False)
    pid_model.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError())
    return pid_model

Preprocess time features (cyclic encoding)

In [5]:
# Preprocess time features (Use timestep as timestamp)
def preprocess_time_features(timestep, glucose, weight):
    # Using cyclic encoding for time, but we will use timestep as time (1, 2, 3,...)
    hour = timestep % 24
    minute = 0  # Simulate with minute 0 for simplicity
    time_sin = np.sin(2 * np.pi * hour / 24)
    time_cos = np.cos(2 * np.pi * hour / 24)
    return [glucose - 110, glucose - random.uniform(70, 180), weight, time_sin, time_cos]

Prepare PID data for inference

In [6]:
# Prepare PID data for inference
def prepare_pid_data(timestep, glucose, weight):
    # Preprocess real-time data for the PID model
    return np.array([preprocess_time_features(timestep, glucose, weight)])

Adjust basal insulin dosage based on glucose and weight

In [7]:
def adjust_basal_insulin(glucose_level, weight, Kp, Ki, Kd, previous_glucose, cumulative_error):
    # Basal insulin rate calculation based on weight
    basal_rate_per_kg = 0.5
    TDI = 0.55 * weight  # Total daily insulin
    basal_insulin_dosage = basal_rate_per_kg * TDI  # Basal insulin dosage (50% of TDI)
    hourly_basal_rate = basal_insulin_dosage / 24  # Hourly basal rate

    # Target glucose level
    target_glucose = 100

    # Calculate the error (difference between current glucose and target)
    error = glucose_level - target_glucose

    # Proportional (Kp) term: Respond to immediate error
    adjustment_factor_proportional = Kp * error

    # Integral (Ki) term: Cumulative error over time
    cumulative_error += error  # Update cumulative error
    if cumulative_error > 10:
        cumulative_error = 10
    elif cumulative_error < -10:
        cumulative_error = -10
    adjustment_factor_integral = Ki * cumulative_error

    # Derivative (Kd) term: Rate of change in glucose
    glucose_change_rate = glucose_level - previous_glucose  # Difference between current and previous glucose levels
    adjustment_factor_derivative = Kd * glucose_change_rate

    # Total adjusted basal rate including all PID terms
    adjusted_basal_rate = hourly_basal_rate + adjustment_factor_proportional + adjustment_factor_integral + adjustment_factor_derivative

    # Apply limits to the basal rate to prevent it from becoming too low or too high
    min_basal_rate = 0.3
    max_basal_rate = 2.0
    adjusted_basal_rate = max(min_basal_rate, min(adjusted_basal_rate, max_basal_rate))


    # Return the adjusted basal rate and the updated cumulative error for next use
    return adjusted_basal_rate, cumulative_error, glucose_level

Bolus Calculation

In [18]:
def calculate_bolus(weight, glucose_level, target_glucose, meal_carbs, insulin_type='rapid'):
    # Calculate Total Daily Insulin (TDI)
    TDI = weight * 0.55  # For weight in kg

    # Calculate the Correction Factor based on insulin type
    if insulin_type == 'rapid':
        correction_factor = 1800 / TDI


    # Calculate correction dose if BG is above target
    correction_dose = 0
    if glucose_level > target_glucose:
        correction_dose = (glucose_level - target_glucose) / correction_factor

    # Calculate carb-to-insulin ratio
    carb_to_insulin_ratio = 500 / TDI

    # Calculate bolus insulin based on carbs in the meal
    bolus_for_meal = (meal_carbs / carb_to_insulin_ratio)*60

    # Total bolus insulin for the meal including correction
    total_bolus = bolus_for_meal + correction_dose

    # Limit the bolus insulin to a safe range
    # min_bolus = 0.5  # Minimum bolus insulin per meal
    # max_bolus = 10.0  # Maximum bolus insulin per meal
    # total_bolus = max(min_bolus, min(total_bolus, max_bolus))

    print(f"Timestep: {timestep}, Glucose: {glucose:.2f} mg/dL, Carbs: {meal_carbs}, Bolus for meal: {bolus_for_meal:.2f} units, Correction dose: {correction_dose:.2f} units, Total bolus: {total_bolus:.2f} units")

    return total_bolus

Predict insulin dosage

In [10]:
def predict_insulin_dosage(pid_model, glucose, weight, timestep, previous_glucose, cumulative_error):
    # Prepare input data for PID model
    X_pid_test = prepare_pid_data(timestep, glucose, weight)

    # Make prediction using the trained PID model (predicting Kp, Ki, Kd)
    predicted_pid_gains = pid_model.predict(X_pid_test)

    # Extract predicted PID gains (Kp, Ki, Kd)
    Kp, Ki, Kd = predicted_pid_gains[0]

    # Adjust basal insulin based on the predicted PID gains and glucose level
    adjusted_basal_rate, cumulative_error, previous_glucose = adjust_basal_insulin(
        glucose, weight, Kp, Ki, Kd, previous_glucose, cumulative_error
    )

    # Print the result
    print(f"\nSample {timestep}:")
    print(f"Timestep: {timestep}")
    print(f"Glucose: {glucose:.2f} mg/dL")
    print(f"Adjusted Hourly Basal Insulin: {adjusted_basal_rate:.2f} U per hour")
    print(f"Weight: {weight} kg")
    print("-" * 50)

    # Return updated values to be used in the next timestep
    return previous_glucose, cumulative_error, glucose, adjusted_basal_rate

Dejavu-RM property specifications

In [11]:
def should_deliver_insulin(glucose):
    # Rule 1: Don't deliver insulin if the glucose sensor has failed
    if  glucose <= 0 or glucose > 1000:
        print(f"irregular reading detected (Glucose = {glucose}): Pausing insulin delivery.")
        return False

    # Rule 2: Do not deliver insulin if glucose < 50 mg/dL
    if glucose < 50:
        print(f"Hypoglycemia detected (Glucose = {glucose}): Pausing insulin delivery.")
        return False

     # Rule 3: Missing or unreadable glucose value
    if glucose is None or (isinstance(glucose, float) and math.isnan(glucose)):
        print("No glucose reading available: Pausing insulin delivery.")
        return False



    return True

parse_test_case

In [12]:
def parse_test_case(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()

    weight = None
    sim_time = None
    meals = []

    for line in lines:
        if "Body Weight" in line:
            weight = float(line.strip().split(':')[1].split()[0])
        elif "Simulation Time" in line:
            sim_time = int(line.strip().split(':')[1].split()[0])
        elif "Meal" in line and "Time" in line:
            parts = line.strip().split(',')
            time = int(parts[0].split(':')[1].strip().split()[0])
            carbs = float(parts[1].split(':')[1].strip().split()[0])
            meals.append((time, carbs))

    return weight, sim_time, meals

Main

In [19]:
def initialize_simulation(weight):
    """
    Initializes the simulation with basic patient parameters, including
    Total Daily Insulin (TDI), daily basal limit, and other essential tracking variables.

    Args:
        weight (float): The weight of the patient.

    Returns:
        dict: A dictionary containing all the initial state variables for the simulation.
    """
    # Calculate Total Daily Insulin (TDI) as 55% of the body weight
    TDI = 0.55 * weight

    # Calculate daily basal limit as 50% of TDI (this will be used to cap basal insulin delivery)
    daily_basal_limit = 0.5 * TDI

    # Initialize the simulation state with various tracking variables
    return {
        "weight": weight,  # Store the weight of the patient
        "TDI": TDI,  # Store the calculated Total Daily Insulin
        "daily_basal_limit": daily_basal_limit,  # Store the daily basal insulin limit
        "previous_glucose": 100,  # Initialize previous glucose level (e.g., starting glucose)
        "cumulative_error": 0,  # Initialize cumulative error for PID controller
        "next_basal_timestep": 1,  # The next basal insulin delivery time step (minute)
        "basal_tracker": {},  # Tracker for basal insulin delivery per day
        "basal_rates": [],  # List to store basal insulin rates over time
        "bolus_insulin": [],  # List to store bolus insulin doses
        "carb_intake": [],  # List to store meal carbohydrate intake over time
        "infusion_values": []  # List to store infusion values (both basal and bolus insulin delivery)
    }

def check_meal(timestep, meal_schedule):
    """
    Checks if there is a meal scheduled at the current timestep + 10 minutes.

    Args:
        timestep (int): The current simulation timestep.
        meal_schedule (dict): A dictionary with the meal times and their associated carb values.

    Returns:
        float: The amount of carbs for the scheduled meal (0 if no meal is scheduled).
    """
    return meal_schedule.get(timestep - 20, 0)

def deliver_bolus(timestep, glucose, meal_carbs, sim_vars):
    """
    Delivers a bolus insulin dose based on the meal carbohydrate intake and glucose level.

    Args:
        timestep (int): The current simulation timestep.
        glucose (float): The current glucose level of the patient.
        meal_carbs (float): The number of carbs in the current meal.
        sim_vars (dict): The current simulation state variables.

    Returns:
        tuple: Returns a tuple with 0 (reset meal_carbs) and the total bolus insulin delivered.
    """
    # Calculate the bolus insulin dose based on the weight, glucose, target glucose (110), and meal carbs
    total_bolus = calculate_bolus(sim_vars["weight"], glucose, 110, meal_carbs)

    # Append the bolus insulin, carb intake, and infusion values to the simulation state
    sim_vars["bolus_insulin"].append(total_bolus)
    sim_vars["carb_intake"].append(meal_carbs)
    sim_vars["infusion_values"].append(total_bolus)

    # Reset the meal carbs to 0 (as the bolus has been delivered)
    return 0, total_bolus

def deliver_basal(timestep, glucose, pid_model, sim_vars):
    """
    Delivers a basal insulin dose based on the PID model prediction and glucose level.

    Args:
        timestep (int): The current simulation timestep.
        glucose (float): The current glucose level of the patient.
        pid_model (tf.keras.Model): The trained PID model for predicting Kp, Ki, Kd.
        sim_vars (dict): The current simulation state variables.
    """
    # Predict the insulin dosage (Kp, Ki, Kd) from the PID model
    prev_g, cum_err, _, adjusted_basal_rate = predict_insulin_dosage(
        pid_model, glucose, sim_vars["weight"], timestep,
        sim_vars["previous_glucose"], sim_vars["cumulative_error"]
    )

    # Calculate the current day from the timestep (1440 timesteps per day)
    current_day = (timestep - 1) // 1440
    tracker = sim_vars["basal_tracker"]

    # Initialize the basal tracker for the current day if it doesn't exist
    if current_day not in tracker:
        tracker[current_day] = 0

    # Convert the adjusted basal rate from units per hour to units per minute (for more precision)
    adjusted_basal_rate_per_minute = (adjusted_basal_rate / 60.0) * 15  # Account for 15-minute intervals

    # Check if adding the adjusted basal rate will exceed the daily basal limit
    if tracker[current_day] + adjusted_basal_rate_per_minute > sim_vars["daily_basal_limit"]:
        # If the limit is exceeded, cap the basal rate to the remaining allowable basal insulin
        print(f"Limit exceeded! Timestep {timestep}: Attempting to deliver {adjusted_basal_rate:.2f} U per minute, but capping it.")
        adjusted_basal_rate_per_minute = max(0, sim_vars["daily_basal_limit"] - tracker[current_day])
        print(f"Capped basal rate to {adjusted_basal_rate_per_minute:.4f} U per minute to respect daily basal limit.")

    # Update the basal tracker with the adjusted basal rate for the current day
    tracker[current_day] += adjusted_basal_rate_per_minute

    # Append the adjusted basal rate to the basal rates and infusion values
    sim_vars["basal_rates"].append(adjusted_basal_rate_per_minute)
    sim_vars["infusion_values"].append(adjusted_basal_rate_per_minute)

    # Update the previous glucose and cumulative error for the next iteration
    sim_vars["previous_glucose"] = prev_g
    sim_vars["cumulative_error"] = cum_err

    # Schedule the next basal delivery 15 timesteps ahead
    sim_vars["next_basal_timestep"] = timestep + 15

def maintain_previous(sim_vars):
    """
    Maintains the previous basal insulin delivery if no bolus or basal was delivered at the current timestep.

    Args:
        sim_vars (dict): The current simulation state variables.
    """
    # Maintain the last basal rate if no new bolus or basal insulin was delivered
    last_basal = sim_vars["basal_rates"][-1] if sim_vars["basal_rates"] else 1.0
    sim_vars["infusion_values"].append(last_basal)
    sim_vars["basal_rates"].append(last_basal)
    sim_vars["bolus_insulin"].append(0)
    sim_vars["carb_intake"].append(0)


In [20]:
if __name__ == "__main__":
    # Load model and test case
    # Load the trained PID model for insulin prediction
    model_path = '/content/drive/MyDrive/GP PID/opt_pid_tuning_model_2.h5'
    pid_model = load_model(model_path)  # Load the model from the given path

    # Load the test case data, including weight, simulation time, and meal schedule
    test_case_path = '/content/drive/MyDrive/GP PID/TestCases.txt'
    weight, sim_time, meals = parse_test_case(test_case_path)  # Parse test case file

    # Create a meal schedule dictionary (time: carbs) for fast lookups during simulation
    meal_schedule = {time: carbs for time, carbs in meals}

    # Initialize the simulation with the patient's weight and other state variables
    sim_vars = initialize_simulation(weight)

    # Set the initial timestep to 1 (starting point of the simulation)
    timestep = 1
    meal_carbs = 0  # Initially no meal carbs

    # Main loop to run the simulation for each timestep up to the simulation time
    while timestep <= sim_time:
        # Randomly generate the glucose level between 70 and 180 mg/dL at each timestep
        glucose = random.uniform(70, 180)

        # Check if insulin should be delivered based on glucose level
        if should_deliver_insulin(glucose):
            adjusted_basal_rate = 0  # No basal insulin if bolus is given

            # Get the carbs for the current meal (scheduled 10 minutes ahead)
            meal_carbs = check_meal(timestep, meal_schedule)
            give_bolus = meal_carbs > 0  # Determine if a bolus should be delivered
            give_basal_now = timestep == sim_vars["next_basal_timestep"] and not give_bolus  # Check if basal is due

            # If bolus needs to be delivered (i.e., meal exists)
            if give_bolus:
                # Deliver bolus insulin and reset meal_carbs
                meal_carbs, _ = deliver_bolus(timestep, glucose, meal_carbs, sim_vars)

                # Delay basal insulin delivery if it's scheduled for the current timestep
                if sim_vars["next_basal_timestep"] == timestep:
                    sim_vars["next_basal_timestep"] = timestep + 1  # Schedule the next basal insulin for 15 minutes later

            # If basal insulin is due and no bolus is needed
            elif give_basal_now:
                # Deliver basal insulin based on the PID model prediction
                deliver_basal(timestep, glucose, pid_model, sim_vars)

                # Append 0 to bolus insulin and carb intake to maintain structure
                sim_vars["bolus_insulin"].append(0)
                sim_vars["carb_intake"].append(0)

        # If no insulin needs to be delivered, maintain the previous basal rate
        else:
            maintain_previous(sim_vars)

        timestep += 1  # Move to the next timestep in the simulation

    # Print final summary of basal insulin delivery per day
    print("\n Basal insulin summary per day:")
    # Iterate through each day and print the total basal insulin delivered for that day
    for day, total in sim_vars["basal_tracker"].items():
        print(f"  Day {day + 1}: {total:.2f} U / {sim_vars['daily_basal_limit']:.2f} U")

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 174ms/step

Sample 1:
Timestep: 1
Glucose: 81.74 mg/dL
Adjusted Hourly Basal Insulin: 0.30 U per hour
Weight: 75.75 kg
--------------------------------------------------
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step

Sample 16:
Timestep: 16
Glucose: 100.58 mg/dL
Adjusted Hourly Basal Insulin: 1.01 U per hour
Weight: 75.75 kg
--------------------------------------------------
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step

Sample 31:
Timestep: 31
Glucose: 94.79 mg/dL
Adjusted Hourly Basal Insulin: 0.30 U per hour
Weight: 75.75 kg
--------------------------------------------------
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step

Sample 46:
Timestep: 46
Glucose: 173.98 mg/dL
Adjusted Hourly Basal Insulin: 2.00 U per hour
Weight: 75.75 kg
--------------------------------------------------
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/