Skip to content

Commit

Permalink
Implement indexing into channels (#172)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreeve committed Mar 30, 2020
1 parent 8376132 commit a957c07
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 14 deletions.
27 changes: 27 additions & 0 deletions nptdms/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,33 @@ def read_raw_data_for_channel(self, channel_path, offset=0, length=None):

segment_index += 1

def read_channel_chunk_for_index(self, channel_path, index):
""" Read the chunk containing the given index
:returns: Tuple of raw channel data chunk and the integer offset to the beginning of the chunk
:rtype: (RawChannelDataChunk, int)
"""
if self._segments is None:
raise RuntimeError("Cannot read data unless metadata has first been read")

if self._segment_channel_offsets is None:
with Timer(log, "Build data index"):
self._build_index()
segment_offsets = self._segment_channel_offsets[channel_path]

# Binary search to find the segment to read
segment_index = np.searchsorted(segment_offsets, index, side='right')
segment = self._segments[segment_index]
chunk_size = self._segment_chunk_sizes[channel_path][segment_index]
segment_start_index = segment_offsets[segment_index - 1] if segment_index > 0 else 0

index_in_segment = index - segment_start_index
chunk_index = index_in_segment // chunk_size

chunk_data = next(segment.read_raw_data_for_channel(self._file, channel_path, chunk_index, 1))
chunk_offset = segment_start_index + chunk_index * chunk_size
return chunk_data, chunk_offset

def _update_object_metadata(self, segment):
""" Update object metadata using the metadata read from a single segment
"""
Expand Down
81 changes: 81 additions & 0 deletions nptdms/tdms.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,12 @@ def _read_channel_data_chunks(self, channel):
for chunk in self._reader.read_raw_data_for_channel(channel.path):
yield chunk

def _read_channel_data_chunk_for_index(self, channel, index):
if self._reader is None:
raise RuntimeError(
"Cannot read channel data after the underlying TDMS reader is closed")
return self._reader.read_channel_chunk_for_index(channel.path, index)

def _read_channel_data(self, channel, offset=0, length=None):
if offset < 0:
raise ValueError("offset must be non-negative")
Expand Down Expand Up @@ -503,6 +509,9 @@ def __init__(
self._raw_data = None
self._data_scaled = None

self._cached_chunk = None
self._cached_chunk_bounds = None

def __repr__(self):
return "<TdmsChannel with path %s>" % self.path

Expand All @@ -515,6 +524,18 @@ def __iter__(self):
else:
return self._read_data_values()

def __getitem__(self, index):
if self._raw_data is not None:
return self.data[index]
elif index is Ellipsis:
return self.read_data()
elif isinstance(index, slice):
return self._read_slice(index.start, index.stop, index.step)
elif isinstance(index, int):
return self._read_at_index(index)
else:
raise TypeError("Invalid index type '%s', expected int, slice or Ellipsis" % type(index).__name__)

@_property_builtin
def path(self):
""" Path to the TDMS object for this channel
Expand Down Expand Up @@ -703,6 +724,66 @@ def _read_data_values(self):
for value in chunk:
yield value

def _read_slice(self, start, stop, step):
if step == 0:
raise ValueError("Step size cannot be zero")

# Replace None values with defaults
step = 1 if step is None else step
if start is None:
start = 0 if step > 0 else -1
if stop is None:
stop = self._length if step > 0 else -1 - self._length

# Handle negative indices
if start < 0:
start = self._length + start
if stop < 0:
stop = self._length + stop

# Check for empty ranges
if stop == start:
return np.empty((0, ), dtype=self.dtype)
if step > 0 and (stop < start or start >= self._length or stop < 0):
return np.empty((0,), dtype=self.dtype)
if step < 0 and (stop > start or stop >= self._length or start < 0):
return np.empty((0,), dtype=self.dtype)

# Trim values outside bounds
if start < 0:
start = 0
if start >= self._length:
start = self._length - 1
if stop > self._length:
stop = self._length
if stop < -1:
stop = -1

# Read data and handle step size
if step > 0:
read_data = self.read_data(start, stop - start)
return read_data[::step] if step > 1 else read_data
else:
read_data = self.read_data(stop + 1, start - stop)
return read_data[::step]

def _read_at_index(self, index):
if index < 0 or index >= self._length:
raise IndexError("Index {0} is outside of the channel bounds [0, {1}]".format(index, self._length - 1))

if self._cached_chunk is not None:
# Check if we've already read and cached the chunk containing this index
bounds = self._cached_chunk_bounds
if bounds[0] <= index < bounds[1]:
return self._cached_chunk[index - bounds[0]]

chunk, chunk_offset = self._tdms_file._read_channel_data_chunk_for_index(self, index)
scaled_chunk = self._scale_data(chunk)
self._cached_chunk = scaled_chunk
self._cached_chunk_bounds = (chunk_offset, chunk_offset + len(scaled_chunk))

return scaled_chunk[index - chunk_offset]

def _scale_data(self, raw_data):
scale = self._get_scaling()
if scale is not None:
Expand Down
24 changes: 12 additions & 12 deletions nptdms/test/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,22 +375,22 @@ def chunked_segment():
channel_metadata("/'group'/'channel1'", TDS_TYPE_INT32, 2),
channel_metadata("/'group'/'channel2'", TDS_TYPE_INT32, 2),
),
"01 00 00 00" "02 00 00 00"
"03 00 00 00" "04 00 00 00"
"05 00 00 00" "06 00 00 00"
"07 00 00 00" "08 00 00 00"
"00 00 00 00" "01 00 00 00"
"0A 00 00 00" "0B 00 00 00"
"02 00 00 00" "03 00 00 00"
"0C 00 00 00" "0D 00 00 00"
)
test_file.add_segment(
("kTocRawData", ),
"",
"07 00 00 00" "08 00 00 00"
"05 00 00 00" "06 00 00 00"
"03 00 00 00" "04 00 00 00"
"01 00 00 00" "02 00 00 00"
"04 00 00 00" "05 00 00 00"
"0E 00 00 00" "0F 00 00 00"
"06 00 00 00" "07 00 00 00"
"10 00 00 00" "11 00 00 00"
)
expected_data = {
('group', 'channel1'): np.array([1, 2, 5, 6, 7, 8, 3, 4], dtype=np.int32),
('group', 'channel2'): np.array([3, 4, 7, 8, 5, 6, 1, 2], dtype=np.int32),
('group', 'channel1'): np.array([0, 1, 2, 3, 4, 5, 6, 7], dtype=np.int32),
('group', 'channel2'): np.array([10, 11, 12, 13, 14, 15, 16, 17], dtype=np.int32),
}
return test_file, expected_data

Expand Down Expand Up @@ -696,10 +696,10 @@ def scaled_data():
segment_objects_metadata(
channel_metadata("/'group'/'channel1'", TDS_TYPE_INT32, 2, properties),
),
"01 00 00 00" "02 00 00 00"
"01 00 00 00" "02 00 00 00" "03 00 00 00" "04 00 00 00"
)
expected_data = {
('group', 'channel1'): np.array([12, 14], dtype=np.float64),
('group', 'channel1'): np.array([12, 14, 16, 18], dtype=np.float64),
}
return test_file, expected_data

Expand Down
2 changes: 1 addition & 1 deletion nptdms/test/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def test_channel_as_dataframe_with_raw_data():
"""Convert channel to Pandas dataframe with absolute time index"""

test_file, _ = scenarios.scaled_data().values
expected_raw_data = np.array([1, 2], dtype=np.int32)
expected_raw_data = np.array([1, 2, 3, 4], dtype=np.int32)
tdms_data = test_file.load()

df = tdms_data["group"]["channel1"].as_dataframe(scaled_data=False)
Expand Down
130 changes: 129 additions & 1 deletion nptdms/test/test_tdms_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections import defaultdict
import os
import tempfile
from hypothesis import (assume, given, example, strategies)
from hypothesis import (assume, given, example, settings, strategies)
import numpy as np
import pytest
from nptdms import TdmsFile
Expand Down Expand Up @@ -233,6 +233,134 @@ def test_iterate_file_and_groups():
compare_arrays(channel.data, expected_channel_data)


def test_indexing_channel_after_read_data():
""" Test indexing into a channel after reading all data
"""
test_file, expected_data = scenarios.chunked_segment().values
with test_file.get_tempfile() as temp_file:
tdms_file = TdmsFile.read(temp_file.file)
for ((group, channel), expected_channel_data) in expected_data.items():
channel_object = tdms_file[group][channel]
assert channel_object[0] == expected_channel_data[0]
compare_arrays(channel_object[:], expected_channel_data)


@given(index=strategies.integers(0, 7))
def test_indexing_channel_with_integer(index):
""" Test indexing into a channel with an integer index
"""
test_file, expected_data = scenarios.chunked_segment().values
with test_file.get_tempfile() as temp_file:
with TdmsFile.open(temp_file.file) as tdms_file:
for ((group, channel), expected_channel_data) in expected_data.items():
channel_object = tdms_file[group][channel]
assert channel_object[index] == expected_channel_data[index]


def test_indexing_channel_with_integer_and_caching():
""" Test indexing into a channel with an integer index, reusing the same file to test caching
"""
test_file, expected_data = scenarios.chunked_segment().values
with test_file.get_tempfile() as temp_file:
with TdmsFile.open(temp_file.file) as tdms_file:
for ((group, channel), expected_channel_data) in expected_data.items():
channel_object = tdms_file[group][channel]
values = []
for i in range(len(channel_object)):
values.append(channel_object[i])
compare_arrays(values, expected_channel_data)


def test_indexing_scaled_channel_with_integer():
""" Test indexing into a channel with an integer index when the channel is scaled
"""
test_file, expected_data = scenarios.scaled_data().values
with test_file.get_tempfile() as temp_file:
with TdmsFile.open(temp_file.file) as tdms_file:
for ((group, channel), expected_channel_data) in expected_data.items():
channel_object = tdms_file[group][channel]
values = []
for i in range(len(channel_object)):
values.append(channel_object[i])
compare_arrays(values, expected_channel_data)


def test_indexing_channel_with_ellipsis():
""" Test indexing into a channel with ellipsis returns all data
"""
test_file, expected_data = scenarios.chunked_segment().values
with test_file.get_tempfile() as temp_file:
with TdmsFile.open(temp_file.file) as tdms_file:
for ((group, channel), expected_channel_data) in expected_data.items():
channel_object = tdms_file[group][channel]
compare_arrays(channel_object[...], expected_channel_data)


@pytest.fixture(scope="module")
def opened_tdms_file():
""" Allow re-use of an opened TDMS file
"""
test_file, expected_data = scenarios.chunked_segment().values
with test_file.get_tempfile() as temp_file:
with TdmsFile.open(temp_file.file) as tdms_file:
yield tdms_file, expected_data


@given(
start=strategies.integers(-10, 10) | strategies.none(),
stop=strategies.integers(-10, 10) | strategies.none(),
step=strategies.integers(-5, 5).filter(lambda i: i != 0) | strategies.none(),
)
@settings(max_examples=1000)
def test_indexing_channel_with_slice(opened_tdms_file, start, stop, step):
""" Test indexing into a channel with a slice
"""
tdms_file, expected_data = opened_tdms_file
for ((group, channel), expected_channel_data) in expected_data.items():
channel_object = tdms_file[group][channel]
compare_arrays(channel_object[start:stop:step], expected_channel_data[start:stop:step])


@pytest.mark.parametrize('index', [-9, 8])
def test_indexing_channel_with_invalid_integer_raises_error(index):
""" Test indexing into a channel with an invalid integer index
"""
test_file, expected_data = scenarios.chunked_segment().values
with test_file.get_tempfile() as temp_file:
with TdmsFile.open(temp_file.file) as tdms_file:
for ((group, channel), expected_channel_data) in expected_data.items():
channel_object = tdms_file[group][channel]
with pytest.raises(IndexError):
_ = channel_object[index]


def test_indexing_channel_with_zero_step_raises_error():
""" Test indexing into a channel with a slice with zero step size raises an error
"""
test_file, expected_data = scenarios.chunked_segment().values
with test_file.get_tempfile() as temp_file:
with TdmsFile.open(temp_file.file) as tdms_file:
for ((group, channel), expected_channel_data) in expected_data.items():
channel_object = tdms_file[group][channel]
with pytest.raises(ValueError) as exc_info:
_ = channel_object[::0]
assert str(exc_info.value) == "Step size cannot be zero"


@pytest.mark.parametrize('index', ["test", None])
def test_indexing_channel_with_invalid_type_raises_error(index):
""" Test indexing into a channel with an invalid index type
"""
test_file, expected_data = scenarios.chunked_segment().values
with test_file.get_tempfile() as temp_file:
with TdmsFile.open(temp_file.file) as tdms_file:
for ((group, channel), expected_channel_data) in expected_data.items():
channel_object = tdms_file[group][channel]
with pytest.raises(TypeError) as exc_info:
_ = channel_object[index]
assert "Invalid index type" in str(exc_info.value)


def test_invalid_offset_in_read_data_throws():
""" Exception is thrown when reading a subset of data with an invalid offset
"""
Expand Down

0 comments on commit a957c07

Please sign in to comment.