Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
from cassandra.query import (SimpleStatement, PreparedStatement, BoundStatement,
BatchStatement, bind_params, QueryTrace,
named_tuple_factory, dict_factory, tuple_factory, FETCH_SIZE_UNSET)
from cassandra.timestamps import MonotonicTimestampGenerator


def _is_eventlet_monkey_patched():
Expand Down Expand Up @@ -771,7 +772,8 @@ def __init__(self,
prepare_on_all_hosts=True,
reprepare_on_up=True,
execution_profiles=None,
allow_beta_protocol_version=False):
allow_beta_protocol_version=False,
timestamp_generator=None):
"""
``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
extablishing connection pools or refreshing metadata.
Expand Down Expand Up @@ -830,6 +832,13 @@ def __init__(self,
if connection_class is not None:
self.connection_class = connection_class

if timestamp_generator is not None:
if not callable(timestamp_generator):
raise ValueError("timestamp_generator must be callable")
self.timestamp_generator = timestamp_generator
else:
self.timestamp_generator = MonotonicTimestampGenerator()

self.profile_manager = ProfileManager()
self.profile_manager.profiles[EXEC_PROFILE_DEFAULT] = ExecutionProfile(self.load_balancing_policy,
self.default_retry_policy,
Expand Down Expand Up @@ -1893,6 +1902,27 @@ def default_serial_consistency_level(self, cl):
.. versionadded:: 2.1.0
"""

timestamp_generator = None
"""
When :attr:`use_client_timestamp` is set, sessions call this object and use
the result as the timestamp. (Note that timestamps specified within a CQL
query will override this timestamp.) By default, a new
:class:`~.MonotonicTimestampGenerator` is created for
each :class:`Cluster` instance.

Applications can set this value for custom timestamp behavior. For
example, an application could share a timestamp generator across
:class:`Cluster` objects to guarantee that the application will use unique,
increasing timestamps across clusters, or set it to to ``lambda:
int(time.time() * 1e6)`` if losing records over clock inconsistencies is
acceptable for the application. Custom :attr:`timestamp_generator` s should
be callable, and calling them should return an integer representing seconds
since some point in time, typically UNIX epoch.

.. versionadded:: 3.8.0
"""


encoder = None
"""
A :class:`~cassandra.encoder.Encoder` instance that will be used when
Expand Down Expand Up @@ -2085,7 +2115,7 @@ def _create_response_future(self, query, parameters, trace, custom_payload, time

start_time = time.time()
if self._protocol_version >= 3 and self.use_client_timestamp:
timestamp = int(start_time * 1e6)
timestamp = self.cluster.timestamp_generator()
else:
timestamp = None

Expand Down
107 changes: 107 additions & 0 deletions cassandra/timestamps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Copyright 2013-2016 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains utilities for generating timestamps for client-side
timestamp specification.
"""

import logging
import time
from threading import Lock

log = logging.getLogger(__name__)

class MonotonicTimestampGenerator(object):
"""
An object that, when called, returns ``int(time.time() * 1e6)`` when
possible, but, if the value returned by ``time.time`` doesn't increase,
drifts into the future and logs warnings.
Exposed configuration attributes can be configured with arguments to
``__init__`` or by changing attributes on an initialized object.

.. versionadded:: 3.8.0
"""

warn_on_drift = True
"""
If true, log warnings when timestamps drift into the future as allowed by
:attr:`warning_threshold` and :attr:`warning_interval`.
"""

warning_threshold = 0
"""
This object will only issue warnings when the returned timestamp drifts
more than ``warning_threshold`` seconds into the future.
"""

warning_interval = 0
"""
This object will only issue warnings every ``warning_interval`` seconds.
"""

def __init__(self, warn_on_drift=True, warning_threshold=0, warning_interval=0):
self.lock = Lock()
with self.lock:
self.last = 0
self._last_warn = 0
self.warn_on_drift = warn_on_drift
self.warning_threshold = warning_threshold
self.warning_interval = warning_interval

def _next_timestamp(self, now, last):
"""
Returns the timestamp that should be used if ``now`` is the current
time and ``last`` is the last timestamp returned by this object.
Intended for internal and testing use only; to generate timestamps,
call an instantiated ``MonotonicTimestampGenerator`` object.

:param int now: an integer to be used as the current time, typically
representing the current time in seconds since the UNIX epoch
:param int last: an integer representing the last timestamp returned by
this object
"""
if now > last:
self.last = now
return now
else:
self._maybe_warn(now=now)
self.last = last + 1
return self.last

def __call__(self):
"""
Makes ``MonotonicTimestampGenerator`` objects callable; defers
internally to _next_timestamp.
"""
with self.lock:
return self._next_timestamp(now=int(time.time() * 1e6),
last=self.last)

def _maybe_warn(self, now):
# should be called from inside the self.lock.
diff = self.last - now
since_last_warn = now - self._last_warn

warn = (self.warn_on_drift and
(diff > self.warning_threshold * 1e6) and
(since_last_warn >= self.warning_interval * 1e6))
if warn:
log.warn(
"Clock skew detected: current tick ({now}) was {diff} "
"microseconds behind the last generated timestamp "
"({last}), returned timestamps will be artificially "
"incremented to guarantee monotonicity.".format(
now=now, diff=diff, last=self.last))
self._last_warn = now
2 changes: 2 additions & 0 deletions docs/api/cassandra/cluster.rst
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@

.. autoattribute:: use_client_timestamp

.. autoattribute:: timestamp_generator

.. autoattribute:: encoder

.. autoattribute:: client_protocol_handler
Expand Down
14 changes: 14 additions & 0 deletions docs/api/cassandra/timestamps.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
``cassandra.timestamps`` - Timestamp Generation
=============================================

.. module:: cassandra.timestamps

.. autoclass:: MonotonicTimestampGenerator (warn_on_drift=True, warning_threshold=0, warning_interval=0)

.. autoattribute:: warn_on_drift

.. autoattribute:: warning_threshold

.. autoattribute:: warning_interval

.. automethod:: _next_timestamp
Loading