<a href="https://colab.research.google.com/github/PavelNikishin/Airflow-for-analyst/blob/main/Final_tasks.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Настройка окружения

Для начала вам необходимо выполнить ряд команд чтобы настроить окружение для дальнейшей работы, это позволит первое время не заниматься долгим деплоем, а сразу начать писать код и работать с airflow.

In [None]:
# Установка Airflow
!pip install apache-airflow==2.5.3 # 2.1.4 
!pip install pyngrok

!airflow db init

# Создадим необходимые папки
!mkdir /root/airflow/dags
!touch /root/airflow/dags/dag.py

In [None]:
# Включим веб-сервер
!airflow webserver -p 18273 -D

# Создадим пользователя Airflow
!airflow users create \
          --username admin \
          --firstname admin \
          --lastname admin \
          --role Admin \
          --email admin@example.org \
          -p 12345

# Запуск шедулера
!airflow scheduler -D

#ngrok authtoken <YOUR TOKEN> # найти его можно https://dashboard.ngrok.com/get-started/setup 

!nohup ngrok http 18273 > /dev/null &

# 6.1 BashOperator 

Используя BashOperator написать команду которая будет чистить папку с логами Airflow. Подсказка, хватит простого rm. В ответе прислать полный код дага.

In [None]:
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

with DAG('dag', schedule_interval='@daily', start_date=days_ago(1)) as dag: 

    cli_command = BashOperator(
        task_id="bash_command",
        bash_command="rm -r /root/airflow/logs/ ")

# 6.2 SimpleHttpOperator 

Используя SimpleHttpOperator обратиться по адресу

https://www.random.org/integers/?num=1&min=1&max=5&col=1&base=2&format=plain 

и записать результат в xcom. Подсказка, нужно прописать адрес в http_conn_id, а путь подключения в поле endpoint. В ответе прислать полный код дага.

In [None]:
!airflow connections add 'random' \
    --conn-type 'HTTP' \
    --conn-host 'https://www.random.org/'

In [None]:
# used airflow==2.5.3
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from pendulum import today

with DAG('dag', schedule='@daily', start_date=today('UTC').add(days=-1)) as dag: 

    task_get_op = SimpleHttpOperator(
        task_id="get_op",
        method="GET",
        http_conn_id='random',
        endpoint="integers/?num=1&min=1&max=5&col=1&base=2&format=plain",
        dag=dag)

# 6.3 Пуш в Xcom  

Ваше задание переопределить стандартный оператор Dummy так чтобы он пушил в Xcom случайное число от 0 до 9.

In [None]:
from airflow import DAG
from datetime import timedelta
from airflow.utils.dates import days_ago
from airflow.models import BaseOperator
from random import randint

class DummyOperator(BaseOperator):

    ui_color = '#e8f7e4'
    #inherits_from_dummy_operator = True

    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)

    def execute(self, context):
        # ВАШ КОД
        return randint(0,9)

dag = DAG('dag',schedule_interval='@daily', start_date=days_ago(1))
t1 = DummyOperator(task_id='task_1', dag=dag)
t2 = DummyOperator(task_id='task_2',dag=dag)

t1 >> t2

# 6.4 Connections и Variables

In [None]:
!airflow connections add 'custom_conn_id' \
    --conn-type 'HTTP' \
    --conn-host 'google.com' \
    --conn-login  'user' \
    --conn-password  '12345'

In [None]:
# used airflow==2.5.3
from airflow import DAG
from airflow.decorators import task
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from pendulum import today

with DAG('dag', schedule='@daily', start_date=today('UTC').add(days=-1)) as dag: 

    @task.python(task_id='conn_to_variables')
    def conn_to_variables(conn_id: str):
        c = BaseHook.get_connection(conn_id)
        Variable.set(key='host', value=c.host, serialize_json=True)
        Variable.set(key='login', value=c.login, serialize_json=True)
        Variable.set(key='password', value=c.password, serialize_json=True)

    conn_to_variables('custom_conn_id')    

In [None]:
!airflow variables get password

"12345"


# 6.5 HttpSensor 

Используя HttpSensor и код ниже обратиться по адресу

https://www.random.org/integers/?num=1&min=1&max=5&col=1&base=10&format=plain

Если ответ будет равен 5 то вернуть True чтобы сенсор завершился, также добавить параметр окончания действия сенсора 1 минутой

In [None]:
!airflow connections delete 'http_default' 
!airflow connections add 'http_default' \
    --conn-type 'HTTP' \
    --conn-host 'https://www.random.org/'

In [None]:
import airflow
from airflow import DAG
from airflow.sensors.http_sensor import HttpSensor

dag = DAG('dag',schedule_interval='@daily', start_date=airflow.utils.dates.days_ago(1),)

def response_check(response, task_instance):
    #print(response.status_code, response.text)
    if response.status_code == 200:
        if int(response.text) == 5:
            return True
        else:
            return False
    else:
        return False
    
sensor = HttpSensor(
    task_id='http_sensor',
    http_conn_id='http_default',
    endpoint='integers/?num=1&min=1&max=5&col=1&base=10&format=plain',
    response_check=response_check,
    poke_interval=10,
    timeout=60,
    dag=dag)

# 6.6 Сгенерировать 5 DAG

Сгенерировать 5 DAG таким образом чтобы в каждом DAG генерировалось по 10 Task идущих параллельно. Использовать DummyOperator для задач. Имена DAG выбрать по такому шаблону dag_number.

In [None]:
# used airflow==2.5.3
from airflow import DAG
from airflow.decorators import task
from airflow.operators.dummy_operator import DummyOperator
from pendulum import today

def create_dag(dag_id, default_args):
  
    with DAG(dag_id, default_args=default_args) as dag: 

        for n in range(1, 11):

            DummyOperator(task_id=f'task_{n}')

    return dag

# build a dag for each number in range
for n in range(1, 6):
    dag_id = f'dag_{n}'

    default_args = {'owner': 'airflow',
                    'start_date': today('UTC').add(days=-1),
                    'schedule': '@daily'}

    globals()[dag_id] = create_dag(dag_id, default_args)