In [1]:
import numpy as np
import pandas as pd
from scipy import interpolate
from scipy.optimize import brentq
from scipy.interpolate import CubicSpline
from scipy.signal import find_peaks, peak_widths

### Specifications of the resulting file.
Change sensor number:

In [38]:
save = True             # true to save file, false to just perform calculations
fpath = r"../data"      # file location to save results
sensor = 1
sensor_nr = f'{sensor}'         # sensor number for file name
addendum = ''           # to be appended at the end of the file name

### Specifications of the raw sensor data

In [39]:
path = r"../data"       # file location of raw data

file = f"/Sensor{sensor}_raw.csv"
df1 = pd.read_csv(path+file, delimiter=",")

df_all = pd.concat([df1])   #concatenation to accomodate multiple input files
df_all = df_all.reset_index(drop=True)

gain = 512
int_time = 1057

### Optional plot parameters.

In [40]:
plot_results = False

if plot_results: 
    import matplotlib.pyplot as plt
    
    # Colours for plots
    f1_colour = '#7600ed'       # λ = 415nm
    f2_colour = '#0028ff'       # λ = 445nm
    f3_colour = '#00d5ff'       # λ = 480nm
    f4_colour = '#1fff00'       # λ = 515nm
    f5_colour = '#b3ff00'       # λ = 555nm
    f6_colour = '#ffdf00'       # λ = 590nm
    f7_colour = '#ff4f00'       # λ = 630nm
    f8_colour = '#df0000'       # λ = 680nm
    nir_colour = '#555555'      # grey

    # Measurement Markers {'none' none | 'o' circle | '^' triangle | 's' square |'X' X | '+' plus}
    data_mark = 'o'

    # Marker Size
    mark_sz = 3

    # Font Style
    plt.rcParams["font.family"] = "serif"
    plt.rcParams["font.serif"] = "Times New Roman"

### Convert raw sensor counts to unitless basic values.

Calculated as per:
$$basic_k = \frac{raw_k}{(Gain \times Integration Time)}$$

In [41]:
columns_to_modify = ['f1', 'f2', 'f3', 'f4', 'f5', 'f6', 'f7', 'f8', 'clear', 'nir']
df_all[columns_to_modify] = df_all[columns_to_modify].div(gain * int_time)

### Calculations of irradiance on AS7341, considering wavelength dependency of the beam splitter.

Irradiance on the power meter:
$$I^{E}_{PM} = \frac{P^{E}_{PM}}{A_{PM}}$$

Correction for beam splitter non-ideality:
$$I^{E}_{AS7341} = I^{E}_{PM}\cdot{}\kappa_{BS}(\lambda{})$$

In [42]:
# Correct for the effects of the beam splitter
bs = pd.read_csv(r"BeamSplitterData.csv")
bs['ratio'] = bs['trans_adj_bs']/bs['refl_adj_bs']  # ratio of transmitted (DUT) to reflected (PM)
A_pm = np.pi*((2.5E-3)**2)      # Area of 5mm aperture on TLPM
planck = (6.626e-34)            # Planck's constant (J⋅s)
c_light = (3.0e8)               # Speed of light (m/s)
avogadro = 6.02214076e23        # Avogadro's constant (mol⁻¹)

# apply corrections for BS and calculate irradiance
df_all = pd.merge(df_all, bs[['wavelength', 'ratio']], on='wavelength', how='left')
P_pm = df_all['power']                          # Power (W)

I_dut = P_pm * df_all['ratio'] / A_pm           # W/m²

df_all['I_dut'] = I_dut
df_all['P_dut'] = P_pm * df_all['ratio']

In [43]:
df_all = df_all[df_all["clear"] > 0]    # remove measurements which may have failed, with a clear channel reading of 0
df_old = df_all.copy()                  # copy of dataframe to visualize effects of preprocessing

### Delineate between iterations of the measurement
To ensure data from seperate measurement cycles is not grouped together

In [44]:
df_all['cycle'] = 0

# Find the indices where 'wavelength' is 390, as this was the starting wavelength for each cycle
start_indices = df_all.index[df_all['wavelength'] == 390].tolist()

# Assign cycle numbers (to make a unique data set for each measurement)
for cycle_number in range(len(start_indices) - 1):
    start_idx = start_indices[cycle_number]
    end_idx = start_indices[cycle_number + 1]
    df_all.loc[start_idx:end_idx - 1, 'cycle'] = cycle_number + 1

# for the last cycle, if needed
if len(start_indices) > 1:
    df_all.loc[start_indices[-1]:, 'cycle'] = len(start_indices)

print(str(df_all['cycle'].max()))      # to see how many cycles are in this data. Total new rows of data will be 8* this value (for the number of bins)


5


