In [1]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import glob
from pathlib import Path
import re 
from datetime import datetime
import pywt

base_dir = "data"

In [2]:
def find_data_files(base_dir):
    """
    Recursively find all 'data' files (case-insensitive) within the given base directory.
    Returns a list of tuples (folder_name, file_path).
    """
    data_files = []

    # Search for all files in the directory and filter those named "binout" (case-insensitive)
    for file_path in Path(base_dir).rglob("*.txt"):
            folder_name = file_path.parent.name  # Extract folder name
            data_files.append((folder_name, str(file_path)))

    return data_files

def find_pulse_edges(edges, signal, pulse_window):
    """
    Identifies and classifies pulses using derivative-based edge detection and validates pulse windows.

    Parameters:
    edges (array-like): List of detected edge indices.
    signal (array-like): The reference signal (e.g., SHPB strain gauge data).
    pulse_window (float): Expected pulse duration (in index units).

    Returns:
    tuple: (List of valid pulse start indices, Dictionary of all pulse properties)
    """
    rising_edges_start_idx = []
    falling_edges_start_idx = [] 
    window_start_approx = []
    window_end_approx = []
    
    edge_results = {
        "start": [],
        "end": [],
        "n_points": [],
        "direction": []
    }
    
    if len(edges) == 0:
        return window_start_approx, edge_results  # Return empty result if no edges detected

    # Compute first derivative of the signal
    derivative = np.gradient(signal)

    # Sort edges to ensure order
    edges = np.sort(edges)

    # Group consecutive edges
    grouped_edges = []
    current_group = [edges[0]]

    for i in range(1, len(edges)):
        if edges[i] - edges[i - 1] <= 2:  # Allowing small gaps (adjustable)
            current_group.append(edges[i])
        else:
            grouped_edges.append(current_group)
            current_group = [edges[i]]
    
    grouped_edges.append(current_group)  # Add last group

    # Process grouped edges to extract pulse properties
    for group in grouped_edges:
        start_idx = group[0]
        end_idx = group[-1]
        n_points = len(group)

        # Compute average derivative over the pulse range
        avg_slope = np.mean(derivative[start_idx:end_idx])

        # Direction is determined by the sign of the average derivative
        if avg_slope > 0: 
            direction = 1
            rising_edges_start_idx.append(start_idx)
        else: 
            direction = -1
            falling_edges_start_idx.append(start_idx)

        # Store results
        edge_results["start"].append(start_idx)
        edge_results["end"].append(end_idx)
        edge_results["n_points"].append(n_points)
        edge_results["direction"].append(direction)

    # **Improved Logic for Pulse Window Matching**
    for rise_idx in rising_edges_start_idx:
        # Find the closest falling edge that meets pulse window constraints
        valid_falls = [
            fall_idx for fall_idx in falling_edges_start_idx
            if pulse_window * 0.90 <= np.abs(rise_idx - fall_idx) <= pulse_window * 1.30
        ]

        if valid_falls:
            # Select the closest falling edge
            best_fall_idx = min(valid_falls, key=lambda x: abs(rise_idx - x))
            window_start_approx.append(min(rise_idx, best_fall_idx))  # Store the earlier index as the start
            window_end_approx.append(max(rise_idx, best_fall_idx))

    return window_start_approx, window_end_approx

def extract_pulse_window(signal, signal_start_approx, signal_end_approx, pulse_points, pp_extra, negative = True):
    
    search_range = np.arange(int(signal_start_approx-(pulse_points*pp_extra)),
                             int(signal_start_approx+(pulse_points*(1+pp_extra)))
                             ,1)
    
    signal_subset = np.array(signal[search_range])    
    signal_gradient = np.gradient(signal_subset)

    if negative:
        signal_start_slope = np.argmin(signal_gradient)
        
    else: 
        signal_start_slope = np.argmax(signal_gradient)
        
    signal_start_zero = np.where(np.isclose(signal_subset[:signal_start_slope], 0, atol=5e-5))[0][-1]
    signal_end_zero = np.where(np.isclose(signal_subset[signal_start_slope:], 0, atol=11e-5))[0][0]
    
    window_range = np.arange(int(signal_start_zero), int(signal_start_slope + signal_end_zero), 1)    
    extracted_signal = signal_subset[window_range]
    time_start_idx = int(signal_start_approx-(pulse_points*pp_extra) + signal_start_zero)
    
    return extracted_signal, time_start_idx


In [3]:
data_files = find_data_files(base_dir)
print(f"Found {len(data_files)} tests for analysis")

Found 28 tests for analysis


In [25]:
wavelet = 'gaus1'
scales = np.arange(1, 300, 2)  # Multi-scale analysis 
pulse_data_points = 9850 # from previous analysis in database 

pulse_rise_data = pd.DataFrame()

