Skip to content

Commit

Permalink
Workaround for BPO-42130 (#802)
Browse files Browse the repository at this point in the history
  • Loading branch information
ods committed Dec 6, 2021
1 parent 444a1a7 commit 67f55c0
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 51 deletions.
37 changes: 19 additions & 18 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.7
python-version: 3.9

- name: Install system dependencies
run: |
Expand All @@ -39,10 +39,10 @@ jobs:
uses: actions/cache@v1
with:
path: ${{ steps.pip-cache.outputs.dir }}
key: ${{ runner.os }}-py-3.7-${{ hashFiles('requirements-ci.txt') }}-${{ hashFiles('setup.py') }}
key: ${{ runner.os }}-py-3.9-${{ hashFiles('requirements-ci.txt') }}-${{ hashFiles('setup.py') }}
# If miss on key takes any other cache with different hashes, will download correct ones on next step anyway
restore-keys: |
${{ runner.os }}-py-3.7-
${{ runner.os }}-py-3.9-
- name: Install python dependencies
run: |
Expand All @@ -68,14 +68,16 @@ jobs:

strategy:
matrix:
python: [3.6, 3.7, 3.8.5]
python: [3.6, 3.7, 3.8, 3.9]
include:
- python: 3.6
snappy_whl: tools/python_snappy-0.5.4-cp36-cp36m-win_amd64.whl
- python: 3.7
snappy_whl: tools/python_snappy-0.5.4-cp37-cp37m-win_amd64.whl
- python: 3.8.5
- python: 3.8
snappy_whl: tools/python_snappy-0.5.4-cp38-cp38-win_amd64.whl
- python: 3.9
snappy_whl: tools/python_snappy-0.5.4-cp39-cp39-win_amd64.whl

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -144,7 +146,7 @@ jobs:

strategy:
matrix:
python: [3.6, 3.7, 3.8.5]
python: [3.6, 3.7, 3.8, 3.9]

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -217,42 +219,41 @@ jobs:
strategy:
matrix:
include:
# FIXME Stick to 3.8.5 as 3.8.6 has broken `wait_for`
- python: 3.8.5
- python: 3.9
kafka: "2.4.0"
scala: "2.12"

# Older python versions against latest broker
- python: 3.6
kafka: "2.4.0"
scala: "2.12"

# Older python versions against latest broker
- python: 3.7
kafka: "2.4.0"
scala: "2.12"

- python: 3.8
kafka: "2.4.0"
scala: "2.12"

# Older brokers against latest python version
- python: 3.8.5
- python: 3.9
kafka: "0.9.0.1"
scala: "2.11"
- python: 3.8.5
- python: 3.9
kafka: "0.10.2.1"
scala: "2.11"
- python: 3.8.5
- python: 3.9
kafka: "0.11.0.3"
scala: "2.12"
- python: 3.8.5
- python: 3.9
kafka: "1.1.1"
scala: "2.12"
- python: 3.8.5
- python: 3.9
kafka: "2.1.1"
scala: "2.12"
- python: 3.8.5
- python: 3.9
kafka: "2.2.2"
scala: "2.12"
- python: 3.8.5
- python: 3.9
kafka: "2.3.1"
scala: "2.12"
fail-fast: false
Expand Down
14 changes: 7 additions & 7 deletions aiokafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import warnings
import weakref

import async_timeout
from kafka.protocol.api import RequestHeader
from kafka.protocol.admin import (
SaslHandShakeRequest, SaslAuthenticateRequest, ApiVersionRequest
Expand All @@ -21,7 +22,7 @@
GroupCoordinatorResponse_v0 as GroupCoordinatorResponse)

import aiokafka.errors as Errors
from aiokafka.util import create_future, create_task, get_running_loop
from aiokafka.util import create_future, create_task, get_running_loop, wait_for

from aiokafka.abc import AbstractTokenProvider

Expand Down Expand Up @@ -212,10 +213,9 @@ async def connect(self):
# Create streams same as `open_connection`, but using custom protocol
reader = asyncio.StreamReader(limit=READER_LIMIT, loop=loop)
protocol = AIOKafkaProtocol(self._closed_fut, reader, loop=loop)
transport, _ = await asyncio.wait_for(
loop.create_connection(
lambda: protocol, self.host, self.port, ssl=ssl),
timeout=self._request_timeout)
async with async_timeout.timeout(self._request_timeout):
transport, _ = await loop.create_connection(
lambda: protocol, self.host, self.port, ssl=ssl)
writer = asyncio.StreamWriter(transport, protocol, reader, loop)
self._reader, self._writer, self._protocol = reader, writer, protocol

Expand Down Expand Up @@ -441,7 +441,7 @@ def send(self, request, expect_response=True):
return self._writer.drain()
fut = self._loop.create_future()
self._requests.append((correlation_id, request.RESPONSE_TYPE, fut))
return asyncio.wait_for(fut, self._request_timeout)
return wait_for(fut, self._request_timeout)

def _send_sasl_token(self, payload, expect_response=True):
if self._writer is None:
Expand All @@ -462,7 +462,7 @@ def _send_sasl_token(self, payload, expect_response=True):

fut = self._loop.create_future()
self._requests.append((None, None, fut))
return asyncio.wait_for(fut, self._request_timeout)
return wait_for(fut, self._request_timeout)

def connected(self):
return bool(self._reader is not None and not self._reader.at_eof())
Expand Down
45 changes: 19 additions & 26 deletions aiokafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
from itertools import chain

import async_timeout
from kafka.protocol.offset import OffsetRequest

from aiokafka.protocol.fetch import FetchRequest
Expand Down Expand Up @@ -846,7 +847,7 @@ async def _update_fetch_positions(self, assignment, node_id, tps):
tp_state.reset_to(offset)
return needs_wakeup

async def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
async def _retrieve_offsets(self, timestamps, timeout_ms=None):
""" Fetch offset for each partition passed in ``timestamps`` map.
Blocks until offsets are obtained, a non-retriable exception is raised
Expand All @@ -867,31 +868,23 @@ async def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
if not timestamps:
return {}

timeout = timeout_ms / 1000
start_time = time.monotonic()
remaining = timeout
while True:
try:
offsets = await asyncio.wait_for(
self._proc_offset_requests(timestamps),
timeout=None if remaining == float("inf") else remaining
)
except asyncio.TimeoutError:
break
except Errors.KafkaError as error:
if not error.retriable:
raise error
if error.invalid_metadata:
self._client.force_metadata_update()
elapsed = time.monotonic() - start_time
remaining = max(0, remaining - elapsed)
if remaining < self._retry_backoff:
break
await asyncio.sleep(self._retry_backoff)
else:
return offsets
raise KafkaTimeoutError(
"Failed to get offsets by times in %s ms" % timeout_ms)
timeout = None if timeout_ms is None else timeout_ms / 1000
try:
async with async_timeout.timeout(timeout):
while True:
try:
offsets = await self._proc_offset_requests(timestamps)
except Errors.KafkaError as error:
if not error.retriable:
raise error
if error.invalid_metadata:
self._client.force_metadata_update()
await asyncio.sleep(self._retry_backoff)
else:
return offsets
except asyncio.TimeoutError:
raise KafkaTimeoutError(
"Failed to get offsets by times in %s ms" % timeout_ms)

async def _proc_offset_requests(self, timestamps):
""" Fetch offsets for each partition in timestamps dict. This may send
Expand Down
9 changes: 9 additions & 0 deletions aiokafka/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from distutils.version import StrictVersion
from typing import Awaitable, Dict, Tuple, TypeVar, Union

import async_timeout

from .structs import OffsetAndMetadata, TopicPartition


Expand All @@ -30,6 +32,13 @@ def create_future(loop: AbstractEventLoop = None) -> "asyncio.Future[T]":
return loop.create_future()


async def wait_for(fut: Awaitable[T], timeout: Union[None, int, float] = None) -> T:
# A replacement for buggy (since 3.8.6) `asyncio.wait_for()`
# https://bugs.python.org/issue42130
async with async_timeout.timeout(timeout):
return await fut


def parse_kafka_version(api_version: str) -> Tuple[int, int, int]:
version = StrictVersion(api_version).version
if not (0, 9) <= version < (3, 0):
Expand Down
1 change: 1 addition & 0 deletions requirements-ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ Pygments==2.10.0
gssapi==1.7.2
dataclasses==0.8; python_version<"3.7"
async_generator==1.10; python_version<"3.7"
async-timeout==4.0.1
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def build_extension(self, ext):


install_requires = [
"async-timeout",
"kafka-python>=2.0.0",
"dataclasses>=0.5; python_version<'3.7'",
]
Expand Down Expand Up @@ -136,6 +137,8 @@ def read_version():
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Operating System :: OS Independent",
"Topic :: System :: Networking",
"Topic :: System :: Distributed Computing",
Expand Down

0 comments on commit 67f55c0

Please sign in to comment.