In [1]:
import pandas as pd 
import numpy as np 
import plotly.graph_objects as go
import streamlit as st  
from numba import jit
from plotly.subplots import make_subplots
import math 
import numpy as np
import pywt

In [2]:
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Load the data
df = pd.read_csv("s2.csv")

# Define the normalization function
def normalize_signal(signal, min_value=-100, max_value=100):
    signal_min = signal.min()
    signal_max = signal.max()
    # Scale the signal to the range [-100, 100]
    normalized_signal = (signal - signal_min) / (signal_max - signal_min) * (max_value - min_value) + min_value
    return normalized_signal

# Calculate time for each sample based on the sampling rate
sampling_rate = 2000  # Hz
time = df.index / sampling_rate  # Time in seconds

# Filter the rows from 7042 to 9238
filtered_time = time
filtered_gl = df["semg LT LAT.G"]
filtered_vl = df["semg LT LAT.V"]
filtered_baso_lt_foot = df["baso LT FOOT"]

# Normalize each signal
normalized_gl = normalize_signal(filtered_gl)
normalized_vl = normalize_signal(filtered_vl)
normalized_baso_lt_foot = normalize_signal(filtered_baso_lt_foot)

# Create subplots for Gastrocnemius Lateralis (GL), Vastus Lateralis (VL), and baso LT FOOT
fig = make_subplots(rows=3, cols=1, subplot_titles=("baso LT FOOT","Gastrocnemius Lateralis (GL)", "Vastus Lateralis (VL)"))

# baso LT FOOT signals
fig.add_trace(go.Scatter(x=filtered_time[7042:9238], y=normalized_baso_lt_foot[7042:9238], mode="lines", name="baso LT FOOT", line=dict(color="red")), row=1, col=1)

# Gastrocnemius Lateralis (GL) signals
fig.add_trace(go.Scatter(x=filtered_time[7042:9238], y=normalized_gl[7042:9238], mode="lines", name="GL (Left)", line=dict(color="blue")), row=2, col=1)

# Vastus Lateralis (VL) signals
fig.add_trace(go.Scatter(x=filtered_time[7042:9238], y=normalized_vl[7042:9238], mode="lines", name="VL (Left)", line=dict(color="green")), row=3, col=1)

# Update layout for titles and axis labels
fig.update_layout(height=900, width=800, title_text="Normalized Muscle Signals for Gastrocnemius Lateralis (GL), Vastus Lateralis (VL), and baso LT FOOT")
fig.update_xaxes(title_text="Time (seconds)", row=1, col=1)
fig.update_yaxes(title_text="Signal (uV)", range=[-100, 100], row=1, col=1)
fig.update_xaxes(title_text="Time (seconds)", row=2, col=1)
fig.update_yaxes(title_text="Signal (uV)", range=[-100, 100], row=2, col=1)
fig.update_xaxes(title_text="Time (seconds)", row=3, col=1)
fig.update_yaxes(title_text="Signal (uV)", range=[-100, 100], row=3, col=1)

# Show plot
fig.show()


In [3]:
import pandas as pd
import numpy as np
import plotly.graph_objs as go
from plotly.subplots import make_subplots

# Define the Butterworth bandpass filter using highpass and lowpass filters
def butterworth_lowpass_filter(signal, cutoff_frequency, sampling_period, order):
    y = np.zeros(len(signal)) 
    omega_c = 2 * np.pi * cutoff_frequency
    omega_c_squared = omega_c * omega_c
    sampling_period_squared = sampling_period * sampling_period
    if order == 1:
        for n in range(len(signal)):
            if n == 0:
                y[n] = (omega_c * signal[n]) / ((2 / sampling_period) + omega_c)
            else:
                y[n] = (((2 / sampling_period) - omega_c) * y[n-1] + omega_c * signal[n] + omega_c * signal[n-1]) / ((2 / sampling_period) + omega_c)
    elif order == 2:
        y[0] = (omega_c * signal[0]) / ((2 / sampling_period) + omega_c)
        y[1] = (((2 / sampling_period) - omega_c) * y[0] + omega_c * signal[1] + omega_c * signal[0]) / ((2 / sampling_period) + omega_c)
        for n in range(2, len(signal)):
            y[n] = (((8 / sampling_period_squared) - 2 * omega_c_squared) * y[n-1]
                    - ((4 / sampling_period_squared) - (2 * np.sqrt(2) * omega_c / sampling_period) + omega_c_squared) * y[n-2]
                    + omega_c_squared * signal[n]
                    + 2 * omega_c_squared * signal[n-1]
                    + omega_c_squared * signal[n-2]) / ((4 / sampling_period_squared) + (2 * np.sqrt(2) * omega_c / sampling_period) + omega_c_squared)
    return y

