# Mock traces example

In [None]:
import os
import sys
from radiocalibrationtoolkit import *

from astropy.time import Time
from scipy.fft import rfft, rfftfreq
from scipy.stats import lognorm, gumbel_r, expon
from matplotlib.ticker import (MultipleLocator, FormatStrFormatter, AutoMinorLocator)


In [None]:
piko = 1e-12

In [None]:
# some global plot settings
plt.rcParams["axes.labelweight"] = "bold"
plt.rcParams["font.weight"] = "bold"
plt.rcParams["font.size"] = 16
plt.rcParams["legend.fontsize"] = 12

plt.rcParams["xtick.major.width"] = 2
plt.rcParams["ytick.major.width"] = 2

plt.rcParams["xtick.major.size"] = 5
plt.rcParams["ytick.major.size"] = 5

plt.rcParams["xtick.labelsize"] = 14
plt.rcParams["ytick.labelsize"] = 14

In [None]:

def fit_stacked_histogram_plot(
    ax,
    data,
    distribution=lognorm,
    quantity_label="trace STD [ADC]",
    xlim=(0, 5.5),
    bins=np.arange(0, 20, 0.1),
    label='',
    cdf_threshold=1-1e-3,
    save=False
):
    # fig, ax = plt.subplots()
    hist, bins, patches = ax.hist(data, bins=bins, density=True, label=label, alpha=0.5)
    bar_color = patches[0].get_facecolor()
    
    params = distribution.fit(data)
    y, x = np.histogram(data, bins=bins, density=True)
    x = (x + np.roll(x, -1))[:-1] / 2.0
    # Separate parts of parameters
    arg = params[:-2]
    loc = params[-2]
    scale = params[-1]

    # Calculate fitted PDF and error with fit in distribution
    pdf = distribution.pdf(x, loc=loc, scale=scale, *arg)
    sse = np.sum(np.power(y - pdf, 2.0))

    arg = params[:-2]
    loc = params[-2]
    scale = params[-1]

    # Get sane start and end points of distribution
    start = (
        distribution.ppf(0.0001, *arg, loc=loc, scale=scale)
        # if arg
        # else distribution.ppf(0.01, loc=loc, scale=scale)
    )
    end = (
        distribution.ppf(0.9999, *arg, loc=loc, scale=scale)
        # if arg
        # else distribution.ppf(0.99, loc=loc, scale=scale)
    )

    # Build PDF and turn into pandas Series
    x = np.linspace(start, end, 1000)
    y = distribution.pdf(x, loc=loc, scale=scale, *arg)
    ax.plot(x, y, color='grey')

    ax.set_xlim(xlim)
    ax.set_xlabel(quantity_label)
    ax.set_ylabel("probability density")

    # print the threshold
    threshold = distribution.ppf(cdf_threshold, *arg, loc=loc, scale=scale)
    ax.axes.axvline(threshold, ls="--", color=bar_color)
    print(threshold)
    fig.subplots_adjust(bottom=0.15,left=0.15)

    return threshold

In [None]:
# read HW response
hw_file_path = "./antenna_setup_files/HardwareProfileList_realistic.xml"
hw_dict = read_hw_file(hw_file_path, interp_args={"fill_value": "extrapolate"})

hw_reponse_1 = hw_dict["RResponse"]["LNA"]
hw_reponse_2 = hw_dict["RResponse"]["digitizer"]
hw_reponse_3 = hw_dict["RResponse"]["cable_fromLNA2digitizer"]
hw_reponse_4 = hw_dict["RResponse"]["impedance_matching_EW"]


# merge all hw responses to one function
def hw_response_func(x):
    return dB2PowerAmp(
        hw_reponse_1(x) + hw_reponse_2(x) + hw_reponse_3(x) + hw_reponse_4(x)
    )


# impedance function
impedance_func = hw_dict["IImpedance"][
    "antenna_EW"
]

# read sidereal voltage square spectral density
sidereal_voltage2_density_DF = pd.read_csv(
    "./voltage2_density/voltage2_density_Salla_EW_GSM16.csv",
    index_col=0,
)
sidereal_voltage2_density_DF.columns = sidereal_voltage2_density_DF.columns.astype(
    float
)

