In [1]:
#!pip install opencv-python zstandard python_on_whales scikit-image

In [2]:
import os

import data
import auto_label

In [3]:


def prepare_and_label(file:str, model:str):

    try: 
        sig_data = data.Data(file)
    except ValueError:
        print(f"Warning: Skipping {file}")
        return None
    
    # # print(f"Sample Rate: {sig_data.sample_rate}")
    # # print(f"Center Frequency: {sig_data.center_freq}")
    # # print(f"Duration: {sig_data.duration}")
    # print("Exporting SIGMF data...")

    try:
        sig_data.export_sigmf_data(overwrite=True)
    except:
        return None
    # sig_data.zst_to_sigmf_data()
    # n_fft = 1024
    # time_dim = 512 # equivalent to tune_step_fft parameter in GamutRF
    # n_samples = n_fft * time_dim
    # print(f"Generating spectrograms with n_samples = {n_samples} and n_fft = {n_fft}...")
    # sig_data.generate_spectrograms(n_samples, n_fft)
    # print("Auto-labeling spectrograms...")
    # sig_data.auto_label_spectrograms(model)
    # # print("Exporting annotation IQ data...")
    # # sig_data.export_annotation_iq()

def auto_label_ZST_directory(path:str, model:str):
    if os.path.isdir(path):
        for file in os.listdir(path):
            if file.endswith('.zst'):
                # Process the file here
                full_path = os.path.join(path, file)
                print(full_path)
                prepare_and_label(full_path, model)
    else:
        print(f"{path} is not a directory.")
    

In [4]:
path = "dev_data/torchsig_test/fhss_css"
auto_label_ZST_directory(path, model="fhss_css")

dev_data/torchsig_test/fhss_css/1690987701_2450000000Hz_20480000sps.ci16.zst


In [5]:
n_annotations = 0
for file in os.listdir(path):
    if file.endswith('.zst'):
        # Process the file here
        full_path = os.path.join(path, file)
        d = data.Data(full_path)
        n_annotations += len(d.metadata["annotations"])

In [6]:
n_annotations

296

In [7]:


def auto_label_directory(path:str, force_model:str = None):


    
    if not os.path.isdir(path):
        print(f"Error: {path} is not a directory")
    
    sample_files = [os.path.join(dirpath, f)  for dirpath, dirnames, filenames in os.walk(path) for f in filenames if f.endswith((".zst", ".raw")) ]
    for f in sample_files:

        model = force_model if force_model else get_model(f)

        if model:    
            print(f"Info: Trying to label {f} with model {model}")
            prepare_and_label(f, model)

def get_model(filepath:str):

    model_path_filters = {
        #TODO: add Immersion_Ghost/open_air_collect/fpv_medium
        #TODO: debug Immersion_Ghost/open_air_collect/fpv_small
        "wifi": {
            "include": ["wifi"],
            "exclude": [],
        },
        "tbs_crossfire": {
            "include": ["TBS_CRSF"],
            "exclude": [],
        },
        "fhss_css": {
            "include": ["Immersion_Ghost/open_air_collect/fpv_large", "Immersion_Ghost/faraday_cage_collect"],
            "exclude": ["msk", "fpv_drone_only"],
        },
        "msk": {
            "include": ["msk"],
            "exclude": [],
        },
        "mini2": {
            "include": ["mini2", "pdx_field_day", "leesburg_field_day"],
            "exclude": [],
        },
        "mavic3": {
            "include": ["mavic3"],
            "exclude": [],
        },        
    }

    models = []
    for model_name in model_path_filters:
        match = False
        for include in model_path_filters[model_name]["include"]:
            if include in filepath:
                match = True
                break
        if match:
            for exclude in model_path_filters[model_name]["exclude"]:
                if exclude in filepath:
                    match = False
                    break
        if match:
            models.append(model_name)
    if len(models) == 0: 
        print(f"Warning: Could not find a matching model for {filepath}")
        return None
    elif len(models) > 1: 
        print(f"Warning: Found multiple matching models [{models}] for {filepath}")
        return None
    else:
        return models[0]


In [10]:
path = "data/gamutrf/gamutrf-fpv"

auto_label_directory(path)

Info: Trying to label data/gamutrf/gamutrf-fpv/Immersion_Ghost/open_air_collect/fpv_large/fpv_pairing/2.45e9_20480000_10_10s_1690987570.ci16.zst with model fhss_css
Info: Trying to label data/gamutrf/gamutrf-fpv/Immersion_Ghost/open_air_collect/fpv_large/fpv_pairing/2.45e9_20480000_10_10s_1690987644.ci16.zst with model fhss_css
Info: Trying to label data/gamutrf/gamutrf-fpv/Immersion_Ghost/open_air_collect/fpv_large/fpv_pairing/2.45e9_20480000_10_10s_1690987608.ci16.zst with model fhss_css
Info: Trying to label data/gamutrf/gamutrf-fpv/Immersion_Ghost/open_air_collect/fpv_large/fpv_no_telemetry/2.45e9_20480000_0_5s_1690988185.ci16.zst with model fhss_css
Info: Trying to label data/gamutrf/gamutrf-fpv/Immersion_Ghost/open_air_collect/fpv_large/fpv_no_telemetry/2.46e9_20480000_10_5s_1690988089.ci16.zst with model fhss_css
Info: Trying to label data/gamutrf/gamutrf-fpv/Immersion_Ghost/open_air_collect/fpv_large/fpv_no_telemetry/2.45e9_20480000_10_5s_1690988055.ci16.zst with model fhss_css