def butterworth_highpass_filter(signal, cutoff_frequency, sampling_period, order):
    y = np.zeros(len(signal))  # Initialize the output signal
    omega_c = 2 * np.pi * cutoff_frequency
    omega_c_squared = omega_c * omega_c
    sampling_period_squared = sampling_period * sampling_period

    if order == 1:
        for n in range(len(signal)):
            if n == 0:
                y[n] = (omega_c * signal[n]) / ((2 / sampling_period) + omega_c)
            else:
                y[n] = (((2 / sampling_period) - omega_c) * y[n-1] + (2 / sampling_period) * signal[n] + (2 / sampling_period) * signal[n-1]) / ((2 / sampling_period) + omega_c)

    elif order == 2:
        y[0] = (omega_c * signal[0]) / ((2 / sampling_period) + omega_c)
        y[1] = (((2 / sampling_period) - omega_c) * y[0] + (2 / sampling_period) * signal[1] + (2 / sampling_period) * signal[0]) / ((2 / sampling_period) + omega_c)

        for n in range(2, len(signal)):
            y[n] = ((4 / sampling_period_squared) * signal[n] - (8 / sampling_period_squared) * signal[n-1] + (4 / sampling_period_squared) * signal[n-2]
                    - (2 * omega_c - (8 / sampling_period_squared)) * y[n-1]
                    - (omega_c - (2 * np.sqrt(2) * omega_c / sampling_period) + (4 / sampling_period_squared)) * y[n-2]) / (omega_c + 2 * np.sqrt(2) * omega_c / sampling_period + (4 / sampling_period_squared))    
    return y

def butterworth_bandpass_filter(signal, low_cutoff, high_cutoff, sampling_period, order):
    highpassed_signal = butterworth_highpass_filter(signal, low_cutoff, sampling_period, order)
    bandpassed_signal = butterworth_lowpass_filter(highpassed_signal, high_cutoff, sampling_period, order)
    return bandpassed_signal

# Load the data
df = pd.read_csv("s2.csv")

# Define sampling parameters
sampling_rate = 2000  # Hz
sampling_period = 1 / sampling_rate

# Extract a subset of the data
start, end = 7042, 9238
t = np.arange(len(df)) / sampling_rate  # Time in seconds
signal_gl = df["semg LT LAT.G"].values  # Gastrocnemius Lateralis (GL)
signal_vl = df["semg LT LAT.V"].values  # Vastus Lateralis (VL)

# Filter parameters
cutoff_low = 20    # Lower cutoff frequency in Hz
cutoff_high = 450  # Upper cutoff frequency in Hz
order = 2          # Filter order

# Apply bandpass filter to both signals
df["filtered_gl"] = butterworth_bandpass_filter(signal_gl, cutoff_low, cutoff_high, sampling_period, order)
df["filtered_vl"] = butterworth_bandpass_filter(signal_vl, cutoff_low, cutoff_high, sampling_period, order)

# Select the segment to plot
start_index = 7042
end_index = 9238
time_segment = t[start_index:end_index]
original_gl_segment = df["semg LT LAT.G"].values[start_index:end_index]
original_vl_segment = df["semg LT LAT.V"].values[start_index:end_index]
filtered_gl_segment = filtered_gl[start_index:end_index]
filtered_vl_segment = filtered_vl[start_index:end_index]

# Plot the original and filtered segments for GL and VL
fig = make_subplots(rows=2, cols=2, subplot_titles=("Original GL Signal", "Filtered GL Signal", "Original VL Signal", "Filtered VL Signal"))

