Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #837 from Parsely/bugfix/timestamp_interface
Browse files Browse the repository at this point in the history
Standardize offsets_before argument
  • Loading branch information
Emmett J. Butler committed Jul 19, 2018
2 parents 331fdb9 + 09602a0 commit 0f9a0f8
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 10 deletions.
2 changes: 2 additions & 0 deletions pykafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
limitations under the License.
"""
__all__ = ["Message", "CompressionType", "OffsetType"]
import datetime as dt
import logging


log = logging.getLogger(__name__)
EPOCH = dt.datetime(1970, 1, 1)


class Message(object):
Expand Down
11 changes: 7 additions & 4 deletions pykafka/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
limitations under the License.
"""
__all__ = ["Partition"]
import datetime as dt
import logging
import time
import weakref

from .common import OffsetType
from .common import OffsetType, EPOCH
from .exceptions import LeaderNotFoundError
from .protocol import PartitionOffsetRequest

Expand Down Expand Up @@ -95,12 +96,14 @@ def fetch_offset_limit(self, offsets_before, max_offsets=1):
"""Use the Offset API to find a limit of valid offsets
for this partition.
:param offsets_before: Return an offset from before this timestamp (in
milliseconds)
:type offsets_before: int
:param offsets_before: Return an offset from before
this timestamp (in milliseconds). Deprecated::2.7,3.6: do not use int
:type offsets_before: `datetime.datetime` or int
:param max_offsets: The maximum number of offsets to return
:type max_offsets: int
"""
if isinstance(offsets_before, dt.datetime):
offsets_before = round((offsets_before - EPOCH).total_seconds() * 1000)
for i in range(self.topic._cluster._max_connection_retries):
if i > 0:
log.debug("Retrying offset limit fetch")
Expand Down
3 changes: 1 addition & 2 deletions pykafka/simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

from six import reraise

from .common import OffsetType
from .common import OffsetType, EPOCH
from .utils.compat import (Queue, Empty, iteritems, itervalues,
range, iterkeys, get_bytes, get_string)
from .exceptions import (UnknownError, OffsetOutOfRangeError, UnknownTopicOrPartition,
Expand All @@ -48,7 +48,6 @@


log = logging.getLogger(__name__)
EPOCH = dt.datetime(1970, 1, 1)
MAGIC_OFFSETS = [OffsetType.EARLIEST, OffsetType.LATEST]


Expand Down
12 changes: 8 additions & 4 deletions pykafka/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
limitations under the License.
"""
__all__ = ["Topic"]
import datetime as dt
import logging
from collections import defaultdict

from .balancedconsumer import BalancedConsumer
from .common import OffsetType
from .common import OffsetType, EPOCH
from .exceptions import LeaderNotFoundError
from .managedbalancedconsumer import ManagedBalancedConsumer
from .partition import Partition
Expand Down Expand Up @@ -162,14 +163,15 @@ def fetch_offset_limits(self, offsets_before, max_offsets=1):
Thanks to Andras Beni from the Kafka users mailing list for providing
this example.
:param offsets_before: Epoch timestamp in milliseconds indicating the
:param offsets_before: Epoch timestamp in milliseconds or datetime indicating the
latest write time for returned offsets. Only offsets of messages
written before this timestamp will be returned. Permissible
special values are `common.OffsetType.LATEST`, indicating that
offsets from all available log segments should be returned, and
`common.OffsetType.EARLIEST`, indicating that only the offset of
the earliest available message should be returned.
:type offsets_before: int
the earliest available message should be returned. Deprecated::2.7,3.6:
do not use int
:type offsets_before: `datetime.datetime` or int
:param max_offsets: The maximum number of offsets to return when more
than one is available. In the case where `offsets_before ==
OffsetType.EARLIEST`, this parameter is meaningless since there is
Expand All @@ -178,6 +180,8 @@ def fetch_offset_limits(self, offsets_before, max_offsets=1):
latest `max_offsets` offsets.
:type max_offsets: int
"""
if isinstance(offsets_before, dt.datetime):
offsets_before = round((offsets_before - EPOCH).total_seconds() * 1000)
requests = defaultdict(list) # one request for each broker
for part in itervalues(self.partitions):
requests[part.leader].append(PartitionOffsetRequest(
Expand Down

0 comments on commit 0f9a0f8

Please sign in to comment.