In [None]:
fig, ax = plt.subplots()
frequencies = np.linspace(0,125,126)
ax.plot(frequencies, hw_reponse_1(frequencies), label='LNA')
ax.plot(frequencies, hw_reponse_2(frequencies), label='digitizer')
ax.plot(frequencies, hw_reponse_3(frequencies), label='cable')
ax.plot(frequencies, hw_reponse_4(frequencies), label='imp. match')
ax.set_xlabel('frequency [MHz]')
ax.set_ylabel('dB')
ax.legend()

In [None]:
fig, ax = plt.subplots()
frequencies = np.linspace(0,125,126)
ax.plot(frequencies, impedance_func(frequencies), label='impedance')
ax.set_xlabel('frequency [MHz]')
ax.set_ylabel('$\Omega$')
ax.legend()

The power obtained from the spectra calculated by applying the Discrete Fourier Transform (DFT) to time traces is calculated as follows:

 \begin{equation}
P_s = 2\frac{1}{T} \sum_{k=f}^{f+\delta f} \frac{|X(k)|^2}{R(f)} \Delta f
\end{equation}

where $\Delta t$ is the sampling time, $T$ the time trace length, $N$ the number of samples, $f_s$ sampling frequency and $R$ the impedance,

where

 \begin{equation}
X(k) = X(k)_{DFT} \Delta t = \frac{X(k)_{DFT}}{fs}
\end{equation}

where $X(k)_{DFT}$ is "one-sided" absolute spectrum obtained by applying discrete Fourier transform to the time trace. 

On the other hand, the power deposited to antenna by the galactic radio emission is calculated as:

 \begin{equation}
P_{sky}(t,f) = \frac{k_{b}}{c^{2}} \sum_{f} f^{2} \int_{\Omega} T_{sky}(t,f,\theta,\phi) \frac{|H(f,\theta,\phi)|^{2}Z_{0}}{R(f)} \Delta f d\Omega  
\end{equation} 

where $Z_0$ is the vacuum impedance, $R_r$ is the antenna impedance, $k_b$ the Boltzmann constant and $c$ is the speed of light.

Denoting power spectral density as 

 \begin{equation}
S_{P}(t,f) = \frac{k_{b}}{c^{2}} f^{2} \int_{\Omega} T_{sky}(t,f,\theta,\phi) \frac{|H(f,\theta,\phi)|^{2}Z_{0}}{R(f)} d\Omega  
\end{equation} 

we can rewrite the equation as

 \begin{equation}
P_{sky}(t,f) = \sum_{f} S_{P}(t,f)  \Delta f
\end{equation} 

Further, the voltage square density  $S_{\nu}(t,f)$ we defined as  

 \begin{equation}
S_{\nu}(t,f) = \frac{S_{P}(t,f)}{R(f)}
\end{equation} 

or as:

 \begin{equation}
S_{\nu}(t,f) = U_{\nu}^2 
\end{equation} 

Equating the two definitions of power yields:

 \begin{equation}
2\frac{1}{T} \sum_{k=f}^{f+\delta f} \frac{|X(k)|^2}{R(f)} \Delta f = 
\frac{k_{b}}{c^{2}} \sum_{f} f^{2} \int_{\Omega} T_{sky}(t,f,\theta,\phi) \frac{|H(f,\theta,\phi)|^{2}Z_{0}}{R_{r}} \Delta f d\Omega  
\end{equation}
 

 \begin{equation}
2\frac{1}{T} \frac{|X(k)|^2}{R(f)}  = 
\frac{k_{b}}{c^{2}} f^{2} \int_{\Omega} T_{sky}(t,f,\theta,\phi) \frac{|H(f,\theta,\phi)|^{2}Z_{0}}{R(f)}  d\Omega  
\end{equation}

 \begin{equation}
2\frac{1}{T} |X(k)|^2 = 
\frac{k_{b}}{c^{2}} f^{2} Z_{0}  \int_{\Omega} T_{sky}(t,f,\theta,\phi) |H(f,\theta,\phi)|^{2} d\Omega  
\end{equation}

 \begin{equation}
2\frac{1}{T} |X(k)|^2 = S_{\nu}(t,f)  
\end{equation}

 \begin{equation}
|X(k)|^2 = \frac{1}{2} T  S_{\nu}(t,f)
\end{equation}

 \begin{equation}
|X(k)_{DFT}|^2 \Delta t^2 = 
\frac{1}{2} T  S_{\nu}(t,f)
\end{equation}

 \begin{equation}
