Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RD-WATCH/Smartflow Integration #412

Merged
merged 22 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f02f82b
Add airflow dev container to docker-compose
mvandenburgh May 3, 2024
820e869
Install `apache-airflow-client`
mvandenburgh May 3, 2024
688ed5f
Add settings value for smartflow url
mvandenburgh Apr 30, 2024
dc8d779
Add smartflow healthcheck to server status endpoint
mvandenburgh May 3, 2024
27286fb
Fix smartflow auth, move to its own app
mvandenburgh May 29, 2024
5bb13cd
Add endpoint for starting a DAG run
mvandenburgh May 29, 2024
1bb462e
Add get/list endpoints for DAG runs
mvandenburgh May 29, 2024
9bf5665
wip smartflow UI
mvandenburgh May 21, 2024
d72aaef
Generate openapi client from new smartflow endpoints
mvandenburgh May 29, 2024
d741347
[wip] hook up smartflow UI to backend
mvandenburgh May 29, 2024
813dc74
Add missing parameter to dag run endpoint
mvandenburgh May 29, 2024
cbb9c47
Add running a new dag to UI
mvandenburgh May 29, 2024
a62bfa8
Fix typescript errors
mvandenburgh May 29, 2024
ed60b73
Add smartflow init file to dockerfile
mvandenburgh May 29, 2024
23c82a9
Add sample DAG to dev environment
mvandenburgh May 29, 2024
e5a1b0b
Require all smartflow env vars to be set to install app
mvandenburgh May 29, 2024
4f56a85
Fix status endpoint
mvandenburgh May 29, 2024
b2de3b7
Fix smartflow url in docker .env file
mvandenburgh May 29, 2024
0748e28
Put script section first in sfc
mvandenburgh May 31, 2024
a84ff45
Add default value for dag run conf
mvandenburgh May 31, 2024
e87305b
Fix healthcheck
mvandenburgh May 31, 2024
e51b451
Pin airflow image
mvandenburgh May 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ RUN mkdir /app/django/src \
&& touch /app/django/src/rdwatch/__init__.py \
&& mkdir /app/django/src/rdwatch_scoring \
&& touch /app/django/src/rdwatch_scoring/__init__.py \
&& mkdir /app/django/src/rdwatch_smartflow \
&& touch /app/django/src/rdwatch_smartflow/__init__.py \
&& touch /app/django/README.md \
&& poetry install --only main

Expand All @@ -88,6 +90,8 @@ RUN mkdir /app/django/src \
&& touch /app/django/src/rdwatch/__init__.py \
&& mkdir /app/django/src/rdwatch_scoring \
&& touch /app/django/src/rdwatch_scoring/__init__.py \
&& mkdir /app/django/src/rdwatch_smartflow \
&& touch /app/django/src/rdwatch_smartflow/__init__.py \
&& touch /app/django/README.md \
&& poetry install --with dev
# Copy git metadata to enable display of version information
Expand Down
3 changes: 3 additions & 0 deletions dev/.env.docker-compose
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ RDWATCH_MINIO_STORAGE_SECRET_KEY=secretkey
RDWATCH_STORAGE_BUCKET_NAME=rdwatch
AWS_REQUEST_PAYER=requester
RDWATCH_POSTGRESQL_SCORING_URI="postgresql://scoring:secretkey@scoredb:5432/scoring"
RDWATCH_SMARTFLOW_USERNAME="rdwatch"
RDWATCH_SMARTFLOW_PASSWORD="rdwatch"
RDWATCH_SMARTFLOW_URL="http://airflow:8080"
3 changes: 3 additions & 0 deletions dev/.env.docker-compose-native
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ RDWATCH_POSTGRESQL_URI="postgresql://rdwatch:secretkey@localhost:5432/rdwatch"
RDWATCH_REDIS_URI="redis://localhost:6379"
RDWATCH_CELERY_BROKER_URL="redis://localhost:6379"
RDWATCH_POSTGRESQL_SCORING_URI="postgresql://scoring:secretkey@localhost:5433/scoring"
RDWATCH_SMARTFLOW_USERNAME="rdwatch"
RDWATCH_SMARTFLOW_PASSWORD="rdwatch"
RDWATCH_SMARTFLOW_URL="http://localhost:8002"
11 changes: 11 additions & 0 deletions dev/airflow.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM apache/airflow:slim-latest-python3.12

RUN airflow db migrate

RUN airflow users create \
--username rdwatch \
--password rdwatch \
Comment on lines +6 to +7
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is hardcoded so in the .env file should we make a note of this? It isn't like other containers where the username/password is parameterized. Or maybe we should use the airflow environment variables?

https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#environment-variables-supported-by-docker-compose

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw this, but I couldn't get it to work so I just gave up and did that command in the docker file. Does it work for you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

