In [None]:
import os
import pickle
import utils
from datetime import datetime, timedelta
import gnss_tools.orbits as orbits
import gnss_tools.time as time
import gnss_tools.coords as coords
import utils.signals.gps_l1ca as gps_l1ca_utils
import numpy as np
import matplotlib.pyplot as plt

SPEED_OF_LIGHT = 299792458.0  # m/s

plt.rcParams.update({'font.size': 14})

In [None]:
# Get local-data directory for storing downloaded data files
data_dir = os.path.join(os.path.dirname(os.path.dirname(utils.__file__)), "local-data")
os.makedirs(data_dir, exist_ok=True)

In [None]:
# Load the samples and sat params from simulation
output_filepath = os.path.join(data_dir, "simulated_gps_l1ca_signal_samples.npy")
assert os.path.exists(output_filepath), f"File {output_filepath} does not exist!"
samples = np.load(output_filepath)
samp_rate = 5e6  # Assuming 5 MHz sampling rate

params_filepath = os.path.join(data_dir, "simulated_gps_l1ca_signal_params.npy")
assert os.path.exists(params_filepath), f"File {params_filepath} does not exist!"
with open(params_filepath, "rb") as f:
    params = pickle.load(f)

sat_params = params["sat_params"]
samp_rate = params["samp_rate"]
samples_time_arr = np.arange(len(samples)) / samp_rate

sat_id_list = sorted(list(sat_params.keys()))
print(sat_id_list)

In [None]:
sat_id = "G02"
sat_info = sat_params[sat_id]
print(sat_info)

In [None]:
def generate_l1ca_reference_samples(
        prn: int,
        samp_rate_hz: float,
        num_samples: int,
        initial_code_phase_chips: float,
        initial_carrier_phase_rad: float,
        doppler_hz: float,
        data_bits: np.ndarray | None = None,
) -> np.ndarray:
    """Generate reference samples for a given set of signal parameters."""
    time_arr = np.arange(num_samples) / samp_rate_hz

    # Generate code samples
    code_seq = 1 - 2 * gps_l1ca_utils.get_GPS_L1CA_code_sequence(prn).astype(np.int8)
    adjusted_code_rate = gps_l1ca_utils.CODE_RATE * (1 + doppler_hz / gps_l1ca_utils.CARRIER_FREQ)
    code_phase_chips = initial_code_phase_chips + time_arr * adjusted_code_rate
    code_chip_indices = code_phase_chips.astype(int) % gps_l1ca_utils.CODE_LENGTH
    code_samples = code_seq[code_chip_indices]

    # Generate carrier samples
    carrier_phase = initial_carrier_phase_rad + 2.0 * np.pi * doppler_hz * time_arr
    carrier_samples = np.exp(1j * carrier_phase)

    # Combine to form signal samples
    signal_samples = code_samples * carrier_samples

    if data_bits is not None:
        # Generate data samples
        code_phase_bits = code_phase_chips * gps_l1ca_utils.DATA_SYMBOL_RATE / gps_l1ca_utils.CODE_RATE
        data_bit_indices = code_phase_bits.astype(int) % len(data_bits)
        data_samples = data_bits[data_bit_indices]
        signal_samples *= data_samples

    return signal_samples

In [None]:
reference_duration = 1e-3
num_reference_samples = int(reference_duration * samp_rate)
prn = int(sat_id[1:])
reference_samples = generate_l1ca_reference_samples(
    prn=prn,
    samp_rate_hz=samp_rate,
    num_samples=num_reference_samples,
    initial_code_phase_chips=sat_info["initial_code_phase_chips"],
    initial_carrier_phase_rad=sat_info["initial_carrier_phase_rad"],
    doppler_hz=sat_info["initial_doppler_hz"],
)

In [None]:
fig = plt.figure(figsize=(12, 6))
ax1, ax2 = fig.subplots(2, 1, sharex=True)
_num_plot_samp = 1000
ax1.plot(samples_time_arr[:_num_plot_samp] * 1e6, np.real(samples[:_num_plot_samp]), label="Real", color="r", marker="o")
ax1.plot(samples_time_arr[:_num_plot_samp] * 1e6, np.imag(samples[:_num_plot_samp]), label="Imag", color="b", marker="o")
ax1.grid()
ax1.set_ylabel("Baseband Samples")
# ax1.set_xlabel("Time [µs]")

ax2.plot(samples_time_arr[:_num_plot_samp] * 1e6, np.real(reference_samples[:_num_plot_samp]), label="Real", color="r", marker="x")
ax2.plot(samples_time_arr[:_num_plot_samp] * 1e6, np.imag(reference_samples[:_num_plot_samp]), label="Imag", color="b", marker="x")
ax2.grid()
ax2.set_ylabel("Reference Samples")
ax2.set_xlabel("Time [µs]")

