# Generation of Synthetic Data for Examples and Exercises

## Define Libraries

In [1]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.special import wofz
from tqdm import tqdm
import synth_dataset as sd

## Generate Synthetic Data

In [2]:

def create_synth_signal(x_wave, peaks_range=[4,20], method=None, edge_tol=.05, c=.015, ph_min=0.1, ph_max=1, itx_min=0.05, itx_max=0.5, x_scale_min=0.01, x_scale_max=0.2, center_min=0, center_max=1000, bg_center_min=0, bg_center_max=0.5, bg_width_min=0.2, bg_width_max=2.0, bg_a_min=150, bg_a_max=1000, snr_min=10, snr_max=30, bg_type=None, plot=False, debug=False):
    if debug:
      voigt, voigt_params = sd.random_peaks(x_wave, peaks_range=peaks_range, method=method, edge_tol=edge_tol, c=c, ph_min=ph_min, ph_max=ph_max, debug=True)
      center_min = min(voigt_params['peaks']) * center_max * 1.1
      arctan, arctan_params = sd.random_arctan_curve(len(x_wave), itx_min=itx_min, itx_max=itx_max, x_scale_min=x_scale_min, x_scale_max=x_scale_max, center_min=center_min, center_max=center_max, debug=True)
      if bg_type is None:
        bg_type = np.random.randint(1,4)
      if bg_type == 0:
        bg, bg_params = sd.gaussian_bg(len(x_wave), center_min=bg_center_min, center_max=bg_center_max, width_min=bg_width_min, width_max=bg_width_max, debug=True)
      if bg_type == 1:
        bg, bg_params = sd.exponential_bg(len(x_wave), a_min=bg_a_min, a_max=bg_a_max, debug=True)
      if bg_type == 2:
        bg, bg_params = sd.real_bg(len(x_wave), debug=True)
      if bg_type == 3:
        bg, bg_params = np.zeros(len(x_wave)), {}
                                 
    else:
      voigt = sd.random_peaks(x_wave, peaks_range=peaks_range,edge_tol=edge_tol, c=c, ph_min=ph_min, ph_max=ph_max)
      arctan = sd.random_arctan_curve(len(x_wave), itx_min=itx_min, itx_max=itx_max, x_scale_min=x_scale_min, x_scale_max=x_scale_max, center_min=center_min, center_max=center_max)
      if bg_type is None:
        bg_type = np.random.randint(1,4)
      if bg_type == 0:
        bg = sd.gaussian_bg(len(x_wave), center_min=bg_center_min, center_max=bg_center_max, width_min=bg_width_min, width_max=bg_width_max)
      if bg_type == 1:
        bg = sd.exponential_bg(len(x_wave), a_min=bg_a_min, a_max=bg_a_max)
      if bg_type == 2:
        bg = sd.real_bg(len(x_wave))
      if bg_type == 3:
        bg = np.zeros(len(x_wave))

    if np.random.choice([True, False]):
        bg = np.flip(bg)
        
    snr = np.random.randint(snr_min, snr_max)
    v, a, b, va, vb, all = sd.combine_all(voigt, arctan, bg)
    noisy_all = sd.add_noise(snr,all)
    x_range = [i for i in range(len(x_wave))]

    if plot:
      plt.figure(figsize=(12, 6))
      plt.plot(x_range, v, label="pure signal")
      plt.plot(x_range, a, label="phase jump", linestyle='--')
      plt.plot(x_range, b, label="background", linestyle='-.')
      plt.plot(x_range, all, label="raw signal", linestyle='-.')
      plt.title("Signal Decomposition")
      plt.xlabel("X-axis")
      plt.ylabel("Intensity")
      plt.legend()
      plt.show()

      plt.figure(figsize=(12, 6))
      plt.plot(x_range, noisy_all, label=f"raw signal - snr {snr}")
      plt.plot(x_range, all, label="clean signal", linestyle='-.')
      plt.title("Clean and Noisy Signal")
      plt.xlabel("X-axis")
      plt.ylabel("Intensity")
      plt.legend()
      plt.show()

    if debug:
      return {'voigt':v,'arctan':a,'background':b,'voigt_arctan':va, 'voigt_background': vb, 'all':all,'noisy_all':noisy_all}, {'voigt': voigt_params, 'arctan': arctan_params, 'bg_type': bg_type, 'bg': bg_params, 'snr':snr}


