## Import libraries

In [1]:
from datetime import timedelta, datetime
from typing import List
from math import exp, isclose
from typing import List
from datetime import datetime, timedelta
from dataclasses import dataclass

import numpy as np
import matplotlib.pyplot as plt
import pytz


## Functions

### Calculate the decay constant

In [2]:
def calculate_decay_constant(initial_factor_level: float, measured_level: float, time_elapsed: float) -> float:
    return (np.log(measured_level) - np.log(initial_factor_level)) / time_elapsed

### Generate hours as numbers for calculation

In [3]:
def generate_refill_hours(refill_times: List[str], start_of_week: datetime) -> List[float]:
    refill_times_in_datetime_format = generate_refill_times_in_datetime_format(refill_times, start_of_week)
    refill_hours = generate_refill_times_in_hour_format(refill_times_in_datetime_format, start_of_week)
    return refill_hours


def generate_refill_times_in_hour_format(refill_times_in_datetime_format, start_of_week):
    refill_hours_in_numbers = []
    for refill_time_in_datetime in refill_times_in_datetime_format:
        refill_time_in_hour = (refill_time_in_datetime - start_of_week).total_seconds() / 3600
        refill_hours_in_numbers.append(refill_time_in_hour)
    return refill_hours_in_numbers


def generate_refill_times_in_datetime_format(refill_times, start_of_week):
    refill_datetimes = []

    for time_str in refill_times:
        refill_time_in_datetime = parse_refill_time(time_str, start_of_week)
        refill_datetimes.append(refill_time_in_datetime)

    return refill_datetimes


def parse_refill_time(time_str: str, start_of_week: datetime) -> datetime:
    weekdays = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
    day_part, *time_part = time_str.split()
    day_of_week = weekdays.index(day_part)
    time_part = " ".join(time_part)
    time_obj = datetime.strptime(time_part, "%I:%M %p")
    time_obj = time_obj.replace(year=CURRENT_YEAR, month=CURRENT_MONTH)
    day_datetime = start_of_week + timedelta(days=day_of_week)
    return day_datetime.replace(hour=time_obj.hour, minute=time_obj.minute)

### Calculate the factor levels 

In [4]:
@dataclass
class FactorCalculationParameters:
    initial_factor_level: float
    decay_constant: float
    refill_hours: List[float]
    week_duration: int
        
def get_factor_levels(settings) -> dict:
    hours_in_a_week = 24 * 7

    start_of_week = CET.localize(
        datetime.combine(datetime.now(CET).date() - timedelta(days=datetime.now(CET).date().weekday()),
                         datetime.min.time()))

    decay_constant = calculate_decay_constant(initial_factor_level=settings.initial_factor_level,
                                              measured_level=settings.factor_measured_level, time_elapsed=settings.time_elapsed_until_measurement)

    refill_hours = generate_refill_hours(settings.refill_times, start_of_week)

    week_hours = np.arange(0, hours_in_a_week, 0.1).round(2)
    week_hours = week_hours.tolist()

    level_params = FactorCalculationParameters(
        refill_hours=refill_hours,
        initial_factor_level=settings.initial_factor_level,
        decay_constant=decay_constant,
        week_duration=hours_in_a_week
    )

    levels = calculate_levels(week_hours=week_hours, params=level_params)

    current_time = convert_to_datetime(settings.current_level)
    current_hour = (current_time - start_of_week).total_seconds() / 3600
    current_hour = float(f"{current_hour:.1f}")
    current_factor_level = levels[week_hours.index(current_hour)]
    
    return {
    "hours": week_hours,
    "start_of_week": start_of_week.isoformat(),
    "levels": levels,
    "current_time": current_time.isoformat(),
    "current_factor_level": [current_hour, current_factor_level]
}

