Skip to content

Commit

Permalink
Add deferrable mode to AWS glue operators (Job & Crawl) (#30948)
Browse files Browse the repository at this point in the history
* add deferrable mode to glue operator

* convert wait for crawler to waiter

* deferrable glue crawler

* add status logging to glue crawler trigger + fix some bugs

Co-authored-by: Niko Oliveira <onikolas@amazon.com>

---------

Co-authored-by: Niko Oliveira <onikolas@amazon.com>
  • Loading branch information
vandonr-amz and o-nikolas committed May 31, 2023
1 parent 025119e commit 635f94c
Show file tree
Hide file tree
Showing 12 changed files with 471 additions and 112 deletions.
3 changes: 2 additions & 1 deletion airflow/providers/amazon/aws/hooks/base_aws.py
Expand Up @@ -836,7 +836,8 @@ def get_waiter(
corresponding value. If a custom waiter has such keys to be expanded, they need to be provided
here.
:param deferrable: If True, the waiter is going to be an async custom waiter.
An async client must be provided in that case.
:param client: The client to use for the waiter's operations
"""
from airflow.providers.amazon.aws.waiters.base_waiter import BaseBotoWaiter

Expand Down
84 changes: 63 additions & 21 deletions airflow/providers/amazon/aws/hooks/glue.py
Expand Up @@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

import asyncio
import time

import boto3
Expand Down Expand Up @@ -194,6 +195,12 @@ def get_job_state(self, job_name: str, run_id: str) -> str:
job_run = self.conn.get_job_run(JobName=job_name, RunId=run_id, PredecessorsIncluded=True)
return job_run["JobRun"]["JobRunState"]

async def async_get_job_state(self, job_name: str, run_id: str) -> str:
"""The async version of get_job_state."""
async with self.async_conn as client:
job_run = await client.get_job_run(JobName=job_name, RunId=run_id)
return job_run["JobRun"]["JobRunState"]

def print_job_logs(
self,
job_name: str,
Expand Down Expand Up @@ -264,33 +271,68 @@ def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> d
:param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs. (default: False)
:return: Dict of JobRunState and JobRunId
"""
failed_states = ["FAILED", "TIMEOUT"]
finished_states = ["SUCCEEDED", "STOPPED"]
next_log_tokens = self.LogContinuationTokens()
while True:
if verbose:
self.print_job_logs(
job_name=job_name,
run_id=run_id,
continuation_tokens=next_log_tokens,
)

job_run_state = self.get_job_state(job_name, run_id)
if job_run_state in finished_states:
self.log.info("Exiting Job %s Run State: %s", run_id, job_run_state)
return {"JobRunState": job_run_state, "JobRunId": run_id}
if job_run_state in failed_states:
job_error_message = f"Exiting Job {run_id} Run State: {job_run_state}"
self.log.info(job_error_message)
raise AirflowException(job_error_message)
ret = self._handle_state(job_run_state, job_name, run_id, verbose, next_log_tokens)
if ret:
return ret
else:
self.log.info(
"Polling for AWS Glue Job %s current run state with status %s",
job_name,
job_run_state,
)
time.sleep(self.JOB_POLL_INTERVAL)

async def async_job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> dict[str, str]:
"""
Waits until Glue job with job_name completes or fails and return final state if finished.
Raises AirflowException when the job failed.
:param job_name: unique job name per AWS account
:param run_id: The job-run ID of the predecessor job run
:param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs. (default: False)
:return: Dict of JobRunState and JobRunId
"""
next_log_tokens = self.LogContinuationTokens()
while True:
job_run_state = await self.async_get_job_state(job_name, run_id)
ret = self._handle_state(job_run_state, job_name, run_id, verbose, next_log_tokens)
if ret:
return ret
else:
await asyncio.sleep(self.JOB_POLL_INTERVAL)

def _handle_state(
self,
state: str,
job_name: str,
run_id: str,
verbose: bool,
next_log_tokens: GlueJobHook.LogContinuationTokens,
) -> dict | None:
"""Helper function to process Glue Job state while polling. Used by both sync and async methods."""
failed_states = ["FAILED", "TIMEOUT"]
finished_states = ["SUCCEEDED", "STOPPED"]

if verbose:
self.print_job_logs(
job_name=job_name,
run_id=run_id,
continuation_tokens=next_log_tokens,
)

if state in finished_states:
self.log.info("Exiting Job %s Run State: %s", run_id, state)
return {"JobRunState": state, "JobRunId": run_id}
if state in failed_states:
job_error_message = f"Exiting Job {run_id} Run State: {state}"
self.log.info(job_error_message)
raise AirflowException(job_error_message)
else:
self.log.info(
"Polling for AWS Glue Job %s current run state with status %s",
job_name,
state,
)
return None

def has_job(self, job_name) -> bool:
"""
Checks if the job already exists.
Expand Down
57 changes: 17 additions & 40 deletions airflow/providers/amazon/aws/hooks/glue_crawler.py
Expand Up @@ -18,9 +18,7 @@
from __future__ import annotations

from functools import cached_property
from time import sleep

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.hooks.sts import StsHook

Expand Down Expand Up @@ -179,41 +177,20 @@ def wait_for_crawler_completion(self, crawler_name: str, poll_interval: int = 5)
:param poll_interval: Time (in seconds) to wait between two consecutive calls to check crawler status
:return: Crawler's status
"""
failed_status = ["FAILED", "CANCELLED"]

while True:
crawler = self.get_crawler(crawler_name)
crawler_state = crawler["State"]
if crawler_state == "READY":
self.log.info("State: %s", crawler_state)
self.log.info("crawler_config: %s", crawler)
crawler_status = crawler["LastCrawl"]["Status"]
if crawler_status in failed_status:
raise AirflowException(f"Status: {crawler_status}")
metrics = self.glue_client.get_crawler_metrics(CrawlerNameList=[crawler_name])[
"CrawlerMetricsList"
][0]
self.log.info("Status: %s", crawler_status)
self.log.info("Last Runtime Duration (seconds): %s", metrics["LastRuntimeSeconds"])
self.log.info("Median Runtime Duration (seconds): %s", metrics["MedianRuntimeSeconds"])
self.log.info("Tables Created: %s", metrics["TablesCreated"])
self.log.info("Tables Updated: %s", metrics["TablesUpdated"])
self.log.info("Tables Deleted: %s", metrics["TablesDeleted"])

return crawler_status

else:
self.log.info("Polling for AWS Glue crawler: %s ", crawler_name)
self.log.info("State: %s", crawler_state)

metrics = self.glue_client.get_crawler_metrics(CrawlerNameList=[crawler_name])[
"CrawlerMetricsList"
][0]
time_left = int(metrics["TimeLeftSeconds"])

if time_left > 0:
self.log.info("Estimated Time Left (seconds): %s", time_left)
else:
self.log.info("Crawler should finish soon")

sleep(poll_interval)
self.get_waiter("crawler_ready").wait(Name=crawler_name, WaiterConfig={"Delay": poll_interval})

# query one extra time to log some info
crawler = self.get_crawler(crawler_name)
self.log.info("crawler_config: %s", crawler)
crawler_status = crawler["LastCrawl"]["Status"]

metrics_response = self.glue_client.get_crawler_metrics(CrawlerNameList=[crawler_name])
metrics = metrics_response["CrawlerMetricsList"][0]
self.log.info("Status: %s", crawler_status)
self.log.info("Last Runtime Duration (seconds): %s", metrics["LastRuntimeSeconds"])
self.log.info("Median Runtime Duration (seconds): %s", metrics["MedianRuntimeSeconds"])
self.log.info("Tables Created: %s", metrics["TablesCreated"])
self.log.info("Tables Updated: %s", metrics["TablesUpdated"])
self.log.info("Tables Deleted: %s", metrics["TablesDeleted"])

return crawler_status
27 changes: 25 additions & 2 deletions airflow/providers/amazon/aws/operators/glue.py
Expand Up @@ -21,10 +21,12 @@
import urllib.parse
from typing import TYPE_CHECKING, Sequence

from airflow import AirflowException
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.links.glue import GlueJobRunDetailsLink
from airflow.providers.amazon.aws.triggers.glue import GlueJobCompleteTrigger

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down Expand Up @@ -52,7 +54,10 @@ class GlueJobOperator(BaseOperator):
:param iam_role_name: AWS IAM Role for Glue Job Execution
:param create_job_kwargs: Extra arguments for Glue Job Creation
:param run_job_kwargs: Extra arguments for Glue Job Run
:param wait_for_completion: Whether or not wait for job run completion. (default: True)
:param wait_for_completion: Whether to wait for job run completion. (default: True)
:param deferrable: If True, the operator will wait asynchronously for the job to complete.
This implies waiting for completion. This mode requires aiobotocore module to be installed.
(default: False)
:param verbose: If True, Glue Job Run logs show in the Airflow Task Logs. (default: False)
:param update_config: If True, Operator will update job configuration. (default: False)
"""
Expand Down Expand Up @@ -91,6 +96,7 @@ def __init__(
create_job_kwargs: dict | None = None,
run_job_kwargs: dict | None = None,
wait_for_completion: bool = True,
deferrable: bool = False,
verbose: bool = False,
update_config: bool = False,
**kwargs,
Expand All @@ -114,6 +120,7 @@ def __init__(
self.wait_for_completion = wait_for_completion
self.verbose = verbose
self.update_config = update_config
self.deferrable = deferrable

def execute(self, context: Context):
"""
Expand Down Expand Up @@ -167,7 +174,18 @@ def execute(self, context: Context):
job_run_id=glue_job_run["JobRunId"],
)
self.log.info("You can monitor this Glue Job run at: %s", glue_job_run_url)
if self.wait_for_completion:

if self.deferrable:
self.defer(
trigger=GlueJobCompleteTrigger(
job_name=self.job_name,
run_id=glue_job_run["JobRunId"],
verbose=self.verbose,
aws_conn_id=self.aws_conn_id,
),
method_name="execute_complete",
)
elif self.wait_for_completion:
glue_job_run = glue_job.job_completion(self.job_name, glue_job_run["JobRunId"], self.verbose)
self.log.info(
"AWS Glue Job: %s status: %s. Run Id: %s",
Expand All @@ -178,3 +196,8 @@ def execute(self, context: Context):
else:
self.log.info("AWS Glue Job: %s. Run Id: %s", self.job_name, glue_job_run["JobRunId"])
return glue_job_run["JobRunId"]

def execute_complete(self, context, event=None):
if event["status"] != "success":
raise AirflowException(f"Error in glue job: {event}")
return
28 changes: 25 additions & 3 deletions airflow/providers/amazon/aws/operators/glue_crawler.py
Expand Up @@ -20,6 +20,9 @@
from functools import cached_property
from typing import TYPE_CHECKING, Sequence

from airflow import AirflowException
from airflow.providers.amazon.aws.triggers.glue_crawler import GlueCrawlerCompleteTrigger

if TYPE_CHECKING:
from airflow.utils.context import Context

Expand All @@ -40,7 +43,10 @@ class GlueCrawlerOperator(BaseOperator):
:param config: Configurations for the AWS Glue crawler
:param aws_conn_id: aws connection to use
:param poll_interval: Time (in seconds) to wait between two consecutive calls to check crawler status
:param wait_for_completion: Whether or not wait for crawl execution completion. (default: True)
:param wait_for_completion: Whether to wait for crawl execution completion. (default: True)
:param deferrable: If True, the operator will wait asynchronously for the crawl to complete.
This implies waiting for completion. This mode requires aiobotocore module to be installed.
(default: False)
"""

template_fields: Sequence[str] = ("config",)
Expand All @@ -53,18 +59,20 @@ def __init__(
region_name: str | None = None,
poll_interval: int = 5,
wait_for_completion: bool = True,
deferrable: bool = False,
**kwargs,
):
super().__init__(**kwargs)
self.aws_conn_id = aws_conn_id
self.poll_interval = poll_interval
self.wait_for_completion = wait_for_completion
self.deferrable = deferrable
self.region_name = region_name
self.config = config

@cached_property
def hook(self) -> GlueCrawlerHook:
"""Create and return an GlueCrawlerHook."""
"""Create and return a GlueCrawlerHook."""
return GlueCrawlerHook(self.aws_conn_id, region_name=self.region_name)

def execute(self, context: Context):
Expand All @@ -81,8 +89,22 @@ def execute(self, context: Context):

self.log.info("Triggering AWS Glue Crawler")
self.hook.start_crawler(crawler_name)
if self.wait_for_completion:
if self.deferrable:
self.defer(
trigger=GlueCrawlerCompleteTrigger(
crawler_name=crawler_name,
poll_interval=self.poll_interval,
aws_conn_id=self.aws_conn_id,
),
method_name="execute_complete",
)
elif self.wait_for_completion:
self.log.info("Waiting for AWS Glue Crawler")
self.hook.wait_for_crawler_completion(crawler_name=crawler_name, poll_interval=self.poll_interval)

return crawler_name

def execute_complete(self, context, event=None):
if event["status"] != "success":
raise AirflowException(f"Error in glue crawl: {event}")
return
63 changes: 63 additions & 0 deletions airflow/providers/amazon/aws/triggers/glue.py
@@ -0,0 +1,63 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import Any, AsyncIterator

from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
from airflow.triggers.base import BaseTrigger, TriggerEvent


class GlueJobCompleteTrigger(BaseTrigger):
"""
Watches for a glue job, triggers when it finishes
:param job_name: glue job name
:param run_id: the ID of the specific run to watch for that job
:param verbose: whether to print the job's logs in airflow logs or not
:param aws_conn_id: The Airflow connection used for AWS credentials.
"""

def __init__(
self,
job_name: str,
run_id: str,
verbose: bool,
aws_conn_id: str,
):
self.job_name = job_name
self.run_id = run_id
self.verbose = verbose
self.aws_conn_id = aws_conn_id

def serialize(self) -> tuple[str, dict[str, Any]]:
return (
# dynamically generate the fully qualified name of the class
self.__class__.__module__ + "." + self.__class__.__qualname__,
{
"job_name": self.job_name,
"run_id": self.run_id,
"verbose": str(self.verbose),
"aws_conn_id": self.aws_conn_id,
},
)

async def run(self) -> AsyncIterator[TriggerEvent]:
hook = GlueJobHook(aws_conn_id=self.aws_conn_id)
await hook.async_job_completion(self.job_name, self.run_id, self.verbose)
yield TriggerEvent({"status": "success", "message": "Job done"})

0 comments on commit 635f94c

Please sign in to comment.