# Original GL signal segment
fig.add_trace(go.Scatter(x=time_segment, y=original_gl_segment, mode='lines', name="Original GL", line=dict(color="blue")), row=1, col=1)

# Filtered GL signal segment
fig.add_trace(go.Scatter(x=time_segment, y=filtered_gl_segment, mode='lines', name="Filtered GL", line=dict(color="blue")), row=1, col=2)

# Original VL signal segment
fig.add_trace(go.Scatter(x=time_segment, y=original_vl_segment, mode='lines', name="Original VL", line=dict(color="green")), row=2, col=1)

# Filtered VL signal segment
fig.add_trace(go.Scatter(x=time_segment, y=filtered_vl_segment, mode='lines', name="Filtered VL", line=dict(color="green")), row=2, col=2)

# Update layout for titles and axis labels
fig.update_layout(
    height=700, width=1000,
    title_text="Original and Bandpass Filtered Signal Segments for Gastrocnemius Lateralis (GL) and Vastus Lateralis (VL)"
)
fig.update_xaxes(title_text="Time (seconds)", row=1, col=1)
fig.update_yaxes(title_text="Amplitude (uV)", row=1, col=1)
fig.update_xaxes(title_text="Time (seconds)", row=2, col=1)
fig.update_yaxes(title_text="Amplitude (uV)", row=2, col=1)
fig.update_xaxes(title_text="Time (seconds)", row=1, col=2)
fig.update_yaxes(title_text="Amplitude (uV)", row=1, col=2)
fig.update_xaxes(title_text="Time (seconds)", row=2, col=2)
fig.update_yaxes(title_text="Amplitude (uV)", row=2, col=2)

fig.show()

In [7]:
import numpy as np
import pywt

def denoise_dwt(signal, level=8):
    # Perform wavelet decomposition using db4
    coeffs = pywt.wavedec(signal, 'db4', level=level)
    
    # Calculate noise standard deviation using robust median estimator
    sigma = np.median(np.abs(coeffs[-1])) / 0.6745
    
    # Initialize empty list for modified coefficients
    denoised_coeffs = []
    
    # Threshold multiplication factor (increased for stronger denoising)
    threshold_factor = 20.0  # Meningkatkan dari default sqrt(2)
    
    # Process each decomposition level with adaptive thresholding
    for i, coeff in enumerate(coeffs):
        if i == 0:  # Skip approximation coefficients
            denoised_coeffs.append(coeff)
            continue
            
        # Calculate level-dependent threshold with higher base threshold
        N = len(signal)
        level_factor = 1.5 / np.sqrt(2**(i))  # Increased level factor
        threshold = sigma * threshold_factor * np.sqrt(np.log(N)) * level_factor
        
        # Apply more aggressive thresholding
        coeff_abs = np.abs(coeff)
        sign = np.sign(coeff)
        
        # Increased threshold multiplier for classification
        is_small = coeff_abs <= 2.5 * threshold  # Increased from 2.0
        is_large = coeff_abs > 2.5 * threshold
        
        modified_coeff = np.zeros_like(coeff)
        # Only keep very large coefficients
        modified_coeff[is_large] = coeff[is_large]
        # More aggressive soft thresholding for small coefficients
        modified_coeff[is_small] = sign[is_small] * (coeff_abs[is_small] - 1.5 * threshold) * (coeff_abs[is_small] > 1.5 * threshold)
        
        denoised_coeffs.append(modified_coeff)
    
    # Reconstruct the signal using db4
    denoised_signal = pywt.waverec(denoised_coeffs, 'db4')
    
    # Ensure the output length matches input length
    if len(denoised_signal) > len(signal):
        denoised_signal = denoised_signal[:len(signal)]
    elif len(denoised_signal) < len(signal):
        denoised_signal = np.pad(denoised_signal, (0, len(signal) - len(denoised_signal)), 'edge')
    
    return denoised_signal

# Apply DWT denoising on filtered signals
df["GL_dwt"] = denoise_dwt(df["filtered_gl"])
df["VL_dwt"] = denoise_dwt(df["filtered_vl"])


