In [2]:
import lightning as L
from torch.utils.data import random_split, DataLoader, Dataset
import h5py
import numpy as np
from scipy.fft import fft, fftfreq
import torch
from noise import *

def fft_func_IQ_abs(signal_I, signal_Q):
    signal = signal_I + 1j*signal_Q
    N = len(signal)
    yf = fft(signal)
    return np.abs(yf)

def fft_func_IQ_complex_channels(signal_I, signal_Q):
    signal = signal_I + 1j*signal_Q
    N = len(signal)
    yf = fft(signal)
    real_part = yf.real
    imag_part = yf.imag
    return real_part, imag_part

class Project8Sim(Dataset):
    def __init__(self, inputs, variables, observables, path='/gpfs/gibbs/pi/heeger/hb637/ssm_files_pi_heeger/combined_data_fullsim.hdf5', cutoff=4000, norm=True, noise_const=1,
                 apply_filter = False, apply_fft=False, ts_and_fft=False, complex_channels=False):

        self.path = path
        self.inputs = inputs
        self.variables = variables
        self.observables = observables
        self.cutoff = cutoff
        self.norm = norm
        self.noise_const = noise_const
        self.apply_filter = apply_filter
        self.apply_fft = apply_fft
        self.ts_and_fft = ts_and_fft
        self.complex_channels = complex_channels

        with h5py.File(path, 'r') as f:
            
            # Load y data
            y_data_list = [f[v][:][:, np.newaxis] for v in variables]
            y_data = np.concatenate(y_data_list, axis=1)

            # Load obs data
            obs_data_list = [f[o][:][:, np.newaxis] for o in observables]
            obs_data = np.concatenate(obs_data_list, axis=1)

        # Normalization of y
        self.mu = 0.0
        self.stds = 1.0
        if norm:
            self.mu = np.mean(y_data, axis=0)
            self.stds = np.std(y_data, axis=0)
            # Prevent division by zero
            self.stds[self.stds == 0] = 1.0
            y_data = (y_data - self.mu) / self.stds
            
        self.vars = np.float32(y_data)
        self.obs = np.float32(obs_data)

        self.index_ts_I = inputs.index('output_ts_I') if ('output_ts_I' in inputs) else None
        self.index_ts_Q = inputs.index('output_ts_Q') if ('output_ts_Q' in inputs) else None

    def __len__(self):
        return self.vars.shape[0]

    def __getitem__(self, idx):
        # Open HDF5 file only for this sample
        with h5py.File(self.path, 'r') as f:
            X_list = []
            for i in self.inputs:
                data_slice = f[i][idx][:self.cutoff]
                X_list.append(data_slice[:, np.newaxis])
            
            X_sample = np.concatenate(X_list, axis=1)

        ts_sample = np.zeros_like(X_sample, dtype=np.float32)
        fft_combined = None 

        for j in range(X_sample.shape[1]): # Iterate over channels
            X_data = X_sample[:, j]
            
            noise_arr = noise_model(self.cutoff, self.noise_const)
            X_noise = X_data + noise_arr
            
            if self.apply_filter:
                X_noise = bandpass_filter(X_noise)
            
            std_X = np.std(X_noise) if self.norm else 1.0
            X_norm = X_noise / std_X
            ts_sample[:, j] = X_norm # Save time series sample
        
        if self.apply_fft and self.index_ts_I is not None and self.index_ts_Q is not None:
            I = ts_sample[:, self.index_ts_I]
            Q = ts_sample[:, self.index_ts_Q]
            
            if self.complex_channels:
                real, imag = fft_func_IQ_complex_channels(I, Q)
                fft_combined = np.stack([real, imag], axis=1).astype(np.float32) # shape: (cutoff, 2)
            else:
                mag = fft_func_IQ_abs(I, Q)
                fft_combined = mag[:, np.newaxis].astype(np.float32) # shape: (cutoff, 1)

        timeseries = ts_sample
        
        if self.ts_and_fft and fft_combined is not None:
            timeseries = np.concatenate([ts_sample, fft_combined], axis=1)
        
        var = self.vars[idx]
        obs = self.obs[idx]
        
        return torch.from_numpy(timeseries), torch.from_numpy(var), torch.from_numpy(obs)
    
    def __outdim__(self):
        return self.vars.shape[1]
    
    def __indim__(self):
        return self.cutoff

In [3]:
# import lightning as L
# from torch.utils.data import random_split, DataLoader, Dataset
# import h5py
# import numpy as np
# from scipy.fft import fft, fftfreq
# import torch
# from noise import *

# def fft_func_IQ_abs(signal_I, signal_Q):
#     signal = signal_I + 1j*signal_Q
#     N = len(signal)
#     yf = fft(signal)
#     return np.abs(yf)

# def fft_func_IQ_complex_channels(signal_I, signal_Q):
#     signal = signal_I + 1j*signal_Q
#     N = len(signal)
#     yf = fft(signal)
#     real_part = yf.real
#     imag_part = yf.imag
#     return real_part, imag_part