for test_paths in data_files:
    folder_date, data_path = test_paths

    strain_data = pd.read_csv(data_path, sep="\t")
    strain_data.drop(index=0, inplace=True)

    incident_raw = strain_data["Channel A"].astype(np.float32)
    transmitted_raw = strain_data["Channel B"].astype(np.float32)
    time = strain_data["Time"].astype(np.float32)

    # Compute CWT
    incident_coeffs, _ = pywt.cwt(incident_raw, scales, wavelet)
    transmitted_coeffs, _ = pywt.cwt(transmitted_raw, scales, wavelet)

    # Find strong edges by taking the absolute max of wavelet coefficients
    incident_edges = np.where(np.abs(incident_coeffs).max(axis=0) > np.percentile(np.abs(incident_coeffs), 98.5))[0]
    transmitted_edges = np.where(np.abs(transmitted_coeffs).max(axis=0) > np.percentile(np.abs(transmitted_coeffs), 98.5))[0]

    incident_start_approx, incident_end_approx = find_pulse_edges(incident_edges, incident_raw,
                                                              pulse_data_points)
    transmitted_start_approx, transmitted_end_approx = find_pulse_edges(transmitted_edges, transmitted_raw ,
                                                                        pulse_data_points)

    dt = np.mean(np.diff(time))
    try:
        incident_start = np.where(np.isclose(incident_raw[:incident_start_approx[0]], 0, atol=0.3))[0][-1]
        incident_pulse_gradient = np.gradient(incident_raw[incident_start:incident_start+pulse_data_points])
        incident_min = np.argmin(incident_pulse_gradient)
        incident_peak =  np.where(np.isclose(incident_pulse_gradient[incident_min:], 0, atol=5e-6))[0][0]
        incident_rise_time = dt * (incident_min + incident_peak)
        
    
        transmitted_start = np.where(np.isclose(transmitted_raw[:transmitted_start_approx[0]], 0, atol=0.3))[0][-1]
        transmitted_pulse_gradient = np.gradient(transmitted_raw[transmitted_start:transmitted_start+pulse_data_points])
        transmitted_min = np.argmin(transmitted_pulse_gradient)
        transmitted_peak =  np.where(np.isclose(transmitted_pulse_gradient[transmitted_min:], 0, atol=5e-6))[0][0]
        transmitted_rise_time = dt * (transmitted_min + transmitted_peak)
    
        results = pd.DataFrame({
            "Date": [folder_date],
            "Test": [data_path.split("\\")[-1]],
            "Incident Rise Time (ms)": [incident_rise_time],
            "Transmitted Rise Time (ms)": [transmitted_rise_time]       })    

        pulse_rise_data = pd.concat([pulse_rise_data, results], axis=0, ignore_index=True) 
    
            # Create figure and axis
        fig, ax = plt.subplots(figsize=(11, 6))
        
        # Plot raw signals
        ax.plot(time, incident_raw, label="Incident", color="royalblue", linewidth=2)
        ax.plot(time, transmitted_raw, label="Transmitted", color="red", linewidth=2)
        
        # Plot approximate start and end points
        ax.scatter(time[incident_start], incident_raw[incident_start], color="purple", s=100, label="Incident Start",
                   edgecolors="black", zorder=4)
        ax.scatter(time[incident_start + incident_min + incident_peak ], incident_raw[incident_start + incident_min + incident_peak], color="darkturquoise",
                   s=100, label="Incident End", edgecolors="black", zorder=4)
        ax.scatter(time[transmitted_start], transmitted_raw[transmitted_start], color="plum", s=100, label="Transmitted Start",
                   edgecolors="black", zorder=4)
        ax.scatter(time[transmitted_start + transmitted_min + transmitted_peak], transmitted_raw[transmitted_start+ transmitted_min + transmitted_peak],
                   color="salmon", s=100, label="Transmitted End", edgecolors="black", zorder=4)
        
        # Customize plot appearance
        ax.set_title("Rise Time Calculations on Raw Pulse Data", fontsize=18)
        ax.set_xlabel("Time (ms)", fontsize=14)
        ax.set_ylabel("Voltage", fontsize=14)
        ax.grid(True, linestyle="--", linewidth=0.5, color="grey")
        ax.legend(fontsize=12, frameon=True, edgecolor="black")
    
        figure_folder = "figures"
        os.makedirs(figure_folder, exist_ok=True)
        plt.savefig(os.path.join(figure_folder, folder_date + "_"+data_path.split('\\')[-1].split('.txt')[0] + ".png"))
        plt.close()
        
    except Exception as e:
        print(data_path.split('\\')[-1] + " not processed.")
        print(e)


Elastic_Test01.txt not processed.
list index out of range
Elastic_Test02.txt not processed.
list index out of range
Elastic_Test03.txt not processed.
list index out of range
Elastic_Test04.txt not processed.
list index out of range
Elastic_Test05.txt not processed.
list index out of range
Elastic_Test06.txt not processed.
list index out of range
Elastic_Test10.txt not processed.
list index out of range


In [26]:
pulse_rise_data.to_csv( "pulse_rise_data.csv")