diff --git a/.github/workflows/dxapi-python-build.yml b/.github/workflows/dxapi-python-build.yml index 7b83e7e..aef2b79 100644 --- a/.github/workflows/dxapi-python-build.yml +++ b/.github/workflows/dxapi-python-build.yml @@ -3,10 +3,14 @@ name: Build dxapi on: workflow_dispatch: push: - branches: [main, wip-workflow] + branches: + - "*" + - "!release-*" pull_request: types: [opened, synchronize] - branches: [main, wip-workflow] + branches: + - "*" + - "!release-*" jobs: build-dxapi: @@ -233,10 +237,10 @@ jobs: needs: [build-dxapi-python-linux] strategy: matrix: - py: ['3.10'] + py: ['3.6', '3.7', '3.8', '3.9', '3.10'] services: timebase: - image: finos/timebase-ce-server + image: finos/timebase-ce-server:6.1 env: JAVA_OPTS: "-DQuantServer.enableRemoteMonitoring=true -DTimeBase.version=5.0" ports: diff --git a/dxapi b/dxapi index 960859b..3cbbee8 160000 --- a/dxapi +++ b/dxapi @@ -1 +1 @@ -Subproject commit 960859b2d47e5d50bbb73db30a5113f0abb43c58 +Subproject commit 3cbbee85f2a8f1f38a31fc14a993fceb05d650f1 diff --git a/pydoc-markdown.yml b/pydoc-markdown.yml new file mode 100644 index 0000000..251ab29 --- /dev/null +++ b/pydoc-markdown.yml @@ -0,0 +1,15 @@ +renderer: + type: markdown + +loaders: + - type: python + search_path: [./bin] + modules: + - dxapi + +renderer: + type: markdown + render_toc: false + descriptive_class_title: false + docstrings_as_blockquote: true + insert_header_anchors: false diff --git a/src/swig/common.i b/src/swig/common.i index 3eae96b..a6416aa 100644 --- a/src/swig/common.i +++ b/src/swig/common.i @@ -1,66 +1,32 @@ +%pythoncode %{ +from contextlib import contextmanager -#include - -namespace DxApi { - -/* -class InstrumentType { -public: - enum Enum { - EQUITY = 0, - OPTION, - FUTURE, - BOND, - FX, - INDEX, // = 5 - ETF, - CUSTOM, - SIMPLE_OPTION, - EXCHANGE, - TRADING_SESSION, // = 10 - STREAM, - DATA_CONNECTOR, - EXCHANGE_TRADED_SYNTHETIC, - SYSTEM, - CFD, // Contract-For-Difference-Synthetic = 15 - UNKNOWN = 0xFF // unknown/invalid - }; - - InstrumentType(InstrumentType::Enum value); - InstrumentType(const char *value); -}; -%extend InstrumentType { - uint8_t __int__() { - return $self->toInt(); - } - - const char *__str__() { - return $self->toString(); - } -} - -struct InstrumentIdentity { -private: - InstrumentIdentity(); - -public: - InstrumentIdentity(DxApi::InstrumentType type, const char * symbol); +JAVA_LONG_MIN_VALUE = -9223372036854775808 +JAVA_LONG_MAX_VALUE = 9223372036854775807 - bool operator==(const DxApi::InstrumentIdentity &other) const; +class InstrumentMessage(object): + def __str__(self): + return str(vars(self)) +%} - std::string symbol; - DxApi::InstrumentType type; -}; +#include -%extend InstrumentIdentity { - InstrumentIdentity(DxApi::InstrumentType::Enum type, const char *symbol) { - return new DxApi::InstrumentIdentity(DxApi::InstrumentType(type), symbol); - } -} -*/ +namespace DxApi { +%feature("autodoc", "Determines the scope of a stream's durability, if any. +Example: + ``` + scope = dxapi.StreamScope('TRANSIENT') + ``` +Possible values: + ``` + DURABLE, + EXTERNAL_FILE, + TRANSIENT, + RUNTIME + ```"); class StreamScope { public: enum Enum { @@ -70,6 +36,7 @@ public: RUNTIME }; + %feature("autodoc", ""); StreamScope(StreamScope::Enum value); StreamScope(const char *value); }; @@ -84,6 +51,25 @@ public: } } +%feature("autodoc", "APPEND: Adds only new data into a stream without truncations. +REPLACE: Adds data into a stream and removes previous data older that first message time + [truncate(first message time + 1)]. +REWRITE: Default. Adds data into a stream and removes previous data by truncating using first message time. + [truncate(first message time)]. +TRUNCATE: Stream truncated every time when loader writes a messages earlier than last message time. + +Example: + ``` + mode = dxapi.StreamScope('TRUNCATE') + ``` + +Possible values: + ``` + APPEND, + REPLACE, + REWRITE, + TRUNCATE + ```"); class WriteMode { public: enum Enum { @@ -93,6 +79,7 @@ public: TRUNCATE }; + %feature("autodoc", ""); WriteMode(WriteMode::Enum value); WriteMode(const char *value); }; @@ -107,81 +94,182 @@ public: } } -class TimeUnit { -public: - enum Enum { - MILLISECOND = 0, - SECOND, - MINUTE, - HOUR, - DAY, - /* Weeks have fixed day order, Monday through Sunday */ - WEEK, - MONTH, - /* Quarters are calendar quarters, starting in January, April, July and */ - /* October. This is different from fiscal quarters, which can start in */ - /* any calendar month. */ - QUARTER, - YEAR, - UNKNOWN = 0xFF - }; - - TimeUnit(TimeUnit::Enum value); - TimeUnit(const char *value); -}; - -%extend TimeUnit { - uint8_t __int__() { - return $self->toInt(); - } - - const char *__str__() { - return $self->toString(); - } -} +%feature("autodoc", "Options for selecting data from a stream. +Example: + ``` + so = dxapi.SelectionOptions() + so._from = 0 + so.to = 100000 + so.useCompression = False + ... + ```"); class SelectionOptions { public: + + %feature("autodoc", ""); SelectionOptions(); + %feature("autodoc", "startTime (int): Start timestamp in millis."); int64_t from; + + %feature("autodoc", "endTime (int): End timestamp in millis."); int64_t to; + + %feature("autodoc", "useCompression (bool): Use compression."); bool useCompression; + + %feature("autodoc", "live (bool): Instead of returning false from next () at the end of the stream, + wait for live data to be added."); bool live; + + %feature("autodoc", "reverse (bool): Specify cursor direction."); bool reverse; - bool isBigEndian; + + %feature("autodoc", "allowLateOutOfOrder (bool): Output out-of-order late messages. + Timebase consumers receive historical messages they requested strictly ordered by their time. + For scenarios when new messages arrive in the middle of consumer's session (So called 'live' mode) + it is possible that newly arrived message has a timestamp in already consumer time region. + In this cases this flag allows consumer to receive these 'late' messages even if they out of order + with respect to current baseline time. + + NOTE: Late Messages that are timestamped prior to consumer's select time or last reset time + will not be delivered even with this flag enabled."); bool allowLateOutOfOrder; + + + %feature("autodoc", "realTimeNotification (bool): Enabled/Disables sending system messages when cursor switches from historical to realtime mode."); bool realTimeNotification; + + %feature("autodoc", "minLatency (bool): try to receive messages ASAP, with minimal buffering. + Can potentially reduce max throughput and increase CPU use / network bandwidth usage."); bool minLatency; + }; +%feature("autodoc", "Options for loading data into a stream. + +Example: + ``` + lo = dxapi.LoadingOptions() + lo.writeMode = dxapi.WriteMode('TRUNCATE') + so.space = 'myspace' + ... + ```"); class LoadingOptions { public: + + %feature("autodoc", "writeMode (WriteMode): see WriteMode class description."); WriteMode writeMode; - bool raw; + + %feature("autodoc", "minLatency (bool): try to send messages ASAP, with minimal buffering. + Can potentially reduce max throughput and increase CPU use / network bandwidth usage."); bool minLatency; + + %feature("autodoc", "space (str): Data Partition. Contains unique number of instruments or Time Ranges."); std::string space; + %feature("autodoc", ""); LoadingOptions(); }; +%feature("autodoc", "Stream definition attributes. + +Example: + ``` + so = dxapi.StreamOptions() + so.name = key + so.description = key + so.scope = dxapi.StreamScope('DURABLE') + so.distributionFactor = 1 + so.highAvailability = False + so.polymorphic = False + so.metadata = schema + + db.createStream(key, options) + ```"); class StreamOptions { public: - enum { - MAX_DISTRIBUTION = 0 - }; - - //TODO: buffer options - //BufferOptions bufferOptions; - +%pythoncode %{ + def name(self, name: str = None) -> None: + '''Optional user-readable name.''' + if name == None: + return self.__getName() + else: + self.__setName(name) + return name + + def description(self, description: str = None) -> None: + '''Optional multi-line description.''' + if description == None: + return self.__getDescription() + else: + self.__setDescription(description) + return description + + def owner(self, owner: str = None) -> None: + '''Optional owner of stream. + During stream creation it will be set + equals to authenticated user name. + ''' + if owner == None: + return self.__getOwner() + else: + self.__setOwner(owner) + return owner + + def location(self, location: str = None) -> None: + '''Location of the stream (by default null). When defined this attribute provides alternative stream location (rather than default location under QuantServerHome)''' + if location == None: + return self.__getLocation() + else: + self.__setLocation(location) + return location + + def distributionRuleName(self, distributionRuleName: str = None) -> None: + '''Class name of the distribution rule''' + if distributionRuleName == None: + return self.__getDistributionRuleName() + else: + self.__setDistributionRuleName(distributionRuleName) + return distributionRuleName + + def metadata(self, metadata: str = None) -> None: + '''Stream metadata in XML format. To build metadata programatically, use dxapi.SchemaDef class.''' + if metadata == None: + return self.__getMetadata() + else: + self.__setMetadata(metadata) + return metadata +%} + + %feature("autodoc", "scope (StreamScope): Determines persistent properties of a stream."); DxApi::StreamScope scope; + + %feature("autodoc", "distributionFactor (int): The number of M-files into which to distribute the + data. Supply MAX_DISTRIBUTION to keep a separate file for each instrument (default)."); int32_t distributionFactor; + + %feature("autodoc", "duplicatesAllowed (bool): Indicates that loader will ignore binary similar messages(for 'unique' streams only)."); bool duplicatesAllowed; + + %feature("autodoc", "highAvailability (bool): High availability durable streams are cached on startup."); bool highAvailability; + + %feature("autodoc", "unique (bool): Unique streams maintain in-memory cache of resent messages. + This concept assumes that stream messages will have some field(s) marked as primary key. + Primary key may be a simple field (e.g. symbol) or composite (e.g. symbol and portfolio ID). + For each key TimeBase runtime maintains a copy of the last message received for this key (cache). + Each new consumer will receive a snapshot of current cache at the beginning of live data subscription."); bool unique; + + %feature("autodoc", "polymorphic (bool)."); bool polymorphic; + + %feature("autodoc", "periodicity (bool): Stream periodicity, if known."); std::string periodicity; + %feature("autodoc", ""); bool operator==(const StreamOptions &value) const; StreamOptions(); @@ -189,72 +277,62 @@ public: %extend StreamOptions { - const char * name() { + const char * __getName() { return $self->name.has_value() ? $self->name.get().c_str() : NULL; } - void name(const char *name) { + void __setName(const char *name) { $self->name = std::string(name); } - const char * description() { + const char * __getDescription() { return $self->description.has_value() ? $self->description.get().c_str() : NULL; } - void description(const char *description) { + void __setDescription(const char *description) { $self->description = std::string(description); } - const char * owner() { + const char * __getOwner() { return $self->owner.has_value() ? $self->owner.get().c_str() : NULL; } - void owner(const char *owner) { + void __setOwner(const char *owner) { $self->owner = std::string(owner); } - - const char * location() { + + const char * __getLocation() { return $self->location.has_value() ? $self->location.get().c_str() : NULL; } - void location(const char *location) { + void __setLocation(const char *location) { $self->location = std::string(location); } - const char * distributionRuleName() { + const char * __getDistributionRuleName() { return $self->distributionRuleName.has_value() ? $self->distributionRuleName.get().c_str() : NULL; } - void distributionRuleName(const char *distributionRuleName) { + void __setDistributionRuleName(const char *distributionRuleName) { $self->distributionRuleName = std::string(distributionRuleName); } - const char * metadata() { + const char * __getMetadata() { return $self->metadata.has_value() ? $self->metadata.get().c_str() : NULL; } - void metadata(const char *metadata) { + void __setMetadata(const char *metadata) { $self->metadata = std::string(metadata); } }; -class Interval { -public: - Interval(int64_t numUnits = 0, TimeUnit unitType = TimeUnitEnum::MILLISECOND); - - bool isZero() const; - bool isNegative() const; - bool isPositive() const; - - int64_t numUnits; - DxApi::TimeUnit unitType; -}; - +%feature("autodoc", "Input parameter definition for a prepared statement."); struct QueryParameter { std::string name; std::string type; + %feature("autodoc", ""); QueryParameter(const std::string &name_, const std::string &type_, const std::string value_) : name(name_), type(type_), value(value_) {} @@ -280,4 +358,5 @@ struct QueryParameter { } } -} // namespace DxApi \ No newline at end of file +%feature("autodoc", ""); +} // namespace DxApi diff --git a/src/swig/dxapi.i b/src/swig/dxapi.i index 1d0d6d8..8fdfbdf 100644 --- a/src/swig/dxapi.i +++ b/src/swig/dxapi.i @@ -55,6 +55,8 @@ else: import _dxapi del _swig_python_version_info del _swig_python_platform + +version = (6, 1) " %enddef @@ -85,7 +87,6 @@ typedef int64_t TimestampNs; %} %feature("director") DxApi::TickLoader::ErrorListener; -//%feature("director") DxApi::TickLoader::SubscriptionListener; %exception { try { @@ -108,12 +109,6 @@ typedef int64_t TimestampNs; } } -%pythoncode %{ -class InstrumentMessage(object): - def __str__(self): - return str(vars(self)) -%} - typedef int64_t TimestampMs; typedef int64_t TimestampNs; diff --git a/src/swig/tick_cursor.i b/src/swig/tick_cursor.i index 6643d32..f254efc 100644 --- a/src/swig/tick_cursor.i +++ b/src/swig/tick_cursor.i @@ -6,6 +6,22 @@ enum NextResult { OK, END_OF_CURSOR, UNAVAILABLE }; +%feature("autodoc", "A cursor (also known as iterator, or result set) for reading data from a +stream. This class provides methods for dynamically reconfiguring the feed, +as well as method reset for essentially re-opening the cursor on a completely different timestamp. + +To get a cursor, use select method from TickDb or TickStream objects, +or call executeQuery to open cursor to QQL result set. + +Also cursor can be created with createCursor method, but it will be not initialized cursor, +so cursor should be configured with types, entities and read time calling reset: + ``` + options = dxapi.SelectionOptions() + cursor = tickdb.createCursor(stream, options) + cursor.subscribeToAllEntities() + cursor.subscribeToAllTypes() + cursor.reset(timestamp) + ```"); class TickCursor { private: TickCursor(); @@ -13,45 +29,256 @@ private: public: ~TickCursor(); +%pythoncode %{ + def next(self) -> bool: + '''Moves cursor on to the next message. This method blocks until the next message becomes available, + or until the cursor is determined to be at the end of the sequence. + This method is illegal to call if isAtEnd() returns true. + + Returns: + bool: false if at the end of the cursor. + ''' + return self.__next() + + def getMessage(self) -> 'InstrumentMessage': + '''Returns an InstrumentMessage object cursor points at.''' + return self.__getMessage() + + def isAtEnd(self) -> bool: + '''Returns true if the last call to next() returned false. Returns false if next() has not been called yet. + This method is legal to call any number of times at any point in the cursor's lifecycle. + ''' + return self.__isAtEnd() + + def nextIfAvailable(self) -> int: + '''Moves cursor on to the next message, but this method NOT blocks until the next message becomes available. + + Returns: + NextResult: OK (0) if new message is available, + END_OF_CURSOR (1) if cursor was closed, + otherwise, UNAVAILABLE (2) + ''' + return self.__nextIfAvailable() + + def isClosed(self) -> bool: + '''Returns true, if cursor was closed''' + return self.__isClosed() + + def close(self) -> None: + '''Close the cursor''' + return self.__close() + + def getCurrentStreamKey(self) -> str: + '''Return the key of the stream that is the source of the current message.''' + return self.__getCurrentStreamKey() + + def reset(self, timestamp: int, entities: 'list[str]' = None) -> None: + '''Reposition the message source to a new point in time, while + preserving current subscription. + + Args: + timestamp (int): The new position in time in millis. + entities ('list[str]'): list of entities to reset + ''' + if entities == None: + return self.__reset(timestamp) + else: + return self.__resetEntities(timestamp, entities) + + def subscribeToAllEntities(self) -> None: + '''Subscribe to all available entities.''' + return self.__subscribeToAllEntities() + + def clearAllEntities(self) -> None: + '''Switch to selective subscription mode (if necessary) and clear the list.''' + return self.__clearAllEntities() + + def addEntity(self, entity: str) -> None: + '''Add the specified entity to subscription. The type and symbol are copied + from the incoming object, if necessary, so the argument can be re-used + after the call. + + Special note about options: + The following fragment will subscribe to specific option contract "DAV 100417P00085000": + cursor.addEntity('DAV 100417P00085000'); + + While the following will subscribe to option root (and you will get all instruments with this root): + cursor.addEntity("DAV "); + ''' + return self.__addEntity(entity) + + def addEntities(self, entities: 'list[str]') -> None: + '''Bulk add the specified entities to subscription. The type and symbol are copied + from the incoming objects, if necessary, so the arguments can be re-used + after the call. + ''' + return self.__addEntities(entities) + + def removeEntities(self, entities: 'list[str]') -> None: + '''Remove the specified entities from subscription. The type and symbol are copied + from the incoming objects, if necessary, so the arguments can be re-used + after the call. + ''' + return self.__removeEntities(entities) + + def removeEntity(self, entity: str) -> None: + '''Remove the specified entity from subscription. The type and symbol are copied + from the incoming object, if necessary, so the argument can be re-used + after the call. + ''' + return self.__removeEntity(entity) + + def subscribeToAllTypes(self) -> None: + '''Subscribe to all available types (no filtering).''' + return self.__subscribeToAllTypes() + + def addTypes(self, types: 'list[str]') -> None: + '''Add the specified type names to subscription.''' + return self.__addTypes(types) + + def removeTypes(self, types: 'list[str]') -> None: + '''Remove the specified types from subscription.''' + return self.__removeTypes(types) + + def setTypes(self, types: 'list[str]') -> None: + '''Subscribe to specified types.''' + return self.__setTypes(types) + + def add(self, types: 'list[str]', entities: 'list[str]') -> None: + '''Add the specified entities and types to subscription. The type and symbol are copied + from the incoming object, if necessary, so the argument can be re-used + after the call. + + Args: + types (list[str]): not-null array of type names to subscribe. + entities (list[str]): not-null array of instruments to subscribe. + ''' + return self.__add(types, entities) + + def remove(self, types: 'list[str]', entities: 'list[str]') -> None: + '''Remove the specified entities and types from subscription. The type and symbol are copied + from the incoming objects, if necessary, so the arguments can be re-used + after the call. + + Args: + types (list[str]): not-null array of type names to unsubscribe. + entities (list[str]): not-null array of instruments to unsubscribe. + ''' + return self.__remove(types, entities) + + def addStreams(self, streams: 'list[TickStream]') -> None: + '''Add streams to subscription. Current time and filter is used to query + data from new sources. + + Args: + streams ('list[TickStream]'): Streams to add. + ''' + return self.__addStreams(streams) + + def removeStreams(self, streams: 'list[TickStream]') -> None: + '''Remove streams from subscription. + + Args: + streams (list[TickStream]): Streams to remove. + ''' + return self.__removeStreams(streams) + + def removeAllStreams(self) -> None: + '''Remove all streams from subscription.''' + return self.__removeAllStreams() + + def setTimeForNewSubscriptions(self, timestamp: int) -> None: + '''This method affects subsequent "add subscription" methods, + such as, for instance, addEntity(). New subscriptions start at + the specified time. + + Args: + timestamp (int): The time to use. + ''' + return self.__setTimeForNewSubscriptions(timestamp) +%} + + %feature("autodoc", ""); + + %rename(__next) next; bool next(); + + %rename(__nextIfAvailable) nextIfAvailable; NextResult nextIfAvailable(); + + %rename(__getMessage) getMessage; PyObject * getMessage(); + %rename(__isAtEnd) isAtEnd; bool isAtEnd() const; + + %rename(__isClosed) isClosed; bool isClosed() const; + + %rename(__close) close; void close(); + %rename(__getCurrentStreamKey) getCurrentStreamKey; const char * getCurrentStreamKey(); + %rename(__reset) reset; void reset(DxApi::TimestampMs dt); + + %rename(__resetEntities) reset; void reset(DxApi::TimestampMs dt, const std::vector *entities); + %rename(__subscribeToAllEntities) subscribeToAllEntities; void subscribeToAllEntities(); + + %rename(__clearAllEntities) clearAllEntities; void clearAllEntities(); + %rename(__addEntities) addEntities; void addEntities(const std::vector *entities); + + %rename(__addEntity) addEntity; void addEntity(const std::string &entity); + %rename(__removeEntities) removeEntities; void removeEntities(const std::vector *entities); + + %rename(__removeEntity) removeEntity; void removeEntity(const std::string &entity); + %rename(__subscribeToAllTypes) subscribeToAllTypes; void subscribeToAllTypes(); + %rename(__addTypes) addTypes; void addTypes(const std::vector *types); + + %rename(__removeTypes) removeTypes; void removeTypes(const std::vector *types); + + %rename(__setTypes) setTypes; void setTypes(const std::vector *types); + %rename(__add) add; void add(const std::vector *types, const std::vector * entities); + + %rename(__remove) remove; void remove(const std::vector *types, const std::vector * entities); + %rename(__addStreams) addStreams; void addStreams(const std::vector *streams); + + %rename(__removeStreams) removeStreams; void removeStreams(const std::vector *streams); + + %rename(__removeAllStreams) removeAllStreams; void removeAllStreams(); + %rename(__setTimeForNewSubscriptions) setTimeForNewSubscriptions; void setTimeForNewSubscriptions(DxApi::TimestampMs dt); }; // TickCursor +%feature("autodoc", ""); + } } diff --git a/src/swig/tick_db.i b/src/swig/tick_db.i index 52d1266..c0378d3 100644 --- a/src/swig/tick_db.i +++ b/src/swig/tick_db.i @@ -4,33 +4,329 @@ namespace DxApi { %newobject TickDb::createFromUrl(const char *); %newobject TickDb::createFromUrl(const char *, const char *, const char *); +%feature("autodoc", "The top-level implementation to the methods of the Deltix Tick +Database engine. Instances of this class are created by static method +createFromUrl: + +``` +db = dxapi.TickDb.createFromUrl('dxtick://localhost:8011') +``` + +or + +``` +db = dxapi.TickDb.createFromUrl('dxtick://localhost:8011', 'user', 'password') +```"); + class TickDb { public: +%pythoncode %{ + + @staticmethod + def createFromUrl(url: str, user: str = None, password: str = None) -> "TickDb": + '''Creates a new database instance with the specified root folder, or URL. + + Args: + url (str): Connection URL. + user (str): User. + password (str): Password. + + Returns: + TickDb: An un-opened TickDB instance. + ''' + if user == None: + return _dxapi.TickDb___createFromUrl(url) + else: + return _dxapi.TickDb___createFromUrlWithUser(url, user, password) + + @staticmethod + @contextmanager + def openFromUrl(url: str, readonly: bool, user: str = None, password: str = None): + '''Creates a new database instance with the specified root folder, or URL, and opens it. + + Args: + url (str): Connection URL. + readonly (bool): Open data store in read-only mode. + user (str): User. + password (str): Password. + + Returns: + TickDb: An opened TickDB instance. + ''' + db = TickDb.createFromUrl(url, user, password) + try: + db.open(readonly) + yield db + finally: + if db.isOpen(): + db.close() + + def isReadOnly(self) -> bool: + '''Determines whether the store is open as read-only.''' + return self.__isReadOnly() + + def isOpen(self) -> bool: + '''Determines whether the store is open.''' + return self.__isOpen() + + def open(self, readOnlyMode: bool) -> bool: + '''Open the data store. + + Args: + readOnlyMode (bool): Open data store in read-only mode. + ''' + return self.__open(readOnlyMode) + + def close(self) -> None: + '''Closes data store.''' + return self.__close() + + def format(self) -> bool: + '''Create a new object on disk and format internally. + The data store is left open for read-write at the end of this method. + ''' + return self.__format() + + def listStreams(self) -> 'list[TickStream]': + '''Enumerates existing streams. + + Returns: + list[TickStream]: An array of existing stream objects. + ''' + return self.__listStreams() + + def getStream(self, key: str) -> 'TickStream': + '''Looks up an existing stream by key. + + Args: + key (str): Identifies the stream. + + Returns: + TickStream: A stream object, or None if the key was not found. + ''' + return self.__getStream(key) + + def createStream(self, key: str, options: StreamOptions) -> 'TickStream': + '''Creates a new stream within the database. + + Args: + key (str): A required key later used to identify the stream. + options (StreamOptions): Options for creating the stream. + + Returns: + TickStream: A new instance of TickStream. + ''' + return self.__createStream(key, options) + + def createFileStream(self, key: str, dataFile: str) -> 'TickStream': + '''Creates a new stream mount to the given data file. + + Args: + key (str): A required key later used to identify the stream. + dataFile (str): Path to the data file (on server side). + + Returns: + TickStream: A new instance of TickStream. + ''' + return self.__createFileStream(key) + + def createCursor(self, stream: 'TickStream', options: SelectionOptions) -> 'TickCursor': + '''Opens an initially empty cursor for reading data from multiple streams, + according to the specified options. The messages + are returned from the cursor strictly ordered by time. Within the same + exact timestamp, the order of messages is undefined and may vary from + call to call, i.e. it is non-deterministic. + + The cursor is returned initially empty and must be reset. + The TickCursor class provides + methods for dynamically re-configuring the subscription, or jumping to + a different timestamp. + + Args: + stream (TickStream): Stream from which data will be selected. + options (SelectionOptions): Selection options. + + Returns: + TickCursor: A cursor used to read messages. + ''' + return self.__createCursor(stream, options) + + @contextmanager + def tryCursor(self, stream: 'TickStream', options: SelectionOptions) -> 'TickCursor': + '''contextmanager version of createCursor. Usage: + ``` + with db.newCursor(stream, options) as cursor: + while cursor.next(): + message = cursor.getMessage() + ``` + ''' + cursor = None + try: + cursor = self.__createCursor(stream, options) + yield cursor + finally: + cursor.close() + + def select(self, timestamp: int, streams: 'list[TickStream]', options: SelectionOptions, types: 'list[str]', entities: 'list[str]') -> 'TickCursor': + '''Opens a cursor for reading data from multiple streams, + according to the specified options. The messages + are returned from the cursor strictly ordered by time. Within the same + exact time stamp, the order of messages is undefined and may vary from + call to call, i.e. it is non-deterministic. + + Note that the arguments of this method only determine the initial + configuration of the cursor. The TickCursor clsas provides + methods for dynamically re-configuring the subscription, or jumping to + a different timestamp. + + Args: + timestamp (int): The start timestamp in millis. + streams (list[TickStream]): Streams from which data will be selected. + options (SelectionOptions): Selection options. + types (list[str]): Specified message types to be subscribed. If null, then all types will be subscribed. + entities (list[str]): Specified entities to be subscribed. If null, then all entities will be subscribed. + + Returns: + TickCursor: A cursor used to read messages. + ''' + return self.__select(timestamp, streams, options, types, entities) + + @contextmanager + def trySelect(self, timestamp: int, streams: 'list[TickStream]', options: SelectionOptions, types: 'list[str]', entities: 'list[str]') -> 'TickCursor': + '''Contextmanager version of select. Usage: + ``` + with db.newSelect(timestamp, streams, options, types, entities) as cursor: + while cursor.next(): + message = cursor.getMessage() + ``` + ''' + cursor = None + try: + cursor = self.__select(timestamp, streams, options, types, entities) + yield cursor + finally: + cursor.close() + + def createLoader(self, stream: 'TickStream', options: LoadingOptions) -> 'TickLoader': + '''Creates a channel for loading data. The loader must be closed + when the loading process is finished. + + Args: + stream (TickStream): stream for loading data. + options (SelectionOptions): Loading Options. + + Returns: + TickLoader: created loader. + ''' + return self.__createLoader(stream, options) + + @contextmanager + def tryLoader(self, stream: 'TickStream', options: LoadingOptions) -> 'TickLoader': + '''Contextmanager version of createLoader. Usage: + + with db.newLoader(stream, options) as loader: + loader.send(message) + ''' + loader = None + try: + loader = self.__createLoader(stream, options) + yield loader + finally: + loader.close() + + def executeQuery(self, query: str, options: SelectionOptions = None, timestamp: int = JAVA_LONG_MIN_VALUE, entities: 'list[str]' = None, params: 'list[QueryParameter]' = []) -> 'TickCursor': + '''Execute Query and creates a message source for reading data from it, + according to the specified options. The messages + are returned from the cursor strictly ordered by time. Within the same + exact time stamp, the order of messages is undefined and may vary from + call to call, i.e. it is non-deterministic. + + Args: + query (str): Query text element. + options (SelectionOptions): Selection options. + timestamp (int): The start timestamp in millis. + entities (list[str]): Specified entities to be subscribed. + If null, then all entities will be subscribed. + params (list[QueryParameter]): The parameter values of the query. + + Returns: + TickCursor: An iterable message source to read messages. + ''' + if options == None: + return self.__executeQuery(query) + else: + return self.__executeQueryFull(query, options, timestamp, entities, params); + + @contextmanager + def tryExecuteQuery(self, query: str, options: SelectionOptions = None, timestamp: int = JAVA_LONG_MIN_VALUE, entities: 'list[str]' = None, params: 'list[QueryParameter]' = []) -> 'TickCursor': + '''Contextmanager version of executeQuery. Usage: + ``` + with db.newExecuteQuery('select * from stream') as cursor: + while cursor.next(): + message = cursor.getMessage() + ``` + ''' + cursor = None + try: + if options == None: + cursor = self.__executeQuery(query) + else: + cursor = self.__executeQueryFull(query, options, timestamp, entities, params); + yield cursor + finally: + cursor.close() +%} + + %feature("autodoc", ""); + + %rename(__createFromUrl) createFromUrl; static TickDb * createFromUrl(const char * url); + + %rename(__createFromUrlWithUser) createFromUrl; static TickDb * createFromUrl(const char * url, const char * username, const char * password); + %rename(__isReadOnly) isReadOnly; bool isReadOnly() const; + + %rename(__isOpen) isOpen; bool isOpen() const; + %rename(__open) open; bool open(bool readOnlyMode); + + %rename(__close) close; void close(); + + %rename(__format) format; bool format(); + %rename(__listStreams) listStreams; std::vector listStreams(); + + %rename(__getStream) getStream; DxApi::TickStream * getStream(const std::string &key); - DxApi::TickStream * createStream(const std::string &key, const std::string &name, const std::string &description, int distributionFactor); + + %rename(__createStream) createStream; DxApi::TickStream * createStream(const std::string &key, const DxApi::StreamOptions &options); + + %rename(__createFileStream) createFileStream; DxApi::TickStream * createFileStream(const std::string &key, const std::string &dataFile); + %rename(__createCursor) createCursor; DxApi::TickCursor * createCursor(const DxApi::TickStream * stream, const DxApi::SelectionOptions &options); + + %rename(__select) select; DxApi::TickCursor * select( DxApi::TimestampMs time, const std::vector *streams, const DxApi::SelectionOptions &options, const std::vector *types, const std::vector *entities); + + %rename(__createLoader) createLoader; DxApi::TickLoader * createLoader(const DxApi::TickStream * stream, const DxApi::LoadingOptions &options); + %rename(__executeQueryFull) executeQuery; DxApi::TickCursor* executeQuery( const std::string &qql, const DxApi::SelectionOptions &options, @@ -38,21 +334,6 @@ public: const std::vector *instruments, const std::vector ¶ms); - DxApi::TickCursor* executeQuery( - const std::string &qql, - const DxApi::SelectionOptions &options, - DxApi::TimestampMs time, - const std::vector ¶ms); - - DxApi::TickCursor* executeQuery( - const std::string &qql, - const DxApi::SelectionOptions &options, - const std::vector ¶ms); - - DxApi::TickCursor* executeQuery( - const std::string &qql, - const std::vector ¶ms); - ~TickDb(); protected: @@ -64,11 +345,13 @@ protected: %extend TickDb { + %rename(__executeQuery) executeQuery; DxApi::TickCursor* executeQuery(const std::string &qql) { return $self->executeQuery(qql, std::vector()); } } +%feature("autodoc", ""); -} // namespace DxApi \ No newline at end of file +} // namespace DxApi diff --git a/src/swig/tick_loader.i b/src/swig/tick_loader.i index 77fbf7c..7e946e1 100644 --- a/src/swig/tick_loader.i +++ b/src/swig/tick_loader.i @@ -2,29 +2,134 @@ namespace DxApiImpl { namespace Python { +%feature("autodoc", "Object which consumes messages. + +Create loader from TickDb: + options = dxapi.LoadingOptions() + stream = tickdb.createLoader(stream, options) + +Create loader from TickStream: + options = dxapi.LoadingOptions() + stream = stream.createLoader(options)"); + class TickLoader { public: TickLoader(DxApi::TickLoader *loader); ~TickLoader(); +%pythoncode %{ + + def send(self, message: InstrumentMessage) -> None: + '''This method is invoked to send a message to the object. + + Args: + message (InstrumentMessage): A temporary buffer with the message. + By convention, the message is only valid for the duration of this call. + ''' + return self.__send(message) + + def flush(self) -> None: + '''Flushes all buffered messages by sending them to server. + Note that calling 'send' method not guaranty that all messages will be delivered and stored to server. + ''' + return self.__flush() + + def close(self) -> None: + '''Flushes and closes the loader''' + return self.__close() + + def addListener(self, listener: 'ErrorListener') -> None: + '''Register error listener. All writing data errors will be delivered to the listener. + + Args: + listener (ErrorListener): error listener to register. + ''' + return self.__addListener(listener) + + def removeListener(self, listener: 'ErrorListener') -> None: + '''Unsubscribe registered error listener. + + Args: + listener (ErrorListener): error listener to unsubscribe. + ''' + return self.__removeListener(listener) + + def nErrorListeners(self) -> int: + '''Returns number of registered error listeners''' + return self.__nErrorListeners() + + def registerType(self, type: str) -> int: + '''Register type of sending message to get type id. For performance reasons, + you could specify type id instead of type name, for example: + + ``` + message = dxapi.InstrumentMessage() + message.typeId = loader.registerType("deltix.timebase.api.messages.universal.PackageHeader") + // as alternative, you could write: + // message.typeName = "deltix.timebase.api.messages.universal.PackageHeader" + loader.send(message) + ``` + + Args: + type (str): name of type to register. + + Returns: + int: id of registered type. + ''' + return self.__registerType(type) + + def registerInstrument(self, symbol: str) -> int: + '''Register instrument of sending message to get instrument id. For performance reasons, + you could specify instrument id instead of symbol and instrument type, for example: + + ``` + message = dxapi.InstrumentMessage() + message.instrumentId = loader.registerInstrument('AAPL') + // as alternative, you could write: + // message.symbol = 'AAPL' + loader.send(message) + ``` + + Args: + symbol (str): instrument ticker. + + Returns: + int: id of registered instrument. + ''' + return self.__registerInstrument(symbol) + +%} + + %feature("autodoc", ""); + + %rename(__registerType) registerType; uint32_t registerType(const std::string &type_name); + + %rename(__registerInstrument) registerInstrument; uint32_t registerInstrument(const std::string &instrument); + %rename(__send) send; void send(PyObject *message); + + %rename(__flush) flush; void flush(); + + %rename(__close) close; void close(); + %rename(__addListener) addListener; void addListener(DxApi::TickLoader::ErrorListener * listener); - //void addListener(DxApi::TickLoader::SubscriptionListener *listener); + %rename(__removeListener) removeListener; void removeListener(DxApi::TickLoader::ErrorListener *listener); - //void removeListener(DxApi::TickLoader::SubscriptionListener *listener); + %rename(__nErrorListeners) nErrorListeners; size_t nErrorListeners(); - size_t nSubscriptionListeners(); }; // TickLoader +%feature("autodoc", ""); + } } @@ -33,27 +138,18 @@ namespace TickLoader { class ErrorListener { public: + +%pythoncode %{ + def onError(self, errorClass: str, errorMsg: str) -> None: + return self.__onError() +%} + + %rename(__onError) onError; virtual void onError(const std::string &errorClass, const std::string &errorMsg) = 0; virtual ~ErrorListener(); }; -/* -class SubscriptionListener { -public: - virtual void typesAdded(const std::vector &types) = 0; - virtual void typesRemoved(const std::vector &types) = 0; - virtual void allTypesAdded() = 0; - virtual void allTypesRemoved() = 0; - - virtual void entitiesAdded(const std::vector &entities) = 0; - virtual void entitiesRemoved(const std::vector &entities) = 0; - virtual void allEnititesAdded() = 0; - virtual void allEnititesRemoved() = 0; - -protected: - ~SubscriptionListener(); -}; -*/ +%feature("autodoc", ""); } // TickLoader -} // DxApi \ No newline at end of file +} // DxApi diff --git a/src/swig/tick_stream.i b/src/swig/tick_stream.i index bbad82f..7f102f2 100644 --- a/src/swig/tick_stream.i +++ b/src/swig/tick_stream.i @@ -1,42 +1,389 @@ namespace DxApi { +%feature("autodoc", "The stream is a time series of messages for a number of +financial instruments ('entities'). Messages can be price bars, trade ticks, +bid/offer ticks, or any of the many more built-in and user-defined types. +In the simplest case, a database will have a single stream of data. +Multiple streams can be used to represent data of different frequencies, or completely +different factors. For instance, separate streams can represent +1-minute price bars and ticks for the same set of entities. Or, +you can have price bars and volatility bars in separate streams. + +Get stream: + ``` + stream = tickdb.getStream('stream_key') + ``` + +List stream: + ``` + streams = tickdb.listStreams() + ```"); + class TickStream { public: + +%pythoncode %{ + def key(self) -> str: + '''Returns the key, which uniquely identifies the stream within its database.''' + return self.__key() + + def name(self) -> str: + '''Returns a user-readable short name.''' + return self.__name() + + def distributionFactor(self) -> int: + '''Returns the target number of files to be used for storing data.''' + return self.__distributionFactor() + + def description(self) -> str: + '''Returns a user-readable multi-line description.''' + return self.__description() + + def owner(self) -> str: + '''Returns stream owner.''' + return self.__owner() + + def location(self) -> str: + '''Returns stream location.''' + return self.__location() + + def metadata(self) -> str: + '''Returns stream schema (in xml format).''' + return self.__metadata() + + def scope(self) -> StreamScope: + '''Returns stream schema (in xml format).''' + return self.__scope() + + def highAvailability(self) -> bool: + '''Returns stream memory caching parameter. High availability durable streams are cached on startup.''' + return self.__highAvailability() + + def unique(self) -> bool: + '''Unique streams maintain in-memory cache of resent messages. + This concept assumes that stream messages will have some field(s) marked as primary key. + Primary key may be a simple field (e.g. symbol) or composite (e.g. symbol and portfolio ID). + For each key TimeBase runtime maintains a copy of the last message received for this key (cache). + Each new consumer will receive a snapshot of current cache at the beginning of live data subscription. + ''' + return self.__unique() + + def polymorphic(self) -> bool: + '''Returns whether the stream is configured as polymorphic.''' + return self.__polymorphic() + + def periodicity(self) -> str: + '''Returns Stream periodicity, if known.''' + return self.__periodicity() + + def options(self) -> StreamOptions: + '''Returns stream options object.''' + return self.__options() + + def describe(self) -> str: + '''Returns stream DDL description.''' + return self.__describe() + + def setSchema(self, options: StreamOptions) -> bool: + '''Changes stream schema. + + Args: + options (StreamOptions): Stream options, that contains new schema xml. + + Returns + bool: True, if schema was changed successfully. + ''' + return self.__setSchema(options) + + def listEntities(self) -> 'list[str]': + '''Return an inclusive range of times for which the specified entities + have data in the database. + + Returns: + list[str]: selected entities. + ''' + return self.__listEntities() + + def truncate(self, timestamp: int, entities: 'list[str]' = None) -> bool: + '''Truncates stream data for the given entities from given time + + Args: + timestamp (int): Timestamp in millis. If time less than stream start time, then all stream data will be deleted. + entities (list[str]): A list of entities. If None, all stream entities will be used. + + Returns: + bool: true, if stream was truncated successfully. + ''' + if entities == None: + return self.__truncate(timestamp) + else: + return self.__truncateEntities(timestamp, entities) + + def clear(self, entities: 'list[str]' = None) -> bool: + '''Clear stream data for the given entities. + + Args: + entities (list[str]): A list of entities. If None, all stream entities will be used. + ''' + if entities == None: + return self.__clear() + else: + return self.__clearEntities(entities) + + def purge(self, timestamp: int) -> bool: + '''Deletes stream data that is older than a specified time + + Args: + timestamp (int):Purge time in milliseconds. + + Returns: + bool: true, if stream was purged successfully. + ''' + return self.__purge(timestamp) + + def deleteStream(self) -> bool: + '''Deletes this stream + + Returns: + bool: true, if stream was deleted successfully. + ''' + return self.__deleteStream() + + def abortBackgroundProcess(self) -> bool: + '''Aborts active background process if any exists''' + return self.__abortBackgroundProcess() + + def select(self, timestamp: int, options: SelectionOptions, types: 'list[str]', entities: 'list[str]') -> 'TickCursor': + '''Opens a cursor for reading data from this stream, according to the + specified options. The messages + are returned from the cursor strictly ordered by time. Within the same + exact time stamp, the order of messages is undefined and may vary from + call to call, i.e. it is non-deterministic. + + Note that the arguments of this method only determine the initial + configuration of the cursor. The TickCursor interface provides + methods for dynamically re-configuring the subscription, or jumping to + a different timestamp. + + Args: + timestamp (int): The start timestamp in millis. + options (SelectionOptions): Selection options. + types (list[str]): Specified message types to be subscribed. If null, then all types will be subscribed. + entities (list[str]): Specified entities to be subscribed. If null, then all entities will be subscribed. + + Returns: + TickCursor: A cursor used to read messages. + ''' + return self.__select(timestamp, options, types, entities) + + @contextmanager + def trySelect(self, timestamp: int, options: SelectionOptions, types: 'list[str]', entities: 'list[str]') -> 'TickCursor': + '''Contextmanager version of select. Usage: + ``` + with stream.newSelect(timestamp, options, types, entities) as cursor: + while cursor.next(): + message = cursor.getMessage() + ``` + ''' + cursor = None + try: + cursor = self.__select(timestamp, options, types, entities) + yield cursor + finally: + cursor.close() + + def createCursor(self, options: SelectionOptions) -> 'TickCursor': + '''Creates a cursor for reading data from this stream, according to the + specified options, but initially with a fully restricted filter. + The user must call TickCursor.reset at least once, in order to + begin retrieving data. This method is equivalent to (but is + slightly more optimal than) calling createCursor(options) + + Args: + options (SelectionOptions): Selection Options. + + Returns: + A cursor used to read messages. Never null. + ''' + return self.__createCursor(options) + + @contextmanager + def tryCursor(self, options: SelectionOptions) -> 'TickCursor': + '''contextmanager version of createCursor. Usage: + ``` + with stream.newCursor(options) as cursor: + while cursor.next(): + message = cursor.getMessage() + ``` + ''' + cursor = None + try: + cursor = self.__createCursor(options) + yield cursor + finally: + cursor.close() + + def createLoader(self, options: LoadingOptions) -> 'TickLoader': + '''Creates a channel for loading data. The loader must be closed + when the loading process is finished. + + Args: + options (SelectionOptions): Loading Options. + + Returns: + TickLoader: created loader. + ''' + return self.__createLoader(options) + + @contextmanager + def tryLoader(self, options: LoadingOptions) -> 'TickLoader': + '''Contextmanager version of createLoader. Usage: + ``` + with stream.newLoader(options) as loader: + loader.send(message) + ``` + ''' + loader = None + try: + loader = self.__createLoader(options) + yield loader + finally: + loader.close() + + def listSpaces(self) -> 'list[str]': + '''Returns all created "spaces" for the stream. + Default space returns as "" (empty string). + If backing stream does not support spaces None will be returned. + ''' + return self.__listSpaces() + + def renameSpace(self, newName: str, oldName: str) -> None: + '''Rename existing space. + + Args: + nameName (str): space to rename. + oldName (str): new space name. + ''' + return self.__renameSpace(newName, oldName) + + def deleteSpaces(self, spaces: 'list[str]') -> None: + '''Removed given 'spaces' permanently. + + Args: + spaces (list[str]): list of spaces names to delete. + ''' + return self.__deleteSpaces(spaces) + + def getTimeRange(self, entities: 'list[str]' = None) -> 'list[int]': + '''Return an inclusive range of times for which the specified entities + have data in the database. + + Args: + entities (list[str]): A list of entities. If empty, return for all. + ''' + if entities == None: + return self.__getTimeRange() + else: + return self.__getTimeRangeEntities(entities) + + def getSpaceTimeRange(self, space: str) -> 'list[int]': + '''An array consisting of two long timestamps (from and to) or None if no data was found. + + Args: + space (str): space name. + ''' + return self.__getSpaceTimeRange(space) +%} + + %feature("autodoc", ""); + + %rename(__key) key; const std::string& key() const; + + %rename(__distributionFactor) distributionFactor; int32_t distributionFactor() const; + + %rename(__name) name; const DxApi::Nullable& name() const; + + %rename(__description) description; const DxApi::Nullable& description() const; + + %rename(__owner) owner; const DxApi::Nullable& owner() const; + + %rename(__location) location; const DxApi::Nullable& location() const; + + %rename(__metadata) metadata; const DxApi::Nullable& metadata() const; + + %rename(__scope) scope; DxApi::StreamScope scope() const; + + %rename(__highAvailability) highAvailability; bool highAvailability() const; + + %rename(__unique) unique; bool unique() const; + + %rename(__polymorphic) polymorphic; bool polymorphic() const; + + %rename(__periodicity) periodicity; const std::string& periodicity() const; + + %rename(__options) options; const DxApi::StreamOptions& options() const; + %rename(__describe) describe; + const std::string describe() const; + + %rename(__setSchema) setSchema; bool setSchema(const DxApi::StreamOptions & options); - + + %rename(__listEntities) listEntities; std::vector listEntities(); + %rename(__truncate) truncate; bool truncate(TimestampMs millisecondTime) const; + + %rename(__truncateEntities) truncate; bool truncate(TimestampMs millisecondTime, const std::vector * const entities) const; + + %rename(__clear) clear; bool clear() const; + + %rename(__clearEntities) clear; bool clear(const std::vector * const entities) const; + + %rename(__purge) purge; bool purge(TimestampMs millisecondTime) const; + %rename(__deleteStream) deleteStream; bool deleteStream(); + + %rename(__abortBackgroundProcess) abortBackgroundProcess; bool abortBackgroundProcess() const; + %rename(__select) select; DxApi::TickCursor * select(DxApi::TimestampMs millisecondTime, const DxApi::SelectionOptions &options, const std::vector * types, const std::vector * entities) const; + + %rename(__createCursor) createCursor; DxApi::TickCursor * createCursor(const DxApi::SelectionOptions &options) const; + + %rename(__createLoader) createLoader; DxApi::TickLoader * createLoader(const DxApi::LoadingOptions &options) const; + %rename(__listSpaces) listSpaces; std::vector listSpaces() const; + + %rename(__renameSpace) renameSpace; void renameSpace(const std::string &newName, const std::string &oldName) const; + + %rename(__deleteSpaces) deleteSpaces; void deleteSpaces(const std::vector& spaces) const; static void operator delete(void* ptr, size_t sz); //not supported by swig, will be skipped @@ -50,7 +397,8 @@ protected: }; // class TickStream %extend TickStream { - + + %rename(__getTimeRange) getTimeRange; std::vector getTimeRange() { std::vector result(2); @@ -70,6 +418,7 @@ protected: } } + %rename(__getTimeRangeEntities) getTimeRange; std::vector getTimeRange(const std::vector * const entities) { std::vector result(2); @@ -89,6 +438,7 @@ protected: } } + %rename(__getSpaceTimeRange) getTimeRange; std::vector getTimeRange(const std::string &space) { std::vector result(2); @@ -110,5 +460,6 @@ protected: } +%feature("autodoc", ""); -} // namespace DxApi \ No newline at end of file +} // namespace DxApi diff --git a/src/swig/tick_utils.i b/src/swig/tick_utils.i index f7f791e..6ece469 100644 --- a/src/swig/tick_utils.i +++ b/src/swig/tick_utils.i @@ -1,37 +1,11 @@ %pythoncode %{ -from contextlib import contextmanager - -@contextmanager -def open_TickDb(url, readonly): - db = TickDb.createFromUrl(url) - try: - db.open(readonly) - yield db - finally: - if db.isOpen(): - db.close() - - -@contextmanager -def open_TickCursor(stream, ts_from=0, options=SelectionOptions(), types=None, entities=None): - cursor = None - try: - cursor = stream.select(ts_from, options, types, entities) - yield cursor - finally: - cursor.close() - - - from collections import defaultdict -__JAVA_LONG_MAX_VALUE = 922337203685477580 - -def stream_to_dict(db, stream, fields=None, ts_from=0, ts_to=__JAVA_LONG_MAX_VALUE): - if ts_to > __JAVA_LONG_MAX_VALUE: - ts_to = __JAVA_LONG_MAX_VALUE +def stream_to_dict(db, stream, fields=None, ts_from=0, ts_to=JAVA_LONG_MAX_VALUE): + if ts_to > JAVA_LONG_MAX_VALUE: + ts_to = JAVA_LONG_MAX_VALUE if not db.isOpen(): raise Exception('Database is not opened.') options = SelectionOptions() @@ -174,4 +148,4 @@ def __getSchema(self): TickStream.getSchema = __getSchema -%} \ No newline at end of file +%} diff --git a/tests/TestAll.py b/tests/TestAll.py index 984b98e..93488a3 100644 --- a/tests/TestAll.py +++ b/tests/TestAll.py @@ -15,6 +15,7 @@ 'TestMemoryManagement', 'TestNextIfAvailable', 'TestMultithreaded' + #'TestEntities' ] suite = unittest.TestSuite() diff --git a/tests/TestCursor.py b/tests/TestCursor.py index 51e4226..6a3760c 100644 --- a/tests/TestCursor.py +++ b/tests/TestCursor.py @@ -253,5 +253,29 @@ def test_GetCurrentStreamKey(self): elif "BarMessage" in typeName: self.assertEqual(stream, "bars1min") + def test_ContextManager(self): + stream = self.db.getStream(self.streamKeys[0]) + + with self.db.tryCursor(stream, dxapi.SelectionOptions()) as cursor: + self.assertTrue(cursor.next()) + self.assertTrue(cursor.getMessage() != None) + + with stream.tryCursor(dxapi.SelectionOptions()) as cursor: + self.assertTrue(cursor.next()) + self.assertTrue(cursor.getMessage() != None) + + with self.db.trySelect(0, [stream], dxapi.SelectionOptions(), None, None) as cursor: + self.assertTrue(cursor.next()) + self.assertTrue(cursor.getMessage() != None) + + with stream.trySelect(0, dxapi.SelectionOptions(), None, None) as cursor: + self.assertTrue(cursor.next()) + self.assertTrue(cursor.getMessage() != None) + + with self.db.tryExecuteQuery("select * from " + str(stream.key())) as cursor: + self.assertTrue(cursor.next()) + self.assertTrue(cursor.getMessage() != None) + + if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main() diff --git a/tests/TestLoader.py b/tests/TestLoader.py index cfca344..10e7a6d 100644 --- a/tests/TestLoader.py +++ b/tests/TestLoader.py @@ -26,6 +26,21 @@ def test_LoadFixed(self): finally: self.deleteStream(key) + def test_ContextManager(self): + key = self.streamKeys[0] + try: + stream = self.createStream(key, False) + self.assertIsNotNone(stream) + self.assertEqual(self.streamCount(key), 0) # check stream is empty + + count = 12345 + loadCount = testutils.loadWithBars(stream, count) + self.assertEqual(count, loadCount) + readCount = self.streamCount(key) + self.assertEqual(readCount, loadCount) + finally: + self.deleteStream(key) + def test_LoadPolymorphic(self): key = self.streamKeys[1] try: @@ -222,4 +237,4 @@ def streamCountQQL(self, key): cursor.close() if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main() diff --git a/tests/TestQQL.py b/tests/TestQQL.py index eb834a6..629df3c 100644 --- a/tests/TestQQL.py +++ b/tests/TestQQL.py @@ -15,7 +15,7 @@ def test_ExecuteQueryWithOptions(self): options = dxapi.SelectionOptions() options._from = 5000 options.to = 6000 - cursor = self.db.executeQuery("select * from bars1min", options, []) + cursor = self.db.executeQuery("select * from bars1min", options) self.assertTrue(cursor.next()) message = cursor.getMessage() @@ -41,4 +41,4 @@ def test_ExecuteQueryException(self): if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main() diff --git a/tests/TestStream.py b/tests/TestStream.py index 0154d6a..291cd3d 100644 --- a/tests/TestStream.py +++ b/tests/TestStream.py @@ -25,6 +25,12 @@ def test_StreamOptions(self): self.assertEqual(options.polymorphic, True) self.assertEqual(options.periodicity, 'IRREGULAR') + def test_StreamDescribe(self): + stream = self.db.getStream(self.streamKeys[1]) + description = stream.describe() + print(description) + self.assertIsNotNone(description) + def test_ListEntities(self): stream = self.db.getStream(self.streamKeys[2]) self.assertIsNotNone(stream) @@ -111,7 +117,7 @@ def test_Spaces(self): time.sleep(2) # Test Time Range of space - actualRange = stream.getTimeRange('SpaceY') + actualRange = stream.getSpaceTimeRange('SpaceY') self.assertEqual(actualRange[0], 0) self.assertEqual(actualRange[1], 12344000) diff --git a/tests/TestTickDB.py b/tests/TestTickDB.py index 9b96e0c..57022e0 100644 --- a/tests/TestTickDB.py +++ b/tests/TestTickDB.py @@ -19,6 +19,10 @@ def test_isOpen(self): def test_isReadOnly(self): self.assertFalse(self.db.isReadOnly()) + def test_open(self): + with dxapi.TickDb.openFromUrl(self.dxtickURL(), True) as tickdb: + self.assertTrue(tickdb.isOpen()) + #def test_createStream(self): # key = self.streamKeys[1] # try: @@ -114,4 +118,4 @@ def deleteStreams(self): self.deleteStream(key) if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main() diff --git a/tests/testutils.py b/tests/testutils.py index 160f791..c50fb11 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -44,6 +44,22 @@ def loadBars(stream, count, startTime = 0, timeInterval = 1000000000, symbols = if loader != None: loader.close() +def loadWithBars(stream, count, startTime = 0, timeInterval = 1000000000, symbols = ['MSFT', 'ORCL'], space = None): + options = dxapi.LoadingOptions() + if space != None: + options.space = space + with stream.tryLoader(options) as loader: + loadCount = 0 + barGenerator = generators.BarGenerator(startTime, timeInterval, count, symbols) + while barGenerator.next(): + message = barGenerator.getMessage() + loader.send(message) + loadCount = loadCount + 1 + printLoadingInfo(loadCount, message) + + print("Total loaded " + str(loadCount) + " messages") + return loadCount + def loadL2(stream, count, actionsCount = 5, startTime = 0, timeInterval = 1000000000, symbols = ['MSFT', 'ORCL']): loader = stream.createLoader(dxapi.LoadingOptions()) try: