Skip to content

Commit

Permalink
misc fixes, cleanup code, docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
jhazentia committed May 21, 2024
1 parent 47163e4 commit 74f16bc
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 95 deletions.
104 changes: 53 additions & 51 deletions sigmf/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,60 +74,62 @@ def __init__(self,
mode = "a" if fileobj is not None else "w"
sigmf_fileobj = self._get_output_fileobj()
try:
sigmf_archive = tarfile.TarFile(mode=mode,
fileobj=sigmf_fileobj,
format=tarfile.PAX_FORMAT)
except tarfile.ReadError:
# fileobj doesn't contain any archives yet, so reopen in 'w' mode
sigmf_archive = tarfile.TarFile(mode='w',
fileobj=sigmf_fileobj,
format=tarfile.PAX_FORMAT)

def chmod(tarinfo):
if tarinfo.isdir():
tarinfo.mode = 0o755 # dwrxw-rw-r
else:
tarinfo.mode = 0o644 # -wr-r--r--
return tarinfo

for sigmffile in self.sigmffiles:
sigmffile_name = os.path.normpath(sigmffile.name)
if os.path.isabs(sigmffile_name):
# remove root path component to make relative path for tarfile
sigmffile_name_split = sigmffile_name.split(os.path.sep)
sigmffile_name = os.path.sep.join(sigmffile_name_split[1:])
try:
sigmf_archive = tarfile.TarFile(mode=mode,
fileobj=sigmf_fileobj,
format=tarfile.PAX_FORMAT)
except tarfile.ReadError:
# fileobj doesn't contain any archives yet, so reopen in 'w' mode
sigmf_archive = tarfile.TarFile(mode='w',
fileobj=sigmf_fileobj,
format=tarfile.PAX_FORMAT)

def chmod(tarinfo):
if tarinfo.isdir():
tarinfo.mode = 0o755 # dwrxw-rw-r
else:
tarinfo.mode = 0o644 # -wr-r--r--
return tarinfo

for sigmffile in self.sigmffiles:
sigmffile_name = os.path.normpath(sigmffile.name)
if os.path.isabs(sigmffile_name):
raise SigMFFileError("Invalid SigMFFile name")
self._create_parent_dirs(sigmf_archive, sigmffile_name, chmod)
file_path = os.path.join(sigmffile_name,
os.path.basename(sigmffile_name))
sf_md_filename = file_path + SIGMF_METADATA_EXT
sf_data_filename = file_path + SIGMF_DATASET_EXT
metadata = sigmffile.dumps(pretty=pretty)
metadata_tarinfo = tarfile.TarInfo(sf_md_filename)
metadata_tarinfo.size = len(metadata)
metadata_tarinfo.mtime = time.time()
metadata_tarinfo = chmod(metadata_tarinfo)
metadata_buffer = BytesIO(metadata.encode("utf-8"))
sigmf_archive.addfile(metadata_tarinfo, fileobj=metadata_buffer)
data_tarinfo = sigmf_archive.gettarinfo(name=sigmffile.data_file,
arcname=sf_data_filename)
if sigmffile.offset_and_size:
data_tarinfo.size = sigmffile.offset_and_size[1]

data_tarinfo = chmod(data_tarinfo)
with open(sigmffile.data_file, "rb") as data_file:
# remove root path component to make relative path for tarfile
sigmffile_name_split = sigmffile_name.split(os.path.sep)
sigmffile_name = os.path.sep.join(sigmffile_name_split[1:])
if os.path.isabs(sigmffile_name):
raise SigMFFileError("Invalid SigMFFile name")
self._create_parent_dirs(sigmf_archive, sigmffile_name, chmod)
file_path = os.path.join(sigmffile_name,
os.path.basename(sigmffile_name))
sf_md_filename = file_path + SIGMF_METADATA_EXT
sf_data_filename = file_path + SIGMF_DATASET_EXT
metadata = sigmffile.dumps(pretty=pretty)
metadata_tarinfo = tarfile.TarInfo(sf_md_filename)
metadata_tarinfo.size = len(metadata)
metadata_tarinfo.mtime = time.time()
metadata_tarinfo = chmod(metadata_tarinfo)
metadata_buffer = BytesIO(metadata.encode("utf-8"))
sigmf_archive.addfile(metadata_tarinfo, fileobj=metadata_buffer)
data_tarinfo = sigmf_archive.gettarinfo(name=sigmffile.data_file,
arcname=sf_data_filename)
if sigmffile.offset_and_size:
data_file.seek(sigmffile.offset_and_size[0])
sigmf_archive.addfile(data_tarinfo, fileobj=data_file)

sigmf_archive.close()
if not fileobj:
sigmf_fileobj.close()
else:
sigmf_fileobj.seek(0) # ensure next open can read this as a tar
data_tarinfo.size = sigmffile.offset_and_size[1]

data_tarinfo = chmod(data_tarinfo)
with open(sigmffile.data_file, "rb") as data_file:
if sigmffile.offset_and_size:
data_file.seek(sigmffile.offset_and_size[0])
sigmf_archive.addfile(data_tarinfo, fileobj=data_file)

self.path = sigmf_archive.name
finally:
sigmf_archive.close()
if (not fileobj) and sigmf_fileobj:
sigmf_fileobj.close()
else:
sigmf_fileobj.seek(0) # ensure next open can read this as a tar

self.path = sigmf_archive.name

def _create_parent_dirs(self, _tarfile, sigmffile_name, set_permission):
""" Create parent directory TarInfo objects if tarfile doesn't
Expand Down
2 changes: 1 addition & 1 deletion sigmf/archivereader.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def __init__(self,

self.sigmffiles.append(sigmffile)

if not any([r.data for r in recordings]):
if not any(r.data for r in recordings):
warnings.warn(f"No file with {SIGMF_DATASET_EXT} extension"
" found in archive!")
finally:
Expand Down
1 change: 0 additions & 1 deletion sigmf/sigmffile.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from os import path
import warnings
from collections import OrderedDict
from os import path

import numpy as np

Expand Down
4 changes: 4 additions & 0 deletions sigmf/sigmffile_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@


class AbstractSigMFFileCollection(ABC):
"""Abstract collection of SigMF files that can be
represented by a single SigMF or multiple SigMF files"""

@abstractmethod
def sigmffile_count(self):
Expand All @@ -24,6 +26,8 @@ def archive(self, name=None, fileobj=None, pretty=True):


class SigMFFileCollection(AbstractSigMFFileCollection):
"""Implementation of AbstractSigMFFileCollection representing
collection of multiple SigMF files"""

def __init__(self, sigmffiles: Iterable["sigmf.sigmffile.SigMFFile"]) -> None:
self.sigmffiles = sigmffiles
Expand Down
43 changes: 24 additions & 19 deletions tests/test_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ def test_tarfile_names_and_extensions(test_sigmffile):
assert basedir.name == test_sigmffile.name
archive_name = sigmf_tarfile.name
assert archive_name == temp.name
path.split(temp.name)[-1]
file_extensions = {SIGMF_DATASET_EXT, SIGMF_METADATA_EXT}

file1_name, file1_ext = path.splitext(file1.name)
Expand All @@ -127,6 +126,8 @@ def test_tarfile_names_and_extensions(test_sigmffile):


def test_tarfile_names_and_extensions_with_paths(test_sigmffile):
"""Verify SigMFFile with path like name has corresponding subfolders
when converted to an archive"""
with tempfile.NamedTemporaryFile() as temp:
test_sigmffile.name = os.path.join("test_folder", "test")
sigmf_tarfile = create_test_archive(test_sigmffile, temp)
Expand All @@ -135,7 +136,6 @@ def test_tarfile_names_and_extensions_with_paths(test_sigmffile):
assert subdir.name == test_sigmffile.name
archive_name = sigmf_tarfile.name
assert archive_name == temp.name
path.split(temp.name)[-1]
file_extensions = {SIGMF_DATASET_EXT, SIGMF_METADATA_EXT}

file1_name, file1_ext = path.splitext(file1.name)
Expand All @@ -153,6 +153,8 @@ def test_tarfile_names_and_extensions_with_paths(test_sigmffile):

def test_multirec_archive_into_fileobj(test_sigmffile,
test_alternate_sigmffile):
"""Test creating archive with multiple SigMFFiles by appending
to file object"""
with tempfile.NamedTemporaryFile() as t:
# add first sigmffile to the fileobj t
create_test_archive(test_sigmffile, t)
Expand Down Expand Up @@ -200,7 +202,8 @@ def test_tarfile_type(test_sigmffile):
assert sigmf_tarfile.format == tarfile.PAX_FORMAT


def test_create_archive_pathlike(test_sigmffile, test_alternate_sigmffile):
def test_create_archive_path(test_sigmffile, test_alternate_sigmffile):
"""Test creating an archive using a Path object"""
with tempfile.NamedTemporaryFile() as t:
input_sigmffiles = SigMFFileCollection([test_sigmffile, test_alternate_sigmffile])
arch = SigMFArchive(input_sigmffiles, path=Path(t.name))
Expand All @@ -210,22 +213,24 @@ def test_create_archive_pathlike(test_sigmffile, test_alternate_sigmffile):


def test_archive_names(test_sigmffile):
"""Test SigMF name when read from archive where
archive was created 3 different ways"""
with tempfile.NamedTemporaryFile(suffix=".sigmf") as t:
a = SigMFArchive(sigmffile_collection=test_sigmffile, path=t.name)
assert a.path == t.name
observed_sigmffile = sigmffile.fromarchive(t.name)
observed_sigmffile.name == test_sigmffile.name
assert observed_sigmffile.name == test_sigmffile.name

with tempfile.NamedTemporaryFile(suffix=".sigmf") as t:
archive_path = test_sigmffile.archive(t.name)
assert archive_path == t.name
observed_sigmffile = sigmffile.fromarchive(t.name)
observed_sigmffile.name == test_sigmffile.name
assert observed_sigmffile.name == test_sigmffile.name

with tempfile.NamedTemporaryFile(suffix=".sigmf") as t:
test_sigmffile.tofile(t.name, toarchive=True)
observed_sigmffile = sigmffile.fromarchive(t.name)
observed_sigmffile.name == test_sigmffile.name
assert observed_sigmffile.name == test_sigmffile.name


def test_archive_no_path_or_fileobj(test_sigmffile):
Expand All @@ -245,19 +250,19 @@ def test_fromfile_name_to_archive(test_sigmffile):
assert read_sigmffile.name == '/tmp/test_sigmf'
read_sigmffile.set_data_file(data_file=test_sigmffile.data_file)
read_sigmffile.archive('/tmp/testarchive.sigmf')
sigmf_tar = tarfile.open('/tmp/testarchive.sigmf')
basedir, subdir, file1, file2 = sigmf_tar.getmembers()
assert basedir.name == 'tmp'
assert subdir.name == 'tmp/test_sigmf'
if file1.name.endswith(SIGMF_DATASET_EXT):
sigmf_data = file1
sigmf_meta = file2
else:
sigmf_data = file2
sigmf_meta = file1

assert sigmf_data.name == 'tmp/test_sigmf/test_sigmf.sigmf-data'
assert sigmf_meta.name == 'tmp/test_sigmf/test_sigmf.sigmf-meta'
with tarfile.open('/tmp/testarchive.sigmf') as sigmf_tar:
basedir, subdir, file1, file2 = sigmf_tar.getmembers()
assert basedir.name == 'tmp'
assert subdir.name == 'tmp/test_sigmf'
if file1.name.endswith(SIGMF_DATASET_EXT):
sigmf_data = file1
sigmf_meta = file2
else:
sigmf_data = file2
sigmf_meta = file1

assert sigmf_data.name == 'tmp/test_sigmf/test_sigmf.sigmf-data'
assert sigmf_meta.name == 'tmp/test_sigmf/test_sigmf.sigmf-meta'
finally:
if os.path.exists('/tmp/test_sigmf.sigmf-meta'):
os.remove('/tmp/test_sigmf.sigmf-meta')
Expand Down
32 changes: 9 additions & 23 deletions tests/test_archivereader.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def test_access_data_without_untar(self):


def test_extract_single_recording(test_sigmffile):
"""Test reading an archive with 1 recording"""
with tempfile.NamedTemporaryFile() as tf:
expected_sigmffile = test_sigmffile
arch = SigMFArchive(expected_sigmffile, path=tf.name)
Expand All @@ -100,6 +101,7 @@ def test_extract_single_recording(test_sigmffile):


def test_extract_multi_recording(test_sigmffile, test_alternate_sigmffile):
"""Test reading an archive with 2 recordings"""
with tempfile.NamedTemporaryFile() as tf:
# Create a multi-recording archive
expected_sigmffiles = SigMFFileCollection([test_sigmffile, test_alternate_sigmffile])
Expand All @@ -110,19 +112,14 @@ def test_extract_multi_recording(test_sigmffile, test_alternate_sigmffile):
assert expected in reader.sigmffiles


def test_archivereader_different_folder(test_sigmffile,
def test_archivereader_subfolders_1(test_sigmffile,
test_alternate_sigmffile):
"""Test reading a SigMF archive containing 2 subfolders each containing a subfolder"""
try:
os.makedirs("folder1", exist_ok=True)
test_sigmffile.name = os.path.join("folder1", "test1")
os.makedirs("folder2", exist_ok=True)
test_alternate_sigmffile.name = os.path.join("folder2", "test2")
meta1_filepath = test_sigmffile.name + SIGMF_METADATA_EXT
with open(meta1_filepath, "w") as meta_fd:
test_sigmffile.dump(meta_fd)
meta2_filepath = test_alternate_sigmffile.name + SIGMF_METADATA_EXT
with open(meta2_filepath, "w") as meta_fd:
test_alternate_sigmffile.dump(meta_fd)

os.makedirs("archive_folder", exist_ok=True)
archive_path = os.path.join("archive_folder", "test_archive.sigmf")
Expand All @@ -133,10 +130,6 @@ def test_archivereader_different_folder(test_sigmffile,
for actual_sigmffile in reader:
assert actual_sigmffile in input_sigmffiles.get_sigmffiles()
finally:
if os.path.exists(meta1_filepath):
os.remove(meta1_filepath)
if os.path.exists(meta2_filepath):
os.remove(meta2_filepath)
if os.path.exists(archive_path):
os.remove(archive_path)
if os.path.exists("folder1"):
Expand All @@ -147,30 +140,23 @@ def test_archivereader_different_folder(test_sigmffile,
shutil.rmtree("archive_folder")


def test_archivereader_same_folder(test_sigmffile,
def test_archivereader_subfolders_2(test_sigmffile,
test_alternate_sigmffile):
"""Test reading a SigMF archive containing 1 subfolder containing 2 additional subfolders"""
try:
os.makedirs("folder1", exist_ok=True)
test_sigmffile.name = os.path.join("folder1", "test1")
test_alternate_sigmffile.name = os.path.join("folder1", "test2")
meta1_filepath = test_sigmffile.name + SIGMF_METADATA_EXT
with open(meta1_filepath, "w") as meta_fd:
test_sigmffile.dump(meta_fd)
meta2_filepath = test_alternate_sigmffile.name + SIGMF_METADATA_EXT
with open(meta2_filepath, "w") as meta_fd:
test_alternate_sigmffile.dump(meta_fd)
archive_path = os.path.join("folder1", "test_archive.sigmf")

os.makedirs("archive_folder", exist_ok=True)
archive_path = os.path.join("archive_folder", "test_archive.sigmf")
input_sigmffiles = SigMFFileCollection([test_sigmffile, test_alternate_sigmffile])
arch = SigMFArchive(input_sigmffiles, path=archive_path)
reader = SigMFArchiveReader(arch.path)
assert len(reader) == 2 # number of SigMFFiles
for actual_sigmffile in reader:
assert actual_sigmffile in input_sigmffiles.get_sigmffiles()
finally:
if os.path.exists(meta1_filepath):
os.remove(meta1_filepath)
if os.path.exists(meta2_filepath):
os.remove(meta2_filepath)
if os.path.exists(archive_path):
os.remove(archive_path)
if os.path.exists("folder1"):
Expand Down

0 comments on commit 74f16bc

Please sign in to comment.