Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto reconnect fix #25

Merged
merged 1 commit into from
Nov 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow_kubernetes_job_operator/kube_api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def can_reconnect():
self._set_connection_state(KubeApiRestQueryConnectionState.Disconnected)

# Case auto_reconnect has changed.
if not self.auto_reconnect and do_reconnect:
if not self.auto_reconnect or not do_reconnect:
break

kube_logger.debug(f"[{self.resource_path}] Connection lost, reconnecting..")
Expand Down
5 changes: 2 additions & 3 deletions airflow_kubernetes_job_operator/kube_api/queries.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from logging import Logger
import logging
from datetime import datetime
import os
import json
from typing import Union, List, Callable
import dateutil.parser
Expand Down Expand Up @@ -35,7 +34,7 @@ class LogLine:
autodetect_kuberentes_log_level: bool = True
detect_kubernetes_log_level: Callable = None

def __init__(self, pod_name: str, namespace: str, message: str, timestamp: datetime):
def __init__(self, pod_name: str, namespace: str, message: str, timestamp: datetime = None):
"""GetPodLogs log line generated info object.

Args:
Expand All @@ -48,7 +47,7 @@ def __init__(self, pod_name: str, namespace: str, message: str, timestamp: datet
self.pod_name = pod_name
self.namespace = namespace
self.message = message
self.timestamp = timestamp
self.timestamp = timestamp or datetime.now()

def log(self, logger: Logger = kube_logger):
msg = self.__repr__()
Expand Down
33 changes: 26 additions & 7 deletions airflow_kubernetes_job_operator/kube_api/watchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
LogLine,
)

from zthreading.decorators import thread_synchronized


class NamespaceWatchQueryResourceState(EventHandler):
def __init__(
Expand Down Expand Up @@ -132,6 +134,7 @@ class NamespaceWatchQuery(KubeApiRestQuery):
state_changed_event_name = "state_changed"
deleted_event_name = "deleted"
watch_started_event_name = "watch_started"
pod_logs_reader_started_event_name = "pod_logs_reader_started"

def __init__(
self,
Expand Down Expand Up @@ -193,6 +196,17 @@ def watched_objects(self) -> List[NamespaceWatchQueryResourceState]:
def emit_log(self, data):
self.emit(self.pod_log_event_name, data)

@thread_synchronized
def _create_pod_log_reader(self, uid: str, name: str, namespace: str, follow=True):
read_logs = GetPodLogs(
name=name,
namespace=namespace,
since=self.pod_log_since,
follow=follow,
)
self._executing_pod_loggers[uid] = read_logs
return read_logs

def process_data_state(self, data: dict, client: KubeApiRestClient):
if not self.is_running:
return
Expand All @@ -213,17 +227,19 @@ def process_data_state(self, data: dict, client: KubeApiRestClient):
del self._object_states[uid]

if self.watch_pod_logs and kind == "pod" and uid not in self._executing_pod_loggers:
namespace = data["metadata"]["namespace"]
name = data["metadata"]["name"]
namesoace = data["metadata"]["namespace"]
pod_status = data["status"]["phase"]
if pod_status != "Pending":
read_logs = GetPodLogs(
osw = self._object_states.get(uid)
read_logs = self._create_pod_log_reader(
uid=uid,
name=name,
namespace=namespace,
since=self.pod_log_since,
follow=True,
namespace=namesoace,
)

osw.emit(self.pod_logs_reader_started_event_name)

def handle_error(sender, *args):
# Don't throw error if not running.
if not self.is_running:
Expand All @@ -237,7 +253,6 @@ def handle_error(sender, *args):
# binding only relevant events.
read_logs.on(read_logs.data_event_name, lambda line: self.emit_log(line))
read_logs.on(read_logs.error_event_name, handle_error)
self._executing_pod_loggers[uid] = read_logs
client.query_async(read_logs)

def _stop_all_loggers(
Expand Down Expand Up @@ -271,17 +286,21 @@ def log_event(self, logger: Logger, ev: Event):
f"[{get_ns_objs.namespace}/{get_ns_objs.kind.plural}] "
+ f"Watch collection for {get_ns_objs.kind.plural} lost, attempting to reconnect..."
)
if ev.name == self.state_changed_event_name:
elif ev.name == self.state_changed_event_name:
osw: NamespaceWatchQueryResourceState = ev.sender
logger.info(f"[{osw.namespace}/{osw.kind_name.lower()}s/{osw.name}]" + f" {osw.state}")
elif ev.name == self.pod_log_event_name:
line: LogLine = ev.args[0]
line.log(logger)
elif ev.name == self.pod_logs_reader_started_event_name:
osw: NamespaceWatchQueryResourceState = ev.sender
logger.info(f"[{osw.namespace}/{osw.kind_name.lower()}s/{osw.name}] Reading logs")

def pipe_to_logger(self, logger: Logger = kube_logger, allowed_event_names=None) -> int:
allowed_event_names = set(
allowed_event_names
or [
self.pod_logs_reader_started_event_name,
self.state_changed_event_name,
self.pod_log_event_name,
]
Expand Down
3 changes: 1 addition & 2 deletions airflow_kubernetes_job_operator/kubernetes_job_operator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import List, Union

from airflow import configuration
from airflow.utils.decorators import apply_defaults
from airflow.operators import BaseOperator
from airflow_kubernetes_job_operator.kube_api import KubeResourceState
Expand Down Expand Up @@ -254,4 +253,4 @@ def on_kill(self):
except Exception:
self.log.error("Failed to delete an aborted/killed" + " job! The job may still be executing.")

return super().on_kill()
return super().on_kill()
20 changes: 20 additions & 0 deletions tests/dags/test_double_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import os
from utils import default_args, resolve_file
from airflow import DAG
from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator


dag = DAG(
"kub-job-op-custom",
default_args=default_args,
description="Test base job operator",
schedule_interval=None,
catchup=False,
)

with dag:
KubernetesJobOperator(task_id="test_dbl_log", body_filepath=__file__ + ".yaml")

if __name__ == "__main__":
dag.clear()
dag.run()
18 changes: 18 additions & 0 deletions tests/dags/test_double_log.py.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: batch/v1
kind: Job
spec:
template:
spec:
restartPolicy: Never
containers:
- name: hello-python
image: python:3.8.6-slim
command:
- python
- '-c'
- |
import time
print("A")
time.sleep(0)
print("B")
imagePullPolicy: Always
1 change: 1 addition & 0 deletions tests/dags/test_job_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
delete_policy=default_delete_policy,
)


if __name__ == "__main__":
dag.clear(reset_dag_runs=True)
dag.run()
4 changes: 2 additions & 2 deletions tests/local_airflow/airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ logging_config_class =
# Log format
# Colour the logs when the controlling terminal is a TTY.
colored_console_log = True
colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter

log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
log_format = [%%(asctime)s] %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s

# Log filename format
Expand Down