# Create subplots for each signal, showing both the filtered and DWT-denoised signals
fig = make_subplots(rows=2, cols=1, subplot_titles=("GL Left", "VL Left"))

# GL Left
fig.add_trace(go.Scatter(x=time[7042:9238], y=df["filtered_gl"][7042:9238], mode="lines", name="GL Left (Filtered)", line=dict(color="blue")), row=1, col=1)
fig.add_trace(go.Scatter(x=time[7042:9238], y=df["GL_dwt"][7042:9238], mode="lines", name="GL Left (DWT-Denoised)", line=dict(color="orange")), row=1, col=1)

# VL Left
fig.add_trace(go.Scatter(x=time[7042:9238], y=df["filtered_vl"][7042:9238], mode="lines", name="VL Left (Filtered)", line=dict(color="green")), row=2, col=1)
fig.add_trace(go.Scatter(x=time[7042:9238], y=df["VL_dwt"][7042:9238], mode="lines", name="VL Left (DWT-Denoised)", line=dict(color="red")), row=2, col=1)

# Update layout
fig.update_layout(height=800, width=1000, title_text="Muscle Signals Before and After DWT Denoising")
fig.update_xaxes(title_text="Time (seconds)", row=1, col=1)
fig.update_yaxes(title_text="Signal (uV)", row=1, col=1)
fig.update_xaxes(title_text="Time (seconds)", row=2, col=1)
fig.update_yaxes(title_text="Signal (uV)", row=2, col=1)

# Show plot
fig.show()

In [None]:
from numba import jit

coloumncount = 100
rowcount = 100
a = 0.0001
da = 0.0001
dt = 1/8000
f0 = 0.849
w0 = 2*np.pi* f0

@jit(nopython=True)
def cwt(coloumncount, rowcount, a, da, dt, f0, y):
    w0 = 2 * np.pi * f0
    Ndata = len(y)
    pi = np.pi
    db = (Ndata - 1) * dt / coloumncount
    cwtre = np.zeros((coloumncount, rowcount))
    cwtim = np.zeros((coloumncount, rowcount))
    cwt = np.zeros((coloumncount, rowcount))

    for i in range(rowcount):
        b = 0.0
        for j in range(coloumncount):
            t = 0.0
            cwtre_sum = 0.0
            cwtim_sum = 0.0
            for k in range(Ndata):
                rem = (1 / np.sqrt(a)) * (1 / np.power(pi, 0.25)) * np.exp(-((t - b) / a) ** 2 / 2.0) * np.cos(w0 * (t - b) / a)
                imm = (1 / np.sqrt(a)) * (-1 / np.power(pi, 0.25)) * np.exp(-((t - b) / a) ** 2 / 2.0) * np.sin(w0 * (t - b) / a)
                cwtre_sum += y[k] * rem
                cwtim_sum += y[k] * imm
                t += dt

            cwtre[j, i] = cwtre_sum
            cwtim[j, i] = cwtim_sum
            cwt[j, i] = np.sqrt(cwtre[j, i] ** 2 + cwtim[j, i] ** 2)
            b += db

        a += da
    return cwt

# Run CWT on denoised signals
gl_cwt_result = cwt(coloumncount, rowcount, a, da, dt, f0, df["GL_dwt"][7042:9238].values)
vl_cwt_result = cwt(coloumncount, rowcount, a, da, dt, f0, df["VL_dwt"][7042:9238].values)

In [None]:
X, Y = np.meshgrid(np.arange(rowcount), np.arange(coloumncount))
Z_gl = gl_cwt_result.T  

fig = go.Figure(data=[go.Surface(z=Z_gl, x=X, y=Y, colorscale='Viridis')])
fig.update_layout(title='Continuous Wavelet Transform (CWT) 3D Plot',
                  scene=dict(xaxis_title='Time',
                             yaxis_title='Freq',
                             zaxis_title='Magnitude'),
                  autosize=True)
fig.show()

In [None]:
X, Y = np.meshgrid(np.arange(rowcount), np.arange(coloumncount))
Z_vl = vl_cwt_result.T  

