-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move fivetran sync provider code to this repo #33
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
c863ed8
Move fivetran sync provider code to this repo
pankajastro 6958972
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] b4bfe79
Fix static check
pankajastro 3e25ad1
Copy tests
pankajastro e771ee5
Temp restrict pydantic version
pankajastro aeed44a
Revert pydantic version
pankajastro 047b47c
App review suggestion
pankajastro File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
from datetime import datetime, timedelta | ||
|
||
from airflow import DAG | ||
|
||
from fivetran_provider_async.operators import FivetranOperator | ||
from fivetran_provider_async.sensors import FivetranSensor | ||
|
||
default_args = { | ||
"owner": "Airflow", | ||
"start_date": datetime(2021, 4, 6), | ||
} | ||
|
||
dag = DAG( | ||
dag_id="example_fivetran", | ||
default_args=default_args, | ||
schedule_interval=timedelta(days=1), | ||
catchup=False, | ||
) | ||
|
||
with dag: | ||
fivetran_sync_start = FivetranOperator( | ||
task_id="fivetran-task", | ||
fivetran_conn_id="fivetran_default", | ||
connector_id="{{ var.value.connector_id }}", | ||
) | ||
|
||
fivetran_sync_wait = FivetranSensor( | ||
task_id="fivetran-sensor", | ||
fivetran_conn_id="fivetran_default", | ||
connector_id="{{ var.value.connector_id }}", | ||
poke_interval=5, | ||
) | ||
|
||
fivetran_sync_start >> fivetran_sync_wait |
3 changes: 1 addition & 2 deletions
3
fivetran_provider_async/example_dags/example_fivetran_async.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
88 changes: 88 additions & 0 deletions
88
fivetran_provider_async/example_dags/example_fivetran_bigquery.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
from airflow import DAG | ||
from airflow.operators.empty import EmptyOperator | ||
from airflow.providers.google.cloud.operators.bigquery import BigQueryValueCheckOperator | ||
from airflow.providers.google.cloud.sensors.bigquery import BigQueryTableExistenceSensor | ||
from airflow.utils.dates import datetime | ||
from fivetran_provider.operators.fivetran import FivetranOperator | ||
|
||
from fivetran_provider_async.sensors import FivetranSensor | ||
|
||
TABLE = "forestfires" | ||
DATASET = "google_sheets" | ||
# These args will get passed on to each operator | ||
# You can override them on a per-task basis during operator initialization | ||
default_args = { | ||
"owner": "astronomer", | ||
"depends_on_past": False, | ||
"start_date": datetime(2021, 7, 7), | ||
"email": ["noreply@astronomer.io"], | ||
"email_on_failure": False, | ||
} | ||
|
||
with DAG( | ||
"example_fivetran_bigquery", | ||
default_args=default_args, | ||
description="", | ||
schedule_interval=None, | ||
catchup=False, | ||
) as dag: | ||
""" | ||
### Simple EL Pipeline with Data Integrity and Quality Checks | ||
Before running the DAG, set the following in an Airflow or Environment Variables: | ||
- key: gcp_project_id | ||
value: [gcp_project_id] | ||
- key: connector_id | ||
value: [connector_id] | ||
Fully replacing [gcp_project_id] & [connector_id] with the actual IDs. | ||
What makes this a simple data quality case is: | ||
1. Absolute ground truth: the local CSV file is considered perfect and immutable. | ||
2. No transformations or business logic. | ||
3. Exact values of data to quality check are known. | ||
""" | ||
|
||
""" | ||
#### FivetranOperator & FivetranSensor | ||
Calling Fivetran to begin data movement from Google Sheets to BigQuery | ||
The FivetranSensor monitors the status of the Fivetran data sync | ||
""" | ||
fivetran_sync_start = FivetranOperator( | ||
task_id="fivetran-task", | ||
fivetran_conn_id="fivetran_default", | ||
connector_id="{{ var.value.connector_id }}", | ||
) | ||
|
||
fivetran_sync_wait = FivetranSensor( | ||
task_id="fivetran-sensor", | ||
fivetran_conn_id="fivetran_default", | ||
connector_id="{{ var.value.connector_id }}", | ||
poke_interval=5, | ||
) | ||
|
||
""" | ||
#### BigQuery row validation task | ||
Ensure that data was copied to BigQuery correctly, i.e. the table and dataset | ||
exist. | ||
""" | ||
validate_bigquery = BigQueryTableExistenceSensor( | ||
task_id="validate_bigquery", | ||
project_id="{{ var.value.gcp_project_id }}", | ||
dataset_id=DATASET, | ||
table_id="forestfires", | ||
) | ||
|
||
""" | ||
#### Row-level data quality check | ||
Run a data quality check on a few rows, ensuring that the data in BigQuery | ||
matches the ground truth in the correspoding JSON file. | ||
""" | ||
check_bq_row_count = BigQueryValueCheckOperator( | ||
task_id="check_row_count", | ||
sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE}", | ||
pass_value=516, | ||
use_legacy_sql=False, | ||
) | ||
|
||
done = EmptyOperator(task_id="done") | ||
|
||
fivetran_sync_start >> fivetran_sync_wait >> validate_bigquery | ||
validate_bigquery >> check_bq_row_count >> done |
127 changes: 127 additions & 0 deletions
127
fivetran_provider_async/example_dags/example_fivetran_bqml.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
from datetime import datetime, timedelta | ||
|
||
from airflow import DAG | ||
from airflow.operators.python_operator import BranchPythonOperator | ||
from airflow.providers.google.cloud.operators.bigquery import ( | ||
BigQueryExecuteQueryOperator, | ||
BigQueryGetDataOperator, | ||
) | ||
from airflow.providers.ssh.operators.ssh import SSHOperator | ||
|
||
from fivetran_provider_async.operators import FivetranOperator | ||
from fivetran_provider_async.sensors import FivetranSensor | ||
|
||
# EDIT WITH YOUR PROJECT ID & DATASET NAME | ||
PROJECT_ID = "YOUR PROJECT ID" | ||
DATASET_NAME = "bqml" | ||
DESTINATION_TABLE = "dbt_ads_bqml_preds" | ||
|
||
TRAINING_QUERY = ( | ||
"CREATE OR REPLACE MODEL bqml.dbt_ads_airflow_model " | ||
"OPTIONS " | ||
"(model_type = 'ARIMA_PLUS', " | ||
"time_series_timestamp_col = 'parsed_date', " | ||
"time_series_data_col = 'daily_impressions', " | ||
"auto_arima = TRUE, " | ||
"data_frequency = 'AUTO_FREQUENCY', " | ||
"decompose_time_series = TRUE " | ||
") AS " | ||
"SELECT " | ||
"timestamp(date_day) as parsed_date, " | ||
"SUM(impressions) as daily_impressions " | ||
"FROM `" + PROJECT_ID + ".bqml.ad_reporting` " | ||
"GROUP BY date_day;" | ||
) | ||
|
||
SERVING_QUERY = ( | ||
"SELECT string(forecast_timestamp) as forecast_timestamp, " | ||
"forecast_value, " | ||
"standard_error, " | ||
"confidence_level, " | ||
"prediction_interval_lower_bound, " | ||
"prediction_interval_upper_bound, " | ||
"confidence_interval_lower_bound, " | ||
"confidence_interval_upper_bound " | ||
"FROM ML.FORECAST(MODEL `" | ||
+ PROJECT_ID | ||
+ ".bqml.dbt_ads_airflow_model`,STRUCT(30 AS horizon, 0.8 AS confidence_level));" | ||
) | ||
|
||
|
||
def ml_branch(ds, **kwargs): | ||
if "train" in kwargs["params"] and kwargs["params"]["train"]: | ||
return "train_model" | ||
else: | ||
return "get_predictions" | ||
|
||
|
||
default_args = { | ||
"owner": "Airflow", | ||
"start_date": datetime(2021, 4, 6), | ||
} | ||
|
||
dag = DAG( | ||
dag_id="example_fivetran_bqml", | ||
default_args=default_args, | ||
schedule_interval=timedelta(days=1), | ||
catchup=False, | ||
) | ||
|
||
with dag: | ||
linkedin_sync = FivetranOperator( | ||
task_id="linkedin-sync", | ||
fivetran_conn_id="fivetran_default", | ||
connector_id="{{ var.value.linkedin_connector_id }}", | ||
) | ||
|
||
linkedin_sensor = FivetranSensor( | ||
task_id="linkedin-sensor", | ||
fivetran_conn_id="fivetran_default", | ||
connector_id="{{ var.value.linkedin_connector_id }}", | ||
poke_interval=5, | ||
) | ||
|
||
twitter_sync = FivetranOperator( | ||
task_id="twitter-sync", | ||
fivetran_conn_id="fivetran_default", | ||
connector_id="{{ var.value.twitter_connector_id }}", | ||
) | ||
|
||
twitter_sensor = FivetranSensor( | ||
task_id="twitter-sensor", | ||
fivetran_conn_id="fivetran_default", | ||
connector_id="{{ var.value.twitter_connector_id }}", | ||
poke_interval=5, | ||
) | ||
|
||
dbt_run = SSHOperator( | ||
task_id="dbt_ad_reporting", | ||
command="cd dbt_ad_reporting ; ~/.local/bin/dbt run -m +ad_reporting", | ||
ssh_conn_id="dbtvm", | ||
) | ||
|
||
ml_branch = BranchPythonOperator(task_id="ml_branch", python_callable=ml_branch, provide_context=True) | ||
|
||
train_model = BigQueryExecuteQueryOperator( | ||
task_id="train_model", sql=TRAINING_QUERY, use_legacy_sql=False | ||
) | ||
|
||
get_preds = BigQueryExecuteQueryOperator( | ||
task_id="get_predictions", | ||
sql=SERVING_QUERY, | ||
use_legacy_sql=False, | ||
destination_dataset_table=DATASET_NAME + "." + DESTINATION_TABLE, | ||
write_disposition="WRITE_APPEND", | ||
) | ||
|
||
print_preds = BigQueryGetDataOperator( | ||
task_id="print_predictions", dataset_id=DATASET_NAME, table_id=DESTINATION_TABLE | ||
) | ||
|
||
linkedin_sync >> linkedin_sensor | ||
twitter_sync >> twitter_sensor | ||
|
||
[linkedin_sensor, twitter_sensor] >> dbt_run | ||
|
||
dbt_run >> ml_branch >> [train_model, get_preds] | ||
get_preds >> print_preds |
50 changes: 50 additions & 0 deletions
50
fivetran_provider_async/example_dags/example_fivetran_dbt.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
from datetime import datetime, timedelta | ||
|
||
from airflow import DAG | ||
from airflow.providers.ssh.operators.ssh import SSHOperator | ||
|
||
from fivetran_provider_async.operators import FivetranOperator | ||
from fivetran_provider_async.sensors import FivetranSensor | ||
|
||
default_args = { | ||
"owner": "Airflow", | ||
"start_date": datetime(2021, 4, 6), | ||
} | ||
|
||
with DAG( | ||
dag_id="ad_reporting_dag", | ||
default_args=default_args, | ||
schedule_interval=timedelta(days=1), | ||
catchup=False, | ||
) as dag: | ||
linkedin_sync = FivetranOperator( | ||
task_id="linkedin-ads-sync", | ||
connector_id="{{ var.value.linkedin_connector_id }}", | ||
) | ||
|
||
linkedin_sensor = FivetranSensor( | ||
task_id="linkedin-sensor", | ||
connector_id="{{ var.value.linkedin_connector_id }}", | ||
poke_interval=600, | ||
) | ||
|
||
twitter_sync = FivetranOperator( | ||
task_id="twitter-ads-sync", | ||
connector_id="{{ var.value.twitter_connector_id }}", | ||
) | ||
|
||
twitter_sensor = FivetranSensor( | ||
task_id="twitter-sensor", | ||
connector_id="{{ var.value.twitter_connector_id }}", | ||
poke_interval=600, | ||
) | ||
|
||
dbt_run = SSHOperator( | ||
task_id="dbt_ad_reporting", | ||
command="cd dbt_ad_reporting ; ~/.local/bin/dbt run -m +ad_reporting", | ||
ssh_conn_id="dbtvm", | ||
) | ||
|
||
linkedin_sync >> linkedin_sensor | ||
twitter_sync >> twitter_sensor | ||
[linkedin_sensor, twitter_sensor] >> dbt_run |
40 changes: 40 additions & 0 deletions
40
fivetran_provider_async/example_dags/example_fivetran_xcom.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
import time | ||
from datetime import datetime, timedelta | ||
|
||
from airflow import DAG | ||
from airflow.operators.python import PythonOperator | ||
|
||
from fivetran_provider_async.operators import FivetranOperator | ||
from fivetran_provider_async.sensors import FivetranSensor | ||
|
||
default_args = { | ||
"owner": "Airflow", | ||
"start_date": datetime(2021, 4, 6), | ||
"provide_context": True, | ||
} | ||
|
||
dag = DAG( | ||
dag_id="example_fivetran_xcom", | ||
default_args=default_args, | ||
schedule_interval=timedelta(days=1), | ||
catchup=False, | ||
) | ||
|
||
with dag: | ||
fivetran_operator = FivetranOperator( | ||
task_id="fivetran-operator", | ||
fivetran_conn_id="fivetran_default", | ||
connector_id="{{ var.value.connector_id }}", | ||
) | ||
|
||
delay_task = PythonOperator(task_id="delay_python_task", python_callable=lambda: time.sleep(60)) | ||
|
||
fivetran_sensor = FivetranSensor( | ||
task_id="fivetran-sensor", | ||
fivetran_conn_id="fivetran_default", | ||
connector_id="{{ var.value.connector_id }}", | ||
poke_interval=5, | ||
xcom="{{ task_instance.xcom_pull('fivetran-operator', key='return_value') }}", | ||
) | ||
|
||
fivetran_operator >> delay_task >> fivetran_sensor |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we testing dags in CI? or AstroCloud?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
manual testing is done on Astro cloud
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we havent automated it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we have manual testing. I have attached the dagrun screenshot