# class Project8Sim(Dataset):
#     def __init__(self, inputs, variables, observables, path='/gpfs/gibbs/pi/heeger/hb637/ssm_files_pi_heeger/combined_data_fullsim.hdf5', cutoff=4000, norm=True, noise_const=1,
#             apply_filter = False, apply_fft=False, ts_and_fft=False, complex_channels=False):
#         arr = {}
#         with h5py.File(path, 'r') as f:
#             for i in inputs+variables+observables:
#                 #arr[i] = f[i][:100]
#                 arr[i] = f[i][:]
#                 arr[i] = arr[i][:, np.newaxis]
#         X = np.concatenate([arr[i] for i in inputs], axis = 1)
#         X = np.swapaxes(X,1,2)[:,:cutoff, :]
#         y = np.concatenate([arr[v] for v in variables], axis = 1)
#         obs = np.concatenate([arr[o] for o in observables], axis = 1)
#         # add noise and normalize by std

#         if ('output_ts_I' in inputs) and ('output_ts_Q' in inputs) and apply_fft:
#             index_ts_I = inputs.index('output_ts_I')
#             index_ts_Q = inputs.index('output_ts_Q')

#             ts_list = []
#             fft_list = []

#             for i in range(X.shape[0]):
#                 ts_sample = np.zeros_like(X[i])  # (cutoff, channels)
#                 for j in range(X.shape[2]):
#                     noise_arr = noise_model(cutoff, noise_const)
#                     X_noise = X[i, :, j] + noise_arr
#                     if apply_filter:
#                         X_noise = bandpass_filter(X_noise)
#                     std_X = np.std(X_noise) if norm else 1.0
#                     X_norm = X_noise / std_X
#                     ts_sample[:, j] = X_norm  # Save time series

#                 ts_list.append(ts_sample)

#                 if ts_and_fft:
#                     I = ts_sample[:, index_ts_I]
#                     Q = ts_sample[:, index_ts_Q]
#                     if complex_channels:
#                         real, imag = fft_func_IQ_complex_channels(I, Q)
#                         fft_combined = np.stack([real, imag], axis=1)  # shape: (cutoff, 2)
#                     else:
#                         mag = fft_func_IQ_abs(I, Q)
#                         fft_combined = mag[:, np.newaxis]  # shape: (cutoff, 1)

#                     fft_list.append(fft_combined)

#             X_ts = np.stack(ts_list, axis=0)  # shape: (samples, cutoff, ts_channels)
#             if ts_and_fft:
#                 X_fft = np.stack(fft_list, axis=0)  # shape: (samples, cutoff, fft_channels)
#                 X = np.concatenate([X_ts, X_fft], axis=2)  # concat along channels
#             else:
#                 X = X_ts  # just the FFT (was applied in-place above if needed)

#         if norm:
#             mu_y = np.mean(y, axis=0)
#             stds_y = np.std(y, axis=0)
#             y = (y-mu_y)/stds_y
            
#         self.mu = mu_y
#         self.stds = stds_y
#         self.timeseries = np.float32(X)
#         self.vars = np.float32(y)
#         self.obs = np.float32(obs)
        
#     def __len__(self):
#         return self.vars.shape[0]
    
#     def __getitem__(self, idx):
#         times = self.timeseries[idx, :, :]
#         var = self.vars[idx]
#         obs = self.obs[idx, :]
#         return times, var, obs
    
#     def __outdim__(self):
#         return self.vars.shape[1]
    
#     def __indim__(self):
#         return self.timeseries.shape[1]

In [6]:
# import lightning as L
# from torch.utils.data import random_split, DataLoader, Dataset
# import h5py
# import numpy as np
# from scipy.fft import fft, fftfreq
# import torch
# from noise import *
# #from utils import fft_func_complex

# def fft_func_IQ_abs(signal_I, signal_Q):
#     signal = signal_I + 1j*signal_Q
#     N = len(signal)
#     yf = fft(signal)
#     return np.abs(yf)

# def fft_func_IQ_complex_channels(signal_I, signal_Q):
#     signal = signal_I + 1j*signal_Q
#     N = len(signal)
#     yf = fft(signal)
#     real_part = yf.real
#     imag_part = yf.imag
#     return real_part, imag_part

# class Project8Sim(Dataset):
#     def __init__(self, inputs, variables, observables, path='/gpfs/gibbs/pi/heeger/hb637/ssm_files_pi_heeger/combined_data_fullsim.hdf5', cutoff=4000, norm=True, noise_const=1,
#             apply_filter = False, apply_fft=False, complex_channels=False):
#         arr = {}
#         with h5py.File(path, 'r') as f:
#             for i in inputs+variables+observables:
#                 #arr[i] = f[i][:100]
#                 arr[i] = f[i][:]
#                 arr[i] = arr[i][:, np.newaxis]
#         X = np.concatenate([arr[i] for i in inputs], axis = 1)
#         X = np.swapaxes(X,1,2)[:,:cutoff, :]
#         y = np.concatenate([arr[v] for v in variables], axis = 1)
#         obs = np.concatenate([arr[o] for o in observables], axis = 1)
#         # add noise and normalize by std

