From 9781bd831b58c45da26cddc9846005db722fa310 Mon Sep 17 00:00:00 2001 From: Girum Bizuayehu Date: Mon, 13 Oct 2025 14:09:53 +0300 Subject: [PATCH 1/6] Add DagsterWebSocketProxyConsumer and update the routing to support websocket connections see HEA-752 --- apps/common/consumers.py | 104 +++++++++++++++++++++++++++++++++++++++ apps/common/routing.py | 8 +++ docker/app/run_django.sh | 5 +- hea/asgi.py | 19 ++++++- hea/settings/base.py | 4 ++ requirements/base.txt | 2 + 6 files changed, 139 insertions(+), 3 deletions(-) create mode 100644 apps/common/consumers.py create mode 100644 apps/common/routing.py diff --git a/apps/common/consumers.py b/apps/common/consumers.py new file mode 100644 index 00000000..26f978ed --- /dev/null +++ b/apps/common/consumers.py @@ -0,0 +1,104 @@ +import asyncio +import logging +import os + +import websockets +from channels.exceptions import DenyConnection +from channels.generic.websocket import AsyncWebsocketConsumer +from django.contrib.auth.models import AnonymousUser + +logger = logging.getLogger(__name__) + + +class DagsterWebSocketProxyConsumer(AsyncWebsocketConsumer): + + async def connect(self): + logger.info(f"WebSocket connection attempt: {self.scope['path']}") + + # Authentication check + if isinstance(self.scope["user"], AnonymousUser): + logger.error("Authentication required") + raise DenyConnection("Authentication required") + + if not self.scope["user"].has_perm("common.access_dagster_ui"): + logger.error(f"User {self.scope['user'].username} lacks permission") + raise DenyConnection("Permission denied") + + logger.info(f"User {self.scope['user'].username} authenticated") + + # Build upstream URL + dagster_url = os.environ.get("DAGSTER_WEBSERVER_URL", "http://localhost:3000") + dagster_prefix = os.environ.get("DAGSTER_WEBSERVER_PREFIX", "") + + path = self.scope["path"] + if path.startswith("/pipelines/"): + path = path[len("/pipelines/") :] + + # Convert http to ws + if dagster_url.startswith("https://"): + ws_url = dagster_url.replace("https://", "wss://", 1) + else: + ws_url = dagster_url.replace("http://", "ws://", 1) + + # Build target URL + if dagster_prefix: + target_url = f"{ws_url}/{dagster_prefix}/{path}" + else: + target_url = f"{ws_url}/{path}" + + # Add query string + if self.scope.get("query_string"): + target_url += f"?{self.scope['query_string'].decode()}" + + logger.info(f"Connecting to upstream: {target_url}") + + # Get subprotocols from client + subprotocols = self.scope.get("subprotocols", []) + + try: + self.websocket = await websockets.connect( + target_url, + max_size=2097152, + ping_interval=20, + subprotocols=subprotocols if subprotocols else None, + ) + logger.info("Connected to upstream") + except Exception as e: + logger.error(f"Failed to connect: {e}") + raise DenyConnection(f"Connection to upstream failed: {e}") + + await self.accept(self.websocket.subprotocol) + logger.info(f"Client accepted with subprotocol: {self.websocket.subprotocol}") + + self.consumer_task = asyncio.create_task(self.consume_from_upstream()) + + async def disconnect(self, close_code): + logger.info(f"Disconnecting with code {close_code}") + if hasattr(self, "consumer_task"): + self.consumer_task.cancel() + try: + await self.consumer_task + except asyncio.CancelledError: + pass + if hasattr(self, "websocket"): + await self.websocket.close() + + async def receive(self, text_data=None, bytes_data=None): + try: + await self.websocket.send(bytes_data or text_data) + except Exception as e: + logger.error(f"Error forwarding to upstream: {e}") + await self.close() + + async def consume_from_upstream(self): + try: + async for message in self.websocket: + if isinstance(message, bytes): + await self.send(bytes_data=message) + else: + await self.send(text_data=message) + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Error consuming from upstream: {e}") + await self.close() diff --git a/apps/common/routing.py b/apps/common/routing.py new file mode 100644 index 00000000..84eee935 --- /dev/null +++ b/apps/common/routing.py @@ -0,0 +1,8 @@ +from django.urls import re_path + +from common.consumers import DagsterWebSocketProxyConsumer + +websocket_urlpatterns = [ + # Route WebSocket connections for Dagster proxy + re_path(r"^pipelines/(?P.*)$", DagsterWebSocketProxyConsumer.as_asgi()), +] diff --git a/docker/app/run_django.sh b/docker/app/run_django.sh index 73112fa1..7fb81e47 100755 --- a/docker/app/run_django.sh +++ b/docker/app/run_django.sh @@ -40,7 +40,8 @@ touch log/django_sql.log chown -R django:django log/* echo Starting Gunicorn with DJANGO_SETTINGS_MODULE=${DJANGO_SETTINGS_MODULE} -gosu django gunicorn ${APP}.wsgi:application \ +gosu django gunicorn ${APP}.asgi:application \ --name ${APP}${ENV} \ + --worker-class uvicorn.workers.UvicornWorker \ --config $(dirname $(readlink -f "$0"))/gunicorn_config.py \ - $* 2>&1 + $* 2>&1 \ No newline at end of file diff --git a/hea/asgi.py b/hea/asgi.py index 38778e6c..80ec0886 100644 --- a/hea/asgi.py +++ b/hea/asgi.py @@ -9,8 +9,25 @@ import os +from channels.auth import AuthMiddlewareStack +from channels.routing import ProtocolTypeRouter, URLRouter +from channels.security.websocket import AllowedHostsOriginValidator from django.core.asgi import get_asgi_application os.environ.setdefault("DJANGO_SETTINGS_MODULE", "hea.settings") -application = get_asgi_application() +django_asgi_app = get_asgi_application() + + +django_asgi_app = get_asgi_application() + +# Import routing after Django setup +from common.routing import websocket_urlpatterns # noqa: E402 + +application = ProtocolTypeRouter( + { + "http": django_asgi_app, + # WebSocket requests handled by Channels consumers + "websocket": AllowedHostsOriginValidator(AuthMiddlewareStack(URLRouter(websocket_urlpatterns))), + } +) diff --git a/hea/settings/base.py b/hea/settings/base.py index 630b8b1d..b983c8b7 100644 --- a/hea/settings/base.py +++ b/hea/settings/base.py @@ -109,6 +109,7 @@ "rest_framework_gis", "revproxy", "corsheaders", + "channels", ] PROJECT_APPS = ["common", "metadata", "baseline"] INSTALLED_APPS = EXTERNAL_APPS + PROJECT_APPS @@ -155,6 +156,9 @@ "SEARCH_PARAM": "search", } +ASGI_APPLICATION = "hea.asgi.application" +CHANNEL_LAYERS = {"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}} +WEBSOCKET_ACCEPT_ALL = False # Require authentication ########## CORS CONFIGURATION # See: https://github.com/ottoyiu/django-cors-headers diff --git a/requirements/base.txt b/requirements/base.txt index 712e6b86..7a5a82f1 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -1,4 +1,5 @@ # Do not pip freeze into this file. Add only the packages you need so pip can align dependencies. +channels==4.3.1 dagster==1.11.7 dagster_aws==0.27.7 dagster-pipes==1.11.7 @@ -46,6 +47,7 @@ sqlparse==0.5.0 tabulate==0.9.0 # Need universal-pathlib > 0.2.0 for gdrivefs support universal-pathlib==0.2.1 +uvicorn==0.37.0 whitenoise==6.4.0 xlrd==2.0.1 xlutils==2.0.0 From dffc109f3e567f514e6476cafc7f2037800b6e80 Mon Sep 17 00:00:00 2001 From: Girum Bizuayehu Date: Mon, 13 Oct 2025 17:12:40 +0300 Subject: [PATCH 2/6] Remove non required settings see HEA-752 --- hea/settings/base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/hea/settings/base.py b/hea/settings/base.py index b983c8b7..70248164 100644 --- a/hea/settings/base.py +++ b/hea/settings/base.py @@ -158,7 +158,6 @@ ASGI_APPLICATION = "hea.asgi.application" CHANNEL_LAYERS = {"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}} -WEBSOCKET_ACCEPT_ALL = False # Require authentication ########## CORS CONFIGURATION # See: https://github.com/ottoyiu/django-cors-headers From bd69551440b50e14bd8be1b84fad1c2a73d264e7 Mon Sep 17 00:00:00 2001 From: Girum Bizuayehu Date: Tue, 14 Oct 2025 11:31:17 +0300 Subject: [PATCH 3/6] Add log supress entry for removing chatty websocket ping, pong logs see HEA-752 --- hea/asgi.py | 3 --- hea/settings/base.py | 4 ++-- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/hea/asgi.py b/hea/asgi.py index 80ec0886..dd48af32 100644 --- a/hea/asgi.py +++ b/hea/asgi.py @@ -16,9 +16,6 @@ os.environ.setdefault("DJANGO_SETTINGS_MODULE", "hea.settings") -django_asgi_app = get_asgi_application() - - django_asgi_app = get_asgi_application() # Import routing after Django setup diff --git a/hea/settings/base.py b/hea/settings/base.py index 70248164..227d2c07 100644 --- a/hea/settings/base.py +++ b/hea/settings/base.py @@ -283,11 +283,11 @@ "django.security": {"handlers": ["console", "logfile"], "level": "ERROR", "propagate": False}, "factory": {"handlers": ["console", "logfile"], "level": "INFO"}, "faker": {"handlers": ["console", "logfile"], "level": "INFO"}, - "luigi": {"level": "INFO"}, - "luigi-interface": {"level": "INFO"}, "urllib3": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False}, "common.models": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False}, "common.signals": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False}, + "uvicorn.error": {"handlers": ["console"], "level": "WARNING", "propagate": False}, + "uvicorn.access": {"handlers": ["console"], "level": "WARNING", "propagate": False}, }, # Keep root at DEBUG and use the `level` on the handler to control logging output, # so that additional handlers can be used to get additional detail, e.g. `common.resources.LoggingResourceMixin` From 466a6035e075e6f26738a7363fa9a1cb123e2be9 Mon Sep 17 00:00:00 2001 From: Girum Bizuayehu Date: Tue, 14 Oct 2025 11:34:02 +0300 Subject: [PATCH 4/6] Override common app ready to patch urllib3 pool see HEA-752 --- apps/common/apps.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/apps/common/apps.py b/apps/common/apps.py index d2529c29..ef2e8f99 100644 --- a/apps/common/apps.py +++ b/apps/common/apps.py @@ -6,3 +6,22 @@ class CommonConfig(AppConfig): default_auto_field = "django.db.models.BigAutoField" name = "common" verbose_name = _("common") + + def ready(self): + """Patch urllib3 connection pool size on startup""" + from urllib3 import HTTPConnectionPool + + original_init = HTTPConnectionPool.__init__ + + def patched_init(self, *args, **kwargs): + # Force larger pool size + kwargs["maxsize"] = kwargs.get("maxsize", 50) + if kwargs["maxsize"] < 50: + kwargs["maxsize"] = 50 + + original_init(self, *args, **kwargs) + + # Apply patch + HTTPConnectionPool.__init__ = patched_init + + print("urllib3 connection pool patched: maxsize=50") From e5cef4c9a4d444659c437d547e404c42818e7ebf Mon Sep 17 00:00:00 2001 From: Girum Bizuayehu Date: Thu, 16 Oct 2025 00:51:27 +0300 Subject: [PATCH 5/6] Add logging filters, apply the filters and address PR feedback see HEA-752 --- apps/common/consumers.py | 2 +- apps/common/logging_filters.py | 43 ++++++++++++++++++++++++++++++++++ hea/settings/base.py | 43 ++++++++++++++++++++++++++++++++-- 3 files changed, 85 insertions(+), 3 deletions(-) create mode 100644 apps/common/logging_filters.py diff --git a/apps/common/consumers.py b/apps/common/consumers.py index 26f978ed..5ac78e1c 100644 --- a/apps/common/consumers.py +++ b/apps/common/consumers.py @@ -58,7 +58,7 @@ async def connect(self): try: self.websocket = await websockets.connect( target_url, - max_size=2097152, + max_size=10485760, ping_interval=20, subprotocols=subprotocols if subprotocols else None, ) diff --git a/apps/common/logging_filters.py b/apps/common/logging_filters.py new file mode 100644 index 00000000..66ce1540 --- /dev/null +++ b/apps/common/logging_filters.py @@ -0,0 +1,43 @@ +import logging + + +class SuppressWebSocketPings(logging.Filter): + + def filter(self, record): + suppress_phrases = [ + "sending keepalive ping", + "received keepalive pong", + "> PING", + "< PONG", + "% sending keepalive", + "% received keepalive", + "ASGI 'lifespan' protocol appears unsupported.", + ] + + message = record.getMessage() + + for phrase in suppress_phrases: + if phrase in message: + return False # Don't log this message + + return True + + +class SuppressRevProxyNoise(logging.Filter): + + def filter(self, record): + # Suppress these RevProxy messages + suppress_phrases = [ + "ProxyView created", + "Normalizing response headers", + "Checking for invalid cookies", + "Starting streaming HTTP Response", + ] + + message = record.getMessage() + + for phrase in suppress_phrases: + if phrase in message: + return False + + return True diff --git a/hea/settings/base.py b/hea/settings/base.py index 265afb53..8d87e96d 100644 --- a/hea/settings/base.py +++ b/hea/settings/base.py @@ -253,6 +253,12 @@ }, "filters": { "require_debug_false": {"()": "django.utils.log.RequireDebugFalse"}, + "suppress_ws_pings": { + "()": "common.logging_filters.SuppressWebSocketPings", + }, + "suppress_revproxy_noise": { + "()": "common.logging_filters.SuppressRevProxyNoise", + }, }, "handlers": { "logfile": { @@ -269,6 +275,7 @@ "stream": sys.stdout, "class": "logging.StreamHandler", "formatter": env.str("LOG_FORMATTER", "standard"), + "filters": ["suppress_ws_pings", "suppress_revproxy_noise"], }, "mail_admins": { "level": "ERROR", @@ -286,8 +293,40 @@ "urllib3": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False}, "common.models": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False}, "common.signals": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False}, - "uvicorn.error": {"handlers": ["console"], "level": "WARNING", "propagate": False}, - "uvicorn.access": {"handlers": ["console"], "level": "WARNING", "propagate": False}, + "uvicorn": { + "handlers": ["console"], + "level": "INFO", + "propagate": False, + }, + "uvicorn.error": { + "handlers": ["console"], + "level": "DEBUG", + "propagate": False, + "filters": ["suppress_ws_pings"], + }, + "uvicorn.access": { + "handlers": ["console"], + "level": "INFO", + "propagate": False, + }, + "revproxy": { + "handlers": ["console"], + "level": "INFO", + "propagate": False, + "filters": ["suppress_revproxy_noise"], + }, + "revproxy.view": { + "handlers": ["console"], + "level": "INFO", + "propagate": False, + "filters": ["suppress_revproxy_noise"], + }, + "revproxy.response": { + "handlers": ["console"], + "level": "INFO", + "propagate": False, + "filters": ["suppress_revproxy_noise"], + }, }, # Keep root at DEBUG and use the `level` on the handler to control logging output, # so that additional handlers can be used to get additional detail, e.g. `common.resources.LoggingResourceMixin` From a7c4022d5ec1165d0012a83a51860a603427352c Mon Sep 17 00:00:00 2001 From: Girum Bizuayehu Date: Fri, 17 Oct 2025 13:51:44 +0300 Subject: [PATCH 6/6] Address PR feedback see HEA-752 --- apps/common/apps.py | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/apps/common/apps.py b/apps/common/apps.py index ef2e8f99..d2529c29 100644 --- a/apps/common/apps.py +++ b/apps/common/apps.py @@ -6,22 +6,3 @@ class CommonConfig(AppConfig): default_auto_field = "django.db.models.BigAutoField" name = "common" verbose_name = _("common") - - def ready(self): - """Patch urllib3 connection pool size on startup""" - from urllib3 import HTTPConnectionPool - - original_init = HTTPConnectionPool.__init__ - - def patched_init(self, *args, **kwargs): - # Force larger pool size - kwargs["maxsize"] = kwargs.get("maxsize", 50) - if kwargs["maxsize"] < 50: - kwargs["maxsize"] = 50 - - original_init(self, *args, **kwargs) - - # Apply patch - HTTPConnectionPool.__init__ = patched_init - - print("urllib3 connection pool patched: maxsize=50")