took a little searching but I think you can remove the local airflow DockerFile. These _ based commands are only meant for local development, but I assume your infrastructure code handles all this.
I wouldn't worry about implementation now, just logging this comment for the future.

  airflow:
    image: apache/airflow:slim-latest-python3.12
    volumes:
      - airflow-data:/opt/airflow
      - type: bind
        source: ${PWD}/dev/airflow_sample_dag.py
        target: /opt/airflow/dags/airflow_sample_dag.py
    environment:
      AIRFLOW_HOME: "/opt/airflow/"
      _AIRFLOW_DB_MIGRATE: True
      _AIRFLOW_WWW_USER_CREATE: True
      _AIRFLOW_WWW_USER_USERNAME: "rdwatch"
      _AIRFLOW_WWW_USER_PASSWORD: "rdwatch"
    command: ["airflow", "standalone"]
    ports:
      - 8002:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s

--firstname rdwatch \
--lastname admin \
--role Admin \
--email admin@localhost
34 changes: 34 additions & 0 deletions dev/airflow_sample_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.models import Param

with DAG(
dag_id="RD-WATCH-AIRFLOW-DEMO-DAG",
description="Test DAG",
params={
"region_id": Param(
default="BR_R002", type="string", pattern=r"^[A-Z]{2}_[RCST]\d{3}$"
),
"model_run_title": Param(default="test_run", type="string"),
},
start_date=datetime(2022, 3, 1),
catchup=False,
schedule_interval=None,
max_active_runs=1,
default_view="grid",
tags=["demo", "watch", "ta2", "rgd", "random"],
) as dag1:

@task
def print_region(**context):
region_id = context["params"]["region_id"]
print(f"region_id: {region_id}")

@task
def print_model_run_title(**context):
model_run_title = context["params"]["model_run_title"]
print(f"model_run_title: {model_run_title}")

print_region() >> print_model_run_title()
49 changes: 48 additions & 1 deletion django/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions django/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ classifiers = [
packages = [
{ include = "rdwatch", from = "src" },
{ include = "rdwatch_scoring", from = "src"},
{ include = "rdwatch_smartflow", from = "src"},
]

[tool.poetry-dynamic-versioning]
Expand All @@ -30,7 +31,7 @@ vcs = "git"
metadata = true

[tool.poetry-dynamic-versioning.substitution]
files = ["src/rdwatch/__init__.py", "src/rdwatch_scoring/__init__.py"]
files = ["src/rdwatch/__init__.py", "src/rdwatch_scoring/__init__.py", "src/rdwatch_smartflow/__init__.py"]

[tool.poetry.dependencies]
python = ">=3.11.8,<4"
Expand All @@ -57,6 +58,8 @@ pystac-client = "^0.7.6"
more-itertools = "^10.2.0"
torch = {version = "^2.2.2+cpu", source = "pytorch"}
torchvision = {version = "^0.17.2+cpu", source = "pytorch"}
apache-airflow-client = "^2.9.0"
beautifulsoup4 = "^4.12.3"

[tool.poetry.group.dev.dependencies]
django-stubs = "^4.2.7"
Expand Down Expand Up @@ -91,7 +94,7 @@ skip-string-normalization = true

[tool.isort]
profile = "black"
known_first_party = ["rdwatch", "rdwatch_scoring"]
known_first_party = ["rdwatch", "rdwatch_scoring", "rdwatch_smartflow"]
known_django=["django"]
sections=["FUTURE", "STDLIB", "THIRDPARTY", "DJANGO", "FIRSTPARTY", "LOCALFOLDER"]

Expand Down
26 changes: 26 additions & 0 deletions django/src/rdwatch/server/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ def INSTALLED_APPS(self):
]
if 'RDWATCH_POSTGRESQL_SCORING_URI' in os.environ:
base_applications.append('rdwatch_scoring')
if all(
os.environ.get(key)
for key in [
'RDWATCH_SMARTFLOW_URL',
'RDWATCH_SMARTFLOW_USERNAME',
'RDWATCH_SMARTFLOW_PASSWORD',
]
):
base_applications.append('rdwatch_smartflow')
return base_applications

MIDDLEWARE = [
Expand Down Expand Up @@ -132,6 +141,23 @@ def DATABASES(self):
environ_required=True, environ_prefix=_ENVIRON_PREFIX
)

SMARTFLOW_URL = values.Value(
None,
environ_required=False,
environ_prefix=_ENVIRON_PREFIX,
)

SMARTFLOW_USERNAME = values.Value(
None,
environ_required=False,
environ_prefix=_ENVIRON_PREFIX,
)
SMARTFLOW_PASSWORD = values.Value(
None,
environ_required=False,
environ_prefix=_ENVIRON_PREFIX,
)

