In [53]:
import json
import numpy as np
import scipy as sp
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib import style
from scipy.signal import butter, filtfilt
from scipy.signal import find_peaks

import warnings
warnings.filterwarnings(action="ignore", module="scipy", message="^internal gelsd")

style.use("ggplot")

In [54]:
sampling_frequency = 50
# 50/7 is chosen for applying average filter to the data each time
average_filter_window_duration_hr = int((1 / 7) * sampling_frequency)
T = 1 / sampling_frequency
save_plots = True

In [55]:
# z-score normalization
# z = x-mean(x)/std(x)
def normalize(data):
    for i in range(0, 3):
        data[:, i] = sp.stats.zscore(data[:, i])
    return data

In [56]:
# average filter
# y = (x1 + x2 + x3 + ... + xn) / n
# window = 50/7 = 7 samples i.e averaging every 7 samples
def apply_average_filter(data, window):
    for i in range(0, 3):
        data[:, i] = np.array(pd.Series(data[:, i]).rolling(window=window).mean())
    return data

In [57]:
# plot one axis data
def plot(data, title, plot_save_path):
    N = len(data)
    y = np.linspace(0, N / sampling_frequency, N)
    sns.set_theme(style="whitegrid", rc={"axes.grid": True, "grid.color": "lightgray"})
    plt.figure(figsize=(15, 6))

    plt.plot(y, data)

    plt.title(title)
    plt.ylabel("ACCELERATION (m/s^2)")
    plt.xlabel("TIME (s)")
    draw_plot(plot_save_path)
    plt.close()


# plot 3 axis data
def plot_all_axis(data0, data1, data2, title, plot_save_path):
    N = len(data0)
    time = np.linspace(0, N / sampling_frequency, N)
    sns.set_theme(style="whitegrid", rc={"axes.grid": True, "grid.color": "lightgray"})

    plt.figure(figsize=(15, 6))
    plt.plot(time, data0, label="X-axis", color="r")
    plt.plot(time, data1, label="Y-axis", color="g")
    plt.plot(time, data2, label="Z-axis", color="b")

    plt.title(title)
    plt.xlabel("Time (s)")
    plt.ylabel("Acceleration (m/s²)")
    plt.legend()
    plt.grid(True)
    draw_plot(plot_save_path)
    plt.close()


# plot fft data
def plot_fft(fft_acc, freqs, plot_save_path):
    sns.set_theme(style="whitegrid", rc={"axes.grid": True, "grid.color": "lightgray"})
    plt.figure(figsize=(15, 6))
    plt.plot(freqs, fft_acc)
    plt.xlabel("Frequency in Hertz [Hz]")
    plt.ylabel("Magnitude")
    plt.title("FFT")
    draw_plot(plot_save_path)
    plt.close()


# draw the plot and save it
def draw_plot(plot_save_path):
    if save_plots:
        plt.savefig(plot_save_path)
    else:
        plt.draw()
        plt.pause(5)

In [58]:
# butter bandpass filter
def butter_bandpass(lowcut, highcut, fs, order):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    # get the coefficients for the filter
    # b = numerator coefficients, a = denominator coefficients
    b, a = butter(order, [low, high], btype="band")
    return b, a


# bandpass filter
# y = b*x + a*y
def butter_bandpass_filter(data, lowcut, highcut, fs, order):
    b, a = butter_bandpass(lowcut, highcut, fs, order)
    # forward-backward filter to avoid phase distortion
    y = filtfilt(b, a, data)
    return y


# wrapper function for bandpass filter
def apply_bandpass_butterworth_filter(data, low_cutoff_freq, high_cutoff_freq):
    order = 2
    y = butter_bandpass_filter(
        data, low_cutoff_freq, high_cutoff_freq, sampling_frequency, order
    )
    return y

In [59]:
# aggregate all 3 components of acceleration into one
# y = sqrt(x1^2 + x2^2 + x3^2)
def aggregate_components(data):
    return np.array(
        list(map(lambda c: np.sqrt(np.sum(np.square(c))), data)), dtype=np.float64
    )

In [60]:
def FreqTransform(x, low_freqs, fft_len):
    fft_x = np.fft.rfft(x, fft_len)
    # Calculate magnitude of the lower frequencies
    mag_freq_x = np.abs(fft_x)[low_freqs]
    return mag_freq_x, fft_x

