## **Importing Packages**

In [1]:
from scipy.io import wavfile
!pip install audiofile
import audiofile
from scipy.signal import butter, filtfilt, lfilter, find_peaks
from scipy.fft import fft, fftshift
import numpy as np
from IPython.display import Audio
import soundfile as sf

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting audiofile
  Downloading audiofile-1.2.1-py3-none-any.whl (14 kB)
Collecting audmath>=1.2.1
  Downloading audmath-1.2.1-py3-none-any.whl (10 kB)
Collecting audeer
  Downloading audeer-1.19.0-py3-none-any.whl (20 kB)
Installing collected packages: audmath, audeer, audiofile
Successfully installed audeer-1.19.0 audiofile-1.2.1 audmath-1.2.1


## **Functions**

## **Heart & Lung Sound Signal Separation**

In [2]:
def butter_bandpass(lowcut, highcut, fs, order=5):
    nyq = 0.5*fs
    low = lowcut/nyq
    high = highcut/nyq
    b,a = butter(order, [low, high], btype='band')
    return b, a

# ------------------------------------------------------------------------------
def butter_bandpass_filter(data, lowcut, highcut, fs, order=5):
  b, a = butter_bandpass(lowcut, highcut, fs, order = order)
  y = lfilter(b, a, data)
  return y

In [3]:
def Heart_Lung_Separation(pcg_signal, fs):
  pcg_filtered = butter_bandpass_filter(pcg_signal, 20, 2000, fs, order=5)
  lung_signal = butter_bandpass_filter(pcg_filtered, 100, 1000, fs, order=5)
  heart_signal = pcg_filtered - lung_signal

  # Quality Index of heart signal:
  heart_signal_snr = Heart_SNR(heart_signal, fs)
  heart_quality_ind = Heart_Quality_Index(heart_signal_snr)

  # Quality Index of lung signal:
  lung_signal_snr = Lung_SNR(lung_signal, fs)
  lung_quality_ind = Lung_Quality_Index(lung_signal_snr)
  return [pcg_filtered, heart_signal, lung_signal, heart_quality_ind, lung_quality_ind]

## **Quality Index**

In [4]:
def Lung_SNR(signal, fs):
  FFT = abs(fftshift(fft(signal)))
  f = np.arange(0,fs/2,fs/len(FFT))
  FFT = (FFT[int(len(FFT)/2):])
  ind = [index for index,value in enumerate(f) if value > 50 and value < 2500]
  s1_list = [FFT[i] for i in ind]
  s2_list = np.delete(FFT, ind)
  S1 = sum(s1_list)
  S2 = sum(s2_list)
  snr = S1/S2

  print(' lung SNR: ', snr)
  return snr

# ------------------------------------------------------------------------------
def Heart_SNR(signal, fs):
  FFT = abs(fftshift(fft(signal)))
  f = np.arange(0,fs/2,fs/len(FFT))
  FFT = (FFT[int(len(FFT)/2):])
  ind = [index for index,value in enumerate(f) if value > 20 and value < 1200]
  s1_list = [FFT[i] for i in ind]
  s2_list = np.delete(FFT, ind)
  S1 = sum(s1_list)
  S2 = sum(s2_list)
  snr = S1/S2

  print(' heart SNR: ', snr)
  return snr

# ------------------------------------------------------------------------------
def Lung_Quality_Index(snr):
  max_snr = 25
  if snr > max_snr:
    quality_ind = 100
  elif snr < max_snr:
    quality_ind = snr*100/max_snr
  return quality_ind

# ------------------------------------------------------------------------------
def Heart_Quality_Index(snr):
  max_snr = 8
  if snr > max_snr:
    quality_ind = 100
  elif snr < max_snr:
    quality_ind = snr*100/max_snr
  return quality_ind

### **Butterworth Lowpass and Highpass Filter**



In [5]:
# Lowpass Butterworth filter:
def butterworth_low_pass_filter(original_signal, order, cutoff, fs):
  b, a = butter(order, 2*cutoff/fs, btype = 'low')
  low_pass_filtered_signal = filtfilt(b, a, original_signal)
  return low_pass_filtered_signal

# ------------------------------------------------------------------------------
# Highpass Butterworth filter:
def butterworth_high_pass_filter(original_signal, order, cutoff, fs):
  b, a = butter(order, 2*cutoff/fs, btype = 'high')
  high_pass_filtered_signal = filtfilt(b, a, original_signal)
  return high_pass_filtered_signal

## **Heart Signal Preprocessing**

In [7]:
def heart_signal_preprocessing(heart_signal, fs):
  fc = 100  # Cutoff frequency (Hz)
  # Define the order of the filter (higher order provides better attenuation)
  order = 4
  # Define the normalized cutoff frequency (between 0 and 1, where 1 is the Nyquist frequency)
  w = fc / (fs / 2)
  b, a = butter(order, w, 'low')
  filtered_signal = filtfilt(b, a, heart_signal)
  return filtered_signal