# django-celery-results configuration
CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'django-cache'
Expand Down
2 changes: 2 additions & 0 deletions django/src/rdwatch/server/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
# Conditionally add the scoring URLs if the scoring app is installed
if 'rdwatch_scoring' in apps.app_configs.keys():
urlpatterns.append(path('api/scoring/', include('rdwatch_scoring.urls')))
if 'rdwatch_smartflow' in apps.app_configs.keys():
urlpatterns.append(path('api/smartflow/', include('rdwatch_smartflow.urls')))

if settings.DEBUG:
try:
Expand Down
16 changes: 15 additions & 1 deletion django/src/rdwatch/views/server_status.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import datetime
import socket
from typing import Any

from ninja import Field, Router, Schema

from django.http import HttpRequest

from rdwatch_smartflow.utils import SmartFlowClient

router = Router()

SERVER_INSTANCE_EPOCH = datetime.datetime.now()
Expand All @@ -26,6 +29,7 @@ class ServerStatusSchema(Schema):
hostname: str
ip: str
rdwatch_version: str
smartflow: dict[str, Any] | None


@router.get('/', response=ServerStatusSchema)
Expand All @@ -35,6 +39,16 @@ def get_status(request: HttpRequest):
uptime = datetime.datetime.now() - SERVER_INSTANCE_EPOCH
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)

try:
smartflow_status = SmartFlowClient().get_health().to_dict()
except Exception:
smartflow_status = None

return ServerStatusSchema(
uptime=uptime, hostname=hostname, ip=ip, rdwatch_version=api.version
uptime=uptime,
hostname=hostname,
ip=ip,
rdwatch_version=api.version,
smartflow=smartflow_status,
)
7 changes: 7 additions & 0 deletions django/src/rdwatch_smartflow/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from ninja import NinjaAPI

from . import views

api = NinjaAPI(urls_namespace='smartflow')

api.add_router('/', views.router)
7 changes: 7 additions & 0 deletions django/src/rdwatch_smartflow/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from django.urls import path

from .api import api

urlpatterns = [
path('', api.urls),
]
93 changes: 93 additions & 0 deletions django/src/rdwatch_smartflow/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from functools import cached_property
from typing import Any

import airflow_client.client
import requests
from airflow_client.client.api import dag_api, dag_run_api, monitoring_api
from airflow_client.client.models import (
DAGCollection,
DAGRun,
DAGRunCollection,
HealthInfo,
)
from bs4 import BeautifulSoup

from django.conf import settings


class SmartFlowClient:
@cached_property
def _session_token(self) -> str:
session = requests.Session()
smartflow_url: str = settings.SMARTFLOW_URL
username: str = settings.SMARTFLOW_USERNAME
password: str = settings.SMARTFLOW_PASSWORD

res = session.get(smartflow_url)
soup = BeautifulSoup(res.text, 'html.parser')
csrf_token = soup.find('input', {'name': 'csrf_token'})['value']

login_url = f'{smartflow_url}/login/'
login_form_data = {
'csrf_token': csrf_token,
'username': username,
'password': password,
}
login_headers = {'referer': login_url}
res = session.post(
url=login_url,
data=login_form_data,
headers=login_headers,
)
res.raise_for_status()
return session.cookies.get('session')

@property
def _client(self):
host: str = f'{settings.SMARTFLOW_URL}/api/v1'

configuration = airflow_client.client.Configuration(
host=host,
# api_key={'session': self._session_token}
)
with airflow_client.client.ApiClient(
configuration=configuration,
header_name='cookie',
header_value=f'session={self._session_token}',
) as api_client:
return api_client

@property
def _monitoring_api(self):
return monitoring_api.MonitoringApi(self._client)

@property
def _dag_api(self):
return dag_api.DAGApi(self._client)

@property
def _dag_run_api(self):
return dag_run_api.DAGRunApi(self._client)

def get_health(self) -> HealthInfo:
return self._monitoring_api.get_health()

def list_dags(self, **kwargs) -> DAGCollection:
return self._dag_api.get_dags(**kwargs)

def list_dag_runs(self, dag_id: str, **kwargs) -> DAGRunCollection:
return self._dag_run_api.get_dag_runs(dag_id=dag_id, **kwargs)

def create_dag_run(
self, dag_id: str, dag_run_title: str, conf: dict[str, Any]
) -> DAGRun:
"""
Parameters:
dag_id: the id of the DAG to run
dag_run_title: the title to give this new DAG run
"""
dag_run = DAGRun(conf=conf, dag_run_id=dag_run_title)
return self._dag_run_api.post_dag_run(dag_id=dag_id, dag_run=dag_run)

def get_dag_run(self, dag_id: str, dag_run_id: str) -> DAGRun:
return self._dag_run_api.get_dag_run(dag_id=dag_id, dag_run_id=dag_run_id)
Loading
Loading