In [5]:
def calculate_levels(week_hours: List[float], params: FactorCalculationParameters) -> List[float]:
    levels = []
    peak_value = params.initial_factor_level

    for hour in week_hours:
        if not params.refill_hours:
            levels.append(0.0)
            continue

        if hour < params.refill_hours[0]:
            previous_week_last_refill = params.refill_hours[-1] - params.week_duration
            factor_value = params.initial_factor_level * exp(params.decay_constant * (hour - previous_week_last_refill))
            levels.append(factor_value)
            continue

        for i in range(len(params.refill_hours)):
            if isclose(hour, params.refill_hours[i]):
                if i == 0:
                    previous_week_last_refill = params.refill_hours[-1] - params.week_duration
                    previous_hour = hour - 0.1
                    previous_level_value = peak_value * exp(
                        params.decay_constant * (previous_hour - previous_week_last_refill))
                    level_value = params.initial_factor_level + previous_level_value
                    peak_value = level_value
                    levels.append(level_value)
                else:
                    previous_hour = hour - 0.1
                    last_refill_hour = params.refill_hours[i - 1]
                    previous_value = peak_value * exp(params.decay_constant * (previous_hour - last_refill_hour))
                    level_value = params.initial_factor_level + previous_value
                    peak_value = level_value
                    levels.append(level_value)
                break

            if hour < params.refill_hours[i]:
                if i == 0:
                    previous_week_last_refill = params.refill_hours[-1] - params.week_duration
                    level_value = params.initial_factor_level * exp(
                        params.decay_constant * (hour - previous_week_last_refill))
                    levels.append(level_value)
                else:
                    level_value = peak_value * exp(params.decay_constant * (hour - params.refill_hours[i - 1]))
                    levels.append(level_value)
                break

        if hour > params.refill_hours[-1]:
            levels.append(peak_value * exp(params.decay_constant * (hour - params.refill_hours[-1])))

    return levels

In [6]:
def convert_to_datetime(date_str):
    time_part = ' '.join(date_str.split()[1:])  # Gets "10:05 AM"
    datetime_obj = datetime.strptime(time_part, '%I:%M %p')
    datetime_obj = datetime_obj.replace(year=CURRENT_YEAR, month=CURRENT_MONTH)

    weekday_str = date_str.split()[0]  # Gets "Monday"
    weekdays = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
    target_weekday = weekdays.index(weekday_str)
    current_weekday = NOW.weekday()

    days_difference = target_weekday - current_weekday
    correct_date = NOW + timedelta(days=days_difference)

    final_datetime = datetime_obj.replace(day=correct_date.day)

    return CET.localize(final_datetime)

### Constants

In [7]:
INITIAL_FACTOR_LEVEL = 60  # maximum factor level reached after infusion

WEEK_DURATION_IN_HOURS = 168
HOUR_INCR = 0.1  # Hour increment for time points

CET = pytz.timezone('Europe/Berlin')

NOW = datetime.now(CET)
NOW_FORMATTED = NOW.strftime("%A %I:%M %p")

CURRENT_YEAR = NOW.year
CURRENT_MONTH = NOW.month

## Measurement data for calculation

### Infusion events

In [8]:
refill_times = [
    "Monday 07:30 AM",
    "Wednesday 02:30 PM",
    "Friday 02:30 PM"
]

### Define measurements

In [9]:
first_measurement = {
    'initial_factor_level': INITIAL_FACTOR_LEVEL,
    'time_elapsed_until_measurement': 30,
    'factor_measured_level': 15,
    'refill_times': refill_times,
    'current_level': NOW_FORMATTED
}

second_measurement = {
    'initial_factor_level': INITIAL_FACTOR_LEVEL,
    'time_elapsed_until_measurement': 72,
    'factor_measured_level': 1,
    'refill_times': refill_times,
    'current_level': NOW_FORMATTED
}

third_measurement = {
    'initial_factor_level': INITIAL_FACTOR_LEVEL,
    'time_elapsed_until_measurement': 50,
    'factor_measured_level': 6,
    'refill_times': refill_times,
    'current_level': NOW_FORMATTED
}


measurements = [
    first_measurement,
    second_measurement,
    third_measurement,
]

### Validate the mean method for measurements

### Calculate the measurement decay constants

In [10]:
decay_constants = []

for measurement in measurements:
    initial = measurement['initial_factor_level']
    time_elapsed = measurement['time_elapsed_until_measurement']
    measured = measurement['factor_measured_level']
    decay_constant = (np.log(measured) - np.log(initial)) / time_elapsed
    decay_constants.append(decay_constant)

    
