In [18]:
import sys
sys.path.append(r"C:\Users\hjia9\Documents\GitHub\data-analysis\read")
sys.path.append(r"C:\Users\hjia9\Documents\GitHub\data-analysis")

import numpy as np
import time

from read_scope_data import read_trc_data
from data_analysis_utils import Photons, PhotonPulse, hl_envelopes_idx
from plot_utils import plot_counts_per_bin

import matplotlib.pyplot as plt
from scipy.ndimage import gaussian_filter1d
from scipy.signal import fftconvolve, savgol_filter, find_peaks, find_peaks_cwt

from plot_utils import plot_original_and_baseline, plot_subtracted_signal
%matplotlib qt

In [37]:
ifn = r"E:\x-ray\20241102\C3--E-ring-p24-z13-x40-xray--00034.trc"
xray_data, tarr_x = read_trc_data(ifn)
print(xray_data.shape)

Reading data...
Done
(25000002,)


In [42]:
time_ms = tarr_x * 1000
filtered_xray_data = savgol_filter(xray_data, window_length=51, polyorder=3)

start_idx = 10000000
end_idx = 10500000

plt.figure(0)
plt.plot(time_ms[start_idx:end_idx], xray_data[start_idx:end_idx], label='Original')
plt.plot(time_ms[start_idx:end_idx], filtered_xray_data[start_idx:end_idx], label='Filtered')
plt.legend(loc='upper right')
plt.show()

