Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid crash in recovery by pessimistically looking up the earliest key #289

Merged
12 changes: 9 additions & 3 deletions faust/tables/recovery.py
Expand Up @@ -690,12 +690,18 @@ async def _build_offsets(
# Offsets may have been compacted, need to get to the recent ones
earliest = await consumer.earliest_offsets(*tps)
# FIXME To be consistent with the offset -1 logic
earliest = {tp: offset - 1 for tp, offset in earliest.items()}
earliest = {
tp: offset - 1 if offset is not None else None
for tp, offset in earliest.items()
}

for tp in tps:
last_value = destination[tp]
new_value = earliest[tp]
new_value = earliest.get(tp, None)

if last_value is None:
if last_value is None and new_value is None:
destination[tp] = -1
elif last_value is None:
destination[tp] = new_value
elif new_value is None:
destination[tp] = last_value
Expand Down
46 changes: 46 additions & 0 deletions tests/unit/tables/test_recovery.py
Expand Up @@ -237,6 +237,52 @@ async def test__build_offsets(self, *, recovery):
}
)

@pytest.mark.asyncio
async def test__build_offsets_with_none(self, *, recovery, app) -> None:
consumer = Mock(
name="consumer",
earliest_offsets=AsyncMock(
return_value={TP1: 0, TP2: 3, TP3: 5, TP4: None}
),
)
tps = {TP1, TP2, TP3, TP4}
destination = {TP1: None, TP2: 1, TP3: 8, TP4: -1}
await recovery._build_offsets(consumer, tps, destination, "some-title")
assert len(destination) == 4
assert destination[TP1] == -1
assert destination[TP2] == 2
assert destination[TP3] == 8
assert destination[TP4] == -1

@pytest.mark.asyncio
async def test__build_offsets_both_none(self, *, recovery, app) -> None:
consumer = Mock(
name="consumer",
earliest_offsets=AsyncMock(return_value={TP1: None}),
)
tps = {TP1}
destination = {TP1: None}
await recovery._build_offsets(consumer, tps, destination, "some-title")
assert len(destination) == 1
assert destination[TP1] == -1

@pytest.mark.asyncio
async def test__build_offsets_partial_consumer_response(
self, *, recovery, app
) -> None:
consumer = Mock(
name="consumer",
earliest_offsets=AsyncMock(return_value={TP1: None}),
)
tps = {TP1}
destination = {TP1: 3, TP2: 4, TP3: 5, TP4: 20}
await recovery._build_offsets(consumer, tps, destination, "some-title")
assert len(destination) == 4
assert destination[TP1] == 3
assert destination[TP2] == 4
assert destination[TP3] == 5
assert destination[TP4] == 20

@pytest.mark.asyncio
async def test__seek_offsets(self, *, recovery):
consumer = Mock(
Expand Down