-
Notifications
You must be signed in to change notification settings - Fork 1
/
operator_plugin.py
90 lines (78 loc) · 2.39 KB
/
operator_plugin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#Instructions
#In this exercise, we’ll consolidate repeated code into Operator Plugins
#1 - Move the data quality check logic into a custom operator
#2 - Replace the data quality check PythonOperators with our new custom operator
#3 - Consolidate both the S3 to RedShift functions into a custom operator
#4 - Replace the S3 to RedShift PythonOperators with our new custom operator
#5 - Execute the DAG
import datetime
import logging
from airflow import DAG
from airflow.contrib.hooks.aws_hook import AwsHook
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators import (
HasRowsOperator,
PostgresOperator,
PythonOperator,
S3ToRedshiftOperator
)
import sql_statements
dag = DAG(
"lesson3.exercise1",
start_date=datetime.datetime(2018, 1, 1, 0, 0, 0, 0),
end_date=datetime.datetime(2018, 12, 1, 0, 0, 0, 0),
schedule_interval="@monthly",
max_active_runs=1
)
create_trips_table = PostgresOperator(
task_id="create_trips_table",
dag=dag,
postgres_conn_id="redshift",
sql=sql_statements.CREATE_TRIPS_TABLE_SQL
)
copy_trips_task = S3ToRedshiftOperator(
task_id="load_trips_from_s3_to_redshift",
dag=dag,
table="trips",
redshift_conn_id="redshift",
aws_credentials_id="aws_credentials",
s3_bucket="udac-data-pipelines",
s3_key="divvy/partitioned/{execution_date.year}/{execution_date.month}/divvy_trips.csv"
)
#
# TODO: Replace this data quality check with the HasRowsOperator
#
check_trips = HasRowsOperator(
task_id='check_trips_data',
dag=dag,
redshift_conn_id='redshift',
table='trips'
)
create_stations_table = PostgresOperator(
task_id="create_stations_table",
dag=dag,
postgres_conn_id="redshift",
sql=sql_statements.CREATE_STATIONS_TABLE_SQL,
)
copy_stations_task = S3ToRedshiftOperator(
task_id="load_stations_from_s3_to_redshift",
dag=dag,
redshift_conn_id="redshift",
aws_credentials_id="aws_credentials",
s3_bucket="udac-data-pipelines",
s3_key="divvy/unpartitioned/divvy_stations_2017.csv",
table="stations"
)
#
# TODO: Replace this data quality check with the HasRowsOperator
#
check_stations = HasRowsOperator(
task_id='check_stations_data',
dag=dag,
redshift_conn_id='redshift',
table='stations'
)
create_trips_table >> copy_trips_task
create_stations_table >> copy_stations_task
copy_stations_task >> check_stations
copy_trips_task >> check_trips