### Utilities

In [28]:
def add_signal_to_df(n_row, x, params, signal_df, peaks_df):
    """
    Adds a new signal (analysis and corresponding peaks) to the provided dataframes.

    :param n_row: The unique identifier (UID) for the signal.
    :param params: Dictionary containing the signal and peak parameters.
    :param signal_df: DataFrame to which the analysis data will be added.
    :param peaks_df: DataFrame to which the peaks data will be added.
    :return: Updated signal_df and peaks_df with the new information.
    """
    # Extracting analysis-related data from params
    new_signal_data = pd.DataFrame([{
        'row': n_row,
        'wave_file': x,
        'n_peaks': params['voigt']['n_peaks'],
        'arctan_x_scale': params['arctan']['x_scale'],
        'arctan_intensity': params['arctan']['intensity'],
        'arctan_center': params['arctan']['center'],
        'bg_type': params['bg_type'],
        'bg_center':params['bg'].get('center',-1),
        'bg_width': params['bg'].get('width',-1),
        'bg_a': params['bg'].get('a',-1),
        'bg_file': params['bg'].get('file',-1),
        'snr': params['snr']
    }])
    # Adding to signal_df
    if not signal_df.empty:
        signal_df = pd.concat([signal_df, new_signal_data],ignore_index=True)
    else:
        signal_df = new_signal_data

    # Extracting peaks-related data from params
    peaks_data = {
        'peak_num': range(1, len(params['voigt']['peaks']) + 1),
        'row': [n_row] * len(params['voigt']['peaks']),
        'peak_position': params['voigt']['peaks'],
        'gaussian_width': params['voigt']['gaussian_width'],
        'lorentz_width': params['voigt']['lorentz_width'],
        'peak_height': params['voigt']['peak_height'],
        'method': params['voigt']['method']
    }
    new_peaks_df = pd.DataFrame(peaks_data)
    # Adding to peaks_df
    if not peaks_df.empty:
        peaks_df = pd.concat([peaks_df, new_peaks_df], ignore_index=True)
    else:
        peaks_df = new_peaks_df

    return signal_df, peaks_df

In [6]:
def convert_single_signal(data):
    """
    Convert a single signal dictionary into a numpy array of shape (1, 1000, 7).

    :param data: Dictionary with keys 'voigt', 'arctan', 'background','voigt_arctan', 'voigt_background', 'all'.
    :return: NumPy array of shape (1, 1000, 7).
    """
    # Extracting each array and stacking them along a new axis
    signal_arrays = [data[key] for key in ['voigt', 'arctan', 'background','voigt_arctan', 'voigt_background', 'all','noisy_all']]
    combined_array = np.stack(signal_arrays, axis=-1)

    # Adding an extra dimension to make it (1, 1000, 7)
    return np.expand_dims(combined_array, axis=0)

def add_signal_to_array(existing_array, new_data):
    """
    Add a new signal to the existing array.

    :param existing_array: Current array of shape (n_data, 1000, 7).
    :param new_data: New signal data to be added, in the same format as expected by convert_single_signal.
    :return: Updated array with the new signal added.
    """
    new_signal_array = convert_single_signal(new_data)

    # If the existing array is empty, initialize it with the new signal
    if existing_array.size == 0:
        return new_signal_array
    else:
        # Concatenate the new signal to the existing array
        return np.concatenate((existing_array, new_signal_array), axis=0)


def save_signals_to_files(signal_array, file_prefix):
    """
    Save each dimension of the signal array as separate .npy files.

    :param signal_array: The signal array of shape (n_data, 1000, 7).
    :param file_prefix: The prefix for the file names.
    """
    for i in range(signal_array.shape[-1]):
        file_name = f"{file_prefix}_dim_{i}.npy"
        np.save(file_name, signal_array[:, :, i])
        print(f"Saved: {file_name}")

## Example Plots