|X(k)_{DFT}|^2  = 
\frac{T}{\Delta t^2} \frac{1}{2}  S_{\nu}(t,f)  
\end{equation}

 \begin{equation}
|X(k)_{DFT}|^2  = 
\frac{N f_{s}^2}{ f_s} \frac{1}{2}   S_{\nu}(t,f) 
\end{equation}

 \begin{equation}
|X(k)_{DFT}|^2  =  \frac{N f_{s}}{2}   S_{\nu}(t,f)  
\end{equation}

 \begin{equation}
|X(k)_{DFT}|  = \sqrt{\frac{N f_{s} }{2}}   \sqrt{S_{\nu}(t,f)}  
\end{equation}

A small reminder of the system parameter relations:
 
 \begin{equation}
\Delta f = \frac{fs}{N} = \frac{1}{N\Delta t} = \frac{1}{T} 
\end{equation}

 \begin{equation}
fs = \frac{1}{\Delta t}
\end{equation}


Thermal noise:

 \begin{equation}
v^2 = 4 k_B T R
\end{equation}

where $k_B$ is the Boltzmann constant, $T$ is the temperature and $R$ is the impedance.

source: https://en.wikipedia.org/wiki/Johnson%E2%80%93Nyquist_noise

In [None]:
mock_trace_generator = Mock_trace_generator(
    sidereal_voltage2_density_DF=sidereal_voltage2_density_DF,
    hw_response_func=hw_response_func,
    impedance_func=impedance_func,
    voltage2ADC=2048,
    time_trace_size=2048,
    sampling_frequency_MHz=250,
)
freq_MHz_bins = mock_trace_generator.get_frequency_bins()

In [None]:
mock_traces_DF = mock_trace_generator.generate_mock_trace(5, nbi = {"67": 1}, nbi_err=0.3)

In [None]:
fig, ax = plt.subplots()
ax.plot(mock_traces_DF.columns.values[2:], mock_traces_DF.iloc[1,2:])
ax.set_xlabel('time [$\mu$s]')
ax.set_ylabel('amplitude [ADC]')


In [None]:
additional_noise = 5e-4*piko

debug_spectra_dict = mock_trace_generator.generate_mock_trace(
    1, lst=15, temp_celsius=4, nbi={"67": 1}, nbi_err=0.3, return_debug_dict=True, additional_noise=additional_noise
)[0]

In [None]:
debug_spectra_dict

In [None]:
x = debug_spectra_dict["freq_MHz_bins"]
fig, ax = plt.subplots()
# ax.set_title("Signal at antenna")

ax.plot(
    x,
    debug_spectra_dict["sidereal_voltage2_density"],
    lw=3,
    label="galactic signal",
    color="b",
)
ax.plot(
    x,
    debug_spectra_dict["thermal_noise_voltage2_density"],
    lw=3,
    label="thermal noise",
    color="green",
)

ax.plot(
    x,
    debug_spectra_dict["sidereal_voltage2_density"]
    + debug_spectra_dict["thermal_noise_voltage2_density"],
    label="galactic signal + thermal noise",
    lw=3,
    color="r",
)

ax.plot(
    x,
    debug_spectra_dict["total_voltage2_density_scattered"],
    alpha=0.4,
    label="galactic signal + thermal noise scattered",
    color="r",
)

ax.set_xlabel("frequency [MHz]")
ax.set_ylabel("amplitude [V/$\sqrt{Hz}]$")
ax.legend()


fig, ax = plt.subplots()
# ax.set_title("Signal in hardware")

volts2adc = 2048
ax.plot(
    x,
    debug_spectra_dict["spectrum_voltage_density_in_HW_with_additional_noise_with_NBI"] * volts2adc,
    alpha=0.5,
    label="spectrum + extra noise + narrowband RFI",
)
ax.plot(
    x,
    debug_spectra_dict["spectrum_voltage_density_in_HW_with_additional_noise"] * volts2adc,
    alpha=0.5,
    label="spectrum + extra noise",
)
ax.plot(
    x,
    debug_spectra_dict["spectrum_voltage_density_in_HW"] * volts2adc,
    alpha=1,
    label="spectrum",
)

ax.set_xlabel("frequency [MHz]")
ax.set_ylabel("amplitude [ADC]")
ax.set_ylim(0, 360)
ax.legend()