fig.align_labels()

In [None]:
reference_duration_ms = 20
num_reference_samples = int(reference_duration_ms * 1e-3 * samp_rate)
num_coh_samples = int(1e-3 * samp_rate)  # 1 ms coherent integration
prn = int(sat_id[1:])
reference_samples = generate_l1ca_reference_samples(
    prn=prn,
    samp_rate_hz=samp_rate,
    num_samples=num_reference_samples,
    initial_code_phase_chips=sat_info["initial_code_phase_chips"],
    initial_carrier_phase_rad=sat_info["initial_carrier_phase_rad"],
    doppler_hz=sat_info["initial_doppler_hz"],
)

fig = plt.figure(figsize=(12, 5))
ax1, ax2 = fig.subplots(2, 1, sharex=True)
_num_plot_samp = num_reference_samples

bb_conj_ref_samples = samples[:num_reference_samples] * np.conj(reference_samples)
coh_integrated_samples = np.sum(bb_conj_ref_samples.reshape((reference_duration_ms, -1)), axis=1)
_plot_time = samples_time_arr[:_num_plot_samp] * 1e3
ax1.scatter(_plot_time, np.real(bb_conj_ref_samples[:_num_plot_samp]), label="Real", color="r", s=1, alpha=.2)
ax1.scatter(_plot_time, np.imag(bb_conj_ref_samples[:_num_plot_samp]), label="Imag", color="b", s=1, alpha=.2)
ax1.grid()
ax1.set_ylabel("BB x conj(Ref.)\nSamples")

_plot_time = samples_time_arr[:_num_plot_samp:num_coh_samples] * 1e3
ax2.plot(_plot_time, np.real(coh_integrated_samples), label="Real", color="r", marker="x")
ax2.plot(_plot_time, np.imag(coh_integrated_samples), label="Imag", color="b", marker="x")
ax2.grid()
ax2.set_xticks(np.arange(0, reference_duration_ms + 1, 2))
ax2.set_ylabel("1ms Coherent Intgr.\nBB x conj(Ref.) Samples")
ax2.set_xlabel("Time [ms]")
ax2.legend(loc="lower right")

fig.align_labels()

In [None]:
10 * np.log10(10**(19.8/10) * 1e3)

In [None]:
fig = plt.figure(figsize=(6, 5))
ax1, ax2, ax3 = fig.subplots(3, 1, sharex=True)
_num_plot_samp = 500
ax1.plot(samples_time_arr[:_num_plot_samp] * 1e6, np.real(samples[:_num_plot_samp]), label="Real", color="r", marker="o")
ax1.plot(samples_time_arr[:_num_plot_samp] * 1e6, np.imag(samples[:_num_plot_samp]), label="Imag", color="b", marker="o")
ax1.grid()
ax1.set_ylabel("Baseband\nSamples")
# ax1.set_xlabel("Time [µs]")

ax2.plot(samples_time_arr[:_num_plot_samp] * 1e6, np.real(reference_samples[:_num_plot_samp]), label="Real", color="r", marker="x")
ax2.plot(samples_time_arr[:_num_plot_samp] * 1e6, np.imag(reference_samples[:_num_plot_samp]), label="Imag", color="b", marker="x")
ax2.grid()
ax2.set_ylabel("Ref.\nSamples")

_bb_conj_ref_samples = samples[:_num_plot_samp] * np.conj(reference_samples[:_num_plot_samp])
ax3.plot(samples_time_arr[:_num_plot_samp] * 1e6, np.real(_bb_conj_ref_samples[:_num_plot_samp]), label="Real", color="r", marker="x")
ax3.plot(samples_time_arr[:_num_plot_samp] * 1e6, np.imag(_bb_conj_ref_samples[:_num_plot_samp]), label="Imag", color="b", marker="x")
ax3.grid()
ax3.set_ylabel("Bb X conj(Ref.)\nSamples")
ax3.set_xlabel("Time [µs]")

ax1.legend(framealpha=1.0, loc="upper right")

fig.align_labels()

In [None]:
sat_id = sat_id_list[4]
sat_info = sat_params[sat_id]
prn = int(sat_id[1:])

print(f"Sat ID: {sat_id}, PRN: {prn}, Initial Code Phase (chips): {sat_info['initial_code_phase_chips']}, Initial Carrier Phase (rad): {sat_info['initial_carrier_phase_rad']}")
print(f"Initial Doppler (Hz): {sat_info['initial_doppler_hz']}, C/N0 (dB-Hz): {sat_info['cn0_dBHz']}")

