<a href="https://colab.research.google.com/github/JohnCimm/Two-Step-Deep-Learning-Approach-for-Denoising-RF-Transmitted-Images-Under-Jamming-/blob/main/sigmfToTXT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import numpy as np
import datetime as dt

!pip install sigmf
from sigmf import SigMFFile, sigmffile

contact_email = 'aia-rf@mit.edu'


# adapted from "Use Case" in gnuradio/SigMF repository (https://github.com/gnuradio/SigMF)

def read_sigmf_file(filename=f'sample', folderpath=None):

    # Load a dataset
    full_filename = filename
    if folderpath is not None:
        full_filename = os.path.join(folderpath, filename)

    meta = sigmffile.fromfile(full_filename)

    # Get some metadata and all annotations
    sample_rate = meta.get_global_field(SigMFFile.SAMPLE_RATE_KEY)
    sample_count = meta.sample_count
    signal_duration = sample_count / sample_rate

    data = meta.read_samples(0, meta.sample_count)
    return data, meta

#data,meta = read_sigmf_file('/content/CommSignal2_train_frame_0000.sigmf-data')
#os.makedirs('/content/dataset')
#fullName = os.path.join('/content/dataset','CommSignal2_0000')
#print(data)
#data.tofile('/content/dataset/CommSignal2_0000',' ')


def write_sigmf_file(data, filename=f'sample', folderpath=None, fs=1, fc=0, description=''):
    assert data.dtype == 'complex64'

    full_filename = filename
    if folderpath is not None:
        if not os.path.exists(folderpath):
            os.makedirs(folderpath)
        full_filename = os.path.join(folderpath, filename)

    # write those samples to file in cf32_le
    data.tofile(f"{full_filename}.sigmf-data")

    # create the metadata
    meta = SigMFFile(
        data_file=f"{full_filename}.sigmf-data", # extension is optional
        global_info = {
            SigMFFile.DATATYPE_KEY: 'cf32_le',
            SigMFFile.SAMPLE_RATE_KEY: fs,
            SigMFFile.AUTHOR_KEY: contact_email,
            SigMFFile.DESCRIPTION_KEY: description,
            SigMFFile.VERSION_KEY: sigmf.__version__,
        }
    )

    # create a capture key at time index 0
    meta.add_capture(0, metadata={
        SigMFFile.FREQUENCY_KEY: fc,
        SigMFFile.DATETIME_KEY: dt.datetime.utcnow().isoformat()+'Z',
    })

    # check for mistakes & write to disk
    assert meta.validate()
    meta.tofile(f"{full_filename}.sigmf-meta") # extension is optional

    return data, meta

Collecting sigmf
  Downloading SigMF-1.2.6-py3-none-any.whl.metadata (9.5 kB)
Downloading SigMF-1.2.6-py3-none-any.whl (37 kB)
Installing collected packages: sigmf
Successfully installed sigmf-1.2.6


In [None]:
import os
import warnings
import numpy as np
import pickle


def load_dataset_sample(idx, dataset_type, sig_type):
    foldername = os.path.join('dataset',dataset_type,sig_type)
    filename = f'{sig_type}_{dataset_type}_{idx:04d}'
    # Special handling for "Separation" validation and test set; only using Comm2 vs [sig_type] for this iteration
    if 'sep_' in dataset_type:
        filename = f'CommSignal2_vs_{sig_type}_{dataset_type}_{idx:04d}'
    data, meta = read_sigmf_file(filename=filename, folderpath=foldername)
    return data, meta

