<a href="https://colab.research.google.com/github/filippozuddas/ML-SRT-SETI/blob/main/setigen_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install setigen blimpy

Collecting setigen
  Downloading setigen-2.7.0-py3-none-any.whl.metadata (11 kB)
Collecting blimpy
  Downloading blimpy-2.1.4-py3-none-any.whl.metadata (6.8 kB)
Collecting sphinx-rtd-theme>=0.4.3 (from setigen)
  Downloading sphinx_rtd_theme-3.0.2-py2.py3-none-any.whl.metadata (4.4 kB)
Collecting pytest-cov>=4.1.0 (from setigen)
  Downloading pytest_cov-7.0.0-py3-none-any.whl.metadata (31 kB)
Collecting hdf5plugin (from blimpy)
  Downloading hdf5plugin-6.0.0-py3-none-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.metadata (3.3 kB)
Collecting pyparsing==2.4.7 (from blimpy)
  Downloading pyparsing-2.4.7-py2.py3-none-any.whl.metadata (3.6 kB)
Collecting coverage>=7.10.6 (from coverage[toml]>=7.10.6->pytest-cov>=4.1.0->setigen)
  Downloading coverage-7.11.0-cp311-cp311-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl.metadata (9.0 kB)
Collecting sphinxcontrib-jquery<5,>=4 (from sphinx-rtd-theme>=0.4.3->setigen)
  Downloading sphinxcontrib_jquery-4.1-py2.py3-none-any.whl.me

In [1]:
import matplotlib.pyplot as plt
import numpy as np
from numpy.random import uniform, randint, rand
import setigen as stg
from pathlib import Path
from astropy import units as u
from astropy.time import Time
from skimage.transform import downscale_local_mean
import scipy
import os

In [2]:
BASE_DIR = "/content/filippo"
DATA_DIR = os.path.join(BASE_DIR, "data")
MODELS_DIR = os.path.join(BASE_DIR, "models")
NOTEBOOKS_DIR = os.path.join(BASE_DIR, "notebooks")
RESULTS_DIR = os.path.join(BASE_DIR, "results")
TEST_DIR = os.path.join(BASE_DIR, "test")
TEST_IMAGES_DIR = os.path.join(TEST_DIR, "images")

In [18]:
FCHANS = 4096
TCHANS = 16
N_FRAMES = 6

DF = 2.7939677238464355 * u.Hz
DT = 18.25361108 * u.s
FCH1 = 6095.214842353016 * u.MHz

RESIZE_FACTOR = 8
FCHANS_FINAL = FCHANS // RESIZE_FACTOR

OBS_LENGTH = (TCHANS * DT).to(u.s).value
SLEW_TIME_S = 15

# Creiamo un array di tempi di inizio per i 6 frame
MJD_START = 56789 # Un tempo di inizio arbitrario
t_start_arr = [Time(MJD_START, format='mjd').unix]
for i in range(1, N_FRAMES):
    # Il prossimo frame inizia dopo la fine del precedente + il tempo di spostamento
    t_start_arr.append(t_start_arr[i - 1] + OBS_LENGTH + SLEW_TIME_S)

NOISE_MEAN = 58348559

RESIZE_FACTOR = 8

In [4]:
def preprocessing(data):
  """
    Applica log-scaling e min-max normalization.
  """
  data = np.log(data)
  data= data - data.min()
  data = data/data.max()
  return data

def data_resizing(data_batch, factor):
  """
  Ridimensiona la dimensione in frequenza (asse 2).
  Forma input: (6, 16, 4096) -> Forma output: (6, 16, 512)
  """
  # Usiamo (1, 1, factor) perché non vogliamo ridimensionare i primi due assi
  resized_batch = downscale_local_mean(data_batch, (1, 1, factor))
  return resized_batch

