In [6]:
#install dependencies

!pip3 install apache-airflow==2.3.3  #get airflow version fron Environment configuration for the airflow service in GCP.
!pip3 install apache-airflow-providers-google   #plugins related to gcp services

Collecting apache-airflow==2.3.3
  Downloading apache_airflow-2.3.3-py3-none-any.whl.metadata (112 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m112.2/112.2 kB[0m [31m252.1 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Collecting attrs<21.0,>=20.0 (from apache-airflow==2.3.3)
  Downloading attrs-20.3.0-py2.py3-none-any.whl.metadata (10 kB)
Collecting cattrs!=1.7.*,~=1.1 (from apache-airflow==2.3.3)
  Downloading cattrs-1.10.0-py3-none-any.whl.metadata (8.9 kB)
Collecting flask-appbuilder==4.1.2 (from apache-airflow==2.3.3)
  Downloading Flask_AppBuilder-4.1.2-py3-none-any.whl.metadata (8.7 kB)
Collecting graphviz>=0.12 (from apache-airflow==2.3.3)
  Downloading graphviz-0.20.3-py3-none-any.whl.metadata (12 kB)
Collecting markdown>=3.0 (from apache-airflow==2.3.3)
  Downloading Markdown-3.6-py3-none-any.whl.metadata (7.0 kB)
Collecting pathspec~=0.9.0 (from apache-airflow==2.3.3)
  Downloading pathspec-0.9.0-py2.py3-none-any.whl.metadata (12 kB)
Collecting apispec

In [None]:
#update gcloud
!gcloud components update  #update gcloud 


In [None]:
#interact with Cloud composer in GCP using CLI using gcloud commands:

!gcloud composer 

In [None]:
#getting airflow version for my current GCP environment:

!gcloud composer environments run --location=us-central1 --project=project_id airflow_env_name version

In [None]:
#list dags in airflow env:
 
!gcloud composer environments run --location=us-central1 --project=project_id airflow_env_name dags list

In [None]:
#list all the commands under dags:

!gcloud composer environments run --location=us-central1 --project=project_id airflow_env_name dags

In [None]:
#Tutorial dag :

from datetime import datetime, timedelta
#we would need models to build dag object which will instantiate a Dag
from airflow import models

#Operators: We need this to operate operations
from airflow.operators.bash import BashOperator

with models.DAG(
    'tutorial',
    #These args will get passed on to each operator
    #You can override them on a per-task basis during operator initialization
    default_args = {
        'depends_on_past':False,
        'email':['airflow@example.com'],
        'email_on_failure':False,
        'email_on_retry':False,
        'retries':1,
        'retry_delay':timedelta(minutes=5)
    };
    description = 'A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021,1,1),
    catchup=False,
    tags=['example'],
) as dag:

#t1 and t2 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command = 'date'
)

t2 = BashOperator(
    task_id = 'sleep',
    depends_on_past = False,
    bash_command = 'sleep 5',
    retries =3
)

t1 >> t2

In [None]:
#Instantiating a dataproc workflow template using Airflow:
from airflow.providers.google.cloud.operators.dataproc import (DataprocInstantiateWorkflowTemplateOperator,)
from airflow import models
import datetime
from airflow.utils.dates import days_ago

project_id = "project_id"
region = 'us-central-1'

default_args = {
    "project_id":project_id,
    "region":region,
    "start_date":days_ago(1)
}

with models.DAG(
    "daily_product_revenue",
    default_args = default_args,
    schedule_interval = datetime.timedelta(days=1),
) as dag:
    
    start_template_job = DataprocInstantiateWorkflowTemplateOperator(
        task_id = "daily_product_revenue_wf"
        template_id = "dataproc_wf_id"
    )




In [None]:
#Running complex data pipelines using Airflow with Dataproc jobs:

import datetime
from airflow import models
from airflow.models import variable
from airflow.providers.google.cloud.operators.dataproc import (DataprocSubmitSparkSqlJobOperator,
                                                               DataprocSubmitPySparkJobOperator)
from airflow.utils.dates import days_ago

project_id = variable.get('project_id')
region = variable.get('region')
bucket_name = variable.get('bucket_name')
cluster_name = variable.get('cluster_name')

default_args = {
    "project_id":project_id,
    "region":region,
    "start_date":days_ago(1)
}

with models.DAG(
    "daily_product_revenue_vars_wf_dag",
    default_args = default_args,
    schedule_interval = datetime.timedelta(days=1)
) as dag:
    
    task_cleanup = DataprocSubmitSparkSqlJobOperator(
        task_id = 'run_db_cleanup',
        query_uri = f'gs://{bucket_name}/script_path'
    )

    task_convert_orders = DataprocSubmitSparkSqlJobOperator(
        task_id = 'run_convert_orders',
        query_uri=f'gs://{bucket_name}/script_path',
        variables = {
            'bucket_name':f'gs://{bucket_name}',
            'table_name':'orders'
        }
    )

    task_convert_order_items = DataprocSubmitSparkSqlJobOperator(
        task_id = 'run_convert_order_items',
        query_uri = f'gs:://{bucket_name}/script_path',
        variables = {
            'bucket_name':f'gs://{bucket_name}',
            'table_name':'order_items'
        }
    )

    task_compute_daily_product_revenue = DataprocSubmitSparkSqlJobOperator(
        task_id = 'run_compute_daily_product_revenue',
        query_uri = f'gs://{bucket_name}/script_path',
        variable = {
            'bucket_name':f'gs://{bucket_name}'
        }
    )
    
    task_load_dpr_bq = DataprocSubmitPysparkJobOperator(
        task_id = 'run_load_dpr_bq',
        main = f'gs://{bucket_name}/script_path',
        dataproc_jars = ['jar1', 'jar2', 'etc'],
        dataproc_properties = {
            'spark.app.name':'Bigquery Loader - Daily Product Revenue',
            'spark.submit.deployMode':'cluster',
            'spark.yarn.appMaster.Env.DATA_URI':f'gs://{bucket_name}/file_path',
            'spark.yarn.appMaster.Env.PROJECT_ID':project_id,
            'spark.yarn.appMaster.Env.DATASET_NAME':'retail',
            'spark.yarn.appMaster.Env.GCS_TEMP_BUCKET':bucket_name
        }
    )

    task_cleanup >> task_convert_orders
    task_cleanup >> task_convert_order_items

    task_convert_orders >> task_compute_daily_product_revenue
    task_convert_order_items >> task_compute_daily_product_revenue

    task_compute_daily_product_revenue >> task_load_dpr_bq