#         if ('output_ts_I' in inputs) and ('output_ts_Q' in inputs) and apply_fft:
#             index_ts_I = inputs.index('output_ts_I')
#             index_ts_Q = inputs.index('output_ts_Q')
#             for i in range(X.shape[0]):
#                 for j in range(X.shape[2]):
#                     noise_arr = noise_model(cutoff, noise_const)
#                     X_noise = X[i, :, j] + noise_arr
#                     if(apply_filter): # whether to apply the band pass filter
#                         X_noise = bandpass_filter(X_noise)
#                     if(norm): # whether to normalize by std
#                         std_X = np.std(X_noise)
#                     else:
#                         std_X = 1
#                     X[i, :, j] = X_noise/std_X

#                 if(complex_channels):
#                     X[i,:,index_ts_I], X[i,:,index_ts_Q] = fft_func_IQ_complex_channels(X[i, :, index_ts_I], X[i, :, index_ts_Q])
#                 else:
#                     X[i,:,0] = fft_func_IQ_abs(X[i, :, index_ts_I], X[i, :, index_ts_Q])


#             if apply_fft and not(complex_channels):
#                 X = X[:,:,0]
#                 X = X[:, :, np.newaxis]
        
#         else:
#             for i in range(X.shape[0]):
#                 for j in range(X.shape[2]):
#                     noise_arr = noise_model(cutoff, noise_const)
#                     X_noise = X[i, :, j] + noise_arr
#                     if(apply_filter): # whether to apply the band pass filter
#                         X_noise = bandpass_filter(X_noise)
#                     if(apply_fft):
#                         X_noise = fft_func(X_noise)
#                         if len(X_noise) < cutoff:
#                             X_noise = np.pad(X_noise, (0, cutoff - len(X_noise)), mode='constant')
#                     if(norm and not(apply_fft)): # whether to normalize by std
#                         std_X = np.std(X_noise)
#                     else:
#                         std_X = 1
#                     X[i, :, j] = X_noise/std_X

#         if norm:
#             mu_y = np.mean(y, axis=0)
#             stds_y = np.std(y, axis=0)
#             y = (y-mu_y)/stds_y
            
#         self.mu = mu_y
#         self.stds = stds_y
#         self.timeseries = np.float32(X)
#         self.vars = np.float32(y)
#         self.obs = np.float32(obs)
        
#     def __len__(self):
#         return self.vars.shape[0]
    
#     def __getitem__(self, idx):
#         times = self.timeseries[idx, :, :]
#         var = self.vars[idx]
#         obs = self.obs[idx, :]
#         return times, var, obs
    
#     def __outdim__(self):
#         return self.vars.shape[1]
    
#     def __indim__(self):
#         return self.timeseries.shape[1]

In [2]:
inputs = ['output_ts_I', "output_ts_Q"]
variables = ['energy_eV', 'pitch_angle_deg']
observables = ['avg_axial_frequency_Hz', 'avg_carrier_frequency_Hz', 'radius_m']

data = Project8Sim(inputs, variables, observables, path='/gpfs/gibbs/pi/heeger/hb637/ssm_files_pi_heeger/combined_data_fullsim.hdf5', cutoff=8000, norm=True, noise_const=0,
            apply_filter = False, apply_fft=True, complex_channels=False)

In [4]:
inputs = ['output_ts_I', 'output_ts_Q']
variables = ['energy_eV', 'pitch_angle_deg']
observables = ['avg_axial_frequency_Hz', 'avg_carrier_frequency_Hz', 'radius_m']

dataset = Project8Sim(inputs=inputs, variables=variables, observables=observables, path='/gpfs/gibbs/pi/heeger/hb637/ssm_files_pi_heeger/combined_data_fullsim.hdf5',
        cutoff=4000, norm=True, apply_fft=True, ts_and_fft=True, complex_channels=True)
    
sample_index = 18900

timeseries_tensor, var_tensor, obs_tensor = dataset[sample_index]

amplitudes = timeseries_tensor[:, 3].numpy()

cutoff = dataset.cutoff
times = np.linspace(0, 1, cutoff) 

plt.figure(figsize=(10, 5))
plt.plot(times, amplitudes)
plt.title(f'Time Series Amplitudes (Channel 0) - Sample Index {sample_index}')
plt.xlabel('Time (Arbitrary Units)')
plt.ylabel('Normalized Amplitude')
plt.grid(True, linestyle='--', alpha=0.6)
plt.show()

NameError: name 'plt' is not defined