def load_dataset_sample_components(idx, dataset_type, sig_type):
    assert 'train' in dataset_type or 'val' in dataset_type or 'test' in dataset_type, f'Invalid dataset type requested for obtaining components: {dataset_type}'

    soi_name = 'Comm2' if 'sep_' in dataset_type else 'QPSK'
    foldername1 = os.path.join('dataset',dataset_type,'Components', sig_type, soi_name)
    filename1 = f'{sig_type}_{dataset_type}_{idx:04d}'
    # Special handling for "Separation" validation and test set; only using Comm2 vs [sig_type] for this iteration
    if 'sep_' in dataset_type:
        filename1 = f'CommSignal2_vs_{sig_type}_{dataset_type}_{idx:04d}'
    data1, meta1 = read_sigmf_file(filename=filename1, folderpath=foldername1)

    foldername2 = os.path.join('dataset',dataset_type,'Components', sig_type, 'Interference')
    filename2 = f'{sig_type}_{dataset_type}_{idx:04d}'
    # Special handling for "Separation" validation and test set; only using Comm2 vs [sig_type] for this iteration
    if 'sep_' in dataset_type:
        filename2 = f'CommSignal2_vs_{sig_type}_{dataset_type}_{idx:04d}'
    data2, meta2 = read_sigmf_file(filename=filename1, folderpath=foldername2)

    return data1, meta1, data2, meta2

def load_dataset_sample_demod_groundtruth(idx, dataset_type, sig_type):
    assert 'train' in dataset_type or 'val' in dataset_type or 'test' in dataset_type, f'Invalid dataset type requested for obtaining components: {dataset_type}'
    assert 'demod_' in dataset_type, f'Invalid dataset type requested for obtaining components: {dataset_type}'

    foldername = os.path.join('dataset',dataset_type,'Components',sig_type,'QPSK')
    filename = f'{sig_type}_{dataset_type}_{idx:04d}'
    data, meta = read_sigmf_file(filename=filename, folderpath=foldername)

    msg_folder = os.path.join('dataset',dataset_type,'QPSK_Bits',sig_type)
    msg_filename = f'{sig_type}_{dataset_type}_QPSK_bits_{idx:04d}'
    msg_bits, ground_truth_info = pickle.load(open(os.path.join(msg_folder,f'{msg_filename}.pkl'),'rb'))
    return data, meta, msg_bits, ground_truth_info


def demod_check_ber(bit_est, idx, dataset_type, sig_type):
    assert 'demod_' in dataset_type, f'Invalid dataset type requested for obtaining components: {dataset_type}'

    msg_folder = os.path.join('dataset',dataset_type,'QPSK_Bits',sig_type)
    msg_filename = f'{sig_type}_{dataset_type}_QPSK_bits_{idx:04d}'
    bit_true, _ = pickle.load(open(os.path.join(msg_folder,f'{msg_filename}.pkl'),'rb'))
    if len(bit_est) != len(bit_true):
        warnings.warn(f'Mismatch in estimated bit message length ({len(bit_est)}) and true bit message length ({len(bit_true)})')
        msg_len = min(len(bit_true), len(bit_est))
        bit_true = bit_true[:msg_len]
        bit_est = bit_est[:msg_len]
    ber = np.sum(np.abs(bit_est-bit_true))/len(bit_true)
    return ber


In [None]:
get_sinr = lambda s, i: 10*np.log10(np.mean(np.abs(s)**2)/np.mean(np.abs(i)**2))

# sig_type = "EMISignal1"
# sig_type = "CommSignal2"
sig_type = "CommSignal3"

num_train_frame = {"EMISignal1": 580, "CommSignal2": 150, "CommSignal3": 139}
# num_all_frame = {"EMISignal1": 580, "CommSignal2": 150, "CommSignal3": 189}
sig_dataset = []
for ii in range(num_train_frame[sig_type]):
    filename = f'{sig_type}_{"train_frame"}_{ii:04d}_{".txt"}'
    data,meta = load_dataset_sample(ii, "train_frame", sig_type)
    sig_dataset.append(data)

    fullName = os.path.join('/content/drive/MyDrive/CommSig3',filename)
    data.tofile(fullName,' ')
sig_dataset = np.array(sig_dataset)
#data,meta = read_sigmf_file('/content/CommSignal2_train_frame_0000.sigmf-data')

