Skip to content

Commit

Permalink
Support conversion to Pandas dataframe when file opened without readi…
Browse files Browse the repository at this point in the history
…ng data (#214)
  • Loading branch information
adamreeve committed Aug 26, 2020
1 parent 4d5d1bf commit ce1810e
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 37 deletions.
16 changes: 16 additions & 0 deletions nptdms/channel_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,22 @@ def append_data(self, new_data):
self._data_insert_position += len(new_data)


class RawDataSlice(object):
def __init__(self, data, scaler_data):
self.data = data
self.scaler_data = scaler_data


def slice_raw_data(raw_data, offset, length=None):
if offset == 0 and length is None:
return raw_data
end = None if length is None else offset + length
data = None if raw_data.data is None else raw_data.data[offset:end]
scaler_data = dict(
(scale_id, scaler_data[offset:end]) for (scale_id, scaler_data) in raw_data.scaler_data.items())
return RawDataSlice(data, scaler_data)


def _new_numpy_array(dtype, num_values, memmap_dir=None):
"""Initialise a new numpy array for data
Expand Down
10 changes: 6 additions & 4 deletions nptdms/export/pandas_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,17 @@ def _channels_to_dataframe(channels_to_export, time_index=False, absolute_time=F
for column_name, channel in channels_to_export.items():
index = channel.time_track(absolute_time) if time_index else None
if scaled_data:
dataframe_dict[column_name] = pd.Series(data=_array_for_pd(channel.data), index=index)
dataframe_dict[column_name] = pd.Series(data=_array_for_pd(channel[:]), index=index)
elif channel.scaler_data_types:
# Channel has DAQmx raw data
for scale_id, raw_data in channel.raw_scaler_data.items():
raw_data = channel.read_data(scaled=False)
for scale_id, scaler_data in raw_data.items():
scaler_column_name = column_name + "[{0:d}]".format(scale_id)
dataframe_dict[scaler_column_name] = pd.Series(data=raw_data, index=index)
dataframe_dict[scaler_column_name] = pd.Series(data=scaler_data, index=index)
else:
# Raw data for normal TDMS file
dataframe_dict[column_name] = pd.Series(data=_array_for_pd(channel.raw_data), index=index)
raw_data = channel.read_data(scaled=False)
dataframe_dict[column_name] = pd.Series(data=_array_for_pd(raw_data), index=index)
return pd.DataFrame.from_dict(dataframe_dict)


Expand Down
8 changes: 6 additions & 2 deletions nptdms/tdms.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from nptdms.log import log_manager
from nptdms.common import ObjectPath
from nptdms.reader import TdmsReader
from nptdms.channel_data import get_data_receiver
from nptdms.channel_data import get_data_receiver, slice_raw_data
from nptdms.export import hdf_export, pandas_export
from nptdms.base_segment import RawChannelDataChunk
from nptdms.timestamp import TdmsTimestamp, TimestampArray
Expand Down Expand Up @@ -706,7 +706,11 @@ def read_data(self, offset=0, length=None, scaled=True):
Set this parameter to False to return raw unscaled data.
For DAQmx data a dictionary of scaler id to raw scaler data will be returned.
"""
raw_data = self._read_channel_data(offset, length)
if self._raw_data is None:
raw_data = self._read_channel_data(offset, length)
else:
raw_data = slice_raw_data(self._raw_data, offset, length)

if raw_data is None:
dtype = self.dtype if scaled else self._raw_data_dtype()
return np.empty((0,), dtype=dtype)
Expand Down
6 changes: 3 additions & 3 deletions nptdms/test/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ def single_segment_with_one_channel():
test_file.add_segment(
("kTocMetaData", "kTocRawData", "kTocNewObjList"),
segment_objects_metadata(
channel_metadata("/'group'/'channel1'", TDS_TYPE_INT32, 2),
channel_metadata("/'group'/'channel1'", TDS_TYPE_INT32, 4),
),
"01 00 00 00" "02 00 00 00"
"01 00 00 00" "02 00 00 00" "03 00 00 00" "04 00 00 00"
)
expected_data = {
('group', 'channel1'): np.array([1, 2], dtype=np.int32),
('group', 'channel1'): np.array([1, 2, 3, 4], dtype=np.int32),
}
return test_file, expected_data

Expand Down
45 changes: 45 additions & 0 deletions nptdms/test/test_daqmx.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,51 @@ def test_lazily_reading_a_subset_of_raw_channel_data():
np.testing.assert_array_equal(data_2[0], [18, 19])


@pytest.mark.parametrize('offset,length', [
(0, None),
(1, None),
(0, 2),
(1, 2),
])
def test_read_raw_data(offset, length):
# Single scale which is just the raw DAQmx scaler data
properties = {
"NI_Number_Of_Scales": (3, "01 00 00 00"),
}
scaler_1 = daqmx_scaler_metadata(1, 3, 0)
scaler_2 = daqmx_scaler_metadata(2, 3, 2)
metadata = segment_objects_metadata(
root_metadata(),
group_metadata(),
daqmx_channel_metadata("Channel1", 4, [4], [scaler_1], properties),
daqmx_channel_metadata("Channel2", 4, [4], [scaler_2], properties))
data = (
# Data for segment
"01 00"
"11 00"
"02 00"
"12 00"
"03 00"
"13 00"
"04 00"
"14 00"
)

test_file = GeneratedFile()
test_file.add_segment(segment_toc(), metadata, data)

end = None if length is None else offset + length
with test_file.get_tempfile() as temp_file:
tdms_file = TdmsFile.read(temp_file.file)
data_1 = tdms_file["Group"]["Channel1"].read_data(offset=offset, length=length, scaled=False)
assert data_1[1].dtype == np.int16
np.testing.assert_array_equal(data_1[1], [1, 2, 3, 4][offset:end])

data_2 = tdms_file["Group"]["Channel2"].read_data(offset=offset, length=length, scaled=False)
assert data_2[2].dtype == np.int16
np.testing.assert_array_equal(data_2[2], [17, 18, 19, 20][offset:end])


def test_stream_data_chunks():
"""Test streaming chunks of DAQmx data from a TDMS file
"""
Expand Down
46 changes: 30 additions & 16 deletions nptdms/test/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
except ImportError:
pytest.skip("Skipping Pandas tests as Pandas is not installed", allow_module_level=True)

from nptdms import TdmsFile
from nptdms.test import scenarios
from nptdms.test.test_daqmx import daqmx_channel_metadata, daqmx_scaler_metadata
from nptdms.test.util import (
Expand Down Expand Up @@ -136,14 +137,14 @@ def timed_segment():
return toc, metadata, data


def test_file_as_dataframe():
@pytest.mark.parametrize('lazy_load', [True, False])
def test_file_as_dataframe(lazy_load):
"""Test converting file to Pandas dataframe"""

test_file = GeneratedFile()
test_file.add_segment(*timed_segment())
tdms_data = test_file.load()

df = tdms_data.as_dataframe()
df = on_test_file(test_file, lazy_load, lambda tdms_data: tdms_data.as_dataframe())

assert len(df) == 2
assert "/'Group'/'Channel1'" in df.keys()
Expand Down Expand Up @@ -191,14 +192,15 @@ def test_file_as_dataframe_with_absolute_time():
assert (df.index == expected_start)[0]


def test_group_as_dataframe():
@pytest.mark.parametrize('lazy_load', [True, False])
def test_group_as_dataframe(lazy_load):
"""Convert a group to dataframe"""

test_file = GeneratedFile()
test_file.add_segment(*timed_segment())
tdms_data = test_file.load()

df = tdms_data["Group"].as_dataframe()
df = on_test_file(test_file, lazy_load, lambda tdms_data: tdms_data["Group"].as_dataframe())

assert len(df) == 2
assert len(df.keys()) == 2
assert "Channel1" in df.keys()
Expand All @@ -207,14 +209,16 @@ def test_group_as_dataframe():
assert (df["Channel2"] == [3, 4]).all()


def test_channel_as_dataframe():
@pytest.mark.parametrize('lazy_load', [True, False])
def test_channel_as_dataframe(lazy_load):
"""Convert a channel to dataframe"""

test_file = GeneratedFile()
test_file.add_segment(*timed_segment())
tdms_data = test_file.load()

df = tdms_data["Group"]["Channel2"].as_dataframe()
df = on_test_file(
test_file, lazy_load, lambda tdms_data: tdms_data["Group"]["Channel2"].as_dataframe())

assert len(df) == 2
assert len(df.keys()) == 1
assert "/'Group'/'Channel2'" in df.keys()
Expand Down Expand Up @@ -265,19 +269,21 @@ def test_channel_as_dataframe_with_absolute_time():
assert (df.index == expected_start)[0]


def test_channel_as_dataframe_with_raw_data():
@pytest.mark.parametrize('lazy_load', [True, False])
def test_channel_as_dataframe_with_raw_data(lazy_load):
"""Convert channel to Pandas dataframe with absolute time index"""

test_file, _ = scenarios.scaled_data().values
expected_raw_data = np.array([1, 2, 3, 4], dtype=np.int32)
tdms_data = test_file.load()

df = tdms_data["group"]["channel1"].as_dataframe(scaled_data=False)
df = on_test_file(
test_file, lazy_load, lambda tdms_data: tdms_data["group"]["channel1"].as_dataframe(scaled_data=False))

np.testing.assert_equal(df["/'group'/'channel1'"], expected_raw_data)


def test_raw_daqmx_channel_export():
@pytest.mark.parametrize('lazy_load', [True, False])
def test_raw_daqmx_channel_export(lazy_load):
""" Test exporting raw daqmx data for a channel
"""

Expand All @@ -302,10 +308,9 @@ def test_raw_daqmx_channel_export():
segment_toc = (
"kTocMetaData", "kTocRawData", "kTocNewObjList", "kTocDAQmxRawData")
test_file.add_segment(segment_toc, metadata, data)
tdms_data = test_file.load()
channel = tdms_data["Group"]["Channel1"]
dataframe = on_test_file(
test_file, lazy_load, lambda tdms_data: tdms_data["Group"]["Channel1"].as_dataframe(scaled_data=False))

dataframe = channel.as_dataframe(scaled_data=False)
expected_data = {
0: np.array([1, 2, 3, 4], dtype=np.int16),
1: np.array([17, 18, 19, 20], dtype=np.int16),
Expand Down Expand Up @@ -339,3 +344,12 @@ def test_export_with_empty_channels():
assert (df["channel1"] == [1, 2]).all()
assert len(df["channel2"]) == 2
assert np.isnan(df["channel2"]).all()


def on_test_file(test_file, lazy_load, func):
if lazy_load:
with test_file.get_tempfile() as temp_file:
with TdmsFile.open(temp_file) as tdms_file:
return func(tdms_file)
else:
return func(test_file.load())
48 changes: 36 additions & 12 deletions nptdms/test/test_tdms_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ def test_lazily_read_channel_data(test_file, expected_data):
compare_arrays(actual_data, expected_data)


def test_read_raw_channel_data():
"""Test reading raw channel data"""

test_file, expected_data = scenarios.single_segment_with_one_channel().values
with test_file.get_tempfile() as temp_file:
tdms_file = TdmsFile.read(temp_file.file)
for ((group, channel), expected_data) in expected_data.items():
actual_data = tdms_file[group][channel].read_data(scaled=False)
assert actual_data.dtype == expected_data.dtype
compare_arrays(actual_data, expected_data)


def test_lazily_read_raw_channel_data():
"""Test reading raw channel data lazily"""

Expand All @@ -64,6 +76,30 @@ def test_lazily_read_raw_channel_data():
compare_arrays(actual_data, expected_data)


def test_read_raw_channel_data_slice():
"""Test reading a slice of raw channel data"""

test_file, expected_data = scenarios.single_segment_with_one_channel().values
with test_file.get_tempfile() as temp_file:
tdms_file = TdmsFile.read(temp_file.file)
for ((group, channel), expected_data) in expected_data.items():
actual_data = tdms_file[group][channel].read_data(offset=1, length=2, scaled=False)
assert actual_data.dtype == expected_data.dtype
compare_arrays(actual_data, expected_data[1:3])


def test_lazily_read_raw_channel_data_slice():
"""Test reading raw channel data lazily"""

test_file, expected_data = scenarios.single_segment_with_one_channel().values
with test_file.get_tempfile() as temp_file:
with TdmsFile.open(temp_file.file) as tdms_file:
for ((group, channel), expected_data) in expected_data.items():
actual_data = tdms_file[group][channel].read_data(offset=1, length=2, scaled=False)
assert actual_data.dtype == expected_data.dtype
compare_arrays(actual_data, expected_data[1:3])


def test_lazily_read_channel_data_with_file_path():
"""Test reading channel data lazily after initialising with a file path
"""
Expand Down Expand Up @@ -414,18 +450,6 @@ def test_read_data_after_close_throws():
assert "Cannot read data after the underlying TDMS reader is closed" in str(exc_info.value)


def test_read_data_after_open_in_read_mode_throws():
""" Trying to read channel data after reading all data initially should throw
"""
test_file, expected_data = scenarios.single_segment_with_one_channel().values
group, channel = list(expected_data.keys())[0]
with test_file.get_tempfile() as temp_file:
tdms_file = TdmsFile.read(temp_file.file)
with pytest.raises(RuntimeError) as exc_info:
tdms_file[group][channel].read_data()
assert "Cannot read data after the underlying TDMS reader is closed" in str(exc_info.value)


def test_access_data_property_after_opening_throws():
""" Accessing the data property after opening without reading data should throw
"""
Expand Down

0 comments on commit ce1810e

Please sign in to comment.