Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7e1799d
BLD: Run mypy for neko inside its poetry env
Sep 15, 2022
0d99098
MOD: Fix mypy warnings
cjdsellers Sep 16, 2022
940d9ad
BLD: Add `disallow_incomplete_defs` flag to mypy
Sep 16, 2022
26cf26d
MOD: Apply standardizations and fix mypy warnings
cjdsellers Sep 19, 2022
7b9b5b5
REF: Fix more mypy warnings
cjdsellers Sep 21, 2022
33d75be
MOD: Standardize Python client API with backend
cjdsellers Sep 23, 2022
775f2ce
ADD: Add mypy.ini to Python client
cjdsellers Sep 26, 2022
90d4788
MOD: Make batch.submit_job start and end params required
cjdsellers Sep 26, 2022
5a15eb6
MOD: Rename get_shape to get_record_count
cjdsellers Sep 29, 2022
b402617
MOD: Remove 'nearest' gateway option
cjdsellers Sep 30, 2022
7df173d
MOD: Change struct field names and exposed data columns
cjdsellers Oct 6, 2022
a665007
MOD: Improve Python dev environment config
cjdsellers Oct 6, 2022
2daf9a6
MOD: Align list_fields optional dataset param
cjdsellers Oct 11, 2022
cf5368d
MOD: Standardize some enum naming
cjdsellers Oct 13, 2022
fb9f2c2
DOC: Update API key length
cjdsellers Oct 19, 2022
8d921a4
MOD: Standardize batch submit_job param ordering
cjdsellers Oct 21, 2022
33c3712
DOC: Change "batch data" to "batch download"
Oct 28, 2022
ac05a37
DOC: Add timeseries.stream_async to Python client
cjdsellers Oct 31, 2022
c66963e
FMT: Enable whitespace hooks for all languages
threecgreen Oct 28, 2022
d911f71
MOD: Update Python client
cjdsellers Nov 6, 2022
74d83e9
VER: Release Python client 0.5.0
cjdsellers Nov 7, 2022
e6977d9
DOC: Fix minor copy errors and some links
cjdsellers Nov 6, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 0.5.0 - 2022-11-07
- Fixed dataframe columns for derived data schemas (dropped `channel_id`)
- Fixed `batch.submit_job` requests for `dbz` encoding
- Updated `quickstart.ipynb` jupyter notebook

## 0.4.0 - 2022-09-14
- Upgraded `dbz-python` to `0.1.5`
- Added `map_symbols` option for `.to_df()` (experimental)
Expand Down
1 change: 0 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,3 @@ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.