In [None]:
# some global plot settings
plt.rcParams["axes.labelweight"] = "bold"
plt.rcParams["font.weight"] = "bold"
plt.rcParams["font.size"] = 20
plt.rcParams["legend.fontsize"] = 18

plt.rcParams["xtick.major.width"] = 3
plt.rcParams["ytick.major.width"] = 3

plt.rcParams["xtick.major.size"] = 7
plt.rcParams["ytick.major.size"] = 7

plt.rcParams["xtick.labelsize"] = 18
plt.rcParams["ytick.labelsize"] = 18

In [None]:
mock_trace_generator = Mock_trace_generator(
    sidereal_voltage2_density_DF=sidereal_voltage2_density_DF,
    hw_response_func=hw_response_func,
    impedance_func=impedance_func,
    voltage2ADC=2048,
    time_trace_size=2048,
    sampling_frequency_MHz=250,
)
freq_MHz_bins = mock_trace_generator.get_frequency_bins()

additional_noise = 5e-4*piko
number_of_traces = 5000
mock_traces_DF = mock_trace_generator.generate_mock_trace(
    number_of_traces,
    # temp_celsius=870,
    additional_noise=additional_noise,
    nbi={"67.25": 1},
    nbi_err=0.3,
    rounding=True
)

In [None]:
rec_traces_all_DF = pd.read_csv('/home/tomas/gitlab/noise-files/triggered_ADC_traces_2021-09--2022-08_partially_cleaned_ch0.dat')
# for thesis: LS1736 Ch0 2022-05-01 00:00:00.00-2022-09-01 00:00:00.00 ~3000 traces after filtering
# 5000 mock traces

In [None]:
# 56, 1733, 1736, 1737, 1738, 1742, 1744
rec_traces_DF = rec_traces_all_DF[rec_traces_all_DF.station == 31736]

In [None]:
time_filter = (
    rec_traces_DF.gpsTime > Time("2022-05-01 00:00:00.000", format="iso").gps
) & (
    rec_traces_DF.gpsTime < Time("2022-09-01 00:00:00.000", format="iso").gps
)

In [None]:
rec_traces_DF = rec_traces_DF.loc[time_filter]
rec_traces_DF

In [None]:
temp_vals = rec_traces_DF.iloc[:,6:].sub(rec_traces_DF.iloc[:,6:].mean(axis=1).values, axis=0)
rec_traces_DF = pd.concat((rec_traces_DF.iloc[:,:6].copy(deep=True), temp_vals), axis=1)

In [None]:
def fft_df(df, dsc=2):
    temp_df = pd.DataFrame(np.abs(rfft(df.iloc[:,dsc:].values)))
    return pd.concat((df.iloc[:,:dsc], temp_df.set_index(df.index)), axis=1)

mock_spectra_DF = fft_df(mock_traces_DF, 2).iloc[:,2:]
rec_spectra_DF = fft_df(rec_traces_DF, 6).iloc[:,6:]
mock_spectra_DF.columns = [round(i,2) for i in rfftfreq(2048, 1/250)]
rec_spectra_DF.columns = [round(i,2) for i in rfftfreq(2048, 1/250)]

In [None]:
fig, ax = plt.subplots()
ax.plot(mock_traces_DF.iloc[0,2:].values, alpha=0.5, label='mock trace')
ax.plot(rec_traces_DF.iloc[0,6:].values, alpha=0.5, label='spectrum trace')

fig, ax = plt.subplots()
ax.plot(mock_spectra_DF.iloc[0,2:].values, alpha=0.5, label='mock trace')
ax.plot(rec_spectra_DF.iloc[0,6:].values, alpha=0.5, label='spectrum trace')

In [None]:
color1 = '#1f77b4'
color2 = 'darkorange'

fig, ax = plt.subplots(figsize=(6.5,4.75))
ax.plot(np.arange(2048)/250, debug_spectra_dict["time_trace"], alpha=0.5, label="mock trace", c=color1)
ax.plot(np.arange(2048)/250, rec_traces_DF.iloc[0, 6:].values.round(), alpha=0.5, label="recorded trace", c=color2)
ax.set_xlabel("time [$\mu$s]")
ax.set_ylabel("amplitude [ADC]")
ylim = ax.get_ylim()
ax.set_ylim(ylim[0], ylim[1] + 3)
ax.legend()
ax.yaxis.set_major_locator(MultipleLocator(5))