In [52]:
def analyze_downsample_options(data, tarr, window_length=51, polyorder=3, min_timescale_ms=1e-3):
    """
    Analyze different downsample rates and their effect on filtered data.
    
    Args:
        data: Input signal array
        times: Time array in milliseconds
        window_length: Savitzky-Golay filter window length
        polyorder: Savitzky-Golay filter polynomial order
        min_timescale_ms: Minimum timescale to preserve in milliseconds
    
    Returns:
        int: Recommended downsample rate
    """
    # Calculate sampling rate
    dt = tarr[1] - tarr[0]
    min_samples = min_timescale_ms / dt  # minimum samples needed
    
    # Original filtered data
    filtered_orig = savgol_filter(data, window_length, polyorder)
    
    # Try different downsample rates
    rates = [2, 5, 10, 20, 50]
    errors = []
    
    print(f"\nAnalyzing downsample rates (minimum {min_samples:.1e} samples needed for {min_timescale_ms:.1e} ms features):")


    
    for rate in rates:
        # Skip rates that would undersample the minimum timescale
        if rate > min_samples/2:  # Nyquist criterion
            print(f"Rate {rate:2d}: Too high for {min_timescale_ms:.1e} ms features")
            continue
            
        # Downsample, filter, then upsample
        downsampled = data[::rate]
        filtered_down = savgol_filter(downsampled, 
                                    max(3, window_length//rate), 
                                    min(polyorder, (window_length//rate)-1))
        # Interpolate back to original size
        filtered_up = np.interp(np.arange(len(data)), 
                              np.arange(len(downsampled))*rate, 
                              filtered_down)
        
        # Compare with original
        error = np.abs(filtered_orig - filtered_up).mean()
        errors.append((rate, error))
        print(f"Rate {rate:2d}: Mean error = {error:.2e}, Samples in min_timescale = {min_samples/rate:.1f}")
    
    if errors:
        # Find best rate that preserves features
        best_rate = min(errors, key=lambda x: x[1])[0]
        print(f"\nRecommended downsample rate: {best_rate}")
        return best_rate
    else:
        print("\nNo suitable downsample rates found for given timescale")
        return 1

In [90]:
min_ts = 0.5e-3
analyze_downsample_options(filtered_xray_data, time_ms, window_length=51, polyorder=3, min_timescale_ms=min_ts)


Analyzing downsample rates (minimum 2.5e+02 samples needed for 5.0e-04 ms features):
Rate  2: Mean error = 1.55e-05, Samples in min_timescale = 125.0
Rate  5: Mean error = 2.40e-04, Samples in min_timescale = 50.0
Rate 10: Mean error = 1.59e-04, Samples in min_timescale = 25.0
Rate 20: Mean error = 8.44e-04, Samples in min_timescale = 12.5
Rate 50: Mean error = 1.31e-03, Samples in min_timescale = 5.0

Recommended downsample rate: 2


2

In [120]:
# Downsample data using recommended rate of 2
downsample_rate = 8

# Create downsampled arrays
time_ds = time_ms[::downsample_rate]
data_ds = filtered_xray_data[::downsample_rate]

# Calculate minimum distance between peaks based on min_timescale
sample_period = (time_ds[1] - time_ds[0])  # Time between samples
min_distance = int(min_ts / sample_period * 10)

# Plot comparison
plt.figure(figsize=(12,6))
plt.plot(time_ms, -filtered_xray_data, 'b-', label='Original', alpha=0.5)
plt.plot(time_ds, -data_ds, 'r-', label='Downsampled', linewidth=2)
plt.xlabel('Time (ms)')
plt.ylabel('Signal (V)')
plt.title(f'Original vs Downsampled Data (rate={downsample_rate})')
plt.legend(loc='upper right')
plt.grid(True)
plt.show()

In [121]:
larr, harr = hl_envelopes_idx(data_ds, dmin=1, dmax=min_distance, split=False)

plt.figure()
plt.plot(time_ds, data_ds)
plt.plot(time_ds[larr], data_ds[larr], label='Lower Envelope')
plt.plot(time_ds[harr], data_ds[harr], label='Upper Envelope')
plt.xlabel('Time (ms)')
plt.ylabel('Signal')
plt.legend(loc='upper right')
plt.show()

In [122]:
noise_sample = data_ds[:int(len(data_ds)*0.001)]
noise_amplitude = (np.max(np.abs(noise_sample)) - np.min(np.abs(noise_sample))) / 2

# Interpolate baseline using upper envelope points
baseline = np.interp(np.arange(len(data_ds)), harr, data_ds[harr]) - noise_amplitude
baseline_subtracted = data_ds - baseline

plt.figure()
plt.plot(time_ds, data_ds, label='Original')
plt.plot(time_ds, baseline, label='Baseline')
plt.plot(time_ds, baseline_subtracted, label='Baseline Subtracted')
plt.legend(loc='upper right')
plt.show()

In [125]:
data = -baseline_subtracted
# Use first 5% of data as noise sample for threshold determination
noise_sample = data[:int(len(data)*0.05)]
noise_mean = np.mean(noise_sample)
noise_std = np.std(noise_sample)

lower_threshold = noise_mean + 6*noise_std
upper_threshold = noise_mean + 100*noise_std

# Find peaks above lower threshold with minimum distance constraint
peak_indices, _ = find_peaks(data, height=lower_threshold, distance=min_distance)

# Remove peaks that exceed upper threshold and nearby peaks
mask = np.ones(len(peak_indices), dtype=bool)
for i, idx in enumerate(peak_indices):
    if data[idx] > upper_threshold:
        # Find all peaks within min_distance of this large peak
        nearby_mask = np.abs(peak_indices - idx) <= min_distance*20
        mask[nearby_mask] = False

# Apply mask to keep only valid peaks
peak_indices = peak_indices[mask]

# Convert peak indices back to original time coordinates
pulse_times = time_ds[peak_indices]
pulse_amplitudes = data[peak_indices]

plt.figure()
plt.plot(time_ds, data)
plt.axhline(y=lower_threshold, color='g', linestyle='--', label='Lower Threshold')
plt.axhline(y=upper_threshold, color='r', linestyle='--', label='Upper Threshold')
plt.scatter(pulse_times, pulse_amplitudes, color='red')
plt.xlabel('Time (ms)')
plt.ylabel('Signal')
plt.title('Detected Pulses')
plt.legend()
plt.show()


In [5]:
# Create histogram of raw signal data
plt.figure(figsize=(10,6))
plt.hist(xray_data, bins=1000, density=True)
plt.xlabel('Signal Amplitude (V)') 
plt.ylabel('Density')
plt.title('Histogram of X-ray Signal Amplitudes')
plt.grid(True)
plt.show()

In [None]:
# Create histogram of pulses over time with adjustable bin width
bin_width = 0.1

plt.figure(figsize=(10,6))
bins = np.arange(min(pulse_times), max(pulse_times) + bin_width, bin_width)
hist, bins = np.histogram(pulse_times, bins=bins)
bin_centers = (bins[:-1] + bins[1:])/2

plt.plot(bin_centers, hist)
plt.xlabel('Time (ms)')
plt.ylabel('Counts') 
plt.title(f'Photon Counts vs Time')
plt.grid(True)
plt.show()


In [119]:
print(min_distance)

31