6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ To install the latest stable version of the package from PyPI:
## Usage
The library needs to be configured with an API key from your account.
[Sign up](https://databento.com/signup) for free and you will automatically
receive a set of API keys to start with. Each API key is a 28-character
string that can be found on the API Keys page of your [Databento user portal](https://databento.com/platform/keys).
receive a set of API keys to start with. Each API key is a 32-character
string starting with `db-`, that can be found on the API Keys page of your [Databento user portal](https://databento.com/platform/keys).

A simple Databento application looks like this:

Expand Down Expand Up @@ -75,7 +75,7 @@ array = data.to_ndarray() # to ndarray
```

Note that the API key was also passed as a parameter, which is
[not recommended for production applications](https://docs0.databento.com/knowledge-base/new-users/securing-your-api-keys?historical=python&live=python).
[not recommended for production applications](https://docs.databento.com/knowledge-base/kb-new-users/kb-new-security-managing-api-keys?historical=python&live=python).
Instead, you can leave out this parameter to pass your API key via the `DATABENTO_API_KEY` environment variable:

```python
Expand Down
30 changes: 30 additions & 0 deletions databento/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
from typing import Optional

from databento.common.bento import Bento, FileBento, MemoryBento
from databento.common.enums import (
Compression,
Dataset,
Delivery,
Encoding,
FeedMode,
Flags,
HistoricalGateway,
LiveGateway,
Packaging,
RollRule,
Schema,
SplitDuration,
SType,
SymbologyResolution,
)
from databento.historical.api import API_VERSION
from databento.historical.client import Historical
from databento.historical.error import (
Expand All @@ -19,9 +35,23 @@
"BentoError",
"BentoHttpError",
"BentoServerError",
"Compression",
"Dataset",
"Delivery",
"Encoding",
"FeedMode",
"FileBento",
"Flags",
"Historical",
"HistoricalGateway",
"LiveGateway",
"MemoryBento",
"Packaging",
"RollRule",
"Schema",
"SplitDuration",
"SType",
"SymbologyResolution",
]

# Set to either 'DEBUG' or 'INFO', controls console logging
Expand Down
52 changes: 22 additions & 30 deletions databento/common/bento.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
import datetime as dt
import io
import os.path
from typing import Any, BinaryIO, Callable, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, BinaryIO, Callable, Dict, List, Optional

import numpy as np
import pandas as pd
import zstandard
from databento.common.data import DBZ_COLUMNS, DBZ_STRUCT_MAP, DERIV_SCHEMAS
from databento.common.data import COLUMNS, DERIV_SCHEMAS, STRUCT_MAP
from databento.common.enums import Compression, Encoding, Schema, SType
from databento.common.logging import log_debug
from databento.common.metadata import MetadataDecoder
from databento.common.symbology import ProductIdMappingInterval


if TYPE_CHECKING:
from databento.historical.client import Historical


class Bento:
"""The abstract base class for all Bento I/O classes."""

def __init__(self):
def __init__(self) -> None:
self._metadata: Dict[str, Any] = {}
self._dtype: Optional[np.dtype] = None
self._product_id_index: Dict[dt.date, Dict[int, str]] = {}
Expand All @@ -31,7 +35,7 @@ def __init__(self):
self._limit: Optional[int] = None
self._encoding: Optional[Encoding] = None
self._compression: Optional[Compression] = None
self._shape: Optional[Tuple] = None
self._record_count: Optional[int] = None

def _check_metadata(self) -> None:
if not self._metadata:
Expand Down Expand Up @@ -155,7 +159,7 @@ def dtype(self) -> np.dtype:
"""
if self._dtype is None:
self._check_metadata()
self._dtype = np.dtype(DBZ_STRUCT_MAP[self.schema])
self._dtype = np.dtype(STRUCT_MAP[self.schema])

return self._dtype

Expand Down Expand Up @@ -336,24 +340,20 @@ def compression(self) -> Compression:
return self._compression

@property
def shape(self) -> Tuple:
def record_count(self) -> int:
"""
Return the shape of the data.
Return the record count.

Returns
-------
Tuple
The data shape.
int

"""
if self._shape is None:
if self._record_count is None:
self._check_metadata()
self._shape = (
self._metadata["record_count"],
len(DBZ_STRUCT_MAP[self.schema]),
)
self._record_count = self._metadata["record_count"]

return self._shape
return self._record_count

@property
def mappings(self) -> Dict[str, List[Dict[str, Any]]]:
Expand Down Expand Up @@ -404,7 +404,7 @@ def to_ndarray(self) -> np.ndarray:

"""
data: bytes = self.reader(decompress=True).read()
return np.frombuffer(data, dtype=DBZ_STRUCT_MAP[self.schema])
return np.frombuffer(data, dtype=STRUCT_MAP[self.schema])

def to_df(
self,
Expand Down Expand Up @@ -437,20 +437,12 @@ def to_df(
df.set_index(self._get_index_column(), inplace=True)

# Cleanup dataframe
if self.schema == Schema.MBO:
df.drop("channel_id", axis=1, inplace=True)
df = df.reindex(columns=DBZ_COLUMNS[self.schema])
df.drop(["length", "rtype"], axis=1, inplace=True)
if self.schema == Schema.MBO or self.schema in DERIV_SCHEMAS:
df = df.reindex(columns=COLUMNS[self.schema])
df["flags"] = df["flags"] & 0xFF # Apply bitmask
df["side"] = df["side"].str.decode("utf-8")
df["action"] = df["action"].str.decode("utf-8")
elif self.schema in DERIV_SCHEMAS:
df.drop(["nwords", "type", "depth"], axis=1, inplace=True)
df = df.reindex(columns=DBZ_COLUMNS[self.schema])
df["flags"] = df["flags"] & 0xFF # Apply bitmask
df["side"] = df["side"].str.decode("utf-8")
df["action"] = df["action"].str.decode("utf-8")
else:
df.drop(["nwords", "type"], axis=1, inplace=True)

if pretty_ts:
df.index = pd.to_datetime(df.index, utc=True)
Expand Down Expand Up @@ -493,7 +485,7 @@ def replay(self, callback: Callable[[Any], None]) -> None:
The callback to the data handler.

"""
dtype = DBZ_STRUCT_MAP[self.schema]
dtype = STRUCT_MAP[self.schema]
reader: BinaryIO = self.reader(decompress=True)
while True:
raw: bytes = reader.read(self.struct_size)
Expand Down Expand Up @@ -590,7 +582,7 @@ def to_json(self, path: str) -> None:
"""
self.to_df().to_json(path, orient="records", lines=True)

def request_symbology(self, client) -> Dict[str, Any]:
def request_symbology(self, client: "Historical") -> Dict[str, Any]:
"""
Request symbology resolution based on the metadata properties.

Expand Down Expand Up @@ -622,7 +614,7 @@ def request_symbology(self, client) -> Dict[str, Any]:

def request_full_definitions(
self,
client,
client: "Historical",
path: Optional[str] = None,
) -> "Bento":
"""
Expand Down
50 changes: 26 additions & 24 deletions databento/common/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ def get_deriv_ba_types(level: int) -> List[Tuple[str, Union[type, str]]]:
)


DBZ_COMMON_HEADER: List[Tuple[str, Union[type, str]]] = [
("nwords", np.uint8),
("type", np.uint8),
RECORD_HEADER: List[Tuple[str, Union[type, str]]] = [
("length", np.uint8),
("rtype", np.uint8),
("publisher_id", np.uint16),
("product_id", np.uint32),
("ts_event", np.uint64),
]


DBZ_MBP_MSG: List[Tuple[str, Union[type, str]]] = [
MBP_MSG: List[Tuple[str, Union[type, str]]] = [
("price", np.int64),
("size", np.uint32),
("action", "S1"), # 1 byte chararray
Expand All @@ -58,7 +58,7 @@ def get_deriv_ba_types(level: int) -> List[Tuple[str, Union[type, str]]]:
]


DBZ_OHLCV_MSG: List[Tuple[str, Union[type, str]]] = [
OHLCV_MSG: List[Tuple[str, Union[type, str]]] = [
("open", np.int64),
("high", np.int64),
("low", np.int64),
Expand All @@ -67,8 +67,8 @@ def get_deriv_ba_types(level: int) -> List[Tuple[str, Union[type, str]]]:
]


DBZ_STRUCT_MAP: Dict[Schema, List[Tuple[str, Union[type, str]]]] = {
Schema.MBO: DBZ_COMMON_HEADER
STRUCT_MAP: Dict[Schema, List[Tuple[str, Union[type, str]]]] = {
Schema.MBO: RECORD_HEADER
+ [
("order_id", np.uint64),
("price", np.int64),
Expand All @@ -81,9 +81,9 @@ def get_deriv_ba_types(level: int) -> List[Tuple[str, Union[type, str]]]:
("ts_in_delta", np.int32),
("sequence", np.uint32),
],
Schema.MBP_1: DBZ_COMMON_HEADER + DBZ_MBP_MSG + get_deriv_ba_types(0), # 1
Schema.MBP_10: DBZ_COMMON_HEADER
+ DBZ_MBP_MSG
Schema.MBP_1: RECORD_HEADER + MBP_MSG + get_deriv_ba_types(0), # 1
Schema.MBP_10: RECORD_HEADER
+ MBP_MSG
+ get_deriv_ba_types(0) # 1
+ get_deriv_ba_types(1) # 2
+ get_deriv_ba_types(2) # 3
Expand All @@ -94,21 +94,21 @@ def get_deriv_ba_types(level: int) -> List[Tuple[str, Union[type, str]]]:
+ get_deriv_ba_types(7) # 8
+ get_deriv_ba_types(8) # 9
+ get_deriv_ba_types(9), # 10
Schema.TBBO: DBZ_COMMON_HEADER + DBZ_MBP_MSG + get_deriv_ba_types(0),
Schema.TRADES: DBZ_COMMON_HEADER + DBZ_MBP_MSG,
Schema.OHLCV_1S: DBZ_COMMON_HEADER + DBZ_OHLCV_MSG,
Schema.OHLCV_1M: DBZ_COMMON_HEADER + DBZ_OHLCV_MSG,
Schema.OHLCV_1H: DBZ_COMMON_HEADER + DBZ_OHLCV_MSG,
Schema.OHLCV_1D: DBZ_COMMON_HEADER + DBZ_OHLCV_MSG,
Schema.STATUS: DBZ_COMMON_HEADER
Schema.TBBO: RECORD_HEADER + MBP_MSG + get_deriv_ba_types(0),
Schema.TRADES: RECORD_HEADER + MBP_MSG,
Schema.OHLCV_1S: RECORD_HEADER + OHLCV_MSG,
Schema.OHLCV_1M: RECORD_HEADER + OHLCV_MSG,
Schema.OHLCV_1H: RECORD_HEADER + OHLCV_MSG,
Schema.OHLCV_1D: RECORD_HEADER + OHLCV_MSG,
Schema.STATUS: RECORD_HEADER
+ [
("ts_recv", np.uint64),
("group", "S1"), # 1 byte chararray
("trading_status", np.uint8),
("halt_reason", np.uint8),
("trading_event", np.uint8),
],
Schema.DEFINITION: DBZ_COMMON_HEADER
Schema.DEFINITION: RECORD_HEADER
+ [
("ts_recv", np.uint64),
("min_price_increment", np.int64),
Expand Down Expand Up @@ -191,24 +191,26 @@ def get_deriv_ba_fields(level: int) -> List[str]:
]


DBZ_DERIV_HEADER_FIELDS = [
DERIV_HEADER_FIELDS = [
"ts_event",
"ts_in_delta",
"publisher_id",
"product_id",
"action",
"side",
"depth",
"flags",
"price",
"size",
"sequence",
]

DBZ_COLUMNS = {
COLUMNS = {
Schema.MBO: [
"ts_event",
"ts_in_delta",
"publisher_id",
"channel_id",
"product_id",
"order_id",
"action",
Expand All @@ -218,8 +220,8 @@ def get_deriv_ba_fields(level: int) -> List[str]:
"size",
"sequence",
],
Schema.MBP_1: DBZ_DERIV_HEADER_FIELDS + get_deriv_ba_fields(0),
Schema.MBP_10: DBZ_DERIV_HEADER_FIELDS
Schema.MBP_1: DERIV_HEADER_FIELDS + get_deriv_ba_fields(0),
Schema.MBP_10: DERIV_HEADER_FIELDS
+ get_deriv_ba_fields(0)
+ get_deriv_ba_fields(1)
+ get_deriv_ba_fields(2)
Expand All @@ -230,6 +232,6 @@ def get_deriv_ba_fields(level: int) -> List[str]:
+ get_deriv_ba_fields(7)
+ get_deriv_ba_fields(8)
+ get_deriv_ba_fields(9),
Schema.TBBO: DBZ_DERIV_HEADER_FIELDS + get_deriv_ba_fields(0),
Schema.TRADES: DBZ_DERIV_HEADER_FIELDS,
Schema.TBBO: DERIV_HEADER_FIELDS + get_deriv_ba_fields(0),
Schema.TRADES: DERIV_HEADER_FIELDS,
}
8 changes: 3 additions & 5 deletions databento/common/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
class HistoricalGateway(Enum):
"""Represents a historical data center gateway location."""

NEAREST = "nearest"
BO1 = "bo1"


Expand All @@ -14,7 +13,6 @@ class LiveGateway(Enum):
"""Represents a live data center gateway location."""

ORIGIN = "origin"
NEAREST = "nearest"
NY4 = "ny4"
DC3 = "dc3"

Expand Down Expand Up @@ -72,8 +70,8 @@ class Compression(Enum):


@unique
class Duration(Enum):
"""Represents the duration interval for each batch data file."""
class SplitDuration(Enum):
"""Represents the duration before splitting for each batched data file."""

DAY = "day"
WEEK = "week"
Expand All @@ -92,7 +90,7 @@ class Packaging(Enum):

@unique
class Delivery(Enum):
"""Represents the delivery mechanism for batch data."""
"""Represents the delivery mechanism for batched data."""

DOWNLOAD = "download"
S3 = "s3"
Expand Down
Loading