decay_constants = np.array(decay_constants)

print(f"Decay constants: {decay_constants}")

Decay constants: [-0.04620981 -0.0568659  -0.0460517 ]


### Calculate the mean value 

### 1-2 Mean, Median

In [22]:
mean_decay_constant = np.mean(decay_constants)
median_decay_constant = np.median(decay_constants)

print(f"Mean decay constant: {mean_decay_constant}")
print(f"Median decay constant: {median_decay_constant}")

Mean decay constant: -0.04970913686491326
Median decay constant: -0.04620981203732968


### 3. Huber Regression

In [12]:
from sklearn.linear_model import HuberRegressor

log_initial = np.log(INITIAL_FACTOR_LEVEL)  # ln(C0) is constant since initial_factor_level has always the same value
time_elapsed = np.array([measurement['time_elapsed_until_measurement'] for measurement in measurements])
log_measured = np.array([np.log(measurement['factor_measured_level']) for measurement in measurements])

# Use Huber regression (a robust regression method)
X = time_elapsed.reshape(-1, 1)  # Reshape for sklearn
y = log_initial - log_measured  # Target variable for regression

huber = HuberRegressor().fit(X, y)
huber_decay_constant = huber.coef_[0]


print(f"Huber decay constant: {huber_decay_constant}")

Huber decay constant: 0.06447852046553139


### 4. Theoretical decay constant (from manufacturer)

In [13]:
elocta_hour_decay_constant = (np.log(50) - np.log(100)) / 18
altuviiio_decay_constant = (np.log(10) - np.log(100)) / 168
twelve_hour_decay_constant = (np.log(50) - np.log(100)) / 12

### 5. Measurement decay constants

In [14]:
first_measurement_decay_constant = decay_constants[0]
second_measurement_decay_constant = decay_constants[1]
third_measurement_decay_constant = decay_constants[2]

### Calculate the curve values with the decay constants

In [15]:
time_elapsed_values = np.linspace(0, 72, 72)

In [16]:
median_decay_values = [INITIAL_FACTOR_LEVEL * exp(median_decay_constant * (hour)) for hour in time_elapsed_values]
mean_decay_values = [INITIAL_FACTOR_LEVEL * exp(mean_decay_constant * (hour)) for hour in time_elapsed_values]
huber_decay_values = [INITIAL_FACTOR_LEVEL * exp(huber_decay_constant * (hour)) for hour in time_elapsed_values]

elocta_hour_values = [INITIAL_FACTOR_LEVEL * exp(elocta_hour_decay_constant * (hour)) for hour in time_elapsed_values]
altuviiio_hour_values = [INITIAL_FACTOR_LEVEL * exp(altuviiio_decay_constant * (hour)) for hour in time_elapsed_values]
twelve_hour_values = [INITIAL_FACTOR_LEVEL * exp(twelve_hour_decay_constant * (hour)) for hour in time_elapsed_values]

first_measurement_values = [INITIAL_FACTOR_LEVEL * exp(first_measurement_decay_constant * (hour)) for hour in time_elapsed_values]
second_measurement_values = [INITIAL_FACTOR_LEVEL * exp(second_measurement_decay_constant * (hour)) for hour in time_elapsed_values]
third_measurement_values = [INITIAL_FACTOR_LEVEL * exp(third_measurement_decay_constant * (hour)) for hour in time_elapsed_values]

### Plot

In [17]:
from math import exp
import random

import numpy as np
import plotly.graph_objs as go
from plotly.offline import init_notebook_mode, iplot

# Initialize Plotly to run in offline mode within Jupyter Notebook
init_notebook_mode(connected=True)

median_line_data = go.Scatter(
    x=time_elapsed_values,
    y=median_decay_values,
    mode='lines',
    name='Median Method Decay Constant',
    line=dict(color=f"rgba({random.randint(0, 255)}, {random.randint(0, 255)}, {random.randint(0, 255)}, 0.5)", dash='dot'),
)