In [61]:
def fft(acc_data, plot_save_path):
    acc_data = acc_data[~np.isnan(acc_data)]
    acc_data = sp.signal.detrend(acc_data)
    N = len(acc_data) * 4
    freqs = np.fft.rfftfreq(N, 1 / sampling_frequency)
    low_freqs = (freqs >= (45 / 60)) & (freqs <= (150 / 60))

    mag_freq_acc, fft_acc = FreqTransform(acc_data, low_freqs, N)
    peaks_acc = find_peaks(mag_freq_acc, height=30, distance=1)[0]
    # Sort peaks in order of peak magnitude
    sorted_freq_peaks_acc = sorted(
        peaks_acc, key=lambda j: mag_freq_acc[j], reverse=True
    )
    if len(sorted_freq_peaks_acc) == 0:
        print("No peak found")
        return
    use_peak = sorted_freq_peaks_acc[0]
    chosen_freq = freqs[low_freqs][use_peak]
    prediction = chosen_freq * 60
    plot_fft(fft_acc, freqs, plot_save_path)
    print("Heart rate:", prediction)

In [65]:
def calculate_heart_rate(normalized_data):
  # smoothen the data using average filter
  smooth_data = apply_average_filter(normalized_data, average_filter_window_duration_hr)
  smooth_data = np.array(list(filter(lambda row: np.isfinite(np.sum(row)), smooth_data)), dtype=np.float64)
  plot_all_axis(smooth_data[:,0],smooth_data[:,1],smooth_data[:,2], 'Smoothened Accelerometer Data - HR', 'plots/bio_watch/smoothened_accelerometer_data.png')

  # filter coarse and high frequency noise using bandpass filter
  low_cutoff_freq = 7
  high_cutoff_freq = 13
  smooth_data[:,0] = apply_bandpass_butterworth_filter(smooth_data[:,0], low_cutoff_freq, high_cutoff_freq)
  smooth_data[:,1] = apply_bandpass_butterworth_filter(smooth_data[:,1], low_cutoff_freq, high_cutoff_freq)
  smooth_data[:,2] = apply_bandpass_butterworth_filter(smooth_data[:,2], low_cutoff_freq, high_cutoff_freq)
  plot_all_axis(smooth_data[:,0],smooth_data[:,1],smooth_data[:,2], 'Bandpass-1 Accelerometer Data', 'plots/bio_watch/after_bandpass1.png')

  # aggregate all 3 components of acceleration into one
  aggregated_data = aggregate_components(smooth_data)

  # only consider desired frequency range for heart rate calculation
  high_cutoff_freq = 3.66
  low_cutoff_freq = 0.5
  bandpass2_data = apply_bandpass_butterworth_filter(aggregated_data, low_cutoff_freq, high_cutoff_freq)
  plot(bandpass2_data, 'Pulse wave from Accelerometer Data', 'plots/bio_watch/aggregated_pulse_wave.png')
  # calculate heart rate using fft
  fft(bandpass2_data, 'plots/bio_watch/hr_fft.png')

In [63]:
def bio_watch(data, sampling_freq):
  global sampling_frequency
  sampling_frequency = sampling_freq
  plot_all_axis(data[:,0], data[:,1], data[:,2], 'Raw Accelerometer Data', 'plots/bio_watch/raw_accelerometer_data.png')
  normalized_data = normalize(data)
  plot_all_axis(normalized_data[:,0],normalized_data[:,1],normalized_data[:,2], 'Normalized Accelerometer Data', 'plots/bio_watch/normalized_accelerometer_data.png')
  # calculate heart rate from the normalized data
  calculate_heart_rate(normalized_data)

In [66]:
# Example usage
input_file_path = f"accelerometer_data_from_phone/short_data.txt"
data = pd.read_csv(input_file_path, header=None)
data = data.iloc[:, 1:] # ignore the first column (timestamp)
data = data.values
print("Number of records:", len(data))
bio_watch(data, sampling_frequency)

Number of records: 13288
Heart rate: 79.73196807709684


  return math.isfinite(val)
  return np.asarray(x, float)


In [None]:
# fitbit heart rate data
dates1 = ["03-25", "03-26", "03-27", "03-28", "03-29", "03-30", "03-31", "04-01" , "04-02", "04-03"]
for i in dates1:
    file_path = f'fitbit_hr/heart_rate-2025-{i}.json'
    with open(file_path, 'r') as file:
        data = json.load(file)
        
    counter = 0
    sum = 0
    for i in data:
        sum += i['value']['bpm']
        counter += 1
    print("Average heart rate:", sum/counter)

Average heart rate: 78.35426429240862
Average heart rate: 83.5953370953371
Average heart rate: 79.58814085411538
Average heart rate: 96.88546129677235
Average heart rate: 92.44304152637486
Average heart rate: 95.6619901273058
Average heart rate: 93.11899179366941
Average heart rate: 91.95290944922726
Average heart rate: 115.9402230742419
Average heart rate: 112.15412808641975
