## AF detection using RRI variance
Based on this paper https://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=1588177

In [23]:
import numpy as np
from concrete import fhe

### Load r_peak x coordinate 

In [24]:
coord_x_rpeaks = np.array(np.load('Pan_Tompkins_QRS_Detection/extracted_data/coord_x_r_peaks_no_107.npy'))
print("--- ECG signal infos ---")
print("List of R peaks: ", coord_x_rpeaks)
print("Number of R peaks: ", len(coord_x_rpeaks))
print("Duration of the signal: ", coord_x_rpeaks[-1]/360, "s")
print("Sampling frequency: 360")
print("------------------------")

--- ECG signal infos ---
List of R peaks:  [  88  210  385  510  701 1022 1335 1636 1934 2231 2537 2849 3163 3477
 3788]
Number of R peaks:  15
Duration of the signal:  10.522222222222222 s
Sampling frequency: 360
------------------------


![image.png](./Pan_Tompkins_QRS_Detection/extracted_data/r_peaks_no_107.png)

In [25]:
def compute_rri(coord_x_rpeaks, verbose=False):
    """
    Compute the R peaks intervals using numpy.diff. It represents the first step of the algorithm

    Parameters
    ----------
    coord_x_rpeaks : list of x coordinates of R peaks
    verbose : if True, print the R peaks intervals

    Returns
    -------
    rri : list of x coordinates of R peaks intervals
    """
    rri = np.diff(coord_x_rpeaks)
    if verbose:
        print("RRI: ", rri)
    return rri

def convert_rri_to_seconds(rri, fs=360, verbose=False):
    """
    Convert RRI to seconds

    Parameters
    ----------
    rri : list of x coordinates of R peaks intervals
    fs : sampling frequency
    verbose : if True, print the R peaks intervals in seconds

    Returns
    -------
    rri : list of x coordinates of R peaks intervals in seconds
    """
    rri_to_seconds = np.divide(rri, fs)
    if verbose:
        print("RRI to seconds: ", rri_to_seconds)
    return rri_to_seconds

def compute_moving_average(rri, verbose=False):
    """
    Compute the moving average of RRI. It represents the part 1 of the second step of the algorithm

    Parameters
    ----------
    rri : list of R peaks intervals in seconds
    verbose : if True, print the moving average of RRI

    Returns
    -------
    moving_average : list of moving average for each RRI
    """
    moving_average = []

    # Compute the moving average of RRI
    for i in range(len(rri)):
        if i == 0:
            moving_average.append(rri[i])
        else:
            moving_average.append(0.75 * moving_average[i-1] + 0.25 * rri[i])
    
    if verbose:
        print("Moving average: ", moving_average)
    
    return moving_average

def compute_moving_average_to_int(rri, verbose=False):
    moving_average = []

    # Compute the moving average of RRI
    for i in range(len(rri)):
        if i == 0:
            moving_average.append(rri[i])
        else:
            moving_average.append((0.75 * moving_average[i-1] + 0.25 * rri[i]).astype(np.int64))
    
    if verbose:
        print("Moving average: ", moving_average)
    
    return moving_average

def normalize_rri(rri, mvg_average, verbose=False):
    """
    Normalize each RRI. It represents the part 2 of the second step of the algorithm

    Parameters
    ----------
    rri : list of R peaks intervals in seconds
    mvg_average : list of moving average for each RRI
    verbose : if True, print the normalized RRI

    Returns
    -------
    rri_norm : list of normalized RRI
    """
    rri_norm = []

    # normalize each rri 
    for i in range(len(rri)):
        rri_norm.append(rri[i]*100 / mvg_average[i])
    
    if verbose:
        print("Normalized RRI: ", rri_norm)
    
    return rri_norm

def compute_rri_variance(rri_norm, verbose=False):
    """
    Compute the variance of the normalized RRI. It represents the third step of the algorithm

    Parameters
    ----------
    rri_norm : list of normalized RRI
    verbose : if True, print the variance of the normalized RRI

    Returns
    -------
    variance : variance of the normalized RRI
    """
    
    # compute the mean of the normalized rri
    mean_rri_norm = np.sum(rri_norm) / len(rri_norm)

    # compute the sum of the formula (the orginal formula use the absolute value before squaring, we should maybe avoid it with FHE)
    sum_formula = np.sum((abs(rri_norm - mean_rri_norm))**2)

    # compute the variance
    variance = sum_formula / (len(rri_norm)-1)

    if verbose:
        print("Mean of the normalized RRI: ", mean_rri_norm)
        print("Variance of the normalized RRI: ", variance)

    return variance

def af_detection(variance, threshold=200, verbose=False):
    """
    Detect AF using the variance of the normalized RRI. 
    According to the article, a typical threshold is 200. 
    It represents the fourth step of the algorithm

    Parameters
    ----------
    variance : variance of the normalized RRI
    threshold : threshold used to detect AF
    verbose : if True, print the details of the AF detection

    Returns
    -------
    af : True if AF is detected, False otherwise
    """

    close_to_threshold = False

    if variance > threshold:
        af = True
        # Check if the variance is close to the threshold
        if variance < threshold*1.1:
            close_to_threshold = True
    else:
        af = False
        # Check if the variance is close to the threshold
        if variance > threshold*0.9:
            close_to_threshold = True

    if verbose:
        print("-------------------")
    
        if close_to_threshold:
            print("WARNING : The variance is close to the threshold")

        if af:
            print("AF detected, the variance is higher than the threshold")
        else:
            print("AF not detected, the variance is lower than the threshold")

        print("Variance: ", variance)
        print("Threshold: ", threshold)
        print("-------------------")
    
    return af