In [26]:
def create_false_sample():
  """
    Genera un campione FALSO (Classe 0) 100% sintetico.
    Logica: 50% solo rumore, 50% RFI (segnale drift=0 iniettato in TUTTI i 6 frame).
    Ritorna:
        - data_processed (np.array): L'array numpy finale (6, 16, 512, 1)
        - cadence (stg.Cadence): L'oggetto cadenza per il plotting
  """

  frame_list = [stg.Frame(fchans=FCHANS,
                                tchans=TCHANS,
                                df=DF,
                                dt=DT,
                                fch1=FCH1) for _ in range(N_FRAMES)]

  cadence = stg.Cadence(frame_list=frame_list)
  cadence.apply(lambda frame: frame.add_noise(x_mean=NOISE_MEAN,
                                                    x_std=NOISE_MEAN/4,
                                                    noise_type='chi2'))

  if rand() > 0.5:
      pass
  else:
      snr = uniform(10, 300)
      start_freq = cadence[0].get_frequency(index=randint(100, FCHANS - 100))

      cadence.add_signal(
          stg.constant_path(f_start=start_freq, drift_rate=0 * u.Hz/u.s),
          stg.constant_t_profile(level=cadence[0].get_intensity(snr=snr)),
          stg.gaussian_f_profile(width=uniform(10, 50) * u.Hz)
      )

  data_orig = np.array([frame.data for frame in cadence]) # (6, 16, 4096)
  data_resized = data_resizing(data_orig, factor = RESIZE_FACTOR)
  data_processed = preprocessing(data_resized)

  return data_processed[..., np.newaxis], cadence

In [27]:
def create_true_sample():
  """
    Genera un campione VERO (Classe 1) 100% sintetico.
    Logica: Segnale ETI (drift != 0) iniettato in pattern "ABABAB".
    Ritorna:
        - data_processed (np.array): L'array numpy finale (6, 16, 512, 1)
        - cadence (stg.Cadence): L'oggetto cadenza per il plotting
  """
  frame_list = [stg.Frame(fchans=FCHANS,
                              tchans=TCHANS,
                              df=DF,
                              dt=DT,
                              fch1=FCH1,
                              t_start=t_start_arr[i]) for i in range(N_FRAMES)]

  cadence = stg.OrderedCadence(frame_list=frame_list, order="ABABAB")
  cadence.apply(lambda frame: frame.add_noise(x_mean=NOISE_MEAN,
                                                  x_std=NOISE_MEAN/4,
                                                  noise_type='chi2'))

  snr = uniform(10, 300)
  drift_rate = (rand() * 2 + 1) * (-1)**randint(1,3) # Non-zero drift
  start_freq = cadence[0].get_frequency(index=randint(100, FCHANS - 100))

  cadence.by_label("A").add_signal(
      stg.constant_path(f_start=start_freq, drift_rate=drift_rate * u.Hz/u.s),
      stg.constant_t_profile(level=cadence[0].get_intensity(snr=snr)),
      stg.gaussian_f_profile(width=uniform(10, 50) * u.Hz),
      stg.constant_bp_profile(level=1)
  )

  data_orig = np.array([frame.data for frame in cadence])
  data_resized = data_resizing(data_orig, factor=RESIZE_FACTOR)
  data_processed = preprocessing(data_resized)

  return data_processed[..., np.newaxis], cadence

In [34]:
print("--- Esecuzione Sanity Check ---")

# --- 1. Generazione e Plot false ---
print("Generazione Campione false (Classe 0)...")
false_data, false_cadence = create_false_sample()
print(f"Forma finale dati false: {false_data.shape}")

fig_false = plt.figure(figsize=(10, 10))
fig_false.suptitle("Campione false (Classe 0) - Tutti i 6 frame dovrebbero essere identici", fontsize=16)
false_cadence.plot()
fig_false.savefig(os.path.join(TEST_IMAGES_DIR, "1_test_generazione_false.png"))

print("\n" + "="*30 + "\n")

# --- 2. Generazione e Plot true ---
print("Generazione Campione true (Classe 1)...")
true_data, true_cadence = create_true_sample()
print(f"Forma finale dati true: {true_data.shape}")

fig_vero = plt.figure(figsize=(10, 10))
fig_vero.suptitle("Campione true (Classe 1) - Pattern ON-OFF (Frame 0, 2, 4)", fontsize=16)
true_cadence.plot() # Usiamo il plotter integrato
fig_vero.savefig(os.path.join(TEST_IMAGES_DIR, "2_test_generazione_true.png"))


--- Esecuzione Sanity Check ---
Generazione Campione false (Classe 0)...
Forma finale dati false: (6, 16, 512, 1)


Generazione Campione true (Classe 1)...
Forma finale dati true: (6, 16, 512, 1)
