Skip to content

Commit

Permalink
Update from staging (#54)
Browse files Browse the repository at this point in the history
* Threadpool executor (#22)

* Release v5.15.0

* update protobuf to v4.22.3

* Add threaded streamset calls

Using concurrent.futures.ThreadPoolExecutor

* Blacken code

* Update for failing tests

* Ignore flake8 as part of testing

pytest-flake8 seems to have issues with the later versions of flake8

tholo/pytest-flake8#92

* Update .gitignore

* Update ignore and remove extra print.

* Remove idea folder (pycharm)

---------

Co-authored-by: David Konigsberg <72822263+davidkonigsberg@users.noreply.github.com>
Co-authored-by: Jeff Lin <42981468+jleifnf@users.noreply.github.com>

* Threaded arrow (#23)

* Release v5.15.0

* update protobuf to v4.22.3

* Add threaded streamset calls

Using concurrent.futures.ThreadPoolExecutor

* Blacken code

* Update for failing tests

* Ignore flake8 as part of testing

pytest-flake8 seems to have issues with the later versions of flake8

tholo/pytest-flake8#92

* Update .gitignore

* Update proto definitions.

* Update endpoint to support arrow methods

* Support arrow endpoints

* Additional arrow updates

* Update transformers, add polars conversion

* Update .gitignore

* Update ignore and remove extra print.

* Remove idea folder (pycharm)

* Update requirements.txt

* Update btrdb/transformers.py

* Update the way to check for arrow-enabled btrdb

This has not been "turned on" yet though, since we dont know the version number this will be enabled for. The method is currently commented out, but can be re-enabled pretty easily.

* Use IPC streams to send the arrow bytes for insert

Instead of writing out feather files to an `io.BytesIO` stream and then sending the feather files over the wire, this creates a buffered outputstream and then sends that data back as bytes to btrdb.

* Create arrow specific stream methods.

* Update test conn object to support minor version

* Update tests and migrate arrow code.

* Arrow and standard streamset insert

* Create basic arrow to dataframe transformer

* Support multirawvalues, arrow transformers

* Multivalue arrow queries, in progress

* Update stream filter to properly filter for sampling frequency

* Update arrow values queries for multivalues

* Update param passing for sampling frequency

* Update index passing, and ignore depth

* benchmark raw values queries for arrow and current api

* Add aligned windows and run func

* Streamset read benchmarks (WIP)

In addition:
* update streamset.count to support the `precise` boolean flag.

* Update mock return value for versionMajor

* In progress validation of stream benchs

---------

Co-authored-by: David Konigsberg <72822263+davidkonigsberg@users.noreply.github.com>
Co-authored-by: Jeff Lin <42981468+jleifnf@users.noreply.github.com>

* Add 3.10 python to the testing matrix (#21)

* Add 3.10 python to the testing matrix

* Fix yaml parsing

* Update requirements to support 3.10

* Use pip-tools `pip-compile` cli tool to generate requirements.txt files from the updated pyproject.toml file
* Include pyproject.toml with basic features to support proper extra deps
* Support different ways to install btrdb from pip
  * `btrdb, btrdb[data], btrdb[all], btrdb[testing], btrdb[ray]`
* Update transformers.py to build up a numpy array when the subarrays are not the same size (number of entries)
  * This converts the main array's dtype to `object`
  * tests still pass with this change
* recompile the btrdb proto files with latest protobuf and grpc plugins
* Create multiple requirements.txt files for easier updating in the future as well as a locked version with pinned dependencies

* Ignore protoc generated flake errors

* Update test requirements

* Include pre-commit and setup.

* Pre-commit lints.

* Update pre-commit.yaml

add staging to pre-commit checks

* Fix missing logging import, rerun pre-commit (#24)

* Add basic doc string to endpoint object (#25)

* Update benchmark scripts.

* Multistream read bench insert bench (#26)

* Fix multistream endpoint bugs

* The streamset was passing the incorrect params to the endpoint
* The endpoint does not return a `version` in its response, just `stat` and `arrowBytes`

Params have been updated and a NoneType is passed around to ignore the
lack of version info, which lets us use the same logic for all bytes
decoding.

* Add multistream benchmark methods for timesnap and no timesnap.

* Add insert benchmarking methods (#27)

Benchmarking methods added for:

* stream inserts using tuples of time, value data
* stream inserts using pyarrow tables of timestamps, value columns

* streamset inserts using a dict map of streamset stream uuids, and lists of tuples of time, value data
* streamset inserts using a dict map of streamset stream uuids, and pyarrow tables of timestamps, values.

* Fix arrow inserts (#28)

* Add insert benchmarking methods

Benchmarking methods added for:

* stream inserts using tuples of time, value data
* stream inserts using pyarrow tables of timestamps, value columns

* streamset inserts using a dict map of streamset stream uuids, and lists of tuples of time, value data
* streamset inserts using a dict map of streamset stream uuids, and pyarrow tables of timestamps, values.

* Include nullable false in pyarrow schema inserts

* This was the only difference in the schemas between go and python.
* also using a bytesIO stream to act as the sink for the ipc bytes.

* Start integration test suite

* Add more streamset integration tests.

* Add support for authenticated requests without encryption.

* Optimize logging calls (#30)

Previously, the debug logging in the api would create the f-strings no matter if logging.DEBUG was the current log level or not.

This can impact the performance, especially for benchmarking.

Now, a cached IS_DEBUG flag is created for the stream operations, and other locations, the logger.isEnabledFor boolean is checked.

Note that in the stream.py, this same function call is only executed once, and the results are cached for the rest of the logic.

* Add more arrow tests and minor refactoring.

* More integration test cases

* Restructure tests.

* Mark new failing tests as expected failures for now.

* Disable gzip compression, it is very slow.

* Reenable test, server has been fixed.

* Update pandas testing and fix flake8 issues (#31)

* Update pandas testing and fix flake8 issues

* Update stream logic for unpacking arrow tables, update integration tests.

* add init.py for integration tests.

* Add additional tests for arrow methods vs their old api counterparts.

* Add tests for timesnap boundary conditions. (#32)

* Add more integration tests.

* Add additional integration tests, modify the name_callable ability of the arrow_values.

* remove extraneous prints.

* Include retry logic.

* Update statpoint order in arrow, fix some bugs with the arrow methods.

* Update testing to account for NaNs.

* Update github action versions.

* Update tests, add in a test for duplicate values.

* Remove empty test, remove extraneous prints

---------

Co-authored-by: andrewchambers <andrewchamberss@gmail.com>

* Update docs for arrow (#35)

* Update docs, add in final enhanced edits.

* Only enable arrow-endpoints when version >= 5.30 (#36)

Once we have a v5.30tag of the server with arrow/multistream, we can
merge this and complete the ticket.

* Update arrow notes, small doc changes. (#38)

* fix: patch up stream object type and other bugs (#33)

* fix: patch up stream object type and other bugs

* fix: resolve depth errors in stream window

* fix: resolve remaining test warnings

* fix: resolve test imports

* chore: add pre-commit install to readme

* Update staging branch with latest `master` changes (#52)

---------

Co-authored-by: David Konigsberg <72822263+davidkonigsberg@users.noreply.github.com>
Co-authored-by: Jeff Lin <42981468+jleifnf@users.noreply.github.com>
Co-authored-by: Andrew Chambers <andrew@pingthings.io>
Co-authored-by: andrewchambers <andrewchamberss@gmail.com>
Co-authored-by: Taite Nazifi <135669716+pingt8@users.noreply.github.com>
  • Loading branch information
6 people committed Sep 25, 2023
1 parent 1783796 commit 2f32815
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 73 deletions.
26 changes: 13 additions & 13 deletions README.md
@@ -1,12 +1,10 @@
# BTrDB Bindings for Python

These are BTrDB Bindings for Python allowing you painless and productive access to the Berkeley Tree Database (BTrDB). BTrDB is a time series database focusing on blazing speed with respect to univariate time series data at the nanosecond scale.

These are BTrDB Bindings for Python allowing you painless and productive access to the Berkeley Tree Database (BTrDB). BTrDB is a time series database focusing on blazing speed with respect to univariate time series data at the nanosecond scale.

## Sample Code

Our goal is to make BTrDB as easy to use as possible, focusing on integration with other tools and the productivity of our users. In keeping with this we continue to add new features such as easy transformation to numpy arrays, pandas Series, etc. See the sample code below and then checkout our [documentation](https://btrdb.readthedocs.io/en/latest/) for more in depth instructions.

Our goal is to make BTrDB as easy to use as possible, focusing on integration with other tools and the productivity of our users. In keeping with this we continue to add new features such as easy transformation to numpy arrays, pandas Series, etc. See the sample code below and then checkout our [documentation](https://btrdb.readthedocs.io/en/latest/) for more in depth instructions.

import btrdb

Expand Down Expand Up @@ -40,8 +38,6 @@ Our goal is to make BTrDB as easy to use as possible, focusing on integration wi
>> StatPoint(1500000000300000000, 4.0, 5.0, 6.0, 3, 0.816496580927726)
>> StatPoint(1500000000600000000, 7.0, 8.0, 9.0, 3, 0.816496580927726)



You can also easily work with a group of streams for when you need to evaluate data across multiple time series or serialize to disk.

from btrdb.utils.timez import to_nanoseconds
Expand Down Expand Up @@ -69,17 +65,15 @@ You can also easily work with a group of streams for when you need to evaluate d
8 1500000000800000000 NaN 9.0
9 1500000000900000000 10.0 NaN


## Installation

See our documentation on [installing](https://btrdb.readthedocs.io/en/latest/installing.html) the bindings for more detailed instructions. However, to quickly get started using the latest available versions you can use `pip` to install from pypi with `conda` support coming in the near future.
See our documentation on [installing](https://btrdb.readthedocs.io/en/latest/installing.html) the bindings for more detailed instructions. However, to quickly get started using the latest available versions you can use `pip` to install from pypi with `conda` support coming in the near future.

$ pip install btrdb


## Tests

This project includes a suite of automated tests based upon [pytest](https://docs.pytest.org/en/latest/). For your convenience, a `Makefile` has been provided with a target for evaluating the test suite. Use the following command to run the tests.
This project includes a suite of automated tests based upon [pytest](https://docs.pytest.org/en/latest/). For your convenience, a `Makefile` has been provided with a target for evaluating the test suite. Use the following command to run the tests.

$ make test

Expand All @@ -89,13 +83,13 @@ Note that the test suite has additional dependencies that must be installed for

## Releases

This codebase uses github actions to control the release process. To create a new release of the software, run `release.sh` with arguments for the new version as shown below. Make sure you are in the master branch when running this script.
This codebase uses github actions to control the release process. To create a new release of the software, run `release.sh` with arguments for the new version as shown below. Make sure you are in the master branch when running this script.

```
./release.sh 5 11 4
```

This will tag and push the current commit and github actions will run the test suite, build the package, and push it to pypi. If any issues are encountered with the automated tests, the build will fail and you will have a tag with no corresponding release.
This will tag and push the current commit and github actions will run the test suite, build the package, and push it to pypi. If any issues are encountered with the automated tests, the build will fail and you will have a tag with no corresponding release.

After a release is created, you can manually edit the release description through github.

Expand All @@ -111,4 +105,10 @@ Note that the documentation also requires Sphix and other dependencies to succes

## Versioning

This codebases uses a form of [Semantic Versioning](http://semver.org/) to structure version numbers. In general, the major version number will track with the BTrDB codebase to transparently maintain version compatibility. Planned features between major versions will increment the minor version while any special releases (bug fixes, etc.) will increment the patch number.
This codebases uses a form of [Semantic Versioning](http://semver.org/) to structure version numbers. In general, the major version number will track with the BTrDB codebase to transparently maintain version compatibility. Planned features between major versions will increment the minor version while any special releases (bug fixes, etc.) will increment the patch number.

## Pre-commit hooks

`pip install pre-commit` and then run
`pre-commit run --all-files` to comb through changes and make sure they are formatted correctly.
`pre-commit install` and then run `git commit` to automatically run the pre-commit hooks on every commit.
9 changes: 6 additions & 3 deletions btrdb/conn.py
Expand Up @@ -21,6 +21,7 @@
import re
import uuid as uuidlib
from concurrent.futures import ThreadPoolExecutor
from typing import List

import certifi
import grpc
Expand Down Expand Up @@ -283,8 +284,7 @@ def streams(self, *identifiers, versions=None, is_collection_prefix=False):

if versions and len(versions) != len(identifiers):
raise ValueError("number of versions does not match identifiers")

streams = []
streams: List[Stream] = []
for ident in identifiers:
if isinstance(ident, uuidlib.UUID):
streams.append(self.stream_from_uuid(ident))
Expand All @@ -305,7 +305,10 @@ def streams(self, *identifiers, versions=None, is_collection_prefix=False):
is_collection_prefix=is_collection_prefix,
tags={"name": parts[-1]},
)
if len(found) == 1:
if isinstance(found, Stream):
streams.append(found)
continue
if isinstance(found, list) and len(found) == 1:
streams.append(found[0])
continue
raise StreamNotFoundError(f"Could not identify stream `{ident}`")
Expand Down
20 changes: 18 additions & 2 deletions btrdb/stream.py
Expand Up @@ -22,6 +22,7 @@
from collections import deque
from collections.abc import Sequence
from copy import deepcopy
from typing import List

import pyarrow as pa

Expand Down Expand Up @@ -1451,8 +1452,14 @@ class StreamSetBase(Sequence):
A lighweight wrapper around a list of stream objects
"""

def __init__(self, streams):
self._streams = streams
def __init__(self, streams: List[Stream]):
self._streams: List[Stream] = []
for stream in streams:
if not isinstance(stream, Stream):
raise BTRDBTypeError(
f"streams must be of type Stream {stream}, {type(stream)}"
)
self._streams.append(stream)
if len(self._streams) < 1:
raise ValueError(
f"Trying to create streamset with an empty list of streams {self._streams}."
Expand Down Expand Up @@ -2231,6 +2238,15 @@ def __getitem__(self, item):

return self._streams[item]

def __contains__(self, item):
if isinstance(item, str):
for stream in self._streams:
if str(stream.uuid()) == item:
return True
return False

return item in self._streams

def __len__(self):
return len(self._streams)

Expand Down
64 changes: 56 additions & 8 deletions tests/btrdb/test_conn.py
Expand Up @@ -24,6 +24,54 @@
from btrdb.endpoint import Endpoint
from btrdb.exceptions import *
from btrdb.grpcinterface import btrdb_pb2
from btrdb.stream import Stream

##########################################################################
## Fixtures
##########################################################################


@pytest.fixture
def stream1():
uu = uuidlib.UUID("0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a")
stream = Mock(Stream)
stream.version = Mock(return_value=11)
stream.uuid = Mock(return_value=uu)
type(stream).collection = PropertyMock(return_value="fruits/apple")
type(stream).name = PropertyMock(return_value="gala")
stream.tags = Mock(return_value={"name": "gala", "unit": "volts"})
stream.annotations = Mock(return_value=({"owner": "ABC", "color": "red"}, 11))
stream._btrdb = Mock()
return stream


@pytest.fixture
def stream2():
uu = uuidlib.UUID("17dbe387-89ea-42b6-864b-f505cdb483f5")
stream = Mock(Stream)
stream.version = Mock(return_value=22)
stream.uuid = Mock(return_value=uu)
type(stream).collection = PropertyMock(return_value="fruits/orange")
type(stream).name = PropertyMock(return_value="blood")
stream.tags = Mock(return_value={"name": "blood", "unit": "amps"})
stream.annotations = Mock(return_value=({"owner": "ABC", "color": "orange"}, 22))
stream._btrdb = Mock()
return stream


@pytest.fixture
def stream3():
uu = uuidlib.UUID("17dbe387-89ea-42b6-864b-e2ef0d22a53b")
stream = Mock(Stream)
stream.version = Mock(return_value=33)
stream.uuid = Mock(return_value=uu)
type(stream).collection = PropertyMock(return_value="fruits/banana")
type(stream).name = PropertyMock(return_value="yellow")
stream.tags = Mock(return_value={"name": "yellow", "unit": "watts"})
stream.annotations = Mock(return_value=({"owner": "ABC", "color": "yellow"}, 33))
stream._btrdb = Mock()
return stream


##########################################################################
## Connection Tests
Expand Down Expand Up @@ -91,7 +139,7 @@ def test_streams_recognizes_uuid(self, mock_func):
"""
db = BTrDB(None)
uuid1 = uuidlib.UUID("0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a")
mock_func.return_value = [1]
mock_func.return_value = Stream(db, uuid1)
db.streams(uuid1)

mock_func.assert_called_once()
Expand All @@ -104,7 +152,7 @@ def test_streams_recognizes_uuid_string(self, mock_func):
"""
db = BTrDB(None)
uuid1 = "0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a"
mock_func.return_value = [1]
mock_func.return_value = Stream(db, uuid1)
db.streams(uuid1)

mock_func.assert_called_once()
Expand All @@ -117,7 +165,9 @@ def test_streams_handles_path(self, mock_func):
"""
db = BTrDB(None)
ident = "zoo/animal/dog"
mock_func.return_value = [1]
mock_func.return_value = [
Stream(db, "0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a"),
]
db.streams(ident, "0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a")

mock_func.assert_called_once()
Expand All @@ -139,12 +189,10 @@ def test_streams_raises_err(self, mock_func):
with pytest.raises(StreamNotFoundError) as exc:
db.streams(ident)

mock_func.return_value = [1, 2]
with pytest.raises(StreamNotFoundError) as exc:
db.streams(ident)

# check that does not raise if one returned
mock_func.return_value = [1]
mock_func.return_value = [
Stream(db, ident),
]
db.streams(ident)

def test_streams_raises_valueerror(self):
Expand Down

0 comments on commit 2f32815

Please sign in to comment.