Skip to content

Commit

Permalink
Merge pull request #39 from LamaAni/multi_container_pod_logs
Browse files Browse the repository at this point in the history
Multi container pod logs
  • Loading branch information
LamaAni committed Apr 27, 2021
2 parents c689d61 + bc52a09 commit 6a5f65d
Show file tree
Hide file tree
Showing 14 changed files with 189 additions and 196 deletions.
11 changes: 9 additions & 2 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@ url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]
black = "*"

[packages]
apache-airflow = "==1.10.14"
zthreading = "*"
SQLAlchemy = "==1.3.23"
Flask-SQLAlchemy= "==2.4.4"
apache-airflow = "==1.10.15"
airflow-db-logger = "==1.0.5"
kubernetes = "*"
zthreading = "*"

[requires]
python_version = "3.6"

[pipenv]
allow_prereleases = true
42 changes: 37 additions & 5 deletions airflow_kubernetes_job_operator/kube_api/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,14 @@ class LogLine:
autodetect_kuberentes_log_level: bool = True
detect_kubernetes_log_level: Callable = None

def __init__(self, pod_name: str, namespace: str, message: str, timestamp: datetime = None):
def __init__(
self,
pod_name: str,
container_name: str,
namespace: str,
message: str,
timestamp: datetime = None,
):
"""GetPodLogs log line generated info object.
Args:
Expand All @@ -47,6 +54,7 @@ def __init__(self, pod_name: str, namespace: str, message: str, timestamp: datet
self.pod_name = pod_name
self.namespace = namespace
self.message = message
self.container_name = container_name
self.timestamp = timestamp or datetime.now()

def log(self, logger: Logger = kube_logger):
Expand All @@ -64,8 +72,14 @@ def __str__(self):
return self.message

def __repr__(self):
timestamp = f"[{self.timestamp}]" if self.show_kubernetes_log_timestamps else ""
return timestamp + f"[{self.namespace}/pods/{self.pod_name}]: {self.message}"
header_parts = [
f"{self.timestamp}" if self.show_kubernetes_log_timestamps else None,
f"{self.namespace}/pods/{self.pod_name}",
self.container_name,
]

header = "".join([f"[{p}]" for p in header_parts if p is not None])
return f"{header}: {self.message}"


class GetPodLogs(KubeApiRestQuery):
Expand All @@ -76,6 +90,8 @@ def __init__(
since: datetime = None,
follow: bool = False,
timeout: int = None,
container: str = None,
add_container_name_to_log: bool = None,
):
"""Returns the pod logs for a pod. Can follow the pod logs
in real time.
Expand All @@ -91,6 +107,7 @@ def __init__(
"""
assert not_empty_string(name), ValueError("name must be a non empty string")
assert not_empty_string(namespace), ValueError("namespace must be a non empty string")
assert container is None or not_empty_string(container), ValueError("container must be a non empty string")

kind: KubeResourceKind = KubeResourceKind.get_kind("Pod")
super().__init__(
Expand All @@ -104,15 +121,22 @@ def __init__(
self.name: str = name
self.namespace: str = namespace
self.since: datetime = since
self.container = container
self.query_params = {
"follow": follow,
"pretty": False,
"timestamps": True,
}

if container is not None:
self.query_params["container"] = container

self.since = since
self._last_timestamp = None
self._active_namespace = None
self.add_container_name_to_log = (
add_container_name_to_log if add_container_name_to_log is not None else container is not None
)

def pre_request(self, client: "KubeApiRestClient"):
super().pre_request(client)
Expand Down Expand Up @@ -158,7 +182,15 @@ def parse_data(self, message_line: str):
message = message.replace("\r", "")
lines = []
for message_line in message.split("\n"):
lines.append(LogLine(self.name, self.namespace, message_line, timestamp))
lines.append(
LogLine(
pod_name=self.name,
namespace=self.namespace,
message=message_line,
timestamp=timestamp,
container_name=self.container if self.add_container_name_to_log else None,
)
)
return lines

def emit_data(self, data):
Expand Down Expand Up @@ -271,7 +303,7 @@ def __init__(
)

def parse_data(self, data):
""" Override data parse """
"""Override data parse"""
rslt = json.loads(data)
prased = {}
for grp in rslt.get("groups", []):
Expand Down
82 changes: 57 additions & 25 deletions airflow_kubernetes_job_operator/kube_api/watchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,25 @@ def emit_log(self, data):
self.emit(self.pod_log_event_name, data)

@thread_synchronized
def _create_pod_log_reader(self, uid: str, name: str, namespace: str, follow=True):
def _create_pod_log_reader(
self,
logger_id: str,
name: str,
namespace: str,
container: str = None,
follow=True,
is_single=False,
):
read_logs = GetPodLogs(
name=name,
namespace=namespace,
since=self.pod_log_since,
follow=follow,
container=container,
add_container_name_to_log=False if is_single else True,
)
self._executing_pod_loggers[uid] = read_logs

self._executing_pod_loggers[logger_id] = read_logs
return read_logs

def process_data_state(self, data: dict, client: KubeApiRestClient):
Expand All @@ -226,42 +237,62 @@ def process_data_state(self, data: dict, client: KubeApiRestClient):
if state.deleted:
del self._object_states[uid]

if self.watch_pod_logs and kind == "pod" and uid not in self._executing_pod_loggers:
if self.watch_pod_logs and kind == "pod":
name = data["metadata"]["name"]
namesoace = data["metadata"]["namespace"]
pod_status = data["status"]["phase"]

if pod_status != "Pending":
osw = self._object_states.get(uid)
read_logs = self._create_pod_log_reader(
uid=uid,
name=name,
namespace=namesoace,
)
containers = data["spec"]["containers"]
is_single = len(containers) < 2
for container in containers:
if not isinstance(container, dict):
continue

osw.emit(self.pod_logs_reader_started_event_name)
container_name = container.get("name", None)

def handle_error(sender, *args):
# Don't throw error if not running.
if not self.is_running:
return
assert isinstance(container_name, str) and len(container_name.strip()) > 0, KubeApiException(
"Invalid container name when reading logs"
)

if len(args) == 0:
self.emit_error(KubeApiException("Unknown error from sender", sender))
else:
self.emit_error(args[0])
logger_id = f"{uid}/{container_name}"

# binding only relevant events.
read_logs.on(read_logs.data_event_name, lambda line: self.emit_log(line))
read_logs.on(read_logs.error_event_name, handle_error)
client.query_async(read_logs)
if logger_id in self._executing_pod_loggers:
continue

osw = self._object_states.get(uid)
read_logs = self._create_pod_log_reader(
logger_id=logger_id,
name=name,
namespace=namesoace,
container=container.get("name", None),
is_single=is_single,
)

osw.emit(self.pod_logs_reader_started_event_name, container=container_name)

def handle_error(sender, *args):
# Don't throw error if not running.
if not self.is_running:
return

if len(args) == 0:
self.emit_error(KubeApiException("Unknown error from sender", sender))
else:
self.emit_error(args[0])

# binding only relevant events.
read_logs.on(read_logs.data_event_name, lambda line: self.emit_log(line))
read_logs.on(read_logs.error_event_name, handle_error)
client.query_async(read_logs)

def _stop_all_loggers(
self,
timeout: float = None,
throw_error_if_not_running: bool = None,
):
for q in list(self._executing_pod_loggers.values()):
q.stop(timeout=timeout, throw_error_if_not_running=throw_error_if_not_running)
for pod_logger in list(self._executing_pod_loggers.values()):
pod_logger.stop(timeout=timeout, throw_error_if_not_running=throw_error_if_not_running)

def stop(
self,
Expand Down Expand Up @@ -294,7 +325,8 @@ def log_event(self, logger: Logger, ev: Event):
line.log(logger)
elif ev.name == self.pod_logs_reader_started_event_name:
osw: NamespaceWatchQueryResourceState = ev.sender
logger.info(f"[{osw.namespace}/{osw.kind_name.lower()}s/{osw.name}] Reading logs")
container_name = ev.kwargs.get("container", "[unknown container name]")
logger.info(f"[{osw.namespace}/{osw.kind_name.lower()}s/{osw.name}] Reading logs from {container_name}")

def pipe_to_logger(self, logger: Logger = kube_logger, allowed_event_names=None) -> int:
allowed_event_names = set(
Expand Down
41 changes: 41 additions & 0 deletions tests/dags/templates/test_multi_container_pod.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
apiVersion: v1
kind: Pod
metadata:
name: 'multi-container-test'
labels:
app: 'multi-container-test'
spec:
restartPolicy: Never
containers:
- name: container1
image: 'alpine:latest'
command:
- sh
- -c
- |
echo starting sleep...
sleep 10
echo end
resources:
limits:
cpu: 200m
memory: 500Mi
requests:
cpu: 100m
memory: 200Mi
- name: container2
image: 'alpine:latest'
command:
- sh
- -c
- |
echo starting sleep...
sleep 10
echo end
resources:
limits:
cpu: 200m
memory: 500Mi
requests:
cpu: 100m
memory: 200Mi
4 changes: 2 additions & 2 deletions tests/dags/test_double_log.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import os
from utils import default_args, resolve_file
from utils import default_args, name_from_file
from airflow import DAG
from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator


dag = DAG(
"kub-job-op-custom",
name_from_file(__file__),
default_args=default_args,
description="Test base job operator",
schedule_interval=None,
Expand Down
4 changes: 2 additions & 2 deletions tests/dags/test_job_operator.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from utils import default_args
from utils import default_args, name_from_file
from datetime import timedelta
from airflow import DAG
from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator

dag = DAG(
"kub-job-op",
name_from_file(__file__),
default_args=default_args,
description="Test base job operator",
schedule_interval=None,
Expand Down
20 changes: 0 additions & 20 deletions tests/dags/test_job_operator_config_file.py

This file was deleted.

39 changes: 0 additions & 39 deletions tests/dags/test_job_operator_custom.py

This file was deleted.

Loading

0 comments on commit 6a5f65d

Please sign in to comment.