integration_durations_ms = [1, 2, 5, 10, 20]
doppler_offsets = np.arange(-5000, 5000 + 100, 100)  # Hz
correlations = np.zeros((len(integration_durations_ms), len(doppler_offsets)), dtype=complex)

for i, duration_ms in enumerate(integration_durations_ms):
    num_reference_samples = int((duration_ms * 1e-3) * samp_rate)
    for j, doppler in enumerate(doppler_offsets):
        dopp_shifted_reference_samples = generate_l1ca_reference_samples(
            prn=prn,
            samp_rate_hz=samp_rate,
            num_samples=num_reference_samples,
            initial_code_phase_chips=sat_info["initial_code_phase_chips"],
            initial_carrier_phase_rad=sat_info["initial_carrier_phase_rad"],
            doppler_hz=doppler,
        )
        correlations[i, j] = np.sum(samples[:len(dopp_shifted_reference_samples)] * np.conj(dopp_shifted_reference_samples))

In [None]:
fig = plt.figure(figsize=(8, 4))
ax1 = fig.subplots(1, 1)
for i, duration_ms in enumerate(integration_durations_ms):
    color = plt.cm.viridis(i / len(integration_durations_ms))
    ax1.plot(doppler_offsets / 1e3, np.abs(correlations[i, :]), marker="o", label=f"{duration_ms} ms", color=color, zorder=10 - i)

ax1.set_xlabel("Doppler Offset [kHz]")
ax1.set_ylabel("Correlation Magnitude")
ax1.legend()
ax1.grid()
plt.show()

In [None]:
sat_id = sat_id_list[4]
sat_info = sat_params[sat_id]
prn = int(sat_id[1:])

print(f"Sat ID: {sat_id}, PRN: {prn}, Initial Code Phase (chips): {sat_info['initial_code_phase_chips']}, Initial Carrier Phase (rad): {sat_info['initial_carrier_phase_rad']}")
print(f"Initial Doppler (Hz): {sat_info['initial_doppler_hz']}, C/N0 (dB-Hz): {sat_info['cn0_dBHz']}")

integration_durations_ms = [1, 2, 5, 10, 20]
code_chip_offsets = np.arange(-5, 5, .1)  # chips
correlations = np.zeros((len(integration_durations_ms), len(code_chip_offsets)), dtype=complex)

for i, duration_ms in enumerate(integration_durations_ms):
    num_reference_samples = int((duration_ms * 1e-3) * samp_rate)
    for j, chip_offset in enumerate(code_chip_offsets):
        code_shifted_reference_samples = generate_l1ca_reference_samples(
            prn=prn,
            samp_rate_hz=samp_rate,
            num_samples=num_reference_samples,
            initial_code_phase_chips=sat_info["initial_code_phase_chips"] + chip_offset,
            initial_carrier_phase_rad=sat_info["initial_carrier_phase_rad"],
            doppler_hz=-sat_info["initial_doppler_hz"],
        )
        correlations[i, j] = np.sum(samples[:len(code_shifted_reference_samples)] * np.conj(code_shifted_reference_samples))

In [None]:
fig = plt.figure(figsize=(8, 4))
ax1 = fig.subplots(1, 1)
for i, duration_ms in enumerate(integration_durations_ms):
    color = plt.cm.viridis(i / len(integration_durations_ms))
    ax1.plot(code_chip_offsets, np.abs(correlations[i, :]), marker="o", label=f"{duration_ms} ms", color=color, zorder=10 - i)

ax1.set_xlabel("Code Phase Offset [chips]")
ax1.set_ylabel("Correlation Magnitude")
ax1.legend()
ax1.grid()
plt.show()

In [None]:
sat_id = sat_id_list[6]
sat_info = sat_params[sat_id]
prn = int(sat_id[1:])

print(f"Sat ID: {sat_id}, PRN: {prn}, Initial Code Phase (chips): {sat_info['initial_code_phase_chips']}, Initial Carrier Phase (rad): {sat_info['initial_carrier_phase_rad']}")
print(f"Initial Doppler (Hz): {sat_info['initial_doppler_hz']}, C/N0 (dB-Hz): {sat_info['cn0_dBHz']}")

fig = plt.figure(figsize=(10, 5))
num_reference_samples = int(1e-3 * samp_rate)
reference_samples = generate_l1ca_reference_samples(
    prn=prn,
    samp_rate_hz=samp_rate,
    num_samples=num_reference_samples,
    initial_code_phase_chips=0.0,
    initial_carrier_phase_rad=sat_info["initial_carrier_phase_rad"],
    doppler_hz=-sat_info["initial_doppler_hz"],
)
xcorr = np.fft.ifft(np.fft.fft(samples[:len(reference_samples)]) * np.conj(np.fft.fft(reference_samples)))

