Skip to content
Permalink
Browse files
Revert "Fix k8s pod.execute randomly stuck indefinitely by logs consu…
…mption (#23497) (#23618)" (#23656)

This reverts commit ee342b8.
  • Loading branch information
potiuk committed May 11, 2022
1 parent 422791f commit 2eeb120bf4da8b42eab8685979d5452b1b9b79a1
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 68 deletions.
@@ -15,8 +15,6 @@
# specific language governing permissions and limitations
# under the License.
"""Launches PODs"""
import asyncio
import concurrent
import json
import math
import time
@@ -195,40 +193,6 @@ def follow_container_logs(self, pod: V1Pod, container_name: str) -> PodLoggingSt
)
return self.fetch_container_logs(pod=pod, container_name=container_name, follow=True)

def log_iterable(self, logs: Iterable[bytes]) -> Optional[DateTime]:
timestamp = None
for line in logs:
timestamp, message = self.parse_log_line(line.decode('utf-8', errors="backslashreplace"))
self.log.info(message)
return timestamp

def consume_container_logs_stream(
self, pod: V1Pod, container_name: str, stream: Iterable[bytes]
) -> Optional[DateTime]:
async def async_await_container_completion() -> None:
await asyncio.sleep(1)
while self.container_is_running(pod=pod, container_name=container_name):
await asyncio.sleep(1)

loop = asyncio.get_event_loop()
await_container_completion = loop.create_task(async_await_container_completion())
log_stream = asyncio.ensure_future(loop.run_in_executor(None, self.log_iterable, stream))
tasks: Iterable[asyncio.Task] = {await_container_completion, log_stream}
loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
if log_stream.done():
return log_stream.result()

log_stream.cancel()
try:
loop.run_until_complete(log_stream)
except concurrent.futures.CancelledError:
self.log.warning(
"Container %s log read was interrupted at some point caused by log rotation "
"see https://github.com/apache/airflow/issues/23497 for reference.",
container_name,
)
return None

def fetch_container_logs(
self, pod: V1Pod, container_name: str, *, follow=False, since_time: Optional[DateTime] = None
) -> PodLoggingStatus:
@@ -256,11 +220,10 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True)
),
follow=follow,
)
if follow:
timestamp = self.consume_container_logs_stream(pod, container_name, logs)
else:
timestamp = self.log_iterable(logs)

for raw_line in logs:
line = raw_line.decode('utf-8', errors="backslashreplace")
timestamp, message = self.parse_log_line(line)
self.log.info(message)
except BaseHTTPError as e:
self.log.warning(
"Reading of logs interrupted with error %r; will retry. "
@@ -293,7 +256,7 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True)
time.sleep(1)

def await_container_completion(self, pod: V1Pod, container_name: str) -> None:
while self.container_is_running(pod=pod, container_name=container_name):
while not self.container_is_running(pod=pod, container_name=container_name):
time.sleep(1)

def await_pod_completion(self, pod: V1Pod) -> V1Pod:
@@ -15,8 +15,6 @@
# specific language governing permissions and limitations
# under the License.
import logging
import time
from typing import Generator
from unittest import mock
from unittest.mock import MagicMock

@@ -314,7 +312,7 @@ def test_fetch_container_since_time(self, container_running, mock_now):
args, kwargs = self.mock_kube_client.read_namespaced_pod_log.call_args_list[0]
assert kwargs['since_seconds'] == 5

@pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 4, False), (False, 1, True)])
@pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 3, False), (False, 1, True)])
@mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running')
def test_fetch_container_running_follow(
self, container_running_mock, follow, is_running_calls, exp_running
@@ -324,35 +322,13 @@ def test_fetch_container_running_follow(
When called with follow=False, should return immediately even though still running.
"""
mock_pod = MagicMock()
container_running_mock.side_effect = [True, False, False, False] # called once when follow=False
container_running_mock.side_effect = [True, True, False] # only will be called once
self.mock_kube_client.read_namespaced_pod_log.return_value = [b'2021-01-01 hi']
ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name='base', follow=follow)
assert len(container_running_mock.call_args_list) == is_running_calls
assert ret.last_log_time == DateTime(2021, 1, 1, tzinfo=Timezone('UTC'))
assert ret.running is exp_running

@pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 3, False)])
@mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running')
def test_fetch_container_running_follow_when_kube_api_hangs(
self, container_running_mock, follow, is_running_calls, exp_running
):
"""
When called with follow, should keep looping even after disconnections, if pod still running.
"""
mock_pod = MagicMock()
container_running_mock.side_effect = [True, False, False]

def stream_logs() -> Generator:
while True:
time.sleep(1) # this is intentional: urllib3.response.stream() is not async
yield b'2021-01-01 hi'

self.mock_kube_client.read_namespaced_pod_log.return_value = stream_logs()
ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name='base', follow=follow)
assert len(container_running_mock.call_args_list) == is_running_calls
assert ret.running is exp_running
assert ret.last_log_time is None


def params_for_test_container_is_running():
"""The `container_is_running` method is designed to handle an assortment of bad objects

0 comments on commit 2eeb120

Please sign in to comment.