In [None]:
import numpy as np 
import pandas as pd 
import os
import matplotlib.pyplot as plt
import csv
import pywt
from scipy import stats
from scipy.stats import pearsonr
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error

# Set random seed for reproducibility
np.random.seed(42)

# Display plots inline
plt.rcParams["figure.figsize"] = (15, 6)
plt.rcParams['lines.linewidth'] = 1
plt.rcParams['lines.color'] = 'b'
plt.rcParams['axes.grid'] = True 

# Function for denoising ECG signals using wavelet transform
def denoise(data): 
    w = pywt.Wavelet('sym4')
    maxlev = pywt.dwt_max_level(len(data), w.dec_len)
    threshold = 0.04  # Threshold for filtering

    coeffs = pywt.wavedec(data, 'sym4', level=maxlev)
    for i in range(1, len(coeffs)):
        coeffs[i] = pywt.threshold(coeffs[i], threshold * max(coeffs[i]))
        
    datarec = pywt.waverec(coeffs, 'sym4')
    
    return datarec

# Function to add different types of noise to ECG signals
def add_noise(data, noise_type="gaussian", noise_level=0.01):
    if noise_type == "gaussian":
        noise = np.random.normal(0, noise_level, data.shape)
    elif noise_type == "salt_pepper":
        noise = np.random.choice([0, 1], size=data.shape, p=[1 - noise_level, noise_level])
        noise = noise * (np.random.choice([-1, 1], size=data.shape))
    elif noise_type == "speckle":
        noise = data + data * np.random.normal(0, noise_level, data.shape)
    elif noise_type == "uniform":
        noise = np.random.uniform(-noise_level, noise_level, data.shape)
    elif noise_type == "exponential":
        noise = np.random.exponential(noise_level, data.shape)
    else:
        noise = np.zeros_like(data)
    return data + noise

# Path to MIT-BIH Arrhythmia Database
path = '/kaggle/input/mitbit-arrhythmia-database/mitbih_database/'
window_size = 1000  # Define the window size for segmenting ECG signals

# Lists to store features (X)
X = []

# Read files from the dataset directory
filenames = next(os.walk(path))[2]
filenames.sort()

# Segregate filenames and annotations
records = []
annotations = []
for f in filenames:
    filename, file_extension = os.path.splitext(f)
    if file_extension == '.csv':
        records.append(path + filename + file_extension)
    else:
        annotations.append(path + filename + file_extension)

# Load data from records
for r in range(0, len(records)):
    signals = []

    with open(records[r], 'rt') as csvfile:
        spamreader = csv.reader(csvfile, delimiter=',', quotechar='|')
        row_index = -1
        for row in spamreader:
            if row_index >= 0:
                signals.insert(row_index, int(row[1]))
            row_index += 1

    signals = stats.zscore(signals)
    
    # Add noise to the signals
    noisy_signals = add_noise(signals, noise_type="gaussian", noise_level=0.05)
    
    # Denoise the noisy signals
    denoised_signals = denoise(noisy_signals)
    
    # Store original, noisy, and denoised signals for evaluation
    X.append((signals, noisy_signals, denoised_signals))

# Visualize noise addition and removal
plt.figure(figsize=(15, 18))

plt.subplot(3, 1, 1)
plt.plot(X[0][0][:3000])
plt.title('Original Signal')

plt.subplot(3, 1, 2)
plt.plot(X[0][1][:3000])
plt.title('Noisy Signal')

plt.subplot(3, 1, 3)
plt.plot(X[0][2][:3000])
plt.title('Denoised Signal')

# Visualize noise addition and removal
plt.figure(figsize=(15, 18))

plt.subplot(3, 1, 1)
plt.plot(X[0][0][:3000])
plt.title('Original Signal')

plt.subplot(3, 1, 2)
plt.plot(X[0][1][:3000])
plt.title('Noisy Signal')

plt.subplot(3, 1, 3)
plt.plot(X[0][2][:3000])
plt.title('Denoised Signal')

plt.tight_layout()
plt.show()

# Add the new visualization code here
# Visualize comparison of original, noisy, and denoised signals in a single plot
plt.figure(figsize=(15, 6))  # Change the figure size as needed