## **Homomorphic Envelope with Hilbert Transform**

In [8]:
from scipy.signal import hilbert

def Homomorphic_Envelope_with_Hilbert(input_signal, fs, lpf_frequency):
  # 8Hz, 1st order, Butterworth LPF
  B_low, A_low = butter(1, 2*lpf_frequency/fs, btype = 'low')

  homomorphic_envelope = np.exp(filtfilt(B_low, A_low, np.log(abs(hilbert(input_signal)))))

  # Remove spurious spikes in first sample:
  homomorphic_envelope[0] = homomorphic_envelope[1]
  return homomorphic_envelope

## **Heart Rate**

In [9]:
def HeartRate(data, fs):
  preprocessed_data = heart_signal_preprocessing(data, fs)
  # Find the homomorphic envelope
  homomorphic_envelope = Homomorphic_Envelope_with_Hilbert(preprocessed_data, fs, 8)
  # Find the autocorrelation:
  y = homomorphic_envelope-np.mean(homomorphic_envelope)
  lags, c = xcorr(y, y, detrend = False, maxlags = len(y)-1)
  signal_autocorrelation = c[len(homomorphic_envelope):]

  min_index = int(0.5*fs)
  max_index = 2*fs

  index = np.argmax(signal_autocorrelation[min_index-1:max_index])
  true_index = index+min_index-1

  heartRate = 60/(true_index/fs)
  return round(heartRate), preprocessed_data

## -----------------------------------------------------------------------------
def xcorr(x, y, normed = True, detrend = False, maxlags = 10):
    # Cross correlation of two signals of equal length
    # Returns the coefficients when normed=True
    # Returns inner products when normed=False
    # Usage: lags, c = xcorr(x,y,maxlags=len(x)-1)
    # Optional detrending e.g. mlab.detrend_mean

    Nx = len(x)
    if Nx != len(y):
        raise ValueError('x and y must be equal length')
    
    if detrend:
        import matplotlib.mlab as mlab
        x = mlab.detrend_mean(np.asarray(x)) # can set your preferences here
        y = mlab.detrend_mean(np.asarray(y))
    
    c = np.correlate(x, y, mode='full')

    if normed:
        n = np.sqrt(np.dot(x, x) * np.dot(y, y)) # this is the transformation function
        c = np.true_divide(c,n)

    if maxlags is None:
        maxlags = Nx - 1

    if maxlags >= Nx or maxlags < 1:
        raise ValueError('maglags must be None or strictly '
                         'positive < %d' % Nx)

    lags = np.arange(-maxlags, maxlags + 1)
    c = c[Nx - 1 - maxlags:Nx + maxlags]
    return lags, c

## **Lung Signal Preprocessing**

In [10]:
def lung_signal_preprocessing(lung_signal, fs):
  # Define the filter parameters
  cutoff = 250 # Hz  100
  order = 4
  # Create a Butterworth high-pass filter
  b, a = butter(order, cutoff/(fs/2), btype='highpass')
  # Apply the filter to the signal
  filtered_signal = filtfilt(b, a, lung_signal)
  return filtered_signal

## **Respiration Rate**

In [11]:
def Respiration_Rate(signal, fs):
  preprocessed_data = lung_signal_preprocessing(signal, fs)
  one_breath = 3*fs  # average duration of one breath is 3s 
  peaks, _ = find_peaks(signal, distance = one_breath)
  time = len(signal)/fs
  RR = len(peaks)*60/time
  return round(RR), preprocessed_data

# **Final Functions for Software Implementation**

In [12]:
def Processes_in_Heart_Position(pcg_signal, fs):
    pcg_filtered, heart_signal, lung_signal, heart_quality_ind, lung_quality_ind = Heart_Lung_Separation(pcg_signal, fs)
    heart_rate, filtered_heart_signal = HeartRate(heart_signal, fs)
    respiration_rate, filtered_lung_signal = Respiration_Rate(lung_signal, fs)
    return pcg_filtered, heart_signal, filtered_heart_signal, lung_signal, filtered_lung_signal, heart_quality_ind, lung_quality_ind, heart_rate, respiration_rate

In [13]:
def Processes_in_Optional_Position(pcg_signal, fs):
    pcg_filtered, heart_signal, lung_signal, heart_quality_ind, lung_quality_ind = Heart_Lung_Separation(pcg_signal, fs)
    filtered_heart_signal = heart_signal_preprocessing(heart_signal, fs)
    filtered_lung_signal = lung_signal_preprocessing(lung_signal, fs)
    return pcg_filtered, heart_signal, filtered_heart_signal, lung_signal, filtered_lung_signal, heart_quality_ind, lung_quality_ind