Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
Add universal transfer operator interfaces with working example (#1492)
Browse files Browse the repository at this point in the history
# Description
## What is the current behavior?
<!-- Please describe the current behavior that you are modifying. -->
Closing the PR astronomer/astro-sdk#1267 and
continuing the change in this fresh PR.

<!--
Issues are required for both bug fixes and features.
Reference it using one of the following:

closes: #ISSUE
related: #ISSUE
-->
closes: #1139 
closes: #1544 
closes: #1545
closes: #1546
closes: #1551


## What is the new behavior?
<!-- Please describe the behavior or changes that are being added by
this PR. -->
- Define interfaces
    -  Use Airflow 2.4 Dataset concept to build more types of Datasets:
        - Table
        - File
        - Dataframe
        - API
    
    - Define an interface for the universal transfer operator 
- Add the `TransferParameters` class to pass transfer configurations.
        - Use context manager from DataProvider for clean up.
- Introduce three transfer modes - `native`, `non-native` and
`third-party`.
    
    - `DataProviders`
        - Add interface for `DataProvider`.
        - Add interface for `BaseFilesystemProviders`.
- Add `read` and `write` methods in `DataProviders` with the context
manager.
    
    - `TransferIntegrations` and third-party transfers
- Add interface for `TransferIntegrations` and introduce the third-party
transfer approach
 
- Non-native transfers
    - Add `Dataprovider` for S3 and GCS.
    - Add a transfer workflow for S3 to GCS using a non-native approach.
    - Add a transfer workflow for GCS to S3 using a non-native approach.
    - Add example DAG for S3 to GCS implementation.
    - Add example DAG for GCS to S3 implementation.

- Third-party transfers
- Add `FivetranTransferIntegration` class for all transfers using
Fivetran.
- Implement `FivetranOptions` which inherits from `TransferParameters`
class to pass transfer configurations.
    - Implement a POC for Fivetran integration
    - Add example DAG for Fivetran implementation
- Fivetran POC with working DAG for transfer example (S3 to Snowflake)
when `connector_id` is passed.

- Document the APIs for Fivetran transfers on the notion here:
https://www.notion.so/astronomerio/Fivetran-3bd9ecfbdcae411faa49cb38595a4571

- MakeFile and Dockerfile along with docker-compose.yaml to build it
locally and on the container


## Does this introduce a breaking change?
No

### Checklist
- [x] Example DAG
- [x] Created tests which fail without the change (if possible)
- [x] Extended the README/documentation, if necessary

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
Co-authored-by: Felix Uellendall <feluelle@users.noreply.github.com>
  • Loading branch information
4 people committed Jan 13, 2023
0 parents commit 62c895a
Show file tree
Hide file tree
Showing 30 changed files with 2,006 additions and 0 deletions.
18 changes: 18 additions & 0 deletions Makefile
@@ -0,0 +1,18 @@
.PHONY: help

.DEFAULT_GOAL:= help

target = help

ifdef "$(target)"
target = $(target)
endif

container: ## Set up Airflow in container
@$(MAKE) -C mk -f container.mk $(target)

local: ## Set up local dev env
@$(MAKE) -C mk -f local.mk $(target)

help:
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-41s\033[0m %s\n", $$1, $$2}'
4 changes: 4 additions & 0 deletions README.md
@@ -0,0 +1,4 @@
Universal Transfer Operator

To build locally:
```make container target=build-run```
18 changes: 18 additions & 0 deletions dev/Dockerfile
@@ -0,0 +1,18 @@
FROM quay.io/astronomer/astro-runtime:7.1.0-base

USER root
RUN apt-get update -y && apt-get install -y git
RUN apt-get install -y --no-install-recommends \
build-essential \
libsasl2-2 \
libsasl2-dev \
libsasl2-modules
ENV SETUPTOOLS_USE_DISTUTILS=stdlib

COPY ../pyproject.toml ${AIRFLOW_HOME}/universal_transfer_operator/
# The following file are needed because version they are referenced from pyproject.toml
COPY ../README.md ${AIRFLOW_HOME}/universal_transfer_operator/
COPY ../src/universal_transfer_operator/__init__.py ${AIRFLOW_HOME}/universal_transfer_operator/src/universal_transfer_operator/__init__.py

RUN pip install -e "${AIRFLOW_HOME}/universal_transfer_operator[all]"
USER astro
187 changes: 187 additions & 0 deletions dev/docker-compose.yaml
@@ -0,0 +1,187 @@
---
version: '3'
x-airflow-common:
&airflow-common
image: astro-sdk-dev
build:
context: ..
dockerfile: dev/Dockerfile
environment:
&airflow-common-env
DB_BACKEND: postgres
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres:5432/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__LOAD_EXAMPLES: "False"
AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "True"
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: "5"
ASTRONOMER_ENVIRONMENT: local
AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES: airflow.* astro.*
AIRFLOW__LINEAGE__BACKEND: openlineage.lineage_backend.OpenLineageBackend
OPENLINEAGE_URL: http://host.docker.internal:5050/
OPENLINEAGE_NAMESPACE: "astro"
volumes:
- ./dags:/usr/local/airflow/dags
- ./logs:/usr/local/airflow/logs
- ./plugins:/usr/local/airflow/plugins
- ../../universal_transfer_operator:/usr/local/airflow/universal_transfer_operator
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy

services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
command: postgres -c 'idle_in_transaction_session_timeout=60000' # 1 minute timeout
volumes:
- postgres-db-volume:/var/lib/postgresql/data
ports:
- "5432:5432"
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always

redis:
image: redis:latest
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always

airflow-webserver:
<<: *airflow-common
command: airflow webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

airflow-scheduler:
<<: *airflow-common
command: airflow scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

airflow-worker:
<<: *airflow-common
command: airflow celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
environment:
<<: *airflow-common-env
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

airflow-triggerer:
<<: *airflow-common
command: airflow triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
# yamllint disable rule:line-length
command:
- -c
- |
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
fi
exec /entrypoint bash -c "
airflow db upgrade && \
airflow users create -r Admin -u admin -e admin -f admin -l admin -p admin && \
airflow connections import /usr/local/airflow/universal_transfer_operator/dev/connections.yaml || true && \
airflow version"
# yamllint enable rule:line-length
environment:
<<: *airflow-common-env

flower:
<<: *airflow-common
command: airflow celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
environment:
<<: *airflow-common-env
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

volumes:
postgres-db-volume:
93 changes: 93 additions & 0 deletions example_dags/example_universal_transfer_operator.py
@@ -0,0 +1,93 @@
import os
from datetime import datetime

from airflow import DAG

from universal_transfer_operator.constants import TransferMode
from universal_transfer_operator.datasets.file import File
from universal_transfer_operator.datasets.table import Metadata, Table
from universal_transfer_operator.integrations.fivetran import Connector, Destination, FiveTranOptions, Group
from universal_transfer_operator.universal_transfer_operator import UniversalTransferOperator

with DAG(
"example_universal_transfer_operator",
schedule_interval=None,
start_date=datetime(2022, 1, 1),
catchup=False,
) as dag:
transfer_non_native_gs_to_s3 = UniversalTransferOperator(
task_id="transfer_non_native_gs_to_s3",
source_dataset=File(path="gs://uto-test/uto/", conn_id="google_cloud_default"),
destination_dataset=File(path="s3://astro-sdk-test/uto/", conn_id="aws_default"),
)

transfer_non_native_s3_to_gs = UniversalTransferOperator(
task_id="transfer_non_native_s3_to_gs",
source_dataset=File(path="s3://astro-sdk-test/uto/", conn_id="aws_default"),
destination_dataset=File(
path="gs://uto-test/uto/",
conn_id="google_cloud_default",
),
)

transfer_fivetran_with_connector_id = UniversalTransferOperator(
task_id="transfer_fivetran_with_connector_id",
source_dataset=File(path="s3://astro-sdk-test/uto/", conn_id="aws_default"),
destination_dataset=Table(name="fivetran_test", conn_id="snowflake_default"),
transfer_mode=TransferMode.THIRDPARTY,
transfer_params=FiveTranOptions(conn_id="fivetran_default", connector_id="filing_muppet"),
)

transfer_fivetran_without_connector_id = UniversalTransferOperator(
task_id="transfer_fivetran_without_connector_id",
source_dataset=File(path="s3://astro-sdk-test/uto/", conn_id="aws_default"),
destination_dataset=Table(
name="fivetran_test",
conn_id="snowflake_default",
metadata=Metadata(
database=os.environ["SNOWFLAKE_DATABASE"], schema=os.environ["SNOWFLAKE_SCHEMA"]
),
),
transfer_mode=TransferMode.THIRDPARTY,
transfer_params=FiveTranOptions(
conn_id="fivetran_default",
connector_id="filing_muppet",
group=Group(name="test_group"),
connector=Connector(
service="s3",
config={
"schema": "s3",
"append_file_option": "upsert_file",
"prefix": "folder_path",
"pattern": "file_pattern",
"escape_char": "",
"skip_after": 0,
"list_strategy": "complete_listing",
"bucket": "astro-sdk-test",
"empty_header": True,
"skip_before": 0,
"role_arn": "arn::your_role_arn",
"file_type": "csv",
"delimiter": "",
"is_public": False,
"on_error": "fail",
"compression": "bz2",
"table": "fivetran_test",
"archive_pattern": "regex_pattern",
},
),
destination=Destination(
service="snowflake",
time_zone_offset="-5",
region="GCP_US_EAST4",
config={
"host": "your-account.snowflakecomputing.com",
"port": 443,
"database": "fivetran",
"auth": "PASSWORD",
"user": "fivetran_user",
"password": "123456",
},
),
),
)
33 changes: 33 additions & 0 deletions mk/container.mk
@@ -0,0 +1,33 @@
PHONY: build-run clean docs logs stop shell restart restart-all help

.DEFAULT_GOAL:= help

logs: ## View logs of the all the containers
docker compose -f ../dev/docker-compose.yaml logs --follow

stop: ## Stop all the containers
docker compose -f ../dev/docker-compose.yaml down

clean: ## Remove all the containers along with volumes
docker compose -f ../dev/docker-compose.yaml down --volumes --remove-orphans
rm -rf dev/logs

build-run: ## Build the Docker Image & then run the containers
docker compose -f ../dev/docker-compose.yaml up --build -d

docs: ## Build the docs using Sphinx
docker compose -f ../dev/docker-compose.yaml build
docker compose -f ../dev/docker-compose.yaml run --entrypoint /bin/bash airflow-init -c "cd universal_transfer_operator/docs && make clean html"
@echo "Documentation built in $(shell cd .. && pwd)/docs/_build/html/index.html"

restart: ## Restart Triggerer, Scheduler and Worker containers
docker compose -f ../dev/docker-compose.yaml restart airflow-triggerer airflow-scheduler airflow-worker

restart-all: ## Restart all the containers
docker compose -f ../dev/docker-compose.yaml restart

shell: ## Runs a shell within a container (Allows interactive session)
docker compose -f ../dev/docker-compose.yaml run --rm airflow-scheduler bash

help: ## Prints this message
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-41s\033[0m %s\n", $$1, $$2}'

0 comments on commit 62c895a

Please sign in to comment.