### Normalization of data
Given the inherent properties of the QTH lamp, the irradiance $I^\text{E}_{\text{DUT}}(\lambda)$ does not exhibit uniform distribution across the spectral range. To determine the relative spectral response $s_{i, \text{rel}}(\lambda)$, it is necessary to normalize the raw data from each channel of the spectral sensor $s_{i, \text{raw}}(\lambda)$ by $I^\text{E}_{\text{DUT}}(\lambda)$:
\begin{equation}
    s_{i, \text{rel}}(\lambda) = \frac{s_{i, \text{raw}}(\lambda)}{I^\text{E}_{\text{DUT}}(\lambda)}
\end{equation}
To compute the absolute spectral response across each of the 10 channels, the relative spectral responses for all channels $i$ are compared to channel 8, which shows the peak response at $\lambda_p$:
\begin{equation}
    s_{i}(\lambda) = s_{8, \text{abs}}(\lambda_p) \cdot \frac{s_{i, \text{rel}}(\lambda)}{s_{8, \text{rel}}(\lambda_p)}
\end{equation}

In [45]:
# save a copy of the dataframe before correction to compare
df_old2 = df_all.copy()
df_all = []
df_all = pd.DataFrame()

columns_to_scale = [ 'f1', 'f2', 'f3', 'f4', 'f5', 'f6', 'f7', 'f8', 'nir', 'clear', 'I_dut']

for cycle_number in df_old2['cycle'].unique():
    # Filter rows for the current cycle
    cycle_data = df_old2[df_old2['cycle'] == cycle_number].copy()
    cycle_data.reset_index(inplace=True)

    # normalize to incident light 
    df_normalize = cycle_data.copy()
    df_normalize["f1"] = cycle_data["f1"]/cycle_data["I_dut"]
    df_normalize["f2"] = cycle_data["f2"]/cycle_data["I_dut"]
    df_normalize["f3"] = cycle_data["f3"]/cycle_data["I_dut"]
    df_normalize["f4"] = cycle_data["f4"]/cycle_data["I_dut"]
    df_normalize["f5"] = cycle_data["f5"]/cycle_data["I_dut"]
    df_normalize["f6"] = cycle_data["f6"]/cycle_data["I_dut"]
    df_normalize["f7"] = cycle_data["f7"]/cycle_data["I_dut"]
    df_normalize["f8"] = cycle_data["f8"]/cycle_data["I_dut"]
    df_normalize["clear"] = cycle_data["clear"]/cycle_data["I_dut"]
    df_normalize["nir"] = cycle_data["nir"]/cycle_data["I_dut"]
    df_normalize["I_dut"] = cycle_data["I_dut"]/cycle_data["I_dut"]

    # s_rel(l) / s_rel(l0)
    df_relative = df_normalize.copy()
    df_relative["f1"] = df_normalize["f1"]/df_normalize["f8"].max()
    df_relative["f2"] = df_normalize["f2"]/df_normalize["f8"].max()
    df_relative["f3"] = df_normalize["f3"]/df_normalize["f8"].max()
    df_relative["f4"] = df_normalize["f4"]/df_normalize["f8"].max()
    df_relative["f5"] = df_normalize["f5"]/df_normalize["f8"].max()
    df_relative["f6"] = df_normalize["f6"]/df_normalize["f8"].max()
    df_relative["f7"] = df_normalize["f7"]/df_normalize["f8"].max()
    df_relative["f8"] = df_normalize["f8"]/df_normalize["f8"].max()
    df_relative["clear"] = df_normalize["clear"]/df_normalize["f8"].max()
    df_relative["nir"] = df_normalize["nir"]/df_normalize["f8"].max()
    df_relative["I_dut"] = df_normalize["I_dut"]

    # absolute spectral response 
    # s_abs(l0) * s_rel(l) / s_rel(l0)
    df_absolute = df_relative.copy()
    df_absolute["f1"] = df_relative["f1"]*cycle_data["f8"].max()
    df_absolute["f2"] = df_relative["f2"]*cycle_data["f8"].max()
    df_absolute["f3"] = df_relative["f3"]*cycle_data["f8"].max()
    df_absolute["f4"] = df_relative["f4"]*cycle_data["f8"].max()
    df_absolute["f5"] = df_relative["f5"]*cycle_data["f8"].max()
    df_absolute["f6"] = df_relative["f6"]*cycle_data["f8"].max()
    df_absolute["f7"] = df_relative["f7"]*cycle_data["f8"].max()
    df_absolute["f8"] = df_relative["f8"]*cycle_data["f8"].max()
    df_absolute["clear"] = df_relative["clear"]*cycle_data["f8"].max()
    df_absolute["nir"] = df_relative["nir"]*cycle_data["f8"].max()
    df_absolute["I_dut"] = df_relative["I_dut"] * cycle_data["I_dut"].iloc[cycle_data["f8"].idxmax()]

    # store scaled data in df_all
    df_all = pd.concat([df_all, df_absolute], ignore_index=True)


df_all["PAR"] = (df_all['I_dut'] * df_all['wavelength']*1E-9).astype(float) / (planck * c_light) * ((1e6)/(avogadro))
df_old2["PAR"] = (df_old2['I_dut'] * df_old2['wavelength']*1E-9).astype(float) / (planck * c_light) * ((1e6)/(avogadro))