plt.plot(X[0][0][:3000], label='Original Signal', color='blue')
plt.plot(X[0][1][:3000], label='Noisy Signal', color='red', alpha=0.7)
plt.plot(X[0][2][:3000], label='Denoised Signal', color='green', alpha=0.7)
plt.legend()
plt.title('Comparison of Original, Noisy, and Denoised Signals')
plt.xlabel('Time')
plt.ylabel('Amplitude')

plt.show()


plt.tight_layout()
plt.show()

# Evaluate the denoising performance
original_signals = np.array([x[0] for x in X])
noisy_signals = np.array([x[1] for x in X])
denoised_signals = np.array([x[2] for x in X])

mse_noisy = mean_squared_error(original_signals.flatten(), noisy_signals.flatten())
mse_denoised = mean_squared_error(original_signals.flatten(), denoised_signals.flatten())

r2_noisy = r2_score(original_signals.flatten(), noisy_signals.flatten())
r2_denoised = r2_score(original_signals.flatten(), denoised_signals.flatten())

mae_noisy = mean_absolute_error(original_signals.flatten(), noisy_signals.flatten())
mae_denoised = mean_absolute_error(original_signals.flatten(), denoised_signals.flatten())

corr_noisy, _ = pearsonr(original_signals.flatten(), noisy_signals.flatten())
corr_denoised, _ = pearsonr(original_signals.flatten(), denoised_signals.flatten())

print(f'MSE (Noisy Signals): {mse_noisy}')
print(f'MSE (Denoised Signals): {mse_denoised}')
print(f'R^2 (Noisy Signals): {r2_noisy}')
print(f'R^2 (Denoised Signals): {r2_denoised}')
print(f'MAE (Noisy Signals): {mae_noisy}')
print(f'MAE (Denoised Signals): {mae_denoised}')
print(f'Correlation (Noisy Signals): {corr_noisy}')
print(f'Correlation (Denoised Signals): {corr_denoised}')

# Create DataFrame for evaluation metrics
metrics_df = pd.DataFrame({
    'Metric': ['MSE', 'R2', 'MAE', 'Correlation'],
    'Noisy': [mse_noisy, r2_noisy, mae_noisy, corr_noisy],
    'Denoised': [mse_denoised, r2_denoised, mae_denoised, corr_denoised]
})

# Plot comparison of evaluation metrics
fig, axes = plt.subplots(nrows=2, ncols=2, figsize=(15, 10))
for i, (idx, row) in enumerate(metrics_df.iterrows()):
    ax = axes[i // 2, i % 2]
    ax.bar(['Noisy', 'Denoised'], row[['Noisy', 'Denoised']], color=['red', 'green'])
    ax.set_title(row['Metric'])
    ax.set_ylabel(row['Metric'])

plt.tight_layout()
plt.show()

# Plot dataset distribution
signal_lengths = [len(x[0]) for x in X]
plt.figure(figsize=(10, 6))
plt.hist(signal_lengths, bins=50, color='blue', edgecolor='black')
plt.title('Distribution of Signal Lengths in the Dataset')
plt.xlabel('Signal Length')
plt.ylabel('Frequency')
plt.show()



In [None]:
import numpy as np 
import pandas as pd 
import os
import matplotlib.pyplot as plt
import csv
import pywt
from scipy import stats
from scipy.stats import pearsonr
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error

# Set random seed for reproducibility
np.random.seed(42)

# Display plots inline
plt.rcParams["figure.figsize"] = (15, 6)
plt.rcParams['lines.linewidth'] = 1
plt.rcParams['lines.color'] = 'b'
plt.rcParams['axes.grid'] = True 

# Function for denoising ECG signals using wavelet transform
def denoise(data): 
    w = pywt.Wavelet('sym4')
    maxlev = pywt.dwt_max_level(len(data), w.dec_len)
    threshold = 0.04  # Threshold for filtering

    coeffs = pywt.wavedec(data, 'sym4', level=maxlev)
    for i in range(1, len(coeffs)):
        coeffs[i] = pywt.threshold(coeffs[i], threshold * max(coeffs[i]))
        
    datarec = pywt.waverec(coeffs, 'sym4')
    
    return datarec

# Function to add different types of noise to ECG signals
def add_noise(data, noise_type="gaussian", noise_level=0.01):
    if noise_type == "gaussian":
        noise = np.random.normal(0, noise_level, data.shape)
    elif noise_type == "salt_pepper":
        noise = np.random.choice([0, 1], size=data.shape, p=[1 - noise_level, noise_level])
        noise = noise * (np.random.choice([-1, 1], size=data.shape))
    elif noise_type == "speckle":
        noise = data + data * np.random.normal(0, noise_level, data.shape)
    elif noise_type == "uniform":
        noise = np.random.uniform(-noise_level, noise_level, data.shape)
    elif noise_type == "exponential":
        noise = np.random.exponential(noise_level, data.shape)
    else:
        noise = np.zeros_like(data)
    return data + noise

# Path to MIT-BIH Arrhythmia Database
path = '/kaggle/input/mitbit-arrhythmia-database/mitbih_database/'
window_size = 1000  # Define the window size for segmenting ECG signals

# Lists to store features (X)
X = []

# Read files from the dataset directory
filenames = next(os.walk(path))[2]
filenames.sort()

# Segregate filenames and annotations
records = []
annotations = []
for f in filenames:
    filename, file_extension = os.path.splitext(f)
    if file_extension == '.csv':
        records.append(path + filename + file_extension)
    else:
        annotations.append(path + filename + file_extension)

# Load data from records
for r in range(0, len(records)):
    signals = []

    with open(records[r], 'rt') as csvfile:
        spamreader = csv.reader(csvfile, delimiter=',', quotechar='|')
        row_index = -1
        for row in spamreader:
            if row_index >= 0:
                signals.insert(row_index, int(row[1]))
            row_index += 1

    signals = stats.zscore(signals)
    
    # Add noise to the signals
    noisy_signals = add_noise(signals, noise_type="gaussian", noise_level=0.05)
    
    # Denoise the noisy signals
    denoised_signals = denoise(noisy_signals)
    
    # Store original, noisy, and denoised signals for evaluation
    X.append((signals, noisy_signals, denoised_signals))

# Visualize noise addition and removal
plt.figure(figsize=(15, 18))

plt.subplot(3, 1, 1)
plt.plot(X[0][0][:3000])
plt.title('Original Signal')

plt.subplot(3, 1, 2)
plt.plot(X[0][1][:3000])
plt.title('Noisy Signal')

plt.subplot(3, 1, 3)
plt.plot(X[0][2][:3000])
plt.title('Denoised Signal')

plt.tight_layout()
plt.show()

# Visualize comparison of original, noisy, and denoised signals in a single plot
plt.figure(figsize=(15, 6))  # Change the figure size as needed

plt.plot(X[0][0][:3000], label='Original Signal', color='blue')
plt.plot(X[0][1][:3000], label='Noisy Signal', color='red', alpha=0.7)
plt.plot(X[0][2][:3000], label='Denoised Signal', color='green', alpha=0.7)
plt.legend()
plt.title('Comparison of Original, Noisy, and Denoised Signals')
plt.xlabel('Time')
plt.ylabel('Amplitude')

plt.show()

# Evaluate the denoising performance
original_signals = np.array([x[0] for x in X])
noisy_signals = np.array([x[1] for x in X])
denoised_signals = np.array([x[2] for x in X])

mse_noisy = mean_squared_error(original_signals.flatten(), noisy_signals.flatten())
mse_denoised = mean_squared_error(original_signals.flatten(), denoised_signals.flatten())

r2_noisy = r2_score(original_signals.flatten(), noisy_signals.flatten())
r2_denoised = r2_score(original_signals.flatten(), denoised_signals.flatten())

mae_noisy = mean_absolute_error(original_signals.flatten(), noisy_signals.flatten())
mae_denoised = mean_absolute_error(original_signals.flatten(), denoised_signals.flatten())

corr_noisy, _ = pearsonr(original_signals.flatten(), noisy_signals.flatten())
corr_denoised, _ = pearsonr(original_signals.flatten(), denoised_signals.flatten())

print(f'MSE (Noisy Signals): {mse_noisy}')
print(f'MSE (Denoised Signals): {mse_denoised}')
print(f'R^2 (Noisy Signals): {r2_noisy}')
print(f'R^2 (Denoised Signals): {r2_denoised}')
print(f'MAE (Noisy Signals): {mae_noisy}')
print(f'MAE (Denoised Signals): {mae_denoised}')
print(f'Correlation (Noisy Signals): {corr_noisy}')
print(f'Correlation (Denoised Signals): {corr_denoised}')

# Create DataFrame for evaluation metrics
metrics_df = pd.DataFrame({
    'Metric': ['MSE', 'R2', 'MAE', 'Correlation'],
    'Noisy': [mse_noisy, r2_noisy, mae_noisy, corr_noisy],
    'Denoised': [mse_denoised, r2_denoised, mae_denoised, corr_denoised]
})

# Plot comparison of evaluation metrics
fig, axes = plt.subplots(nrows=2, ncols=2, figsize=(15, 10))
for i, (idx, row) in enumerate(metrics_df.iterrows()):
    ax = axes[i // 2, i % 2]
    ax.bar(['Noisy', 'Denoised'], row[['Noisy', 'Denoised']], color=['red', 'green'])
    ax.set_title(row['Metric'])
    ax.set_ylabel(row['Metric'])

plt.tight_layout()
plt.show()

# Plot dataset distribution
signal_lengths = [len(x[0]) for x in X]
plt.figure(figsize=(10, 6))
plt.hist(signal_lengths, bins=50, color='blue', edgecolor='black')
plt.title('Distribution of Signal Lengths in the Dataset')
plt.xlabel('Signal Length')
plt.ylabel('Frequency')
plt.show()


In [None]:
import numpy as np 
import pandas as pd 
import os
import matplotlib.pyplot as plt
import csv
import pywt
from scipy import stats
from scipy.stats import pearsonr
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error

# Set random seed for reproducibility
np.random.seed(42)

# Display plots inline
plt.rcParams["figure.figsize"] = (15, 6)
plt.rcParams['lines.linewidth'] = 1
plt.rcParams['lines.color'] = 'b'
plt.rcParams['axes.grid'] = True 

# Function for denoising ECG signals using wavelet transform
def denoise(data): 
    w = pywt.Wavelet('sym4')
    maxlev = pywt.dwt_max_level(len(data), w.dec_len)
    threshold = 0.04  # Threshold for filtering

    coeffs = pywt.wavedec(data, 'sym4', level=maxlev)
    for i in range(1, len(coeffs)):
        coeffs[i] = pywt.threshold(coeffs[i], threshold * max(coeffs[i]))
        
    datarec = pywt.waverec(coeffs, 'sym4')
    
    return datarec

# Function to add different types of noise to ECG signals
def add_noise(data, noise_type="gaussian", noise_level=0.01):
    if noise_type == "gaussian":
        noise = np.random.normal(0, noise_level, data.shape)
    elif noise_type == "salt_pepper":
        noise = np.random.choice([0, 1], size=data.shape, p=[1 - noise_level, noise_level])
        noise = noise * (np.random.choice([-1, 1], size=data.shape))
    elif noise_type == "speckle":
        noise = data + data * np.random.normal(0, noise_level, data.shape)
    elif noise_type == "uniform":
        noise = np.random.uniform(-noise_level, noise_level, data.shape)
    elif noise_type == "exponential":
        noise = np.random.exponential(noise_level, data.shape)
    else:
        noise = np.zeros_like(data)
    return data + noise

# Path to MIT-BIH Arrhythmia Database
path = '/kaggle/input/mitbit-arrhythmia-database/mitbih_database/'
window_size = 1000  # Define the window size for segmenting ECG signals

# Lists to store features (X)
X = []

# Read files from the dataset directory
filenames = next(os.walk(path))[2]
filenames.sort()

# Segregate filenames and annotations
records = []
annotations = []
for f in filenames:
    filename, file_extension = os.path.splitext(f)
    if file_extension == '.csv':
        records.append(path + filename + file_extension)
    else:
        annotations.append(path + filename + file_extension)

# Load data from records
for r in range(0, len(records)):
    signals = []

    with open(records[r], 'rt') as csvfile:
        spamreader = csv.reader(csvfile, delimiter=',', quotechar='|')
        row_index = -1
        for row in spamreader:
            if row_index >= 0:
                signals.insert(row_index, int(row[1]))
            row_index += 1

    signals = stats.zscore(signals)
    
    # Add noise to the signals
    noisy_signals = add_noise(signals, noise_type="gaussian", noise_level=0.05)
    
    # Denoise the noisy signals
    denoised_signals = denoise(noisy_signals)
    
    # Store original, noisy, and denoised signals for evaluation
    X.append((signals, noisy_signals, denoised_signals))

# Visualize noise addition and removal
plt.figure(figsize=(15, 18))

plt.subplot(3, 1, 1)
plt.plot(X[0][0][:3000])
plt.title('Original Signal')

plt.subplot(3, 1, 2)
plt.plot(X[0][1][:3000])
plt.title('Noisy Signal')

plt.subplot(3, 1, 3)
plt.plot(X[0][2][:3000])
plt.title('Denoised Signal')

plt.tight_layout()
plt.show()

# Evaluate the denoising performance
original_signals = np.array([x[0] for x in X])
noisy_signals = np.array([x[1] for x in X])
denoised_signals = np.array([x[2] for x in X])

mse_noisy = mean_squared_error(original_signals.flatten(), noisy_signals.flatten())
mse_denoised = mean_squared_error(original_signals.flatten(), denoised_signals.flatten())

r2_noisy = r2_score(original_signals.flatten(), noisy_signals.flatten())
r2_denoised = r2_score(original_signals.flatten(), denoised_signals.flatten())

mae_noisy = mean_absolute_error(original_signals.flatten(), noisy_signals.flatten())
mae_denoised = mean_absolute_error(original_signals.flatten(), denoised_signals.flatten())

corr_noisy, _ = pearsonr(original_signals.flatten(), noisy_signals.flatten())
corr_denoised, _ = pearsonr(original_signals.flatten(), denoised_signals.flatten())

print(f'MSE (Noisy Signals): {mse_noisy}')
print(f'MSE (Denoised Signals): {mse_denoised}')
print(f'R^2 (Noisy Signals): {r2_noisy}')
print(f'R^2 (Denoised Signals): {r2_denoised}')
print(f'MAE (Noisy Signals): {mae_noisy}')
print(f'MAE (Denoised Signals): {mae_denoised}')
print(f'Correlation (Noisy Signals): {corr_noisy}')
print(f'Correlation (Denoised Signals): {corr_denoised}')

# Create DataFrame for evaluation metrics
metrics_df = pd.DataFrame({
    'Metric': ['MSE', 'R2', 'MAE', 'Correlation'],
    'Noisy': [mse_noisy, r2_noisy, mae_noisy, corr_noisy],
    'Denoised': [mse_denoised, r2_denoised, mae_denoised, corr_denoised]
})

# Plot comparison of evaluation metrics
fig, axes = plt.subplots(nrows=2, ncols=2, figsize=(15, 10))
for i, (idx, row) in enumerate(metrics_df.iterrows()):
    ax = axes[i // 2, i % 2]
    ax.bar(['Noisy', 'Denoised'], row[['Noisy', 'Denoised']], color=['red', 'green'])
    ax.set_title(row['Metric'])
    ax.set_ylabel(row['Metric'])

plt.tight_layout()
plt.show()

# Plot dataset distribution
signal_lengths = [len(x[0]) for x in X]
plt.figure(figsize=(10, 6))
plt.hist(signal_lengths, bins=50, color='blue', edgecolor='black')
plt.title('Distribution of Signal Lengths in the Dataset')
plt.xlabel('Signal Length')
plt.ylabel('Frequency')
plt.show()

# Create a pie chart for the dataset distribution
data_types = ['Original', 'Noisy', 'Denoised']
data_counts = [len(X), len(X), len(X)]  # Assuming we have equal counts for each type for simplicity

plt.figure(figsize=(8, 8))
plt.pie(data_counts, labels=data_types, autopct='%1.1f%%', startangle=140)
plt.title('Distribution of Dataset by Data Type')
plt.axis('equal')  # Equal aspect ratio ensures that pie is drawn as a circle.
plt.show()

In [None]:
import numpy as np 
import pandas as pd 
import os
import matplotlib.pyplot as plt
import csv
import pywt
from scipy import stats
from scipy.stats import pearsonr
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error

# Set random seed for reproducibility
np.random.seed(42)

# Display plots inline
plt.rcParams["figure.figsize"] = (15, 6)
plt.rcParams['lines.linewidth'] = 1
plt.rcParams['lines.color'] = 'b'
plt.rcParams['axes.grid'] = True 

# Function for denoising ECG signals using wavelet transform
def denoise(data): 
    w = pywt.Wavelet('sym4')
    maxlev = pywt.dwt_max_level(len(data), w.dec_len)
    threshold = 0.04  # Threshold for filtering

    coeffs = pywt.wavedec(data, 'sym4', level=maxlev)
    for i in range(1, len(coeffs)):
        coeffs[i] = pywt.threshold(coeffs[i], threshold * max(coeffs[i]))
        
    datarec = pywt.waverec(coeffs, 'sym4')
    
    return datarec

# Function to add different types of noise to ECG signals
def add_noise(data, noise_type="gaussian", noise_level=0.01):
    if noise_type == "gaussian":
        noise = np.random.normal(0, noise_level, data.shape)
    elif noise_type == "salt_pepper":
        noise = np.random.choice([0, 1], size=data.shape, p=[1 - noise_level, noise_level])
        noise = noise * (np.random.choice([-1, 1], size=data.shape))
    elif noise_type == "speckle":
        noise = data + data * np.random.normal(0, noise_level, data.shape)
    elif noise_type == "uniform":
        noise = np.random.uniform(-noise_level, noise_level, data.shape)
    elif noise_type == "exponential":
        noise = np.random.exponential(noise_level, data.shape)
    else:
        noise = np.zeros_like(data)
    return data + noise

# Path to MIT-BIH Arrhythmia Database
path = '/kaggle/input/mitbit-arrhythmia-database/mitbih_database/'
window_size = 1000  # Define the window size for segmenting ECG signals

# Lists to store features (X)
X = []

# Read files from the dataset directory
filenames = next(os.walk(path))[2]
filenames.sort()

# Segregate filenames and annotations
records = []
annotations = []
for f in filenames:
    filename, file_extension = os.path.splitext(f)
    if file_extension == '.csv':
        records.append(path + filename + file_extension)
    else:
        annotations.append(path + filename + file_extension)

# Load data from records
for r in range(0, len(records)):
    signals = []

    with open(records[r], 'rt') as csvfile:
        spamreader = csv.reader(csvfile, delimiter=',', quotechar='|')
        row_index = -1
        for row in spamreader:
            if row_index >= 0:
                signals.insert(row_index, int(row[1]))
            row_index += 1

    signals = stats.zscore(signals)
    
    # Add noise to the signals
    noisy_signals = add_noise(signals, noise_type="gaussian", noise_level=0.05)
    
    # Denoise the noisy signals
    denoised_signals = denoise(noisy_signals)
    
    # Store original, noisy, and denoised signals for evaluation
    X.append((signals, noisy_signals, denoised_signals))

# Visualize noise addition and removal
plt.figure(figsize=(15, 18))

plt.subplot(3, 1, 1)
plt.plot(X[0][0][:3000])
plt.title('Original Signal')

plt.subplot(3, 1, 2)
plt.plot(X[0][1][:3000])
plt.title('Noisy Signal')

plt.subplot(3, 1, 3)
plt.plot(X[0][2][:3000])
plt.title('Denoised Signal')

plt.tight_layout()
plt.show()

# Evaluate the denoising performance
original_signals = np.array([x[0] for x in X])
noisy_signals = np.array([x[1] for x in X])
denoised_signals = np.array([x[2] for x in X])

mse_noisy = mean_squared_error(original_signals.flatten(), noisy_signals.flatten())
mse_denoised = mean_squared_error(original_signals.flatten(), denoised_signals.flatten())

r2_noisy = r2_score(original_signals.flatten(), noisy_signals.flatten())
r2_denoised = r2_score(original_signals.flatten(), denoised_signals.flatten())

mae_noisy = mean_absolute_error(original_signals.flatten(), noisy_signals.flatten())
mae_denoised = mean_absolute_error(original_signals.flatten(), denoised_signals.flatten())

corr_noisy, _ = pearsonr(original_signals.flatten(), noisy_signals.flatten())
corr_denoised, _ = pearsonr(original_signals.flatten(), denoised_signals.flatten())

print(f'MSE (Noisy Signals): {mse_noisy}')
print(f'MSE (Denoised Signals): {mse_denoised}')
print(f'R^2 (Noisy Signals): {r2_noisy}')
print(f'R^2 (Denoised Signals): {r2_denoised}')
print(f'MAE (Noisy Signals): {mae_noisy}')
print(f'MAE (Denoised Signals): {mae_denoised}')
print(f'Correlation (Noisy Signals): {corr_noisy}')
print(f'Correlation (Denoised Signals): {corr_denoised}')

# Create DataFrame for evaluation metrics
metrics_df = pd.DataFrame({
    'Metric': ['MSE', 'R2', 'MAE', 'Correlation'],
    'Noisy': [mse_noisy, r2_noisy, mae_noisy, corr_noisy],
    'Denoised': [mse_denoised, r2_denoised, mae_denoised, corr_denoised]
})

# Plot comparison of evaluation metrics
fig, axes = plt.subplots(nrows=2, ncols=2, figsize=(15, 10))
for i, (idx, row) in enumerate(metrics_df.iterrows()):
    ax = axes[i // 2, i % 2]
    ax.bar(['Noisy', 'Denoised'], row[['Noisy', 'Denoised']], color=['red', 'green'])
    ax.set_title(row['Metric'])
    ax.set_ylabel(row['Metric'])

plt.tight_layout()
plt.show()

# Plot dataset distribution
signal_lengths = [len(x[0]) for x in X]
plt.figure(figsize=(10, 6))
plt.hist(signal_lengths, bins=50, color='blue', edgecolor='black')
plt.title('Distribution of Signal Lengths in the Dataset')
plt.xlabel('Signal Length')
plt.ylabel('Frequency')
plt.show()

# Pie chart for dataset distribution
labels = ['Records', 'Annotations']
sizes = [len(records), len(annotations)]
plt.figure(figsize=(8, 6))
plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=140, colors=['skyblue', 'lightgreen'])
plt.title('Distribution of Data Types in the Dataset')
plt.axis('equal')  # Equal aspect ratio ensures that pie is drawn as a circle.
plt.show()


In [None]:
import numpy as np 
import pandas as pd 
import os
import matplotlib.pyplot as plt
import csv
import pywt
from scipy import stats
from scipy.stats import pearsonr
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error

# Set random seed for reproducibility
np.random.seed(42)

# Display plots inline
plt.rcParams["figure.figsize"] = (15, 6)
plt.rcParams['lines.linewidth'] = 1
plt.rcParams['lines.color'] = 'b'
plt.rcParams['axes.grid'] = True 

# Function for denoising ECG signals using wavelet transform
def denoise(data): 
    w = pywt.Wavelet('sym4')
    maxlev = pywt.dwt_max_level(len(data), w.dec_len)
    threshold = 0.04  # Threshold for filtering

    coeffs = pywt.wavedec(data, 'sym4', level=maxlev)
    for i in range(1, len(coeffs)):
        coeffs[i] = pywt.threshold(coeffs[i], threshold * max(coeffs[i]))
        
    datarec = pywt.waverec(coeffs, 'sym4')
    
    return datarec

# Function to add different types of noise to ECG signals
def add_noise(data, noise_type="gaussian", noise_level=0.01):
    if noise_type == "gaussian":
        noise = np.random.normal(0, noise_level, data.shape)
    elif noise_type == "salt_pepper":
        noise = np.random.choice([0, 1], size=data.shape, p=[1 - noise_level, noise_level])
        noise = noise * (np.random.choice([-1, 1], size=data.shape))
    elif noise_type == "speckle":
        noise = data + data * np.random.normal(0, noise_level, data.shape)
    elif noise_type == "uniform":
        noise = np.random.uniform(-noise_level, noise_level, data.shape)
    elif noise_type == "exponential":
        noise = np.random.exponential(noise_level, data.shape)
    else:
        noise = np.zeros_like(data)
    return data + noise

# Path to MIT-BIH Arrhythmia Database
path = '/kaggle/input/mitbit-arrhythmia-database/mitbih_database/'
window_size = 1000  # Define the window size for segmenting ECG signals

# Lists to store features (X)
X = []

# Read files from the dataset directory
filenames = next(os.walk(path))[2]
filenames.sort()

# Segregate filenames and annotations
records = []
annotations = []
for f in filenames:
    filename, file_extension = os.path.splitext(f)
    if file_extension == '.csv':
        records.append(path + filename + file_extension)
    else:
        annotations.append(path + filename + file_extension)

# Load data from records
for r in range(0, len(records)):
    signals = []

    with open(records[r], 'rt') as csvfile:
        spamreader = csv.reader(csvfile, delimiter=',', quotechar='|')
        row_index = -1
        for row in spamreader:
            if row_index >= 0:
                signals.insert(row_index, int(row[1]))
            row_index += 1

    signals = stats.zscore(signals)
    
    # Add noise to the signals
    noisy_signals = add_noise(signals, noise_type="gaussian", noise_level=0.05)
    
    # Denoise the noisy signals
    denoised_signals = denoise(noisy_signals)
    
    # Store original, noisy, and denoised signals for evaluation
    X.append((signals, noisy_signals, denoised_signals))

# Visualize noise addition and removal
plt.figure(figsize=(15, 18))

# Testing and visualizing more signals
num_signals_to_plot = min(5, len(X))  # Plot up to 5 signals or the number of available signals

for i in range(num_signals_to_plot):
    plt.subplot(num_signals_to_plot, 3, 3 * i + 1)
    plt.plot(X[i][0][:3000])
    if i == 0:
        plt.title('Original Signal')

    plt.subplot(num_signals_to_plot, 3, 3 * i + 2)
    plt.plot(X[i][1][:3000])
    if i == 0:
        plt.title('Noisy Signal')

    plt.subplot(num_signals_to_plot, 3, 3 * i + 3)
    plt.plot(X[i][2][:3000])
    if i == 0:
        plt.title('Denoised Signal')

plt.tight_layout()
plt.show()

# Evaluate the denoising performance
original_signals = np.array([x[0] for x in X])
noisy_signals = np.array([x[1] for x in X])
denoised_signals = np.array([x[2] for x in X])

mse_noisy = mean_squared_error(original_signals.flatten(), noisy_signals.flatten())
mse_denoised = mean_squared_error(original_signals.flatten(), denoised_signals.flatten())

r2_noisy = r2_score(original_signals.flatten(), noisy_signals.flatten())
r2_denoised = r2_score(original_signals.flatten(), denoised_signals.flatten())

mae_noisy = mean_absolute_error(original_signals.flatten(), noisy_signals.flatten())
mae_denoised = mean_absolute_error(original_signals.flatten(), denoised_signals.flatten())

corr_noisy, _ = pearsonr(original_signals.flatten(), noisy_signals.flatten())
corr_denoised, _ = pearsonr(original_signals.flatten(), denoised_signals.flatten())

print(f'MSE (Noisy Signals): {mse_noisy}')
print(f'MSE (Denoised Signals): {mse_denoised}')
print(f'R^2 (Noisy Signals): {r2_noisy}')
print(f'R^2 (Denoised Signals): {r2_denoised}')
print(f'MAE (Noisy Signals): {mae_noisy}')
print(f'MAE (Denoised Signals): {mae_denoised}')
print(f'Correlation (Noisy Signals): {corr_noisy}')
print(f'Correlation (Denoised Signals): {corr_denoised}')

# Create DataFrame for evaluation metrics
metrics_df = pd.DataFrame({
    'Metric': ['MSE', 'R2', 'MAE', 'Correlation'],
    'Noisy': [mse_noisy, r2_noisy, mae_noisy, corr_noisy],
    'Denoised': [mse_denoised, r2_denoised, mae_denoised, corr_denoised]
})

# Plot comparison of evaluation metrics
fig, axes = plt.subplots(nrows=2, ncols=2, figsize=(15, 10))
for i, (idx, row) in enumerate(metrics_df.iterrows()):
    ax = axes[i // 2, i % 2]
    ax.bar(['Noisy', 'Denoised'], row[['Noisy', 'Denoised']], color=['red', 'green'])
    ax.set_title(row['Metric'])
    ax.set_ylabel(row['Metric'])

plt.tight_layout()
plt.show()

# Plot dataset distribution
signal_lengths = [len(x[0]) for x in X]
plt.figure(figsize=(10, 6))
plt.hist(signal_lengths, bins=50, color='blue', edgecolor='black')
plt.title('Distribution of Signal Lengths in the Dataset')
plt.xlabel('Signal Length')
plt.ylabel('Frequency')
plt.show()

# Pie chart for dataset distribution
labels = ['Records', 'Annotations']
sizes = [len(records), len(annotations)]
plt.figure(figsize=(8, 6))
plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=140, colors=['skyblue', 'lightgreen'])
plt.title('Distribution of Data Types in the Dataset')
plt.axis('equal')  # Equal aspect ratio ensures that pie is drawn as a circle.
plt.show()