snr = np.max(np.abs(xcorr)**2) / np.var(np.abs(xcorr))
print(f"SNR: {10 * np.log10(snr):.2f} dB")

ax1 = fig.subplots(1, 1)
ax1.plot(np.arange(len(xcorr)), np.abs(xcorr), marker="o", alpha=.5)
ax1.grid()
ax1.set_ylabel("Correlation Magnitude")
ax1.set_xlabel("Code Phase Offset [samples]")
ax1.set_xlim(0, 5000)

In [None]:
cn0_dBHz = 44.1
snr_in_dB = cn0_dBHz - 10 * np.log10(samp_rate)
snr_out_dB = snr_in_dB + 10 * np.log10(1000)
print(f"SNR in: {snr_in_dB:.2f} dB, SNR out (1 ms): {snr_out_dB:.2f} dB")

In [None]:
cn0_dBHz - 10 * np.log10(1/4) - 10 * np.log10(1 / 1e-3)

In [None]:
10 * np.log10(1 / 4)

In [None]:
# C/N0 = A**2 / N0
# SNR = A**2 / 4 / (N0 * B) = (C/N0) / (4 * B)

snr_in_dB = cn0_dBHz - 10 * np.log10(samp_rate) - 10 * np.log10(4)
snr_in_dB + 10 * np.log10(1e3)

In [None]:
10 * np.log10(5e6 / 1e3)

In [None]:
cn0_dBHz + 10 * np.log10(5e-3) - 10 * np.log10(1/4)

In [None]:
10 * np.log10(samp_rate / 1000)

In [None]:
snr_dB = 21.6
snr = 10**(snr_dB / 10)
bandwidth_hz = 1e3
cn0_dBHz = 10 * np.log10(snr * bandwidth_hz)
print(f"C/N0: {cn0_dBHz:.2f} dB-Hz")

In [None]:
total_integration_duration_ms = 40

sat_id = sat_id_list[4]
sat_info = sat_params[sat_id]
prn = int(sat_id[1:])

print(f"Sat ID: {sat_id}, PRN: {prn}, Initial Code Phase (chips): {sat_info['initial_code_phase_chips']}, Initial Carrier Phase (rad): {sat_info['initial_carrier_phase_rad']}")
print(f"Initial Doppler (Hz): {sat_info['initial_doppler_hz']}, C/N0 (dB-Hz): {sat_info['cn0_dBHz']}")

integration_durations_ms = [1, 2, 5, 10, 20]
doppler_offsets = np.arange(-5000, 5000 + 100, 100)  # Hz
correlations = np.zeros((len(integration_durations_ms), len(doppler_offsets)), dtype=complex)

for i, duration_ms in enumerate(integration_durations_ms):
    num_ncoherent_integrations = total_integration_duration_ms // duration_ms
    num_coh_samples = int((duration_ms * 1e-3) * samp_rate)
    num_reference_samples = num_coh_samples * num_ncoherent_integrations
    for j, doppler in enumerate(doppler_offsets):
        reference_samples = generate_l1ca_reference_samples(
            prn=prn,
            samp_rate_hz=samp_rate,
            num_samples=num_reference_samples,
            initial_code_phase_chips=sat_info["initial_code_phase_chips"],
            initial_carrier_phase_rad=sat_info["initial_carrier_phase_rad"],
            doppler_hz=doppler,
        )
        coh_integrations = np.sum((samples[:len(reference_samples)] * np.conj(reference_samples)).reshape((num_ncoherent_integrations, num_coh_samples)), axis=1)
        correlations[i, j] = np.sum(np.abs(coh_integrations)**2, axis=0)

In [None]:
fig = plt.figure(figsize=(8, 4))
ax1 = fig.subplots(1, 1)
for i, duration_ms in enumerate(integration_durations_ms):
    color = plt.cm.viridis(i / len(integration_durations_ms))
    ax1.plot(doppler_offsets / 1e3, np.abs(correlations[i, :]), marker="o", label=f"{duration_ms} ms", color=color, zorder=10 - i)

ax1.set_xlabel("Doppler Offset [kHz]")
ax1.set_ylabel("Correlation Magnitude")
ax1.legend()
ax1.grid()
plt.show()

In [None]:
total_integration_duration_ms = 40

sat_id = sat_id_list[4]
sat_info = sat_params[sat_id]
prn = int(sat_id[1:])

