Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-14911 Unify timeouts, add support for datetime.timedelta for expiry_policy #44

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/async_examples.rst
Expand Up @@ -63,20 +63,20 @@ in cache settings dictionary on creation.
.. literalinclude:: ../examples/expiry_policy.py
:language: python
:dedent: 12
:lines: 72-75
:lines: 73-76

.. literalinclude:: ../examples/expiry_policy.py
:language: python
:dedent: 12
:lines: 81-89
:lines: 82-90

Secondly, expiry policy can be set for all cache operations, which are done under decorator. To create it use
:py:meth:`~pyignite.cache.BaseCache.with_expire_policy`

.. literalinclude:: ../examples/expiry_policy.py
:language: python
:dedent: 12
:lines: 96-105
:lines: 97-106

Transactions
------------
Expand Down
6 changes: 3 additions & 3 deletions docs/examples.rst
Expand Up @@ -97,20 +97,20 @@ in cache settings dictionary on creation.
.. literalinclude:: ../examples/expiry_policy.py
:language: python
:dedent: 12
:lines: 31-34
:lines: 32-35

.. literalinclude:: ../examples/expiry_policy.py
:language: python
:dedent: 12
:lines: 40-46
:lines: 41-47

Secondly, expiry policy can be set for all cache operations, which are done under decorator. To create it use
:py:meth:`~pyignite.cache.BaseCache.with_expire_policy`

.. literalinclude:: ../examples/expiry_policy.py
:language: python
:dedent: 12
:lines: 53-60
:lines: 54-61

Scan
====
Expand Down
9 changes: 5 additions & 4 deletions examples/expiry_policy.py
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
import asyncio
import time
from datetime import timedelta

from pyignite import Client, AioClient
from pyignite.datatypes import ExpiryPolicy
Expand All @@ -30,7 +31,7 @@ def main():
try:
ttl_cache = client.create_cache({
PROP_NAME: 'test',
PROP_EXPIRY_POLICY: ExpiryPolicy(create=1.0)
PROP_EXPIRY_POLICY: ExpiryPolicy(create=timedelta(seconds=1.0))
})
except NotSupportedByClusterError:
print("'ExpiryPolicy' API is not supported by cluster. Finishing...")
Expand All @@ -50,7 +51,7 @@ def main():
print("Create simple Cache and set TTL through `with_expire_policy`")
simple_cache = client.create_cache('test')
try:
ttl_cache = simple_cache.with_expire_policy(access=1.0)
ttl_cache = simple_cache.with_expire_policy(access=timedelta(seconds=1.0))
ttl_cache.put(1, 1)
time.sleep(0.5)
print(f"key = {1}, value = {ttl_cache.get(1)}")
Expand All @@ -71,7 +72,7 @@ async def async_main():
try:
ttl_cache = await client.create_cache({
PROP_NAME: 'test',
PROP_EXPIRY_POLICY: ExpiryPolicy(create=1.0)
PROP_EXPIRY_POLICY: ExpiryPolicy(create=timedelta(seconds=1.0))
})
except NotSupportedByClusterError:
print("'ExpiryPolicy' API is not supported by cluster. Finishing...")
Expand All @@ -93,7 +94,7 @@ async def async_main():
print("Create simple Cache and set TTL through `with_expire_policy`")
simple_cache = await client.create_cache('test')
try:
ttl_cache = simple_cache.with_expire_policy(access=1.0)
ttl_cache = simple_cache.with_expire_policy(access=timedelta(seconds=1.0))
await ttl_cache.put(1, 1)
await asyncio.sleep(0.5)
value = await ttl_cache.get(1)
Expand Down
4 changes: 2 additions & 2 deletions examples/transactions.py
Expand Up @@ -62,7 +62,7 @@ async def async_example():

# rollback transaction on timeout.
try:
async with client.tx_start(timeout=1.0, label='long-tx') as tx:
async with client.tx_start(timeout=1000, label='long-tx') as tx:
await cache.put(key, 'fail')
await asyncio.sleep(2.0)
await tx.commit()
Expand Down Expand Up @@ -114,7 +114,7 @@ def sync_example():

# rollback transaction on timeout.
try:
with client.tx_start(timeout=1.0, label='long-tx') as tx:
with client.tx_start(timeout=1000, label='long-tx') as tx:
cache.put(key, 'fail')
time.sleep(2.0)
tx.commit()
Expand Down
8 changes: 4 additions & 4 deletions pyignite/aio_client.py
Expand Up @@ -489,15 +489,15 @@ def get_cluster(self) -> 'AioCluster':