fig = go.Figure(data=[go.Surface(z=Z_vl, x=X, y=Y, colorscale='Viridis')])
fig.update_layout(title='Continuous Wavelet Transform (CWT) 3D Plot',
                  scene=dict(xaxis_title='Time',
                             yaxis_title='Freq',
                             zaxis_title='Magnitude'),
                  autosize=True)
fig.show()

In [None]:

def plot_thresholded_cwt_with_boundaries(Z, threshold_coef=0.1):
    rowcount, colcount = Z.shape
    
    # Cari nilai maksimum dan minimum dari Z
    Z_max = np.max(Z)
    Z_min = np.min(Z)
    
    # Hitung threshold berdasarkan koefisien thresholding
    threshold = Z_min + threshold_coef * Z_max
    
    # Buat matriks thresholded Z (binary)
    Z_tr = np.where(Z > threshold, 1, 0)
    
    # Buat variabel anotate untuk menandai area threshold
    anotate = np.zeros(colcount)
    for x in range(colcount):
        if np.any(Z_tr[:, x] == 1):  
            anotate[x] = 1
    
    # Menghapus noise: jika ada nilai 1 tunggal di antara nilai 0, ubah menjadi 0
    for i in range(1, len(anotate) - 1):
        if anotate[i] == 1 and anotate[i - 1] == 0 and anotate[i + 1] == 0:
            anotate[i] = 0
    
    # Identifikasi batas antara area 0 dan 1
    boundaries = []
    for i in range(1, len(anotate)):
        if anotate[i] != anotate[i - 1]:  # Jika ada perubahan dari 0 ke 1 atau sebaliknya
            boundaries.append(i)

    # Buat subplot dengan 2 heatmap
    fig = make_subplots(
        rows=1, cols=2,
        subplot_titles=('Original CWT', 'Thresholded CWT with Boundaries'),
        horizontal_spacing=0.1
    )
    
    # Plot original CWT
    fig.add_trace(
        go.Heatmap(
            z=Z,
            colorscale='Jet',
            showscale=True,
            colorbar=dict(x=0.45, title='Magnitude')
        ),
        row=1, col=1
    )
    
    # Plot thresholded CWT
    fig.add_trace(
        go.Heatmap(
            z=Z_tr,
            colorscale='Jet',
            showscale=True,
            colorbar=dict(x=1.0, title='Binary (0/1)')
        ),
        row=1, col=2
    )
    
    # Tambahkan garis batas pada plot thresholded CWT
    for boundary in boundaries:
        fig.add_shape(
            type="line",
            x0=boundary, y0=0, x1=boundary, y1=rowcount,
            line=dict(color="red", width=2),
            xref="x2", yref="y2"  # Mengacu ke subplot kedua
        )

    # Update layout
    fig.update_layout(
        title='CWT Analysis: Original vs Thresholded with Boundaries',
        height=600,
        width=1200,
    )
    
    # Update axes labels
    fig.update_xaxes(title_text="Time (columns)", row=1, col=1)
    fig.update_xaxes(title_text="Time (columns)", row=1, col=2)
    fig.update_yaxes(title_text="Frequency (rows)", row=1, col=1)
    fig.update_yaxes(title_text="Frequency (rows)", row=1, col=2)
    
    # Show plot
    fig.show()
    
    return Z_tr, anotate

# Jalankan fungsi dengan data Z yang sudah ada
Z_tr_gl, anotate_gl = plot_thresholded_cwt_with_boundaries(Z_gl, threshold_coef=0.2)
Z_tr_vl, anotate_vl = plot_thresholded_cwt_with_boundaries(Z_vl, threshold_coef=0.15)


In [None]:
# Plot hasil anotate menggunakan Plotly
fig = go.Figure()
fig.add_trace(go.Scatter(
    y=anotate_gl,
    mode='lines+markers',
    line=dict(color='blue'),
    marker=dict(symbol='circle', color='blue')
))

# Pengaturan tampilan plot
fig.update_layout(
    title="Plot of Anotate Values After Noise Thresholding",
    xaxis_title="Index",
    yaxis_title="Anotate Value"
)

fig.show()