In [None]:
for i in range(3):
    x_wave_length = np.random.randint(150, 1025)
    x_wave = np.linspace(0, 0.5, x_wave_length)
    padding_length = 1024 - x_wave_length
    signal,params = create_synth_signal(x_wave,
                                      peaks_range=[1,2],
                                      method=0,
                                      # peaks
                                      c=.015, 
                                      ph_min=.1, ph_max=1.,
                                      # arctan
                                      x_scale_min=0.05, x_scale_max=1., 
                                      itx_min=0.0, itx_max=0., 
                                      center_min=0, center_max=len(x_wave)*.7, 
                                      # background
                                      bg_width_min=1.5, bg_width_max=3., 
                                      bg_a_min=.15, bg_a_max=1.,
                                      bg_type=1,
                                      # noise
                                      snr_min=30, snr_max=40,
                                      debug=True)
    plt.figure(figsize=(12, 6))
    plt.plot(signal['voigt'], label="pure signal")
    plt.plot(signal['arctan'], label="phase jump", linestyle='--')
    plt.plot(signal['background'], label="background", linestyle='-.')
    plt.plot(signal['noisy_all'], label="raw signal", linestyle='-.')
    plt.title(f"Signal Decomposition {i}")
    plt.xlabel("X-axis")
    plt.ylabel("Intensity")
    plt.legend()
    plt.show()

## Synthetic Dataset Production

In [None]:
np.random.seed(42)

n_files = 1
n_spectra = 1000

for j in tqdm(range(n_files)):
    data = np.empty((0, 341, 6))
    signal_df = pd.DataFrame()
    peaks_df = pd.DataFrame()
    
    for i in range(n_spectra):
      x_wave = np.linspace(0, 1, 341)
      signal,params = create_synth_signal(x_wave,
                                          peaks_range=[1,2],
                                          method=0,
                                          # peaks
                                          c=.015, 
                                          ph_min=.1, ph_max=1.,
                                          # arctan
                                          x_scale_min=0.05, x_scale_max=1., 
                                          itx_min=0.0, itx_max=0., 
                                          center_min=0, center_max=len(x_wave)*.7, 
                                          # background
                                          bg_width_min=1.5, bg_width_max=3., 
                                          bg_a_min=.15, bg_a_max=1.,
                                          bg_type=1,
                                          # noise
                                          snr_min=30, snr_max=40,
                                          debug=True)
      data = add_signal_to_array(data, signal)
      signal_df, peaks_df = add_signal_to_df(i,0, params, signal_df, peaks_df)

    os.makedirs('../data/', exist_ok=True)
    np.save(f'../data/data_{j}.npy', data)
    signal_df.to_csv(f'../data/signal_{j}.csv', index=False)
    peaks_df.to_csv(f'../data/peaks_{j}.csv', index=False)

### Utilities

In [None]:
d = np.load('../data/data_0.npy')
d.shape

In [None]:
v, a, b, va, vb, all, noisy_all = np.split(d, 7, axis=-1)
v, a, b, va, vb, all, noisy_all = [np.squeeze(x, axis=-1) for x in [v, a, b, va, vb, all, noisy_all]]
i = np.random.randint(0,1000)
plt.figure(figsize=(12, 6))
plt.plot(v[i,:], label="pure signal")
plt.plot(a[i,:], label="phase jump", linestyle='--')
plt.plot(b[i,:], label="background", linestyle='-.')
plt.plot(all[i,:], label="raw signal", linestyle='-.')
plt.title("Signal Decomposition")
plt.xlabel("X-axis")
plt.ylabel("Intensity")
plt.legend()
plt.show()

In [None]:
directory = '../data/'

# Initialize an empty list to store DataFrames
dataframes = []

# Loop through the range of file indices
for i in range(300):
    file_path = os.path.join(directory, f'peaks_{i}.csv')
    if os.path.exists(file_path):
        # Read the CSV file and append to the list
        df = pd.read_csv(file_path)
        dataframes.append(df)
len(dataframes)

In [55]:
merged_df = pd.concat(dataframes, ignore_index=True)
merged_df.to_csv(f'{directory}peaks_gaussian.csv', index=False)

In [56]:
arrays = []
for i in range(n_files):
    file_path = os.path.join(directory, f'data_{i}.npy')
    if os.path.exists(file_path):
        # Load the .npy file and append to the list
        array = np.load(file_path)
        arrays.append(array)

# Concatenate all arrays
merged_array = np.concatenate(arrays, axis=0)

# Ensure the final shape is as expected
assert merged_array.shape == (n_files*n_spectra, 341, 7)

# Save the merged array to a new .npy file
np.save(f'{directory}data_gaussian.npy', merged_array)