### Set calculated PAR to zero for $\lambda$<400 nm and $\lambda$>700 nm

In [46]:
df_all.loc[(df_all['wavelength'] < 400) | (df_all['wavelength'] > 700), 'PAR'] = 0

In [47]:
df_old2A = df_old2[df_old2['cycle'] == 1].copy()
df_allA  = df_all[df_all['cycle'] == 1].copy()

if plot_results:
    fig, axes = plt.subplots(1, 2, figsize=(12, 5))  # 1 row, 2 columns

    axes[0].scatter(df_old2A["wavelength"], df_old2A["f1"], label='F1', color=f1_colour, s=mark_sz, marker=data_mark)
    axes[0].scatter(df_old2A["wavelength"], df_old2A["f2"], label='F2', color=f2_colour, s=mark_sz, marker=data_mark)
    axes[0].scatter(df_old2A["wavelength"], df_old2A["f3"], label='F3', color=f3_colour, s=mark_sz, marker=data_mark)
    axes[0].scatter(df_old2A["wavelength"], df_old2A["f4"], label='F4', color=f4_colour, s=mark_sz, marker=data_mark)
    axes[0].scatter(df_old2A["wavelength"], df_old2A["f5"], label='F5', color=f5_colour, s=mark_sz, marker=data_mark)
    axes[0].scatter(df_old2A["wavelength"], df_old2A["f6"], label='F6', color=f6_colour, s=mark_sz, marker=data_mark)
    axes[0].scatter(df_old2A["wavelength"], df_old2A["f7"], label='F7', color=f7_colour, s=mark_sz, marker=data_mark)
    axes[0].scatter(df_old2A["wavelength"], df_old2A["f8"], label='F8', color=f8_colour, s=mark_sz, marker=data_mark)
    axes[0].scatter(df_old2A["wavelength"], df_old2A["clear"], label='Clear', color='#b4c9ff', s=mark_sz, marker=data_mark)
    axes[0].scatter(df_old2A["wavelength"], df_old2A["nir"], label='NIR', color='#670505', s=mark_sz, marker=data_mark)
    axes[0].scatter(df_old2A["wavelength"], df_old2A["PAR"]/10, label='PAR', color='#8A9A5B', s=mark_sz, marker=data_mark)

    axes[0].set_xlabel('Wavelength (nm)')
    axes[0].set_ylabel('Basic Count')

    axes[0].set_title('Channel Response of AS7341')

    axes[1].scatter(df_allA["wavelength"], df_allA["f1"], label='F1', color=f1_colour, s=mark_sz, marker=data_mark)
    axes[1].scatter(df_allA["wavelength"], df_allA["f2"], label='F2', color=f2_colour, s=mark_sz, marker=data_mark)
    axes[1].scatter(df_allA["wavelength"], df_allA["f3"], label='F3', color=f3_colour, s=mark_sz, marker=data_mark)
    axes[1].scatter(df_allA["wavelength"], df_allA["f4"], label='F4', color=f4_colour, s=mark_sz, marker=data_mark)
    axes[1].scatter(df_allA["wavelength"], df_allA["f5"], label='F5', color=f5_colour, s=mark_sz, marker=data_mark)
    axes[1].scatter(df_allA["wavelength"], df_allA["f6"], label='F6', color=f6_colour, s=mark_sz, marker=data_mark)
    axes[1].scatter(df_allA["wavelength"], df_allA["f7"], label='F7', color=f7_colour, s=mark_sz, marker=data_mark)
    axes[1].scatter(df_allA["wavelength"], df_allA["f8"], label='F8', color=f8_colour, s=mark_sz, marker=data_mark)
    axes[1].scatter(df_allA["wavelength"], df_allA["clear"], label='Clear', color='#b4c9ff', s=mark_sz, marker=data_mark)
    axes[1].scatter(df_allA["wavelength"], df_allA["nir"], label='NIR', color='#670505', s=mark_sz, marker=data_mark)
    axes[1].scatter(df_allA["wavelength"], df_allA["PAR"]/10, label='PAR', color='#8A9A5B', s=mark_sz, marker=data_mark)

    axes[1].set_xlabel('Wavelength (nm)')
    axes[1].set_ylabel('Basic Count')

    axes[1].set_title('Channel Response of AS7341 (normalized to power)')

    plt.tight_layout()
    plt.show()

In [48]:
df_norm_mean = df_all.groupby("wavelength").mean(numeric_only=True).reset_index()

In [49]:
if save:
    # save the resulting data to a new dataframe and file
    file_name_normmean = (f'/Sensor{sensor}_preprocessed_normed+mean'+str(addendum)+'.csv')
    df_norm_mean.to_csv(fpath+file_name_normmean, index=False)
    file_name_norm = (f'/Sensor{sensor}_preprocessed_normed'+str(addendum)+'.csv')
    df_all.to_csv(fpath+file_name_norm, index=False)