diff --git a/examples/deploy_cloud_run/Dockerfile b/examples/deploy_cloud_run/Dockerfile new file mode 100644 index 0000000000000..7fd3d0ec007a4 --- /dev/null +++ b/examples/deploy_cloud_run/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.10-slim + +ARG PIPELINE_DIR + +RUN mkdir -p /opt/dagster/app + +WORKDIR /opt/dagster/app + +COPY ${PIPELINE_DIR} . + +RUN pip install \ + dagster \ + dagster-postgres \ + -e . + +ENV DAGSTER_HOME=/opt/dagster/app/ \ No newline at end of file diff --git a/examples/deploy_cloud_run/cloud_run_pipeline/dagster.yaml b/examples/deploy_cloud_run/cloud_run_pipeline/dagster.yaml new file mode 100644 index 0000000000000..3e043ffdd3c87 --- /dev/null +++ b/examples/deploy_cloud_run/cloud_run_pipeline/dagster.yaml @@ -0,0 +1,68 @@ +scheduler: + module: dagster.core.scheduler + class: DagsterDaemonScheduler + +run_coordinator: + module: dagster.core.run_coordinator + class: QueuedRunCoordinator + +run_launcher: + module: dagster_gcp.cloud_run + class: CloudRunRunLauncher + config: + # PROJECT + project: my-gcp-project + # REGION + region: my-region + job_name_by_code_location: + pipeline1: pipeline1-job + pipeline2: pipeline2-job + +run_storage: + module: dagster_postgres.run_storage + class: PostgresRunStorage + config: + postgres_db: + hostname: + env: DAGSTER_PG_HOST + username: + env: DAGSTER_PG_USERNAME + password: + env: DAGSTER_PG_PASSWORD + db_name: + env: DAGSTER_PG_DB + port: 5432 + +schedule_storage: + module: dagster_postgres.schedule_storage + class: PostgresScheduleStorage + config: + postgres_db: + hostname: + env: DAGSTER_PG_HOST + username: + env: DAGSTER_PG_USERNAME + password: + env: DAGSTER_PG_PASSWORD + db_name: + env: DAGSTER_PG_DB + port: 5432 + +event_log_storage: + module: dagster_postgres.event_log + class: PostgresEventLogStorage + config: + postgres_db: + hostname: + env: DAGSTER_PG_HOST + username: + env: DAGSTER_PG_USERNAME + password: + env: DAGSTER_PG_PASSWORD + db_name: + env: DAGSTER_PG_DB + port: 5432 + +run_monitoring: + enabled: true + poll_interval_seconds: 10 \ No newline at end of file diff --git a/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/README.md b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/README.md new file mode 100644 index 0000000000000..5baf382b42568 --- /dev/null +++ b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/README.md @@ -0,0 +1,47 @@ +# pipeline1 + +This is a [Dagster](https://dagster.io/) project scaffolded with [`dagster project scaffold`](https://docs.dagster.io/getting-started/create-new-project). + +## Getting started + +First, install your Dagster code location as a Python package. By using the --editable flag, pip will install your Python package in ["editable mode"](https://pip.pypa.io/en/latest/topics/local-project-installs/#editable-installs) so that as you develop, local code changes will automatically apply. + +```bash +pip install -e ".[dev]" +``` + +Then, start the Dagster UI web server: + +```bash +dagster dev +``` + +Open http://localhost:3000 with your browser to see the project. + +You can start writing assets in `pipeline1/assets.py`. The assets are automatically loaded into the Dagster code location as you define them. + +## Development + +### Adding new Python dependencies + +You can specify new Python dependencies in `setup.py`. + +### Unit testing + +Tests are in the `pipeline1_tests` directory and you can run tests using `pytest`: + +```bash +pytest pipeline1_tests +``` + +### Schedules and sensors + +If you want to enable Dagster [Schedules](https://docs.dagster.io/concepts/partitions-schedules-sensors/schedules) or [Sensors](https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors) for your jobs, the [Dagster Daemon](https://docs.dagster.io/deployment/dagster-daemon) process must be running. This is done automatically when you run `dagster dev`. + +Once your Dagster Daemon is running, you can start turning on schedules and sensors for your jobs. + +## Deploy on Dagster Cloud + +The easiest way to deploy your Dagster project is to use Dagster Cloud. + +Check out the [Dagster Cloud Documentation](https://docs.dagster.cloud) to learn more. diff --git a/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/pipeline1/__init__.py b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/pipeline1/__init__.py new file mode 100644 index 0000000000000..a36556aa3da5d --- /dev/null +++ b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/pipeline1/__init__.py @@ -0,0 +1,9 @@ +from dagster import Definitions, load_assets_from_modules + +from . import assets + +all_assets = load_assets_from_modules([assets]) + +defs = Definitions( + assets=all_assets, +) diff --git a/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/pipeline1/assets.py b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/pipeline1/assets.py new file mode 100644 index 0000000000000..5bd987fba7159 --- /dev/null +++ b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/pipeline1/assets.py @@ -0,0 +1,5 @@ +from dagster import asset + +@asset +def asset_1(): + pass \ No newline at end of file diff --git a/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/pipeline1_tests/__init__.py b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/pipeline1_tests/__init__.py new file mode 100644 index 0000000000000..8b137891791fe --- /dev/null +++ b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/pipeline1_tests/__init__.py @@ -0,0 +1 @@ + diff --git a/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/pipeline1_tests/test_assets.py b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/pipeline1_tests/test_assets.py new file mode 100644 index 0000000000000..8b137891791fe --- /dev/null +++ b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/pipeline1_tests/test_assets.py @@ -0,0 +1 @@ + diff --git a/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/pyproject.toml b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/pyproject.toml new file mode 100644 index 0000000000000..0f706bd48796d --- /dev/null +++ b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/pyproject.toml @@ -0,0 +1,6 @@ +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" + +[tool.dagster] +module_name = "pipeline1" diff --git a/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/setup.cfg b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/setup.cfg new file mode 100644 index 0000000000000..cf8cac354d56a --- /dev/null +++ b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/setup.cfg @@ -0,0 +1,2 @@ +[metadata] +name = pipeline1 diff --git a/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/setup.py b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/setup.py new file mode 100644 index 0000000000000..2d7913395a3f0 --- /dev/null +++ b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline1/setup.py @@ -0,0 +1,11 @@ +from setuptools import find_packages, setup + +setup( + name="pipeline1", + packages=find_packages(exclude=["pipeline1_tests"]), + install_requires=[ + "dagster", + "dagster-cloud" + ], + extras_require={"dev": ["dagster-webserver", "pytest"]}, +) diff --git a/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/README.md b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/README.md new file mode 100644 index 0000000000000..80acda89f2c4d --- /dev/null +++ b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/README.md @@ -0,0 +1,47 @@ +# pipeline2 + +This is a [Dagster](https://dagster.io/) project scaffolded with [`dagster project scaffold`](https://docs.dagster.io/getting-started/create-new-project). + +## Getting started + +First, install your Dagster code location as a Python package. By using the --editable flag, pip will install your Python package in ["editable mode"](https://pip.pypa.io/en/latest/topics/local-project-installs/#editable-installs) so that as you develop, local code changes will automatically apply. + +```bash +pip install -e ".[dev]" +``` + +Then, start the Dagster UI web server: + +```bash +dagster dev +``` + +Open http://localhost:3000 with your browser to see the project. + +You can start writing assets in `pipeline2/assets.py`. The assets are automatically loaded into the Dagster code location as you define them. + +## Development + +### Adding new Python dependencies + +You can specify new Python dependencies in `setup.py`. + +### Unit testing + +Tests are in the `pipeline2_tests` directory and you can run tests using `pytest`: + +```bash +pytest pipeline2_tests +``` + +### Schedules and sensors + +If you want to enable Dagster [Schedules](https://docs.dagster.io/concepts/partitions-schedules-sensors/schedules) or [Sensors](https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors) for your jobs, the [Dagster Daemon](https://docs.dagster.io/deployment/dagster-daemon) process must be running. This is done automatically when you run `dagster dev`. + +Once your Dagster Daemon is running, you can start turning on schedules and sensors for your jobs. + +## Deploy on Dagster Cloud + +The easiest way to deploy your Dagster project is to use Dagster Cloud. + +Check out the [Dagster Cloud Documentation](https://docs.dagster.cloud) to learn more. diff --git a/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/pipeline2/__init__.py b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/pipeline2/__init__.py new file mode 100644 index 0000000000000..a36556aa3da5d --- /dev/null +++ b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/pipeline2/__init__.py @@ -0,0 +1,9 @@ +from dagster import Definitions, load_assets_from_modules + +from . import assets + +all_assets = load_assets_from_modules([assets]) + +defs = Definitions( + assets=all_assets, +) diff --git a/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/pipeline2/assets.py b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/pipeline2/assets.py new file mode 100644 index 0000000000000..51c99579fff1a --- /dev/null +++ b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/pipeline2/assets.py @@ -0,0 +1,6 @@ + +from dagster import asset + +@asset +def asset_2(): + pass \ No newline at end of file diff --git a/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/pipeline2_tests/__init__.py b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/pipeline2_tests/__init__.py new file mode 100644 index 0000000000000..8b137891791fe --- /dev/null +++ b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/pipeline2_tests/__init__.py @@ -0,0 +1 @@ + diff --git a/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/pipeline2_tests/test_assets.py b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/pipeline2_tests/test_assets.py new file mode 100644 index 0000000000000..8b137891791fe --- /dev/null +++ b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/pipeline2_tests/test_assets.py @@ -0,0 +1 @@ + diff --git a/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/pyproject.toml b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/pyproject.toml new file mode 100644 index 0000000000000..f22a7bb2865b7 --- /dev/null +++ b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/pyproject.toml @@ -0,0 +1,6 @@ +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" + +[tool.dagster] +module_name = "pipeline2" diff --git a/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/setup.cfg b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/setup.cfg new file mode 100644 index 0000000000000..c548041a50520 --- /dev/null +++ b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/setup.cfg @@ -0,0 +1,2 @@ +[metadata] +name = pipeline2 diff --git a/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/setup.py b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/setup.py new file mode 100644 index 0000000000000..4c1aaa71f8900 --- /dev/null +++ b/examples/deploy_cloud_run/cloud_run_pipeline/pipeline2/setup.py @@ -0,0 +1,11 @@ +from setuptools import find_packages, setup + +setup( + name="pipeline2", + packages=find_packages(exclude=["pipeline2_tests"]), + install_requires=[ + "dagster", + "dagster-cloud" + ], + extras_require={"dev": ["dagster-webserver", "pytest"]}, +) diff --git a/examples/deploy_cloud_run/cloud_run_pipeline/workspace.yaml b/examples/deploy_cloud_run/cloud_run_pipeline/workspace.yaml new file mode 100644 index 0000000000000..45848f0fadc9e --- /dev/null +++ b/examples/deploy_cloud_run/cloud_run_pipeline/workspace.yaml @@ -0,0 +1,7 @@ +load_from: + - python_module: + module_name: pipeline1 + working_directory: pipeline1 + - python_module: + module_name: pipeline2 + working_directory: pipeline2 \ No newline at end of file diff --git a/examples/deploy_cloud_run/deploy_cloud_run_job.sh b/examples/deploy_cloud_run/deploy_cloud_run_job.sh new file mode 100644 index 0000000000000..178a44eb41958 --- /dev/null +++ b/examples/deploy_cloud_run/deploy_cloud_run_job.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +PROJECT_ID="" # add your project id +FOLDER_NAME="dagster" +REGION="" # add your region +SERVICE_ACCOUNT_EMAIL="" # add your service account +PIPELINE_BASE_DIR="./cloud_run_pipeline" + +for PIPELINE_DIR in $PIPELINE_BASE_DIR/*; do + if [ -d "$PIPELINE_DIR" ]; then + PIPELINE_NAME=$(basename "$PIPELINE_DIR") + IMAGE_NAME="dagster-$PIPELINE_NAME" + JOB_NAME="${PIPELINE_NAME}-job" + + echo "Building Docker image for $PIPELINE_NAME..." + docker build -t $IMAGE_NAME -f Dockerfile --build-arg PIPELINE_DIR=$PIPELINE_DIR --platform=linux/amd64 . + + echo "Tagging Docker image for $PIPELINE_NAME..." + docker tag $IMAGE_NAME $REGION-docker.pkg.dev/$PROJECT_ID/$FOLDER_NAME/$IMAGE_NAME:latest + + echo "Pushing Docker image for $PIPELINE_NAME to Google Artifact Registry..." + docker push $REGION-docker.pkg.dev/$PROJECT_ID/$FOLDER_NAME/$IMAGE_NAME:latest + + echo "Creating Cloud Run job for $PIPELINE_NAME..." + gcloud beta run jobs create $JOB_NAME \ + --image=$REGION-docker.pkg.dev/$PROJECT_ID/$FOLDER_NAME/$IMAGE_NAME:latest \ + --region=$REGION \ + --service-account=$SERVICE_ACCOUNT_EMAIL \ + --project=$PROJECT_ID \ + --set-secrets="DAGSTER_PG_HOST=DAGSTER_PG_HOST:latest,DAGSTER_PG_USERNAME=DAGSTER_PG_USERNAME:latest,DAGSTER_PG_PASSWORD=DAGSTER_PG_PASSWORD:latest,DAGSTER_PG_DB=DAGSTER_PG_DB:latest" + fi +done diff --git a/examples/deploy_cloud_run/deploy_vm.sh b/examples/deploy_cloud_run/deploy_vm.sh new file mode 100644 index 0000000000000..a2aed2b476d57 --- /dev/null +++ b/examples/deploy_cloud_run/deploy_vm.sh @@ -0,0 +1,73 @@ +# !/bin/bash + +VM_NAME="dagster-vm" +ZONE="" # add your zone +PROJECT_ID="" # add your project id +LOCAL_FILE_PATH="./cloud_run_pipeline/*" +DAGSTER_GCP_PATH="../../python_modules/libraries/dagster-gcp/*" +REMOTE_DAGSTER_GCP_PATH="/opt/dagster/app/python_modules/libraries/dagster_gcp" +REMOTE_DIR="/opt/dagster/app" +SERVICE_ACCOUNT_EMAIL="" # service account used must have the right to launch a cloud run job and access secrets from secret manager +SCOPES="cloud-platform" + +gcloud compute instances create $VM_NAME \ + --zone=$ZONE \ + --machine-type=e2-micro \ + --image-family=ubuntu-2204-lts \ + --image-project=ubuntu-os-cloud \ + --project=$PROJECT_ID \ + --service-account=$SERVICE_ACCOUNT_EMAIL \ + --scopes=$SCOPES + +echo "waiting for VM to be created..." +sleep 40 + +gcloud compute ssh $VM_NAME --zone=$ZONE --command=" + sudo mkdir -p $REMOTE_DIR $REMOTE_DAGSTER_GCP_PATH + sudo chown -R $USER $REMOTE_DIR $REMOTE_DAGSTER_GCP_PATH +" --project=$PROJECT_ID + +gcloud compute scp --recurse $LOCAL_FILE_PATH ${VM_NAME}:$REMOTE_DIR \ + --zone=$ZONE \ + --project=$PROJECT_ID + +gcloud compute scp --recurse $DAGSTER_GCP_PATH ${VM_NAME}:$REMOTE_DAGSTER_GCP_PATH \ + --zone=$ZONE \ + --project=$PROJECT_ID + +gcloud compute ssh $VM_NAME --zone=$ZONE --command=" + # Update package list and install prerequisites + sudo apt-get update + sudo apt-get install -y python3.10 python3-venv python3-pip python3.10-distutils + + python3 -m pip install --upgrade pip + + # Create and activate a virtual environment with Python 3.10 + python3 -m venv .venv + source .venv/bin/activate + + # fetch secrets for postgres db from secret manager + export DAGSTER_PG_HOST=\$(gcloud secrets versions access latest --secret='DAGSTER_PG_HOST' --project=$PROJECT_ID) + export DAGSTER_PG_USERNAME=\$(gcloud secrets versions access latest --secret='DAGSTER_PG_USERNAME' --project=$PROJECT_ID) + export DAGSTER_PG_PASSWORD=\$(gcloud secrets versions access latest --secret='DAGSTER_PG_PASSWORD' --project=$PROJECT_ID) + export DAGSTER_PG_DB=\$(gcloud secrets versions access latest --secret='DAGSTER_PG_DB' --project=$PROJECT_ID) + + # Install Dagster + pip install dagster-postgres dagster-webserver + # install dagster-gcp + pip install $REMOTE_DAGSTER_GCP_PATH + + echo 'installed dagster' + + export DAGSTER_HOME=$REMOTE_DIR + cd $REMOTE_DIR + + echo 'starting dagster daemon' + # Start the Dagster daemon + nohup dagster-daemon run & + + echo 'starting dagster webserver' + # Start the dagster webserver + nohup dagster-webserver -h 0.0.0.0 -p 3000 + +" --project=$PROJECT_ID \ No newline at end of file diff --git a/pyright/master/requirements-pinned.txt b/pyright/master/requirements-pinned.txt index 790be83ca6351..a2d12c3561f5c 100644 --- a/pyright/master/requirements-pinned.txt +++ b/pyright/master/requirements-pinned.txt @@ -238,6 +238,7 @@ google-auth-httplib2==0.2.0 google-auth-oauthlib==1.2.0 google-cloud-bigquery==3.21.0 google-cloud-core==2.4.1 +google-cloud-run==0.10.5 google-cloud-storage==2.16.0 google-crc32c==1.5.0 google-re2==1.1.20240501 diff --git a/python_modules/libraries/dagster-gcp/dagster_gcp/cloud_run/__init__.py b/python_modules/libraries/dagster-gcp/dagster_gcp/cloud_run/__init__.py new file mode 100644 index 0000000000000..879449f5ee8e6 --- /dev/null +++ b/python_modules/libraries/dagster-gcp/dagster_gcp/cloud_run/__init__.py @@ -0,0 +1 @@ +from .run_launcher import CloudRunRunLauncher \ No newline at end of file diff --git a/python_modules/libraries/dagster-gcp/dagster_gcp/cloud_run/run_launcher.py b/python_modules/libraries/dagster-gcp/dagster_gcp/cloud_run/run_launcher.py new file mode 100644 index 0000000000000..fd36ff5dad2c4 --- /dev/null +++ b/python_modules/libraries/dagster-gcp/dagster_gcp/cloud_run/run_launcher.py @@ -0,0 +1,253 @@ +import traceback +from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence + +import tenacity +from dagster import ( + DagsterInstance, + Field, + Permissive, + StringSource, + _check as check, +) +from dagster._core.events import EngineEventData +from dagster._core.launcher.base import ( + CheckRunHealthResult, + DagsterRun, + LaunchRunContext, + RunLauncher, + WorkerStatus, +) +from dagster._grpc.types import ExecuteRunArgs +from dagster._serdes import ConfigurableClass, ConfigurableClassData +from google.api_core.exceptions import Conflict, ResourceExhausted, ServerError +from google.api_core.operation import Operation +from google.cloud import run_v2 +from google.cloud.run_v2 import RunJobRequest +from google.cloud.run_v2.types import k8s_min +from typing_extensions import Self + +if TYPE_CHECKING: + from dagster._config.config_schema import UserConfigSchema + + +class CloudRunRunLauncher(RunLauncher, ConfigurableClass): + """Run launcher for launching runs as a Google Cloud Run job.""" + + def __init__( + self, + project: str, + region: str, + job_name_by_code_location: "dict[str, str]", + run_job_retry: "dict[str, int]", + inst_data: Optional[ConfigurableClassData] = None, + ): + self._inst_data = inst_data + self.project = project + self.region = region + self.job_name_by_code_location = job_name_by_code_location + + self.run_job_retry_wait = run_job_retry["wait"] + self.run_job_retry_timeout = run_job_retry["timeout"] + + self.jobs_client = run_v2.JobsClient() + self.executions_client = run_v2.ExecutionsClient() + + def launch_run(self, context: LaunchRunContext) -> None: + external_job_origin = check.not_none(context.dagster_run.external_job_origin) + current_code_location = external_job_origin.location_name + + job_origin = check.not_none(context.job_code_origin) + repository_origin = job_origin.repository_origin + + stripped_repository_origin = repository_origin._replace(container_context={}) + stripped_job_origin = job_origin._replace(repository_origin=stripped_repository_origin) + + args = ExecuteRunArgs( + job_origin=stripped_job_origin, + run_id=context.dagster_run.run_id, + instance_ref=self._instance.get_ref(), + ) + + command_args = args.get_command_args() + + operation = self.create_execution(current_code_location, command_args) + execution_id = operation.metadata.name.split("/")[-1] + + instance: DagsterInstance = self._instance + instance.report_engine_event( + message="Launched run in Cloud Run execution", + dagster_run=context.dagster_run, + engine_event_data=EngineEventData({"Execution ID": execution_id}), + cls=self.__class__, + ) + instance.add_run_tags( + context.dagster_run.run_id, {"cloud_run_job_execution_id": execution_id} + ) + + def fully_qualified_job_name(self, code_location_name: str) -> str: + try: + job_name = self.job_name_by_code_location[code_location_name] + except KeyError: + raise Exception(f"No run launcher defined for code location: {code_location_name}") + return f"projects/{self.project}/locations/{self.region}/jobs/{job_name}" + + def create_execution(self, code_location_name: str, args: Sequence[str]): + job_name = self.fully_qualified_job_name(code_location_name) + return self.execute_job(job_name, args=args) + + def execute_job( + self, + fully_qualified_job_name: str, + timeout: str = "3600s", + args: Optional[Sequence[str]] = None, + env: Optional["dict[str, str]"] = None, + ) -> Operation: + request = RunJobRequest(name=fully_qualified_job_name) + + overrides = {} + if args: + overrides["args"] = args + if env: + overrides["env"] = [ + k8s_min.EnvVar(name=name, value=value) for name, value in env.items() + ] + + container_overrides = [RunJobRequest.Overrides.ContainerOverride(**overrides)] + + request.overrides.container_overrides.extend(container_overrides) + request.overrides.timeout = timeout + + @tenacity.retry( + wait=tenacity.wait_fixed(self.run_job_retry_wait), + stop=tenacity.stop_after_delay(self.run_job_retry_timeout), + retry=tenacity.retry_if_exception_type(ResourceExhausted), + ) + def run_job_with_retries_when_quota_exceeded(request: RunJobRequest): + operation = self.jobs_client.run_job(request) + return operation + + operation = run_job_with_retries_when_quota_exceeded(request) + return operation + + def terminate(self, run_id: str) -> bool: + instance: DagsterInstance = self._instance + run = check.not_none(instance.get_run_by_id(run_id)) + execution_id = run.tags.get("cloud_run_job_execution_id") + + if not execution_id: + self._instance.report_engine_event( + message="Unable to identify Cloud Run execution ID for termination", + dagster_run=run, + cls=self.__class__, + ) + return False + + instance.report_run_canceling(run) + external_job_origin = check.not_none(run.external_job_origin) + try: + fully_qualified_execution_name = ( + f"{self.fully_qualified_job_name(external_job_origin.location_name)}" + f"/executions/{execution_id}" + ) + request = run_v2.CancelExecutionRequest( + name=fully_qualified_execution_name, + ) + self.executions_client.cancel_execution(request=request) + except (ServerError, Conflict): + self._instance.report_engine_event( + message=f"Failed to terminate Cloud Run execution: {execution_id}. Error:\n{traceback.format_exc()}", + dagster_run=run, + cls=self.__class__, + ) + return False + + instance.report_run_canceled(run) + return True + + @property + def inst_data(self) -> Optional[ConfigurableClassData]: + return self._inst_data + + @classmethod + def config_type(cls) -> "UserConfigSchema": + return { + "project": Field( + StringSource, + is_required=True, + description="Google Cloud Platform project ID", + ), + "region": Field( + StringSource, + is_required=True, + description="Google Cloud Platform region for the Cloud Run jobs", + ), + "job_name_by_code_location": Field( + Permissive({}), + is_required=True, + description=( + "Job name for each code location. Each item in this map should be a key-value" + " pair where the key is the code location name and the value is the job name." + ), + ), + "run_job_retry": Field( + { + "wait": Field( + int, + is_required=False, + default_value=10, + description="Number of seconds to wait between retries", + ), + "timeout": Field( + int, + is_required=False, + default_value=300, + description="Number of seconds to wait before timing out", + ), + }, + is_required=False, + default_value={"wait": 10, "timeout": 300}, + description=( + "Retry configuration for run job requests. Note that the default Cloud Run " + "Admin API quota is quite low, which makes retries more likely." + ), + ), + } + + @classmethod + def from_config_value( + cls, inst_data: ConfigurableClassData, config_value: Mapping[str, Any] + ) -> Self: + return cls(inst_data=inst_data, **config_value) + + @property + def supports_check_run_worker_health(self): + return True + + def check_run_worker_health(self, run: DagsterRun) -> CheckRunHealthResult: + execution_id = run.tags.get("cloud_run_job_execution_id") + + if not execution_id: + return CheckRunHealthResult(WorkerStatus.UNKNOWN) + + external_job_origin = check.not_none(run.external_job_origin) + try: + fully_qualified_execution_name = ( + f"{self.fully_qualified_job_name(external_job_origin.location_name)}" + f"/executions/{execution_id}" + ) + request = run_v2.GetExecutionRequest(name=fully_qualified_execution_name) + execution = self.executions_client.get_execution(request=request) + if execution.reconciling: + return CheckRunHealthResult(WorkerStatus.RUNNING) + elif execution.failed_count > 0 or execution.cancelled_count > 0: + return CheckRunHealthResult(WorkerStatus.FAILED) + elif execution.succeeded_count > 0: + return CheckRunHealthResult(WorkerStatus.SUCCESS) + else: + return CheckRunHealthResult( + WorkerStatus.UNKNOWN, msg="Unable to determine execution status" + ) + except (ServerError, Conflict): + return CheckRunHealthResult( + WorkerStatus.UNKNOWN, msg="Unable to fetch execution status" + ) diff --git a/python_modules/libraries/dagster-gcp/dagster_gcp_tests/cloud_run_tests/__init__.py b/python_modules/libraries/dagster-gcp/dagster_gcp_tests/cloud_run_tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/python_modules/libraries/dagster-gcp/dagster_gcp_tests/cloud_run_tests/conftest.py b/python_modules/libraries/dagster-gcp/dagster_gcp_tests/cloud_run_tests/conftest.py new file mode 100644 index 0000000000000..e982cc707653a --- /dev/null +++ b/python_modules/libraries/dagster-gcp/dagster_gcp_tests/cloud_run_tests/conftest.py @@ -0,0 +1,121 @@ +from contextlib import contextmanager +from typing import Callable, ContextManager, Iterator +from unittest.mock import Mock, patch + +import pytest +from dagster._core.definitions.job_definition import JobDefinition +from dagster._core.instance import DagsterInstance +from dagster._core.remote_representation.external import ExternalJob +from dagster._core.storage.dagster_run import DagsterRun +from dagster._core.test_utils import in_process_test_workspace, instance_for_test +from dagster._core.types.loadable_target_origin import LoadableTargetOrigin +from dagster._core.workspace.context import WorkspaceRequestContext + +from dagster_gcp_tests.cloud_run_tests import repo + +IN_PROCESS_NAME = "<>" + + +@pytest.fixture +def instance_cm() -> Callable[..., ContextManager[DagsterInstance]]: + @contextmanager + def cm(config=None): + overrides = { + "run_launcher": { + "module": "dagster_gcp.cloud_run.run_launcher", + "class": "CloudRunRunLauncher", + "config": config or {}, + } + } + with instance_for_test(overrides) as dagster_instance: + yield dagster_instance + + return cm + + +@pytest.fixture +def instance( + instance_cm: Callable[..., ContextManager[DagsterInstance]], +) -> Iterator[DagsterInstance]: + with instance_cm( + { + "project": "test_project", + "region": "test_region", + "job_name_by_code_location": {IN_PROCESS_NAME: "test_job_name"}, + "run_job_retry": {"wait": 1, "timeout": 60}, + } + ) as dagster_instance: + yield dagster_instance + + +@pytest.fixture +def workspace(instance: DagsterInstance) -> Iterator[WorkspaceRequestContext]: + with in_process_test_workspace( + instance, + loadable_target_origin=LoadableTargetOrigin( + python_file=repo.__file__, + attribute=repo.repository.name, + ), + container_image="dagster:latest", + ) as workspace: + yield workspace + + +@pytest.fixture +def job() -> JobDefinition: + return repo.job + + +@pytest.fixture +def external_job(workspace: WorkspaceRequestContext) -> ExternalJob: + location = workspace.get_code_location(workspace.code_location_names[0]) + return location.get_repository(repo.repository.name).get_full_external_job(repo.job.name) + + +@pytest.fixture +def run(instance: DagsterInstance, job: JobDefinition, external_job: ExternalJob) -> DagsterRun: + return instance.create_run_for_job( + job, + external_job_origin=external_job.get_external_origin(), + job_code_origin=external_job.get_python_origin(), + ) + + +@pytest.fixture +def executions(): + return {} + + +@pytest.fixture +def mock_jobs_client(executions): + with patch("google.cloud.run_v2.JobsClient") as MockJobsClient: + mock_jobs_client = MockJobsClient.return_value + operation = Mock() + operation.metadata.name = "projects/test_project/locations/test_region/jobs/test_job_name/executions/test_execution_id" + mock_jobs_client.run_job.return_value = operation + + mock_execution = Mock( + reconciling=True, succeeded_count=0, failed_count=0, cancelled_count=0 + ) + executions["test_execution_id"] = mock_execution + yield mock_jobs_client + + +@pytest.fixture +def mock_executions_client(executions): + with patch("google.cloud.run_v2.ExecutionsClient") as MockExecutionsClient: + mock_executions_client = MockExecutionsClient.return_value + + def cancel_execution(request): + execution_id = request.name.split("/")[-1] + executions[execution_id].reconciling = False + executions[execution_id].cancelled_count = 1 + + def get_execution(request): + execution_id = request.name.split("/")[-1] + return executions[execution_id] + + mock_executions_client.cancel_execution.side_effect = cancel_execution + mock_executions_client.get_execution.side_effect = get_execution + + yield mock_executions_client diff --git a/python_modules/libraries/dagster-gcp/dagster_gcp_tests/cloud_run_tests/repo.py b/python_modules/libraries/dagster-gcp/dagster_gcp_tests/cloud_run_tests/repo.py new file mode 100644 index 0000000000000..0a04980cd82db --- /dev/null +++ b/python_modules/libraries/dagster-gcp/dagster_gcp_tests/cloud_run_tests/repo.py @@ -0,0 +1,16 @@ +import dagster + + +@dagster.op +def node(_): + pass + + +@dagster.job +def job(): + node() + + +@dagster.repository +def repository(): + return {"jobs": {"job": job}} diff --git a/python_modules/libraries/dagster-gcp/dagster_gcp_tests/cloud_run_tests/test_run_launcher.py b/python_modules/libraries/dagster-gcp/dagster_gcp_tests/cloud_run_tests/test_run_launcher.py new file mode 100644 index 0000000000000..d380130bedc0f --- /dev/null +++ b/python_modules/libraries/dagster-gcp/dagster_gcp_tests/cloud_run_tests/test_run_launcher.py @@ -0,0 +1,83 @@ +from dagster import ( + DagsterInstance, + DagsterRun, + DagsterRunStatus, + _check as check, +) +from dagster._core.launcher import WorkerStatus +from dagster._core.workspace.workspace import IWorkspace +from google.cloud.run_v2 import CancelExecutionRequest, GetExecutionRequest, RunJobRequest + + +def test_launch_run( + instance: DagsterInstance, run: DagsterRun, workspace: IWorkspace, mock_jobs_client +): + instance.launch_run(run.run_id, workspace) + run = check.not_none(instance.get_run_by_id(run.run_id)) + + # Assert the correct tag is set + assert run.tags["cloud_run_job_execution_id"] == "test_execution_id" + + # Check that mock was called with correct args + mock_jobs_client.run_job.assert_called_once() + args, _ = mock_jobs_client.run_job.call_args + assert isinstance(args[0], RunJobRequest) + assert args[0].name == "projects/test_project/locations/test_region/jobs/test_job_name" + + +def test_terminate( + instance: DagsterInstance, + run: DagsterRun, + workspace: IWorkspace, + mock_jobs_client, + mock_executions_client, +): + instance.launch_run(run.run_id, workspace) + assert instance.run_launcher.terminate(run.run_id) + + # Check that a cancellation engine event was emitted + run = check.not_none(instance.get_run_by_id(run.run_id)) + assert run.status == DagsterRunStatus.CANCELED + + # Check that the execution client was called with the correct args + assert mock_executions_client.cancel_execution.call_count == 1 + _, kwargs = mock_executions_client.cancel_execution.call_args + request = kwargs["request"] + assert isinstance(request, CancelExecutionRequest) + assert ( + request.name + == "projects/test_project/locations/test_region/jobs/test_job_name/executions/test_execution_id" + ) + + +def test_check_run_worker_health( + instance: DagsterInstance, + run: DagsterRun, + workspace: IWorkspace, + mock_jobs_client, + mock_executions_client, +): + instance.launch_run(run.run_id, workspace) + run = check.not_none(instance.get_run_by_id(run.run_id)) + result = instance.run_launcher.check_run_worker_health(run) + assert result.status == WorkerStatus.RUNNING + assert mock_executions_client.get_execution.call_count == 1 + _, kwargs = mock_executions_client.get_execution.call_args + request = kwargs["request"] + assert isinstance(request, GetExecutionRequest) + assert ( + request.name + == "projects/test_project/locations/test_region/jobs/test_job_name/executions/test_execution_id" + ) + + instance.run_launcher.terminate(run.run_id) + result = instance.run_launcher.check_run_worker_health(run) + assert result.status == WorkerStatus.FAILED + assert mock_executions_client.get_execution.call_count == 2 + _, kwargs = mock_executions_client.get_execution.call_args + request = kwargs["request"] + assert isinstance(request, GetExecutionRequest) + assert ( + request.name + == "projects/test_project/locations/test_region/jobs/test_job_name/executions/test_execution_id" + ) diff --git a/python_modules/libraries/dagster-gcp/setup.py b/python_modules/libraries/dagster-gcp/setup.py index d643632b31136..eab3ba83e20b7 100644 --- a/python_modules/libraries/dagster-gcp/setup.py +++ b/python_modules/libraries/dagster-gcp/setup.py @@ -41,6 +41,7 @@ def get_version() -> str: "google-api-python-client", "google-cloud-bigquery", "google-cloud-storage", + "google-cloud-run", "oauth2client", ], # we need `pyarrow` for testing read/write parquet files.