### Implementation of the algorithm described in [this paper](https://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=1588177)
Step 5 is not implemented, see my report for more details.

In [29]:
# Step 1
# Fetch the interval between each R peak (RRI)
rri = compute_rri(coord_x_rpeaks)

# Convert RRI to seconds
rri_in_sec = convert_rri_to_seconds(rri,verbose=True)

# Step 2.1
# Compute the moving average of RRI
#mvg_average = compute_moving_average(rri, verbose=True)
mvg_average = compute_moving_average_to_int(rri, verbose=True)

# Step 2.2
# Normalize each RRI
rri_norm = normalize_rri(rri, mvg_average, verbose=True)

# Step 3
# Compute the variance of the normalized RRI
variance = compute_rri_variance(rri_norm, verbose=True)

# Step 4
# Detect AF using the variance of the normalized RRI
af_detected = af_detection(variance, verbose=True)


RRI to seconds:  [0.33888889 0.48611111 0.34722222 0.53055556 0.89166667 0.86944444
 0.83611111 0.82777778 0.825      0.85       0.86666667 0.87222222
 0.87222222 0.86388889]
Moving average:  [122, 135, 132, 146, 189, 220, 240, 254, 264, 274, 283, 290, 296, 299]
Normalized RRI:  [100.0, 129.62962962962962, 94.6969696969697, 130.82191780821918, 169.84126984126985, 142.27272727272728, 125.41666666666667, 117.32283464566929, 112.5, 111.67883211678833, 110.24734982332156, 108.27586206896552, 106.08108108108108, 104.0133779264214]
Mean of the normalized RRI:  118.7713227555521
Variance of the normalized RRI:  388.6966013131729
-------------------
AF detected, the variance is higher than the threshold
Variance:  388.6966013131729
Threshold:  200
-------------------


The mean is typically calculated as x.sum() / N, where N = len(x). If, however, ddof is specified, the divisor N - ddof is used instead. In standard statistical practice, ddof=1 provides an unbiased estimator of the variance of a hypothetical infinite population. ddof=0 provides a maximum likelihood estimate of the variance for normally distributed variables.

In [27]:
# Check with numpy formula if the result is the same
assert variance == np.var(rri_norm, ddof=1)

### Fully Homomorphic Encryption

Detection of AF (Step 4)

In [6]:
@fhe.compiler({"variance": "encrypted", "threshold": "encrypted"})
def af_detection_fhe(variance,threshold):
    return variance > threshold

inputset = [(np.random.randint(1000),200),
            (np.random.randint(1000),200),
            (np.random.randint(1000),200)]

circuit = af_detection_fhe.compile(inputset)

In [7]:
encrypted_data = circuit.run(circuit.encrypt(201,200))

In [8]:
clear_data = circuit.decrypt(encrypted_data)
print(clear_data)

1


Compute variance function (Step 3)

In [10]:
@fhe.compiler({"rri_norm": "encrypted"})
def compute_rri_variance_fhe(rri_norm):

    rri_norm_len = rri_norm.size

    mean_rri_norm = np.sum(rri_norm) // rri_norm_len

    sum_formula = np.sum((abs(rri_norm - mean_rri_norm))**2)

    variance = sum_formula // (rri_norm_len-1)
    
    return variance

rri_norm_max_value = np.max(rri_norm)
rri_norm_max_value_rounded = rri_norm_max_value + (50 - rri_norm_max_value % 50)

rri_norm_list_size = len(rri_norm)

inputset = [np.random.randint(rri_norm_max_value_rounded, size=rri_norm_list_size),
            np.random.randint(rri_norm_max_value_rounded, size=rri_norm_list_size),
            np.random.randint(rri_norm_max_value_rounded, size=rri_norm_list_size)]

circuit = compute_rri_variance_fhe.compile(inputset)

In [11]:
# convert rri_norm list to a list of int
rri_norm_int = [int(np.round(i)) for i in rri_norm]

encrypted_data = circuit.run(circuit.encrypt(rri_norm_int))

In [12]:
clear_data = circuit.decrypt(encrypted_data)
print(clear_data)

390


Normalization (Step 2.2)

That is not possible with this framework, because it does not support division between two encrypted values.

Moving average function (Step 2.1)

In [30]:
@fhe.compiler({"rri": "encrypted"})
def compute_moving_average_fhe(rri):
    
    rri_len = rri.size
    moving_average = fhe.zeros(rri_len)

    moving_average[0] = rri[0]

    for i in range(1, rri_len):
        
        moving_average[i] = np.floor_divide(np.multiply(moving_average[i-1], 3) + rri[i], 4)

    
    return moving_average

rri_max_value = np.max(rri)
rri_max_value_rounded = rri_max_value + (50 - rri_max_value % 50)

rri_list_size = len(rri)

inputset = [np.random.randint(rri_max_value_rounded, size=rri_list_size),
            np.random.randint(rri_max_value_rounded, size=rri_list_size),
            np.random.randint(rri_max_value_rounded, size=rri_list_size)]

circuit = compute_moving_average_fhe.compile(inputset)


In [32]:
encrypted_data = circuit.run(circuit.encrypt(rri))

In [33]:
clear_data = circuit.decrypt(encrypted_data)
print(clear_data)

[122 135 132 146 189 220 240 254 264 274 283 290 296 299]