fig, ax = plt.subplots(figsize=(6.5,4.75))

x = debug_spectra_dict['freq_MHz_bins']
# ax.plot(np.abs(rfft(debug_spectra_dict['time_trace'])), alpha=0.5, label='mock spectrum 1')
ax.plot(x,
    np.sqrt(1)
    * debug_spectra_dict[
        "spectrum_voltage_density_in_HW_with_additional_noise_with_NBI"
    ]
    * 2048,
    alpha=0.5,
    label="mock spectrum",
    c=color1
)
ax.plot(x, rec_spectra_DF.iloc[0, :].values, alpha=0.5, label="recorded spectrum", c=color2)
ax.yaxis.set_major_locator(MultipleLocator(50))

ax.set_ylim(0, 360)
ax.set_xlabel("frequency [MHz]")
ax.set_ylabel("amplitude [ADC]")

ax.legend()

In [None]:
freq_ind = 394
data_mock = mock_spectra_DF.iloc[:,freq_ind].values
data_rec = rec_spectra_DF.iloc[:,freq_ind].values
print(mock_spectra_DF.columns[freq_ind])

bins = np.arange(0, 50000, 1000)
fig, ax = plt.subplots()
ax.hist(np.sqrt(1)*data_mock**2, bins=bins, density=True, alpha=0.5, label='mock data')
ax.hist(data_rec**2, bins=bins, density=True, alpha=0.5, label='recorded data')
ax.set_ylabel('density')
ax.set_xlabel('amplitude [ADC$^2$]')
ax.legend()

In [None]:
fig, ax = plt.subplots()
values_std = rec_traces_DF.iloc[:,6:].std(axis=1)
data = values_std[(values_std > 1) & (values_std < 3)]

upper_boundary_std = fit_stacked_histogram_plot(
    ax,
    data,
    distribution=lognorm,
    quantity_label="trace STD [ADC]",
    xlim=(0, 5.5),
    bins=np.arange(0, 20, 0.1),
    label='',
    cdf_threshold=1-1e-3,
    save=False
)

fig, ax = plt.subplots()
values_max_peaks = np.abs(rec_traces_DF.iloc[:,6:]).max(axis=1).values
data = values_max_peaks[(values_max_peaks > 10) & (values_max_peaks < 30) & (values_std > 1) & (values_std<upper_boundary_std)]

upper_boundary_max_peak = fit_stacked_histogram_plot(
    ax,
    data,
    distribution=gumbel_r,
    quantity_label="max peak [ADC]",
    xlim=(0, 30),
    bins=np.arange(5, 30, 1),
    label='',
    cdf_threshold=1-1e-3,
    save=False
)

rec_data_filter =((values_max_peaks < upper_boundary_max_peak) & (values_std > 1) & (values_std<upper_boundary_std) )

In [None]:
freq_ind = 394
data_mock = mock_spectra_DF.iloc[:,freq_ind].values
data_rec = rec_spectra_DF.iloc[:,freq_ind].loc[rec_data_filter].values
print(mock_spectra_DF.columns[freq_ind])

########################################################################
bins = np.arange(0, 51000, 1000)
fig, ax = plt.subplots()
ax.hist(np.sqrt(1)*data_mock**2, bins=bins, density=True, alpha=0.5, label='mock data', color=color1)
ax.hist(data_rec**2, bins=bins, density=True, alpha=0.5, label='recorded data', color=color2)
ax.set_ylabel('density')
ax.set_xlabel('amplitude [ADC$^2$]')
ax.ticklabel_format(style='sci', axis='both', scilimits=(0, 0))

params = expon.fit((data_mock**2)[(data_mock**2) < 50000])
x = np.linspace(100, bins[-1], 1000)
y = expon.pdf(x, loc=params[0], scale=params[1])
ax.plot(x, y, color=color1, lw=3)

params = expon.fit((data_rec**2)[(data_rec**2) < 50000])
x = np.linspace(100, bins[-1], 1000)
y = expon.pdf(x, loc=params[0], scale=params[1])
ax.plot(x, y, color=color2, lw=3)
ax.set_xlim(0, 5e+4)
ax.legend()

In [None]:
data_rec.size

In [None]:
data_mock.size