Skip to content

Commit

Permalink
Write header parser with Numpy
Browse files Browse the repository at this point in the history
  • Loading branch information
cdeil committed Oct 8, 2021
1 parent 44d7dad commit d35a1fe
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 28 deletions.
26 changes: 22 additions & 4 deletions test_whisper_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pandas as pd
from numpy.testing import assert_allclose

from whisper_pandas import WhisperFile, WhisperFileMeta
from whisper_pandas import WhisperFile, WhisperFileMeta, WhisperArchiveMeta


@pytest.fixture(scope="session")
Expand All @@ -22,9 +22,27 @@ def test_meta(meta):
assert meta.max_retention == 315363600
assert_allclose(meta.x_files_factor, 0.5)

assert meta.archives[0].seconds_per_point == 10
assert meta.archives[1].seconds_per_point == 60
assert meta.archives[2].seconds_per_point == 3600
assert meta.header_size == 52
assert meta.file_size == 82785664
assert meta.file_size_actual == 82785664
assert meta.file_size_mismatch is False

assert len(meta.archives) == 3
assert meta.archives[0] == WhisperArchiveMeta(
index=0, offset=52, seconds_per_point=10, points=1555200
)
assert meta.archives[1] == WhisperArchiveMeta(
index=1,
offset=18662452,
seconds_per_point=60,
points=5256000,
)
assert meta.archives[2] == WhisperArchiveMeta(
index=2,
offset=81734452,
seconds_per_point=3600,
points=87601,
)


def test_data_archive_1(wsp):
Expand Down
105 changes: 81 additions & 24 deletions whisper_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import List
import numpy as np
import pandas as pd
import whisper

__all__ = [
"WhisperFile",
Expand All @@ -16,11 +15,28 @@

# Whisper file element formats
# See https://graphite.readthedocs.io/en/latest/whisper.html#database-format

FMT_FILE_META = np.dtype(
[
("aggregation_type", ">u4"),
("max_retention", ">u4"),
("x_files_factor", ">f4"),
("archive_count", ">u4"),
]
)
FMT_ARCHIVE_META = np.dtype(
[("offset", ">u4"), ("seconds_per_point", ">u4"), ("points", ">u4")]
)
FMT_POINT = np.dtype([("time", ">u4"), ("val", ">f8")])
# TODO: change to format dtype and use it for parsing header
N_ARCHIVE = 12
N_META = 16
AGGREGATION_TYPE_TO_METHOD = {
1: "average",
2: "sum",
3: "last",
4: "max",
5: "min",
6: "avg_zero",
7: "absmax",
8: "absmin",
}


@dataclasses.dataclass
Expand All @@ -31,10 +47,23 @@ class WhisperArchiveMeta:
offset: int
seconds_per_point: int
points: int
retention: int

@classmethod
def _from_fh(cls, fh, index: int):
meta = np.fromfile(fh, dtype=FMT_ARCHIVE_META, count=1)[0]
return cls(
index=index,
offset=int(meta["offset"]),
seconds_per_point=float(meta["seconds_per_point"]),
points=int(meta["points"]),
)

@property
def retention(self) -> int:
return self.seconds_per_point * self.points

@property
def size(self):
def size(self) -> int:
return FMT_POINT.itemsize * self.points

def print_info(self):
Expand All @@ -56,32 +85,37 @@ class WhisperFileMeta:
x_files_factor: float
archives: List[WhisperArchiveMeta]

@staticmethod
def _meta_from_fh(fh):
meta = np.fromfile(fh, dtype=FMT_FILE_META, count=1)[0]
aggregation_method = AGGREGATION_TYPE_TO_METHOD[int(meta["aggregation_type"])]
return {
"aggregation_method": aggregation_method,
"max_retention": int(meta["max_retention"]),
"x_files_factor": float(meta["x_files_factor"]),
"archive_count": int(meta["archive_count"]),
}

@classmethod
def read(cls, path) -> "WhisperFileMeta":
info = whisper.info(path)
def _from_fh(cls, fh, path) -> "WhisperFileMeta":
file_meta = cls._meta_from_fh(fh)
archives = []
for index, _ in enumerate(info["archives"]):
archive = WhisperArchiveMeta(
index=index,
offset=_["offset"],
seconds_per_point=_["secondsPerPoint"],
points=_["points"],
retention=_["retention"],
)
archives.append(archive)
for idx in range(file_meta["archive_count"]):
archive_meta = WhisperArchiveMeta._from_fh(fh, idx)
archives.append(archive_meta)

return cls(
path=str(path),
aggregation_method=info["aggregationMethod"],
max_retention=info["maxRetention"],
x_files_factor=info["xFilesFactor"],
aggregation_method=file_meta["aggregation_method"],
max_retention=file_meta["max_retention"],
x_files_factor=file_meta["x_files_factor"],
archives=archives,
)

@property
def header_size(self) -> int:
"""Whisper file header size in bytes"""
return N_META + N_ARCHIVE * len(self.archives)
return FMT_FILE_META.itemsize + FMT_ARCHIVE_META.itemsize * len(self.archives)

@property
def file_size(self) -> int:
Expand Down Expand Up @@ -123,7 +157,11 @@ class WhisperFile:

@classmethod
def read(
cls, path, archives: List[int] = None, dtype: str = "float32"
cls,
path,
archives: List[int] = None,
dtype: str = "float32",
meta_only: bool = False,
) -> "WhisperFile":
"""Read Whisper archive into a pandas.Series.
Expand All @@ -137,12 +175,19 @@ def read(
Default: all
dtype : {"float32", "float64"}
Value float data type
meta_only : bool
Only read metadata from file header
"""
meta = WhisperFileMeta.read(path)
with Path(path).open("rb") as fh:
meta = WhisperFileMeta._from_fh(fh, path=path)

if meta_only:
return cls(meta=meta, data=[])

data = []
for archive_id in range(len(meta.archives)):
if archives is None or archive_id in archives:
# TODO: pass fh here, avoid 2x file open
series = read_whisper_archive(
path, info=meta.archives[archive_id], dtype=dtype
)
Expand Down Expand Up @@ -182,6 +227,18 @@ def read_whisper_archive(
return pd.Series(val, index).sort_index()


def read_whisper_archive_dataframe(
path: str, archive_id: int, dtype: str = "float32"
) -> pd.DataFrame:
info = WhisperFileMeta.read(path).archives[archive_id]
data = np.fromfile(path, dtype=FMT_POINT, count=info.points, offset=info.offset)
data = data[data["time"] != 0]
value = data["val"].astype(dtype)
time = data["time"].astype("uint32")
df = pd.DataFrame({"timestamp": time, "value": value}).sort_values("timestamp")
return df


def main():
"""Command line tool"""
parser = argparse.ArgumentParser()
Expand Down

0 comments on commit d35a1fe

Please sign in to comment.