def tx_start(self, concurrency: TransactionConcurrency = TransactionConcurrency.PESSIMISTIC,
isolation: TransactionIsolation = TransactionIsolation.REPEATABLE_READ,
timeout: Union[int, float] = 0, label: Optional[str] = None) -> 'AioTransaction':
timeout: int = 0, label: Optional[str] = None) -> 'AioTransaction':
"""
Start async thin client transaction. **Supported only python 3.7+**

:param concurrency: (optional) transaction concurrency, see
:py:class:`~pyignite.datatypes.transactions.TransactionConcurrency`
:py:class:`~pyignite.datatypes.transactions.TransactionConcurrency`,
:param isolation: (optional) transaction isolation level, see
:py:class:`~pyignite.datatypes.transactions.TransactionIsolation`
:param timeout: (optional) transaction timeout in seconds if float, in millis if int
:py:class:`~pyignite.datatypes.transactions.TransactionIsolation`,
:param timeout: (optional) transaction timeout in milliseconds,
:param label: (optional) transaction label.
:return: :py:class:`~pyignite.transaction.AioTransaction` instance.
"""
Expand Down
16 changes: 8 additions & 8 deletions pyignite/cache.py
Expand Up @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
from typing import Any, Iterable, Optional, Tuple, Union

from .api.tx_api import get_tx_connection
Expand Down Expand Up @@ -136,16 +136,16 @@ def cache_id(self) -> int:

def with_expire_policy(
self, expiry_policy: Optional[ExpiryPolicy] = None,
create: Union[int, float] = ExpiryPolicy.UNCHANGED,
update: Union[int, float] = ExpiryPolicy.UNCHANGED,
access: Union[int, float] = ExpiryPolicy.UNCHANGED
create: Union[int, datetime.timedelta] = ExpiryPolicy.UNCHANGED,
update: Union[int, datetime.timedelta] = ExpiryPolicy.UNCHANGED,
access: Union[int, datetime.timedelta] = ExpiryPolicy.UNCHANGED
):
"""
:param expiry_policy: optional :class:`~pyignite.datatypes.expiry_policy.ExpiryPolicy`
object. If it is set, other params will be ignored.
:param create: create TTL in seconds (float) or milliseconds (int),
:param update: Create TTL in seconds (float) or milliseconds (int),
:param access: Create TTL in seconds (float) or milliseconds (int).
object. If it is set, other params will be ignored,
:param create: TTL for create in milliseconds or :py:class:`~time.timedelta`,
:param update: TTL for update in milliseconds or :py:class:`~time.timedelta`,
:param access: TTL for access in milliseconds or :py:class:`~time.timedelta`,
:return: cache decorator with expiry policy set.
"""
if not self.client.protocol_context.is_expiry_policy_supported():
Expand Down
8 changes: 4 additions & 4 deletions pyignite/client.py
Expand Up @@ -744,15 +744,15 @@ def get_cluster(self) -> 'Cluster':

def tx_start(self, concurrency: TransactionConcurrency = TransactionConcurrency.PESSIMISTIC,
isolation: TransactionIsolation = TransactionIsolation.REPEATABLE_READ,
timeout: Union[int, float] = 0, label: Optional[str] = None) -> 'Transaction':
timeout: int = 0, label: Optional[str] = None) -> 'Transaction':
"""
Start thin client transaction.

:param concurrency: (optional) transaction concurrency, see
:py:class:`~pyignite.datatypes.transactions.TransactionConcurrency`
:py:class:`~pyignite.datatypes.transactions.TransactionConcurrency`,
:param isolation: (optional) transaction isolation level, see
:py:class:`~pyignite.datatypes.transactions.TransactionIsolation`
:param timeout: (optional) transaction timeout in seconds if float, in millis if int
:py:class:`~pyignite.datatypes.transactions.TransactionIsolation`,
:param timeout: (optional) transaction timeout in milliseconds,
:param label: (optional) transaction label.
:return: :py:class:`~pyignite.transaction.Transaction` instance.
"""
Expand Down
22 changes: 18 additions & 4 deletions pyignite/datatypes/cache_properties.py
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.

import ctypes
import math
from typing import Union

from . import ExpiryPolicy
from .prop_codes import *
Expand Down Expand Up @@ -137,6 +139,20 @@ async def from_python_async(cls, stream, value):
return cls.from_python(stream, value)


class TimeoutProp(PropBase):
prop_data_class = Long

@classmethod
def from_python(cls, stream, value: int):
if not isinstance(value, int) or value < 0:
raise ValueError(f'Timeout value should be a positive integer, {value} passed instead')
return super().from_python(stream, value)

@classmethod
async def from_python_async(cls, stream, value):
return cls.from_python(stream, value)


class PropName(PropBase):
prop_code = PROP_NAME
prop_data_class = String
Expand Down Expand Up @@ -227,9 +243,8 @@ class PropRebalanceDelay(PropBase):
prop_data_class = Long


class PropRebalanceTimeout(PropBase):
class PropRebalanceTimeout(TimeoutProp):
prop_code = PROP_REBALANCE_TIMEOUT
prop_data_class = Long


class PropRebalanceBatchSize(PropBase):
Expand Down Expand Up @@ -262,9 +277,8 @@ class PropCacheKeyConfiguration(PropBase):
prop_data_class = CacheKeyConfiguration


class PropDefaultLockTimeout(PropBase):
class PropDefaultLockTimeout(TimeoutProp):
prop_code = PROP_DEFAULT_LOCK_TIMEOUT
prop_data_class = Long


