Skip to content

Commit

Permalink
*: fix mismatched task names for discovery, make output service conne…
Browse files Browse the repository at this point in the history
…ction task monitored

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
  • Loading branch information
BeryJu committed Mar 15, 2023
1 parent 109f06c commit 8c1e25b
Show file tree
Hide file tree
Showing 13 changed files with 41 additions and 35 deletions.
6 changes: 3 additions & 3 deletions authentik/blueprints/apps.py
Expand Up @@ -55,11 +55,11 @@ def reconcile_load_blueprints_v1_tasks(self):
"""Load v1 tasks"""
self.import_module("authentik.blueprints.v1.tasks")

def reconcile_blueprints_discover(self):
def reconcile_blueprints_discovery(self):
"""Run blueprint discovery"""
from authentik.blueprints.v1.tasks import blueprints_discover, clear_failed_blueprints
from authentik.blueprints.v1.tasks import blueprints_discovery, clear_failed_blueprints

blueprints_discover.delay()
blueprints_discovery.delay()
clear_failed_blueprints.delay()

def import_models(self):
Expand Down
2 changes: 1 addition & 1 deletion authentik/blueprints/settings.py
Expand Up @@ -5,7 +5,7 @@

CELERY_BEAT_SCHEDULE = {
"blueprints_v1_discover": {
"task": "authentik.blueprints.v1.tasks.blueprints_discover",
"task": "authentik.blueprints.v1.tasks.blueprints_discovery",
"schedule": crontab(minute=fqdn_rand("blueprints_v1_discover"), hour="*"),
"options": {"queue": "authentik_scheduled"},
},
Expand Down
8 changes: 4 additions & 4 deletions authentik/blueprints/tests/test_v1_tasks.py
Expand Up @@ -6,7 +6,7 @@
from yaml import dump

from authentik.blueprints.models import BlueprintInstance, BlueprintInstanceStatus
from authentik.blueprints.v1.tasks import apply_blueprint, blueprints_discover, blueprints_find
from authentik.blueprints.v1.tasks import apply_blueprint, blueprints_discovery, blueprints_find
from authentik.lib.config import CONFIG
from authentik.lib.generators import generate_id

Expand Down Expand Up @@ -53,7 +53,7 @@ def test_valid(self):
file.seek(0)
file_hash = sha512(file.read().encode()).hexdigest()
file.flush()
blueprints_discover() # pylint: disable=no-value-for-parameter
blueprints_discovery() # pylint: disable=no-value-for-parameter
instance = BlueprintInstance.objects.filter(name=blueprint_id).first()
self.assertEqual(instance.last_applied_hash, file_hash)
self.assertEqual(
Expand Down Expand Up @@ -81,7 +81,7 @@ def test_valid_updated(self):
)
)
file.flush()
blueprints_discover() # pylint: disable=no-value-for-parameter
blueprints_discovery() # pylint: disable=no-value-for-parameter
blueprint = BlueprintInstance.objects.filter(name="foo").first()
self.assertEqual(
blueprint.last_applied_hash,
Expand All @@ -106,7 +106,7 @@ def test_valid_updated(self):
)
)
file.flush()
blueprints_discover() # pylint: disable=no-value-for-parameter
blueprints_discovery() # pylint: disable=no-value-for-parameter
blueprint.refresh_from_db()
self.assertEqual(
blueprint.last_applied_hash,
Expand Down
4 changes: 2 additions & 2 deletions authentik/blueprints/v1/tasks.py
Expand Up @@ -76,7 +76,7 @@ def on_any_event(self, event: FileSystemEvent):
return
if isinstance(event, FileCreatedEvent):
LOGGER.debug("new blueprint file created, starting discovery")
blueprints_discover.delay()
blueprints_discovery.delay()
if isinstance(event, FileModifiedEvent):
path = Path(event.src_path)
root = Path(CONFIG.y("blueprints_dir")).absolute()
Expand Down Expand Up @@ -134,7 +134,7 @@ def blueprints_find():
throws=(DatabaseError, ProgrammingError, InternalError), base=MonitoredTask, bind=True
)
@prefill_task
def blueprints_discover(self: MonitoredTask):
def blueprints_discovery(self: MonitoredTask):
"""Find blueprints and check if they need to be created in the database"""
count = 0
for blueprint in blueprints_find():
Expand Down
2 changes: 1 addition & 1 deletion authentik/events/monitored_tasks.py
Expand Up @@ -41,7 +41,7 @@ class TaskResult:

def with_error(self, exc: Exception) -> "TaskResult":
"""Since errors might not always be pickle-able, set the traceback"""
self.messages.append(str(exc))
self.messages.append(exception_to_string(exc))
return self


Expand Down
6 changes: 3 additions & 3 deletions authentik/outposts/settings.py
Expand Up @@ -19,9 +19,9 @@
"schedule": crontab(minute=fqdn_rand("outpost_token_ensurer"), hour="*/8"),
"options": {"queue": "authentik_scheduled"},
},
"outpost_local_connection": {
"task": "authentik.outposts.tasks.outpost_local_connection",
"schedule": crontab(minute=fqdn_rand("outpost_local_connection"), hour="*/8"),
"outpost_connection_discovery": {
"task": "authentik.outposts.tasks.outpost_connection_discovery",
"schedule": crontab(minute=fqdn_rand("outpost_connection_discovery"), hour="*/8"),
"options": {"queue": "authentik_scheduled"},
},
}
24 changes: 15 additions & 9 deletions authentik/outposts/tasks.py
Expand Up @@ -236,28 +236,33 @@ def _outpost_single_update(outpost: Outpost):
async_to_sync(closing_send)(channel, {"type": "event.update"})


@CELERY_APP.task()
def outpost_local_connection():
@CELERY_APP.task(
base=MonitoredTask,
bind=True,
)
def outpost_connection_discovery(self: MonitoredTask):
"""Checks the local environment and create Service connections."""
status = TaskResult(TaskResultStatus.SUCCESSFUL)
if not CONFIG.y_bool("outposts.discover"):
LOGGER.info("Outpost integration discovery is disabled")
status.messages.append("Outpost integration discovery is disabled")
self.set_status(status)
return
# Explicitly check against token filename, as that's
# only present when the integration is enabled
if Path(SERVICE_TOKEN_FILENAME).exists():
LOGGER.info("Detected in-cluster Kubernetes Config")
status.messages.append("Detected in-cluster Kubernetes Config")
if not KubernetesServiceConnection.objects.filter(local=True).exists():
LOGGER.debug("Created Service Connection for in-cluster")
status.messages.append("Created Service Connection for in-cluster")
KubernetesServiceConnection.objects.create(
name="Local Kubernetes Cluster", local=True, kubeconfig={}
)
# For development, check for the existence of a kubeconfig file
kubeconfig_path = Path(KUBE_CONFIG_DEFAULT_LOCATION).expanduser()
if kubeconfig_path.exists():
LOGGER.info("Detected kubeconfig")
status.messages.append("Detected kubeconfig")
kubeconfig_local_name = f"k8s-{gethostname()}"
if not KubernetesServiceConnection.objects.filter(name=kubeconfig_local_name).exists():
LOGGER.debug("Creating kubeconfig Service Connection")
status.messages.append("Creating kubeconfig Service Connection")
with kubeconfig_path.open("r", encoding="utf8") as _kubeconfig:
KubernetesServiceConnection.objects.create(
name=kubeconfig_local_name,
Expand All @@ -266,11 +271,12 @@ def outpost_local_connection():
unix_socket_path = urlparse(DEFAULT_UNIX_SOCKET).path
socket = Path(unix_socket_path)
if socket.exists() and access(socket, R_OK):
LOGGER.info("Detected local docker socket")
status.messages.append("Detected local docker socket")
if len(DockerServiceConnection.objects.filter(local=True)) == 0:
LOGGER.debug("Created Service Connection for docker")
status.messages.append("Created Service Connection for docker")
DockerServiceConnection.objects.create(
name="Local Docker connection",
local=True,
url=unix_socket_path,
)
self.set_status(status)
4 changes: 2 additions & 2 deletions authentik/root/celery.py
Expand Up @@ -73,12 +73,12 @@ def task_error_hook(task_id, exception: Exception, traceback, *args, **kwargs):
def _get_startup_tasks() -> list[Callable]:
"""Get all tasks to be run on startup"""
from authentik.admin.tasks import clear_update_notifications
from authentik.outposts.tasks import outpost_controller_all, outpost_local_connection
from authentik.outposts.tasks import outpost_controller_all, outpost_connection_discovery
from authentik.providers.proxy.tasks import proxy_set_defaults

return [
clear_update_notifications,
outpost_local_connection,
outpost_connection_discovery,
outpost_controller_all,
proxy_set_defaults,
]
Expand Down
4 changes: 2 additions & 2 deletions tests/e2e/test_provider_proxy.py
Expand Up @@ -17,7 +17,7 @@
from authentik.flows.models import Flow
from authentik.lib.generators import generate_id
from authentik.outposts.models import DockerServiceConnection, Outpost, OutpostConfig, OutpostType
from authentik.outposts.tasks import outpost_local_connection
from authentik.outposts.tasks import outpost_connection_discovery
from authentik.providers.proxy.models import ProxyProvider
from tests.e2e.utils import SeleniumTestCase, retry

Expand Down Expand Up @@ -210,7 +210,7 @@ class TestProviderProxyConnect(ChannelsLiveServerTestCase):
@reconcile_app("authentik_crypto")
def test_proxy_connectivity(self):
"""Test proxy connectivity over websocket"""
outpost_local_connection()
outpost_connection_discovery()
proxy: ProxyProvider = ProxyProvider.objects.create(
name="proxy_provider",
authorization_flow=Flow.objects.get(
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_outpost_docker.py
Expand Up @@ -19,7 +19,7 @@
OutpostType,
default_outpost_config,
)
from authentik.outposts.tasks import outpost_local_connection
from authentik.outposts.tasks import outpost_connection_discovery
from authentik.providers.proxy.models import ProxyProvider
from tests.e2e.utils import get_docker_tag

Expand Down Expand Up @@ -58,7 +58,7 @@ def setUp(self):
self.ssl_folder = mkdtemp()
self.container = self._start_container(self.ssl_folder)
# Ensure that local connection have been created
outpost_local_connection()
outpost_connection_discovery()
self.provider: ProxyProvider = ProxyProvider.objects.create(
name="test",
internal_host="http://localhost",
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_outpost_kubernetes.py
Expand Up @@ -10,7 +10,7 @@
from authentik.outposts.controllers.k8s.deployment import DeploymentReconciler
from authentik.outposts.controllers.k8s.triggers import NeedsUpdate
from authentik.outposts.models import KubernetesServiceConnection, Outpost, OutpostType
from authentik.outposts.tasks import outpost_local_connection
from authentik.outposts.tasks import outpost_connection_discovery
from authentik.providers.proxy.controllers.kubernetes import ProxyKubernetesController
from authentik.providers.proxy.models import ProxyProvider

Expand All @@ -21,7 +21,7 @@ class OutpostKubernetesTests(TestCase):
def setUp(self):
super().setUp()
# Ensure that local connection have been created
outpost_local_connection()
outpost_connection_discovery()
self.provider: ProxyProvider = ProxyProvider.objects.create(
name="test",
internal_host="http://localhost",
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_proxy_docker.py
Expand Up @@ -18,7 +18,7 @@
OutpostType,
default_outpost_config,
)
from authentik.outposts.tasks import outpost_local_connection
from authentik.outposts.tasks import outpost_connection_discovery
from authentik.providers.proxy.controllers.docker import DockerController
from authentik.providers.proxy.models import ProxyProvider
from tests.e2e.utils import get_docker_tag
Expand Down Expand Up @@ -58,7 +58,7 @@ def setUp(self):
self.ssl_folder = mkdtemp()
self.container = self._start_container(self.ssl_folder)
# Ensure that local connection have been created
outpost_local_connection()
outpost_connection_discovery()
self.provider: ProxyProvider = ProxyProvider.objects.create(
name="test",
internal_host="http://localhost",
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_proxy_kubernetes.py
Expand Up @@ -8,7 +8,7 @@
from authentik.core.tests.utils import create_test_flow
from authentik.outposts.controllers.kubernetes import KubernetesController
from authentik.outposts.models import KubernetesServiceConnection, Outpost, OutpostType
from authentik.outposts.tasks import outpost_local_connection
from authentik.outposts.tasks import outpost_connection_discovery
from authentik.providers.proxy.controllers.k8s.ingress import IngressReconciler
from authentik.providers.proxy.controllers.kubernetes import ProxyKubernetesController
from authentik.providers.proxy.models import ProxyMode, ProxyProvider
Expand All @@ -23,7 +23,7 @@ class TestProxyKubernetes(TestCase):

def setUp(self):
# Ensure that local connection have been created
outpost_local_connection()
outpost_connection_discovery()
self.controller = None

def tearDown(self) -> None:
Expand Down

0 comments on commit 8c1e25b

Please sign in to comment.