print(f"Sat ID: {sat_id}, PRN: {prn}, Initial Code Phase (chips): {sat_info['initial_code_phase_chips']}, Initial Carrier Phase (rad): {sat_info['initial_carrier_phase_rad']}")
print(f"Initial Doppler (Hz): {sat_info['initial_doppler_hz']}, C/N0 (dB-Hz): {sat_info['cn0_dBHz']}")

integration_durations_ms = [1, 2, 5, 10, 20]
num_ncoh_integrations = [total_integration_duration_ms // d for d in integration_durations_ms]
num_reference_samples = int((total_integration_duration_ms * 1e-3) * samp_rate)
code_chip_offsets = np.arange(-15, 15, .1)  # chips
correlations = np.zeros((len(integration_durations_ms), len(code_chip_offsets)), dtype=complex)

for i, duration_ms in enumerate(integration_durations_ms):
    num_ncoherent_integrations = num_ncoh_integrations[i]
    num_coh_samples = int((duration_ms * 1e-3) * samp_rate)
    for j, chip_offset in enumerate(code_chip_offsets):
        reference_samples = generate_l1ca_reference_samples(
            prn=prn,
            samp_rate_hz=samp_rate,
            num_samples=num_reference_samples,
            initial_code_phase_chips=sat_info["initial_code_phase_chips"] + chip_offset,
            initial_carrier_phase_rad=sat_info["initial_carrier_phase_rad"],
            doppler_hz=-sat_info["initial_doppler_hz"],
        )
        coh_integrations = np.sum((samples[:len(reference_samples)] * np.conj(reference_samples)).reshape((num_ncoherent_integrations, num_coh_samples)), axis=1)
        correlations[i, j] = np.sum(np.abs(coh_integrations)**2, axis=0)

In [None]:
fig = plt.figure(figsize=(8, 4))
ax1 = fig.subplots(1, 1)
for i, duration_ms in enumerate(integration_durations_ms):
    num_ncoh = num_ncoh_integrations[i]
    color = plt.cm.viridis(i / len(integration_durations_ms))
    ax1.plot(code_chip_offsets, np.abs(correlations[i, :]), marker="o", label=f"{num_ncoh} x {duration_ms} ms", color=color, markersize=5, zorder=10 - i)

ax1.set_xlabel("Code Phase Offset [chips]")
ax1.set_ylabel("Correlation Magnitude")
ax1.legend()
ax1.grid()
plt.show()

In [None]:
sat_id = sat_id_list[4]
sat_info = sat_params[sat_id]
prn = int(sat_id[1:])

print(f"Sat ID: {sat_id}, PRN: {prn}, Initial Code Phase (chips): {sat_info['initial_code_phase_chips']}, Initial Carrier Phase (rad): {sat_info['initial_carrier_phase_rad']}")
print(f"Initial Doppler (Hz): {sat_info['initial_doppler_hz']}, C/N0 (dB-Hz): {sat_info['cn0_dBHz']}")

fig = plt.figure(figsize=(7, 5))
coh_duration_ms = 1
num_ncoh = 5
num_reference_samples = int(coh_duration_ms * 1e-3 * samp_rate)
reference_samples = generate_l1ca_reference_samples(
    prn=prn,
    samp_rate_hz=samp_rate,
    num_samples=num_reference_samples,
    initial_code_phase_chips=0.0,
    initial_carrier_phase_rad=sat_info["initial_carrier_phase_rad"],
    doppler_hz=-sat_info["initial_doppler_hz"],
)
xcorr = np.fft.ifft(np.fft.fft(samples[:num_reference_samples * num_ncoh].reshape((num_ncoh, -1)), axis=-1) * np.conj(np.fft.fft(reference_samples))[None, :], axis=1)
xcorr = np.sqrt(np.sum(np.abs(xcorr)**2, axis=0))

snr = np.max(np.abs(xcorr)) / np.std(np.abs(xcorr))
print(f"SNR: {20 * np.log10(snr):.2f} dB")

ax1 = fig.subplots(1, 1)
ax1.plot(np.arange(len(xcorr)), np.abs(xcorr), marker="o", alpha=.5)
ax1.grid()
ax1.set_ylabel("Correlation Magnitude")
ax1.set_xlabel("Code Phase Offset [samples]")
ax1.set_xlim(0, 5000)
ax1.set_ylim(0, 1500)

In [None]:
import gnss_tools.time

In [None]:
gnss_tools.time.GPS_TAI_OFFSET

In [None]:
gnss_tools.time.leap_seconds.utc_tai_offset(datetime.now())