class PropMaxConcurrentAsyncOperation(PropBase):
Expand Down
27 changes: 16 additions & 11 deletions pyignite/datatypes/expiry_policy.py
Expand Up @@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import ctypes
import math
from datetime import timedelta
from io import SEEK_CUR
from typing import Union

Expand All @@ -22,13 +24,16 @@


def _positive(_, attrib, value):
if isinstance(value, timedelta):
value = value.total_seconds() * 1000

if value < 0 and value not in [ExpiryPolicy.UNCHANGED, ExpiryPolicy.ETERNAL]:
raise ValueError(f"'{attrib.name}' value must not be negative")


def _write_duration(stream, value):
if isinstance(value, float):
value = int(value * 1000)
if isinstance(value, timedelta):
value = math.floor(value.total_seconds() * 1000)

stream.write(value.to_bytes(8, byteorder=PROTOCOL_BYTE_ORDER, signed=True))

Expand All @@ -44,17 +49,17 @@ class ExpiryPolicy:
#: Set TTL eternal.
ETERNAL = -1

#: Set TTL for create in seconds(float) or millis(int)
create = attr.ib(kw_only=True, default=UNCHANGED,
validator=[attr.validators.instance_of((int, float)), _positive])
#: Set TTL for create in milliseconds or :py:class:`~time.timedelta`
create = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, timedelta],
validator=[attr.validators.instance_of((int, timedelta)), _positive])

#: Set TTL for update in seconds(float) or millis(int)
update = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, float],
validator=[attr.validators.instance_of((int, float)), _positive])
#: Set TTL for update in milliseconds or :py:class:`~time.timedelta`
update = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, timedelta],
validator=[attr.validators.instance_of((int, timedelta)), _positive])

#: Set TTL for access in seconds(float) or millis(int)
access = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, float],
validator=[attr.validators.instance_of((int, float)), _positive])
#: Set TTL for access in milliseconds or :py:class:`~time.timedelta`
access = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, timedelta],
validator=[attr.validators.instance_of((int, timedelta)), _positive])

class _CType(ctypes.LittleEndianStructure):
_pack_ = 1
Expand Down
46 changes: 32 additions & 14 deletions pyignite/transaction.py
Expand Up @@ -13,30 +13,50 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import math
from typing import Union
from enum import IntEnum
from typing import Union, Type

from pyignite.api.tx_api import tx_end, tx_start, tx_end_async, tx_start_async
from pyignite.datatypes import TransactionIsolation, TransactionConcurrency
from pyignite.exceptions import CacheError
from pyignite.utils import status_to_exception


def _convert_to_millis(timeout: Union[int, float]) -> int:
if isinstance(timeout, float):
return math.floor(timeout * 1000)
return timeout
def _validate_int_enum_param(value: Union[int, IntEnum], cls: Type[IntEnum]):
if value not in cls:
raise ValueError(f'{value} not in {cls}')
return value


class Transaction:
def _validate_timeout(value):
if not isinstance(value, int) or value < 0:
raise ValueError(f'Timeout value should be a positive integer, {value} passed instead')
return value


def _validate_label(value):
if value and not isinstance(value, str):
raise ValueError(f'Label should be str, {type(value)} passed instead')
return value


class _BaseTransaction:
def __init__(self, client, concurrency=TransactionConcurrency.PESSIMISTIC,
isolation=TransactionIsolation.REPEATABLE_READ, timeout=0, label=None):
self.client = client
self.concurrency = _validate_int_enum_param(concurrency, TransactionConcurrency)
self.isolation = _validate_int_enum_param(isolation, TransactionIsolation)
self.timeout = _validate_timeout(timeout)
self.label, self.closed = _validate_label(label), False


class Transaction(_BaseTransaction):
"""
Thin client transaction.
"""
def __init__(self, client, concurrency=TransactionConcurrency.PESSIMISTIC,
isolation=TransactionIsolation.REPEATABLE_READ, timeout=0, label=None):
self.client, self.concurrency = client, concurrency
self.isolation, self.timeout = isolation, _convert_to_millis(timeout)
self.label, self.closed = label, False
super().__init__(client, concurrency, isolation, timeout, label)
self.tx_id = self.__start_tx()

def commit(self) -> None:
Expand Down Expand Up @@ -77,15 +97,13 @@ def __end_tx(self, committed):
return tx_end(self.tx_id, committed)


class AioTransaction:
class AioTransaction(_BaseTransaction):
"""
Async thin client transaction.
"""
def __init__(self, client, concurrency=TransactionConcurrency.PESSIMISTIC,
isolation=TransactionIsolation.REPEATABLE_READ, timeout=0, label=None):
self.client, self.concurrency = client, concurrency
self.isolation, self.timeout = isolation, _convert_to_millis(timeout)
self.label, self.closed = label, False
super().__init__(client, concurrency, isolation, timeout, label)

def __await__(self):
return (yield from self.__aenter__().__await__())
Expand Down