### Настройка окружения

Для начала вам необходимо выполнить ряд команд чтобы настроить окружение для дальнейшей работы, это позволит первое время не заниматься долгим деплоем, а сразу начать писать код и работать с airflow.

In [1]:
import sys
sys.path.append("/root/airflow/plugins")
sys.path.append("/root/airflow/dags")

In [None]:
# Установка Airflow
!pip install python-telegram-bot
!pip install apache-airflow==2.1.4
!pip install apache-airflow-providers-http
!pip install apache-airflow-providers-telegram

!airflow db init

In [3]:
# Создадим папку dags
# В этой папке лежат скрипты для создания дагов
# Это стандартное имя для  данной папки
!mkdir /root/airflow/dags
!mkdir /root/airflow/plugins
!mkdir /root/airflow/plugins/custom_plugin
!mkdir /root/airflow/plugins/custom_plugin/operators
!mkdir /root/airflow/plugins/custom_plugin/hooks
!mkdir /root/airflow/plugins/custom_plugin/sensors
!touch /root/airflow/dags/dag.py
!touch /root/airflow/dags/telegram.py
!touch /root/airflow/plugins/custom_plugin/__init__.py
!touch /root/airflow/plugins/custom_plugin/operators/__init__.py
!touch /root/airflow/plugins/custom_plugin/operators/custom_operator.py
!touch /root/airflow/plugins/custom_plugin/hooks/__init__.py
!touch /root/airflow/plugins/custom_plugin/sensors/__init__.py

In [None]:
# Последующие команды не имеют отношения к Airflow
# Они нужни только для корректоной работы веб приложения
# в среде Google Colab

!pip install pyngrok

!ngrok authtoken 1yMOS9zzS85dgM5tk81bQkkL7Lq_6YMxq7UbkBK6LMWPmuPNd # найти его можно https://dashboard.ngrok.com/get-started/setup 

In [None]:
!airflow users create \
          --username admin \
          --firstname admin \
          --lastname admin \
          --role Admin \
          --email admin@example.org \
          -p 12345

### Практика

#### Первый плагин

Поместите в /root/airflow/plugins/custom_operator/operators/custom_operator.py следующий код

```python
from airflow.models.baseoperator import BaseOperator

class HelloOperator(BaseOperator):

    def __init__(
            self,
            name: str,
            **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name

    def execute(self, context):
        message = "Hello {}".format(self.name)
        print(message)
        return message
```



Поместите в dag.py следующий код

**Задание #1**

Сообщение в логе будет ответом к задаче.

```python
from custom_plugin.operators.custom_operator import HelloOperator
from airflow import DAG
from datetime import timedelta, datetime
from airflow.utils.dates import days_ago
 
# Создадим объект класса DAG
dag = DAG(
    'dag',
    start_date=datetime(2015, 12, 1),
    schedule_interval='@daily')
hello_task = HelloOperator(task_id='sample-task', name='foo_bar', dag=dag)
```

#### Нотификация

Вставьте данный код в dag.py

**Задание #2**

Сообщение в телеграм будет ответом.

```python
from airflow import DAG
from datetime import timedelta, datetime
from airflow.utils.dates import days_ago
from custom_plugin.operators.custom_operator import HelloOperator
from airflow.providers.telegram.operators.telegram import TelegramOperator

 
def on_success_callback(context):
    send_message = TelegramOperator(
        task_id='send_message_telegram',
        telegram_conn_id='telegram_id',
        chat_id='-1001525736146',
        text='Hello from Airflow!',
        dag=dag)
    return send_message.execute(context=context)

# Создадим объект класса DAG
dag = DAG(
    'dag',
    start_date=datetime(2015, 12, 1),
    on_success_callback=on_success_callback,
    schedule_interval='@daily')
hello_task = HelloOperator(task_id='sample-task', name='foo_bar', dag=dag)
```

Создайте чат и бота в телеграмме, добавьте бота в чат выдам админские права, создайте **connection** (с именем telegram_id) со следующими параметрами


- Password: API Token который можно получить через https://telegram.me/BotFather
- Host: ID вашего чата, можно получить добавив в чат @raw_data_bot и написать /my_id
- Connection Type: HTTP


**Можете воспользоваться моим каналом:**

- https://t.me/stepiktelegramtest
- TOKEN=1726511562:AAFAfbPk_9gbGjq4VlOg3jbhaXHCxaNRTFA
- ID вашего чата: -1001525736146


In [None]:
# Перезапустим шедулер
# Это должно ускорить поиск нашего дага
# Нужно подождать пару минут пока даг появится
!airflow scheduler -D

In [None]:
# Запуск веб сервера
!airflow webserver -p 18273 -D

In [19]:
# Эта команда просто отображет веб морду на другой адрес
# Его вы можете найти https://dashboard.ngrok.com/endpoints/status
# После запуска посмотрите работает ли интерйфейс, потом выключите данную строку
!nohup ngrok http -log=stdout 18273 > /dev/null &

nohup: redirecting stderr to stdout