mean_line_data = go.Scatter(
    x=time_elapsed_values,
    y=mean_decay_values,
    mode='lines',
    name='Mean Method Decay Constant',
    line=dict(color=f"rgba({random.randint(0, 255)}, {random.randint(0, 255)}, {random.randint(0, 255)}, 0.5)", dash='dash'),
)

huber_line_data = go.Scatter(
    x=time_elapsed_values,
    y=huber_decay_values,
    mode='lines',
    name='Huber Method Decay Constant',
    line=dict(color=f"rgba({random.randint(0, 255)}, {random.randint(0, 255)}, {random.randint(0, 255)}, 0.5)", dash='dashdot'),
)

elocta_data = go.Scatter(
    x=time_elapsed_values,
    y=elocta_hour_values,
    mode='lines',
    name='Elocta Hour Constant',
    line=dict(color=f"rgb({random.randint(0, 255)}, {random.randint(0, 255)}, {random.randint(0, 255)})")
)


twelve_hour_data = go.Scatter(
    x=time_elapsed_values,
    y=twelve_hour_values,
    mode='lines',
    name='Twelve Hour Constant',
    line=dict(color=f"rgb({random.randint(0, 255)}, {random.randint(0, 255)}, {random.randint(0, 255)})")
)

altuviiio_hour_data = go.Scatter(
    x=time_elapsed_values,
    y=altuviiio_hour_values,
    mode='lines',
    name='Altuviiio Hour Constant',
    line=dict(color=f"rgb({random.randint(0, 255)}, {random.randint(0, 255)}, {random.randint(0, 255)})")
)

first_measuremenet_data = go.Scatter(
    x=time_elapsed_values,
    y=first_measurement_values,
    mode='lines',
    name='First Measurement',
    line=dict(color=f"rgb({random.randint(0, 255)}, {random.randint(0, 255)}, {random.randint(0, 255)})")
)

second_measuremenet_data = go.Scatter(
    x=time_elapsed_values,
    y=second_measurement_values,
    mode='lines',
    name='Second Measurement',
    line=dict(color=f"rgb({random.randint(0, 255)}, {random.randint(0, 255)}, {random.randint(0, 255)})")
)

third_measuremenet_data = go.Scatter(
    x=time_elapsed_values,
    y=third_measurement_values,
    mode='lines',
    name='Third Measurement',
    line=dict(color=f"rgb({random.randint(0, 255)}, {random.randint(0, 255)}, {random.randint(0, 255)})")
)


layout = go.Layout(
    title='Factor VIII Decay Constants with Robust Measures',
    xaxis=dict(title='Time Elapsed (hours)'),
    yaxis=dict(title='Decay Constant'),
    showlegend=True
)

# fig = go.Figure(data=[median_line_data, mean_line_data, twelve_hour_data, eighteen_hour_data, new_hour_data], layout=layout)

# fig = go.Figure(data=[median_line_data, mean_line_data], layout=layout)

fig = go.Figure(data=[mean_line_data, median_line_data, first_measuremenet_data, second_measuremenet_data, third_measuremenet_data], layout=layout)



# Print the robust decay constants
print(f"First measurement Decay Constant: {first_measurement_decay_constant}")
print(f"Second measurement Decay Constant: {second_measurement_decay_constant}")
print(f"Third measurement Decay Constant: {third_measurement_decay_constant}")
print(f"")
print(f"Median Decay Constant: {median_decay_constant}")
print(f"Mean Decay Constant: {mean_decay_constant}")
print(f"Huber Decay Constant: {huber_decay_constant}")
print(f"Twelve Hour Decay Constant: {twelve_hour_decay_constant}")
print(f"Eighteen Hour Decay Constant: {elocta_hour_decay_constant}")
print(f"New Constant: {altuviiio_decay_constant}")

First measurement Decay Constant: -0.04620981203732968
Second measurement Decay Constant: -0.05686589669752917
Third measurement Decay Constant: -0.04605170185988091

Median Decay Constant: -0.04620981203732968
Mean Decay Constant: -0.04970913686491326
Huber Decay Constant: 0.06447852046553139
Twelve Hour Decay Constant: -0.05776226504666215
Eighteen Hour Decay Constant: -0.03850817669777477
New Constant: -0.013705863648774083


