Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(
*,
amqp_transport: AmqpTransport,
max_buffer_length: int,
max_wait_time: float = 1
max_wait_time: float = 1,
):
self._buffered_queue: queue.Queue = queue.Queue()
self._max_buffer_len = max_buffer_length
Expand All @@ -59,9 +59,7 @@ def start(self):
self._running = True
if self._max_wait_time:
self._last_send_time = time.time()
self._check_max_wait_time_future = self._executor.submit(
self.check_max_wait_time_worker
)
self._check_max_wait_time_future = self._executor.submit(self.check_max_wait_time_worker)

def stop(self, flush=True, timeout_time=None, raise_error=False):
self._running = False
Expand All @@ -80,9 +78,7 @@ def stop(self, flush=True, timeout_time=None, raise_error=False):
try:
self._check_max_wait_time_future.result(remain_timeout)
except Exception as exc: # pylint: disable=broad-except
_LOGGER.warning(
"Partition %r stopped with error %r", self.partition_id, exc
)
_LOGGER.warning("Partition %r stopped with error %r", self.partition_id, exc)
self._producer.close()

def put_events(self, events, timeout_time=None):
Expand All @@ -102,9 +98,7 @@ def put_events(self, events, timeout_time=None):
# flush the buffer
self.flush(timeout_time=timeout_time)
if timeout_time and time.time() > timeout_time:
raise OperationTimeoutError(
"Failed to enqueue events into buffer due to timeout."
)
raise OperationTimeoutError("Failed to enqueue events into buffer due to timeout.")
with self._lock:
try:
# add single event into current batch
Expand Down Expand Up @@ -157,9 +151,7 @@ def flush(self, timeout_time=None, raise_error=True):
_LOGGER.info("Partition %r is sending.", self.partition_id)
self._producer.send(
batch,
timeout=timeout_time - time.time()
if timeout_time
else None,
timeout=timeout_time - time.time() if timeout_time else None,
)
_LOGGER.info(
"Partition %r sending %r events succeeded.",
Expand All @@ -184,14 +176,10 @@ def flush(self, timeout_time=None, raise_error=True):
finally:
self._cur_buffered_len -= len(batch)
else:
_LOGGER.info(
"Partition %r fails to flush due to timeout.", self.partition_id
)
_LOGGER.info("Partition %r fails to flush due to timeout.", self.partition_id)
if raise_error:
raise OperationTimeoutError(
"Failed to flush {!r} within {}".format(
self.partition_id, timeout_time
)
"Failed to flush {!r} within {}".format(self.partition_id, timeout_time)
)
break
# after finishing flushing, reset cur batch and put it into the buffer
Expand All @@ -202,9 +190,7 @@ def check_max_wait_time_worker(self):
while self._running:
if self._cur_buffered_len > 0:
now_time = time.time()
_LOGGER.info(
"Partition %r worker is checking max_wait_time.", self.partition_id
)
_LOGGER.info("Partition %r worker is checking max_wait_time.", self.partition_id)
# flush the partition if the producer is running beyond the waiting time
# or the buffer is at max capacity
if (now_time - self._last_send_time > self._max_wait_time) or (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(
amqp_transport: AmqpTransport,
max_buffer_length: int = 1500,
max_wait_time: float = 1,
executor: Optional[Union[ThreadPoolExecutor, int]] = None
executor: Optional[Union[ThreadPoolExecutor, int]] = None,
):
self._buffered_producers: Dict[str, BufferedProducer] = {}
self._partition_ids: List[str] = partitions
Expand Down Expand Up @@ -62,20 +62,14 @@ def _get_partition_id(self, partition_id, partition_key):
if partition_id:
if partition_id not in self._partition_ids:
raise ConnectError(
"Invalid partition {} for the event hub {}".format(
partition_id, self._eventhub_name
)
"Invalid partition {} for the event hub {}".format(partition_id, self._eventhub_name)
)
return partition_id
if isinstance(partition_key, str):
return self._partition_resolver.get_partition_id_by_partition_key(
partition_key
)
return self._partition_resolver.get_partition_id_by_partition_key(partition_key)
return self._partition_resolver.get_next_partition_id()

def enqueue_events(
self, events, *, partition_id=None, partition_key=None, timeout_time=None
):
def enqueue_events(self, events, *, partition_id=None, partition_key=None, timeout_time=None):
pid = self._get_partition_id(partition_id, partition_key)
with self._lock:
try:
Expand All @@ -90,7 +84,7 @@ def enqueue_events(
executor=self._executor,
max_wait_time=self._max_wait_time,
max_buffer_length=self._max_buffer_length,
amqp_transport = self._amqp_transport,
amqp_transport=self._amqp_transport,
)
buffered_producer.start()
self._buffered_producers[pid] = buffered_producer
Expand All @@ -105,9 +99,7 @@ def flush(self, timeout_time=None):
futures.append(
(
pid,
self._executor.submit(
producer.flush, timeout_time=timeout_time
),
self._executor.submit(producer.flush, timeout_time=timeout_time),
)
)

Expand All @@ -123,9 +115,7 @@ def flush(self, timeout_time=None):
_LOGGER.info("Flushing all partitions succeeded")
return

_LOGGER.warning(
"Flushing all partitions partially failed with result %r.", exc_results
)
_LOGGER.warning("Flushing all partitions partially failed with result %r.", exc_results)
raise EventDataSendError(
message="Flushing all partitions partially failed, failed partitions are {!r}"
" Exception details are {!r}".format(exc_results.keys(), exc_results)
Expand Down Expand Up @@ -166,9 +156,7 @@ def close(self, *, flush=True, timeout_time=None, raise_error=False):
if raise_error:
raise EventHubError(
message="Stopping all partitions partially failed, failed partitions are {!r}"
" Exception details are {!r}".format(
exc_results.keys(), exc_results
)
" Exception details are {!r}".format(exc_results.keys(), exc_results)
)

if not self._existing_executor:
Expand All @@ -182,6 +170,4 @@ def get_buffered_event_count(self, pid):

@property
def total_buffered_event_count(self):
return sum(
(self.get_buffered_event_count(pid) for pid in self._buffered_producers)
)
return sum((self.get_buffered_event_count(pid) for pid in self._buffered_producers))
Original file line number Diff line number Diff line change
Expand Up @@ -109,136 +109,46 @@ def compute_hash(data, init_val=0, init_val2=0):

p = 0 # string offset
while lenpos > 12:
a += (
ord(data[p + 0])
+ (ord(data[p + 1]) << 8)
+ (ord(data[p + 2]) << 16)
+ (ord(data[p + 3]) << 24)
)
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16) + (ord(data[p + 3]) << 24)
a &= 0xFFFFFFFF
b += (
ord(data[p + 4])
+ (ord(data[p + 5]) << 8)
+ (ord(data[p + 6]) << 16)
+ (ord(data[p + 7]) << 24)
)
b += ord(data[p + 4]) + (ord(data[p + 5]) << 8) + (ord(data[p + 6]) << 16) + (ord(data[p + 7]) << 24)
b &= 0xFFFFFFFF
c += (
ord(data[p + 8])
+ (ord(data[p + 9]) << 8)
+ (ord(data[p + 10]) << 16)
+ (ord(data[p + 11]) << 24)
)
c += ord(data[p + 8]) + (ord(data[p + 9]) << 8) + (ord(data[p + 10]) << 16) + (ord(data[p + 11]) << 24)
c &= 0xFFFFFFFF
a, b, c = mix(a, b, c)
p += 12
lenpos -= 12

if lenpos == 12:
c += (
ord(data[p + 8])
+ (ord(data[p + 9]) << 8)
+ (ord(data[p + 10]) << 16)
+ (ord(data[p + 11]) << 24)
)
b += (
ord(data[p + 4])
+ (ord(data[p + 5]) << 8)
+ (ord(data[p + 6]) << 16)
+ (ord(data[p + 7]) << 24)
)
a += (
ord(data[p + 0])
+ (ord(data[p + 1]) << 8)
+ (ord(data[p + 2]) << 16)
+ (ord(data[p + 3]) << 24)
)
c += ord(data[p + 8]) + (ord(data[p + 9]) << 8) + (ord(data[p + 10]) << 16) + (ord(data[p + 11]) << 24)
b += ord(data[p + 4]) + (ord(data[p + 5]) << 8) + (ord(data[p + 6]) << 16) + (ord(data[p + 7]) << 24)
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16) + (ord(data[p + 3]) << 24)
if lenpos == 11:
c += ord(data[p + 8]) + (ord(data[p + 9]) << 8) + (ord(data[p + 10]) << 16)
b += (
ord(data[p + 4])
+ (ord(data[p + 5]) << 8)
+ (ord(data[p + 6]) << 16)
+ (ord(data[p + 7]) << 24)
)
a += (
ord(data[p + 0])
+ (ord(data[p + 1]) << 8)
+ (ord(data[p + 2]) << 16)
+ (ord(data[p + 3]) << 24)
)
b += ord(data[p + 4]) + (ord(data[p + 5]) << 8) + (ord(data[p + 6]) << 16) + (ord(data[p + 7]) << 24)
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16) + (ord(data[p + 3]) << 24)
if lenpos == 10:
c += ord(data[p + 8]) + (ord(data[p + 9]) << 8)
b += (
ord(data[p + 4])
+ (ord(data[p + 5]) << 8)
+ (ord(data[p + 6]) << 16)
+ (ord(data[p + 7]) << 24)
)
a += (
ord(data[p + 0])
+ (ord(data[p + 1]) << 8)
+ (ord(data[p + 2]) << 16)
+ (ord(data[p + 3]) << 24)
)
b += ord(data[p + 4]) + (ord(data[p + 5]) << 8) + (ord(data[p + 6]) << 16) + (ord(data[p + 7]) << 24)
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16) + (ord(data[p + 3]) << 24)
if lenpos == 9:
c += ord(data[p + 8])
b += (
ord(data[p + 4])
+ (ord(data[p + 5]) << 8)
+ (ord(data[p + 6]) << 16)
+ (ord(data[p + 7]) << 24)
)
a += (
ord(data[p + 0])
+ (ord(data[p + 1]) << 8)
+ (ord(data[p + 2]) << 16)
+ (ord(data[p + 3]) << 24)
)
b += ord(data[p + 4]) + (ord(data[p + 5]) << 8) + (ord(data[p + 6]) << 16) + (ord(data[p + 7]) << 24)
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16) + (ord(data[p + 3]) << 24)
if lenpos == 8:
b += (
ord(data[p + 4])
+ (ord(data[p + 5]) << 8)
+ (ord(data[p + 6]) << 16)
+ (ord(data[p + 7]) << 24)
)
a += (
ord(data[p + 0])
+ (ord(data[p + 1]) << 8)
+ (ord(data[p + 2]) << 16)
+ (ord(data[p + 3]) << 24)
)
b += ord(data[p + 4]) + (ord(data[p + 5]) << 8) + (ord(data[p + 6]) << 16) + (ord(data[p + 7]) << 24)
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16) + (ord(data[p + 3]) << 24)
if lenpos == 7:
b += ord(data[p + 4]) + (ord(data[p + 5]) << 8) + (ord(data[p + 6]) << 16)
a += (
ord(data[p + 0])
+ (ord(data[p + 1]) << 8)
+ (ord(data[p + 2]) << 16)
+ (ord(data[p + 3]) << 24)
)
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16) + (ord(data[p + 3]) << 24)
if lenpos == 6:
b += (ord(data[p + 5]) << 8) + ord(data[p + 4])
a += (
ord(data[p + 0])
+ (ord(data[p + 1]) << 8)
+ (ord(data[p + 2]) << 16)
+ (ord(data[p + 3]) << 24)
)
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16) + (ord(data[p + 3]) << 24)
if lenpos == 5:
b += ord(data[p + 4])
a += (
ord(data[p + 0])
+ (ord(data[p + 1]) << 8)
+ (ord(data[p + 2]) << 16)
+ (ord(data[p + 3]) << 24)
)
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16) + (ord(data[p + 3]) << 24)
if lenpos == 4:
a += (
ord(data[p + 0])
+ (ord(data[p + 1]) << 8)
+ (ord(data[p + 2]) << 16)
+ (ord(data[p + 3]) << 24)
)
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16) + (ord(data[p + 3]) << 24)
if lenpos == 3:
a += ord(data[p + 0]) + (ord(data[p + 1]) << 8) + (ord(data[p + 2]) << 16)
if lenpos == 2:
Expand Down
Loading