diff --git a/airflow/providers/amazon/aws/example_dags/example_dms.py b/airflow/providers/amazon/aws/example_dags/example_dms.py new file mode 100644 index 0000000000000..e2ce8b852b255 --- /dev/null +++ b/airflow/providers/amazon/aws/example_dags/example_dms.py @@ -0,0 +1,347 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Note: DMS requires you to configure specific IAM roles/permissions. For more information, see +https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Security.html#CHAP_Security.APIRole +""" + +import json +import os +from datetime import datetime + +import boto3 +from sqlalchemy import Column, MetaData, String, Table, create_engine + +from airflow import DAG +from airflow.decorators import task +from airflow.models.baseoperator import chain +from airflow.operators.python import get_current_context +from airflow.providers.amazon.aws.operators.dms import ( + DmsCreateTaskOperator, + DmsDeleteTaskOperator, + DmsDescribeTasksOperator, + DmsStartTaskOperator, + DmsStopTaskOperator, +) +from airflow.providers.amazon.aws.sensors.dms import DmsTaskBaseSensor, DmsTaskCompletedSensor + +S3_BUCKET = os.getenv('S3_BUCKET', 's3_bucket_name') +ROLE_ARN = os.getenv('ROLE_ARN', 'arn:aws:iam::1234567890:role/s3_target_endpoint_role') + +# The project name will be used as a prefix for various entity names. +# Use either PascalCase or camelCase. While some names require kebab-case +# and others require snake_case, they all accept mixedCase strings. +PROJECT_NAME = 'DmsDemo' + +# Config values for setting up the "Source" database. +RDS_ENGINE = 'postgres' +RDS_PROTOCOL = 'postgresql' +RDS_USERNAME = 'username' +# NEVER store your production password in plaintext in a DAG like this. +# Use Airflow Secrets or a secret manager for this in production. +RDS_PASSWORD = 'rds_password' + +# Config values for RDS. +RDS_INSTANCE_NAME = f'{PROJECT_NAME}-instance' +RDS_DB_NAME = f'{PROJECT_NAME}_source_database' + +# Config values for DMS. +DMS_REPLICATION_INSTANCE_NAME = f'{PROJECT_NAME}-replication-instance' +DMS_REPLICATION_TASK_ID = f'{PROJECT_NAME}-replication-task' +SOURCE_ENDPOINT_IDENTIFIER = f'{PROJECT_NAME}-source-endpoint' +TARGET_ENDPOINT_IDENTIFIER = f'{PROJECT_NAME}-target-endpoint' + +# Sample data. +TABLE_NAME = f'{PROJECT_NAME}-table' +TABLE_HEADERS = ['apache_project', 'release_year'] +SAMPLE_DATA = [ + ('Airflow', '2015'), + ('OpenOffice', '2012'), + ('Subversion', '2000'), + ('NiFi', '2006'), +] +TABLE_DEFINITION = { + 'TableCount': '1', + 'Tables': [ + { + 'TableName': TABLE_NAME, + 'TableColumns': [ + { + 'ColumnName': TABLE_HEADERS[0], + 'ColumnType': 'STRING', + 'ColumnNullable': 'false', + 'ColumnIsPk': 'true', + }, + {"ColumnName": TABLE_HEADERS[1], "ColumnType": 'STRING', "ColumnLength": "4"}, + ], + 'TableColumnsTotal': '2', + } + ], +} +TABLE_MAPPINGS = { + 'rules': [ + { + 'rule-type': 'selection', + 'rule-id': '1', + 'rule-name': '1', + 'object-locator': { + 'schema-name': 'public', + 'table-name': TABLE_NAME, + }, + 'rule-action': 'include', + } + ] +} + + +def _create_rds_instance(): + print('Creating RDS Instance.') + + rds_client = boto3.client('rds') + rds_client.create_db_instance( + DBName=RDS_DB_NAME, + DBInstanceIdentifier=RDS_INSTANCE_NAME, + AllocatedStorage=20, + DBInstanceClass='db.t3.micro', + Engine=RDS_ENGINE, + MasterUsername=RDS_USERNAME, + MasterUserPassword=RDS_PASSWORD, + ) + + rds_client.get_waiter('db_instance_available').wait(DBInstanceIdentifier=RDS_INSTANCE_NAME) + + response = rds_client.describe_db_instances(DBInstanceIdentifier=RDS_INSTANCE_NAME) + return response['DBInstances'][0]['Endpoint'] + + +def _create_rds_table(rds_endpoint): + print('Creating table.') + + hostname = rds_endpoint['Address'] + port = rds_endpoint['Port'] + rds_url = f'{RDS_PROTOCOL}://{RDS_USERNAME}:{RDS_PASSWORD}@{hostname}:{port}/{RDS_DB_NAME}' + engine = create_engine(rds_url) + + table = Table( + TABLE_NAME, + MetaData(engine), + Column(TABLE_HEADERS[0], String, primary_key=True), + Column(TABLE_HEADERS[1], String), + ) + + with engine.connect() as connection: + # Create the Table. + table.create() + load_data = table.insert().values(SAMPLE_DATA) + connection.execute(load_data) + + # Read the data back to verify everything is working. + connection.execute(table.select()) + + +def _create_dms_replication_instance(ti, dms_client): + print('Creating replication instance.') + instance_arn = dms_client.create_replication_instance( + ReplicationInstanceIdentifier=DMS_REPLICATION_INSTANCE_NAME, + ReplicationInstanceClass='dms.t3.micro', + )['ReplicationInstance']['ReplicationInstanceArn'] + + ti.xcom_push(key='replication_instance_arn', value=instance_arn) + return instance_arn + + +def _create_dms_endpoints(ti, dms_client, rds_instance_endpoint): + print('Creating DMS source endpoint.') + source_endpoint_arn = dms_client.create_endpoint( + EndpointIdentifier=SOURCE_ENDPOINT_IDENTIFIER, + EndpointType='source', + EngineName=RDS_ENGINE, + Username=RDS_USERNAME, + Password=RDS_PASSWORD, + ServerName=rds_instance_endpoint['Address'], + Port=rds_instance_endpoint['Port'], + DatabaseName=RDS_DB_NAME, + )['Endpoint']['EndpointArn'] + + print('Creating DMS target endpoint.') + target_endpoint_arn = dms_client.create_endpoint( + EndpointIdentifier=TARGET_ENDPOINT_IDENTIFIER, + EndpointType='target', + EngineName='s3', + S3Settings={ + 'BucketName': S3_BUCKET, + 'BucketFolder': PROJECT_NAME, + 'ServiceAccessRoleArn': ROLE_ARN, + 'ExternalTableDefinition': json.dumps(TABLE_DEFINITION), + }, + )['Endpoint']['EndpointArn'] + + ti.xcom_push(key='source_endpoint_arn', value=source_endpoint_arn) + ti.xcom_push(key='target_endpoint_arn', value=target_endpoint_arn) + + +def _await_setup_assets(dms_client, instance_arn): + print("Awaiting asset provisioning.") + dms_client.get_waiter('replication_instance_available').wait( + Filters=[{'Name': 'replication-instance-arn', 'Values': [instance_arn]}] + ) + + +def _delete_rds_instance(): + print('Deleting RDS Instance.') + + rds_client = boto3.client('rds') + rds_client.delete_db_instance( + DBInstanceIdentifier=RDS_INSTANCE_NAME, + SkipFinalSnapshot=True, + ) + + rds_client.get_waiter('db_instance_deleted').wait(DBInstanceIdentifier=RDS_INSTANCE_NAME) + + +def _delete_dms_assets(dms_client): + ti = get_current_context()['ti'] + replication_instance_arn = ti.xcom_pull(key='replication_instance_arn') + source_arn = ti.xcom_pull(key='source_endpoint_arn') + target_arn = ti.xcom_pull(key='target_endpoint_arn') + + print('Deleting DMS assets.') + dms_client.delete_replication_instance(ReplicationInstanceArn=replication_instance_arn) + dms_client.delete_endpoint(EndpointArn=source_arn) + dms_client.delete_endpoint(EndpointArn=target_arn) + + +def _await_all_teardowns(dms_client): + print('Awaiting tear-down.') + dms_client.get_waiter('replication_instance_deleted').wait( + Filters=[{'Name': 'replication-instance-id', 'Values': [DMS_REPLICATION_INSTANCE_NAME]}] + ) + + dms_client.get_waiter('endpoint_deleted').wait( + Filters=[ + { + 'Name': 'endpoint-id', + 'Values': [SOURCE_ENDPOINT_IDENTIFIER, TARGET_ENDPOINT_IDENTIFIER], + } + ] + ) + + +@task +def set_up(): + ti = get_current_context()['ti'] + dms_client = boto3.client('dms') + + rds_instance_endpoint = _create_rds_instance() + _create_rds_table(rds_instance_endpoint) + instance_arn = _create_dms_replication_instance(ti, dms_client) + _create_dms_endpoints(ti, dms_client, rds_instance_endpoint) + _await_setup_assets(dms_client, instance_arn) + + +@task(trigger_rule='all_done') +def clean_up(): + dms_client = boto3.client('dms') + + _delete_rds_instance() + _delete_dms_assets(dms_client) + _await_all_teardowns(dms_client) + + +with DAG( + dag_id='example_dms', + schedule_interval=None, + start_date=datetime(2021, 1, 1), + tags=['example'], + catchup=False, +) as dag: + + # [START howto_operator_dms_create_task] + create_task = DmsCreateTaskOperator( + task_id='create_task', + replication_task_id=DMS_REPLICATION_TASK_ID, + source_endpoint_arn='{{ ti.xcom_pull(key="source_endpoint_arn") }}', + target_endpoint_arn='{{ ti.xcom_pull(key="target_endpoint_arn") }}', + replication_instance_arn='{{ ti.xcom_pull(key="replication_instance_arn") }}', + table_mappings=TABLE_MAPPINGS, + ) + # [END howto_operator_dms_create_task] + + # [START howto_operator_dms_start_task] + start_task = DmsStartTaskOperator( + task_id='start_task', + replication_task_arn=create_task.output, + ) + # [END howto_operator_dms_start_task] + + # [START howto_operator_dms_describe_tasks] + describe_tasks = DmsDescribeTasksOperator( + task_id='describe_tasks', + describe_tasks_kwargs={ + 'Filters': [ + { + 'Name': 'replication-instance-arn', + 'Values': ['{{ ti.xcom_pull(key="replication_instance_arn") }}'], + } + ] + }, + do_xcom_push=False, + ) + # [END howto_operator_dms_describe_tasks] + + await_task_start = DmsTaskBaseSensor( + task_id='await_task_start', + replication_task_arn=create_task.output, + target_statuses=['running'], + termination_statuses=['stopped', 'deleting', 'failed'], + ) + + # [START howto_operator_dms_stop_task] + stop_task = DmsStopTaskOperator( + task_id='stop_task', + replication_task_arn=create_task.output, + ) + # [END howto_operator_dms_stop_task] + + # TaskCompletedSensor actually waits until task reaches the "Stopped" state, so it will work here. + # [START howto_operator_dms_task_completed_sensor] + await_task_stop = DmsTaskCompletedSensor( + task_id='await_task_stop', + replication_task_arn=create_task.output, + ) + # [END howto_operator_dms_task_completed_sensor] + + # [START howto_operator_dms_delete_task] + delete_task = DmsDeleteTaskOperator( + task_id='delete_task', + replication_task_arn=create_task.output, + trigger_rule='all_done', + ) + # [END howto_operator_dms_delete_task] + + chain( + set_up() + >> create_task + >> start_task + >> describe_tasks + >> await_task_start + >> stop_task + >> await_task_stop + >> delete_task + >> clean_up() + ) diff --git a/airflow/providers/amazon/aws/example_dags/example_dms_full_load_task.py b/airflow/providers/amazon/aws/example_dags/example_dms_full_load_task.py deleted file mode 100644 index 939ca36dfd3b2..0000000000000 --- a/airflow/providers/amazon/aws/example_dags/example_dms_full_load_task.py +++ /dev/null @@ -1,92 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -This is an example dag for running full load DMS replication task. -""" -from datetime import datetime, timedelta -from os import getenv - -from airflow import DAG -from airflow.providers.amazon.aws.operators.dms import ( - DmsCreateTaskOperator, - DmsDeleteTaskOperator, - DmsStartTaskOperator, -) -from airflow.providers.amazon.aws.sensors.dms import DmsTaskCompletedSensor - -REPLICATION_TASK_ID = getenv('REPLICATION_TASK_ID', 'full-load-test-export') -SOURCE_ENDPOINT_ARN = getenv('SOURCE_ENDPOINT_ARN', 'source_endpoint_arn') -TARGET_ENDPOINT_ARN = getenv('TARGET_ENDPOINT_ARN', 'target_endpoint_arn') -REPLICATION_INSTANCE_ARN = getenv('REPLICATION_INSTANCE_ARN', 'replication_instance_arn') -TABLE_MAPPINGS = { - 'rules': [ - { - 'rule-type': 'selection', - 'rule-id': '1', - 'rule-name': '1', - 'object-locator': { - 'schema-name': 'test', - 'table-name': '%', - }, - 'rule-action': 'include', - } - ] -} - - -with DAG( - dag_id='dms_full_load_task_run_dag', - dagrun_timeout=timedelta(hours=2), - start_date=datetime(2021, 1, 1), - schedule_interval='0 3 * * *', - catchup=False, - tags=['example'], -) as dag: - - # [START howto_dms_operators] - create_task = DmsCreateTaskOperator( - task_id='create_task', - replication_task_id=REPLICATION_TASK_ID, - source_endpoint_arn=SOURCE_ENDPOINT_ARN, - target_endpoint_arn=TARGET_ENDPOINT_ARN, - replication_instance_arn=REPLICATION_INSTANCE_ARN, - table_mappings=TABLE_MAPPINGS, - ) - - start_task = DmsStartTaskOperator( - task_id='start_task', - replication_task_arn=create_task.output, - ) - - wait_for_completion = DmsTaskCompletedSensor( - task_id='wait_for_completion', - replication_task_arn=create_task.output, - ) - - delete_task = DmsDeleteTaskOperator( - task_id='delete_task', - replication_task_arn=create_task.output, - ) - # [END howto_dms_operators] - - start_task >> wait_for_completion >> delete_task - - # Task dependencies created via `XComArgs`: - # create_task >> start_task - # create_task >> wait_for_completion - # create_task >> delete_task diff --git a/airflow/providers/amazon/aws/operators/dms.py b/airflow/providers/amazon/aws/operators/dms.py index aad16155525e8..aca515cfed3a9 100644 --- a/airflow/providers/amazon/aws/operators/dms.py +++ b/airflow/providers/amazon/aws/operators/dms.py @@ -15,8 +15,6 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - - from typing import TYPE_CHECKING, Dict, Optional, Sequence from airflow.models import BaseOperator @@ -154,6 +152,10 @@ class DmsDescribeTasksOperator(BaseOperator): """ Describes AWS DMS replication tasks. + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:DmsDescribeTasksOperator` + :param describe_tasks_kwargs: Describe tasks command arguments :param aws_conn_id: The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If @@ -250,6 +252,10 @@ class DmsStopTaskOperator(BaseOperator): """ Stops AWS DMS replication task. + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:DmsStopTaskOperator` + :param replication_task_arn: Replication task ARN :param aws_conn_id: The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If diff --git a/docs/apache-airflow-providers-amazon/operators/dms.rst b/docs/apache-airflow-providers-amazon/operators/dms.rst index 5e184426718c6..8ca092b6fc573 100644 --- a/docs/apache-airflow-providers-amazon/operators/dms.rst +++ b/docs/apache-airflow-providers-amazon/operators/dms.rst @@ -16,56 +16,115 @@ under the License. -AWS Database Migration Service Operators -======================================== +============================================== +AWS Database Migration Service (DMS) Operators +============================================== + +`AWS Database Migration Service (AWS DMS) `__ +is a web service you can use to migrate data from your database that is +on-premises, on an Amazon Relational Database Service (Amazon RDS) DB instance, +or in a database on an Amazon Elastic Compute Cloud (Amazon EC2) instance to a +database on an AWS service. These services can include a database on Amazon RDS +or a database on an Amazon EC2 instance. You can also migrate a database from an +AWS service to an on-premises database. You can migrate between source and target +endpoints that use the same database engine, such as from an Oracle database to an +Oracle database. You can also migrate between source and target endpoints that use +different database engines, such as from an Oracle database to a PostgreSQL database. Prerequisite Tasks ------------------ .. include:: _partials/prerequisite_tasks.rst -Overview --------- +.. _howto/operator:DmsCreateTaskOperator: -Airflow to AWS Database Migration Service (DMS) integration provides several operators to create and interact with -DMS replication tasks. +Operators +--------- - - :class:`~airflow.providers.amazon.aws.sensors.dms.DmsTaskBaseSensor` - - :class:`~airflow.providers.amazon.aws.sensors.dms.DmsTaskCompletedSensor` - - :class:`~airflow.providers.amazon.aws.operators.dms.DmsCreateTaskOperator` - - :class:`~airflow.providers.amazon.aws.operators.dms.DmsDeleteTaskOperator` - - :class:`~airflow.providers.amazon.aws.operators.dms.DmsDescribeTasksOperator` - - :class:`~airflow.providers.amazon.aws.operators.dms.DmsStartTaskOperator` - - :class:`~airflow.providers.amazon.aws.operators.dms.DmsStopTaskOperator` +Create a replication task +========================= -One example_dag is provided which showcases some of these operators in action. +To create a replication task you can use +:class:`~airflow.providers.amazon.aws.operators.dms.DmsCreateTaskOperator`. - - example_dms_full_load_task.py +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_dms.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_dms_create_task] + :end-before: [END howto_operator_dms_create_task] -.. _howto/operator:DmsCreateTaskOperator: -.. _howto/operator:DmsDeleteTaskOperator: .. _howto/operator:DmsStartTaskOperator: -.. _howto/sensor:DmsTaskCompletedSensor: -Create replication task, wait for it completion and delete it. --------------------------------------------------------------- +Start a replication task +======================== + +To start a replication task you can use +:class:`~airflow.providers.amazon.aws.operators.dms.DmsStartTaskOperator`. + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_dms.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_dms_start_task] + :end-before: [END howto_operator_dms_start_task] + +.. _howto/operator:DmsDescribeTasksOperator: + +Get details of replication tasks +================================ + +To retrieve the details for a list of replication tasks you can use +:class:`~airflow.providers.amazon.aws.operators.dms.DmsDescribeTasksOperator`. + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_dms.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_dms_describe_tasks] + :end-before: [END howto_operator_dms_describe_tasks] + +.. _howto/operator:DmsStopTaskOperator: + +Stop a replication task +======================= + +To stop a replication task you can use +:class:`~airflow.providers.amazon.aws.operators.dms.DmsStopTaskOperator`. -Purpose -""""""" +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_dms.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_dms_stop_task] + :end-before: [END howto_operator_dms_stop_task] + +.. _howto/operator:DmsDeleteTaskOperator: -This example DAG ``example_dms_full_load_task.py`` uses ``DmsCreateTaskOperator``, ``DmsStartTaskOperator``, -``DmsTaskCompletedSensor``, ``DmsDeleteTaskOperator`` to create replication task, start it, wait for it -to be completed, and then delete it. +Delete a replication task +========================= + +To delete a replication task you can use +:class:`~airflow.providers.amazon.aws.operators.dms.DmsDeleteTaskOperator`. + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_dms.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_dms_delete_task] + :end-before: [END howto_operator_dms_delete_task] + +Sensors +------- + +.. _howto/sensor:DmsTaskCompletedSensor: -Defining tasks -"""""""""""""" +Wait for a replication task to complete +======================================= -In the following code we create a new replication task, start it, wait for it to be completed and then delete it. +To check the state of a replication task until it is completed, you can use +:class:`~airflow.providers.amazon.aws.sensors.dms.DmsTaskCompletedSensor`. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_dms_full_load_task.py +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_dms.py :language: python - :start-after: [START howto_dms_operators] - :end-before: [END howto_dms_operators] + :dedent: 4 + :start-after: [START howto_operator_dms_task_completed_sensor] + :end-before: [END howto_operator_dms_task_completed_sensor] Reference