### Measurements and methods

In [18]:
fig = go.Figure(data=[mean_line_data, median_line_data, huber_line_data, first_measuremenet_data, second_measuremenet_data, third_measuremenet_data], layout=layout)


iplot(fig)

### Define input data for the calculation

In [19]:
@dataclass
class InputData:
    initial_factor_level: float
    time_elapsed_until_measurement: float
    factor_measured_level: int
    refill_times: List[str]
    current_level: str

In [23]:
first_measurement_input_data = InputData(**first_measurement)
second_measurement_input_data = InputData(**second_measurement)
third_measurement_input_data = InputData(**third_measurement)

In [27]:
results_first_measurement = get_factor_levels(settings=first_measurement_input_data)
results_second_measurement = get_factor_levels(settings=second_measurement_input_data)
results_third_measurement = get_factor_levels(settings=third_measurement_input_data)

### Define plotting function

In [28]:
from datetime import datetime, timedelta
import plotly.graph_objs as go
from plotly.offline import init_notebook_mode, iplot

# Initialize Plotly to run in offline mode within Jupyter Notebook
init_notebook_mode(connected=True)

def plot_factor_level_chart(measurements):
    traces = []
    max_y_value = 0
    line_colors = ['rgb(75, 192, 192)', 'rgb(192, 75, 192)', 'rgb(192, 192, 75)', 'rgb(75, 75, 192)', 'rgb(192, 75, 75)']
    marker_colors = ['red', 'blue', 'green', 'purple', 'orange']
    
    for i, data in enumerate(measurements):
        # Convert the ISO 8601 datetime string to a datetime object
        start_of_week = datetime.fromisoformat(data['start_of_week'])
        hours = [start_of_week + timedelta(hours=hour) for hour in data['hours']]
        
        # Create the main trace
        trace = go.Scatter(
            x=hours,
            y=data['levels'],
            mode='lines',
            name=f'Factor Level Over Time {i+1}',
            line=dict(color=line_colors[i % len(line_colors)])
        )
        traces.append(trace)
        
        # Update the maximum Y axis value
        max_y_value = max(max_y_value, max(data['levels']))
        
        # Current factor level marker
        current_factor_level = data['current_factor_level']
        factor_level_x_value = start_of_week + timedelta(hours=current_factor_level[0])
        factor_level_y_value = current_factor_level[1]
        
        factor_level_trace = go.Scatter(
            x=[factor_level_x_value],
            y=[factor_level_y_value],
            mode='markers',
            name=f'Current Factor Level {i+1} ({factor_level_y_value:.2f}%)',
            text=[f'Current Measured Level Marker: {factor_level_y_value:.2f}%'],
            marker=dict(color=marker_colors[i % len(marker_colors)], size=10, symbol='cross')
        )
        traces.append(factor_level_trace)
    
    # Define the maximum Y axis value
    y_axis_upper_limit = max_y_value if max_y_value > 100 else 100
    
    # Define x-axis tickvals and ticktext
    start_of_week = datetime.fromisoformat(measurements[0]['start_of_week'])
    tickvals = []
    ticktext = []
    for i in range(7):
        tick_time = start_of_week + timedelta(days=i)
        tick_time = tick_time.replace(hour=9, minute=30)
        tickvals.append(tick_time)
        ticktext.append(tick_time.strftime('%a %H:%M'))
    
    # Layout settings
    layout = go.Layout(
        xaxis=dict(
            title='Time of the Week',
            type='date',
            tickformat='%a %b %d, %H:%M',
            tickvals=tickvals,
            ticktext=ticktext
        ),
        yaxis=dict(
            title='Factor Level (%)',
            range=[0, y_axis_upper_limit],
            hoverformat='.2f'
        )
    )
    
    # Create the figure and plot
    fig = go.Figure(data=traces, layout=layout)
    iplot(fig)


# Plot the chart with multiple measurements
plot_factor_level_chart([results_first_measurement, results_second_measurement, results_third_measurement])
