<a href="https://colab.research.google.com/github/DA04/airflow_training/blob/main/%D0%94%D0%B5%D0%BD%D0%B8%D1%81_%D0%90%D1%84%D0%B0%D0%BD%D0%B0%D1%81%D1%8C%D0%B5%D0%B2_%22%D0%9F%D1%80%D0%BE%D0%B5%D0%BA%D1%82%D0%BD%D0%B0%D1%8F_%D1%80%D0%B0%D0%B1%D0%BE%D1%82%D0%B0_3_ipynb%22.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Настройка Airflow

Для начала вам необходимо выполнить ряд команд чтобы настроить окружение для дальнейшей работы, это позволит первое время не заниматься настройкой среды исполнения, а сразу начать писать код и работать с Airflow.

In [None]:
# Установка Airflow
!pip install apache-airflow==2.1.4

# Инициализация базы данных
!airflow db init

In [None]:
# Создадим необходимые папки
!mkdir /root/airflow/dags
!touch /root/airflow/dags/dag.py

In [None]:
# Включим веб-сервер
!airflow webserver -p 18273 -D

In [None]:
# Создадим пользователя Airflow
!airflow users create \
          --username admin \
          --firstname admin \
          --lastname admin \
          --role Admin \
          --email admin@example.org \
          -p 12345

Admin user admin created


Поместите в dag.py следующий код.

```python
from airflow import DAG
from datetime import timedelta
from airflow.utils.dates import days_ago
from airflow.operators.dummy_operator import DummyOperator

dag = DAG('dag',schedule_interval=timedelta(days=1), start_date=days_ago(1))
t1 = DummyOperator(task_id='task_1', dag=dag)
t2 = DummyOperator(task_id='task_2',dag=dag)
t3 = DummyOperator(task_id='task_3',dag=dag)
t4 = DummyOperator(task_id='task_4',dag=dag)
t5 = DummyOperator(task_id='task_5',dag=dag)
t6 = DummyOperator(task_id='task_6',dag=dag)
t7 = DummyOperator(task_id='task_7',dag=dag)
```

In [None]:
# Запуск шедулера
!airflow scheduler -D

  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/


In [None]:
# Последующие команды не имеют отношения к Airflow
# Они нужни только для корректной работы веб морды
# в среде Google Colab

!pip install pyngrok
!ngrok authtoken <YOUR TOKEN> # найти его можно https://dashboard.ngrok.com/get-started/setup 

# Эта команда просто отображет веб морду на другой адрес
# Его вы можете найти https://dashboard.ngrok.com/cloud-edge/status
# При каждом отключении ссылка будет меняться
!nohup ngrok http -log=stdout 18273 > /dev/null &

nohup: redirecting stderr to stdout


После запуска команды выше, перейдите по адресу в ngrok и подождите  пока появится DAG с именем dag

In [None]:
# Решение на разработку
import pandas as pd
from airflow import DAG
from datetime import timedelta, datetime
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow.providers.sqlite.hooks.sqlite import SqliteHook
from airflow.providers.sqlite.operators.sqlite import SqliteOperator


dag = DAG(
    dag_id='dag',
    schedule_interval='@daily',
    start_date=datetime(2022, 1, 26),
)

# Задача для создания таблицы в sqlite базе данных
create_table_data = SqliteOperator(
    task_id='create_table_data',
    sql="""
    CREATE TABLE if not exists data (
        currency TEXT,
        value INT,
        date DATE
    );
    """,
    dag=dag,
)
# Задача для создания таблицы в sqlite базе данных
create_table_currency = SqliteOperator(
    task_id='create_table_currency',
    sql="""
    CREATE TABLE if not exists currency (
        date DATE,
        code TEXT,
        rate TEXT,
        base TEXT,
        start_date DATE,
        end_date DATE
    );
    """,
    dag=dag,
)


def insert_sqlite_hook(url, table_name):
    sqlite_hook = SqliteHook()
    # Скачиваем данные
    data = pd.read_csv(url)
    # Вставляем данные
    sqlite_hook.insert_rows(table=table_name, rows=data.to_records(index=False), target_fields=list(data.columns))

# Задача для добавления данных из pandas DataFrame
insert_sqlite_data = PythonOperator(
    task_id='insert_sqlite_data',
    python_callable=insert_sqlite_hook,
    op_kwargs={'url': 'https://raw.githubusercontent.com/dm-novikov/stepik_airflow_course/main/data_new/2021-01-01.csv', 'table_name': 'data'},
    dag=dag,
)
# Задача для добавления данных из pandas DataFrame
insert_sqlite_currency = PythonOperator(
    task_id='insert_sqlite_currency',
    python_callable=insert_sqlite_hook,
    op_kwargs={'url': 'https://api.exchangerate.host/timeseries?start_date=2021-01-01&end_date=2021-01-01&base=EUR&format=csv&symbols=USD', 'table_name': 'currency'},
    dag=dag,
)

# Ваше задание

# Создать таблицу через SQLiteOperator
create_table_join = SqliteOperator(
    task_id='create_table_join',
    sql="""create table if not exists join_data (
    currency TEXT,
    value INT,
    date DATE,
    code TEXT,
    rate TEXT,
    base TEXT,
    start_date DATE,
    end_date DATE
    );
    """,
    dag=dag,
)

# Объедините данные через SQLiteOperator
join_data = SqliteOperator(
    task_id='join_data',
    sql="""insert into join_data 
    (currency, value, date, code, rate, base, start_date, end_date) 
    select data.currency, data.value, data.date, currency.code,
    currency.rate, currency.base, currency.start_date, currency.end_date
    from data inner join currency on data.date = currency.date""",
    dag=dag,
)

[create_table_data, create_table_currency, create_table_join] >> insert_sqlite_data >> insert_sqlite_currency >> join_data

In [None]:
# чтобы првоерить решение можете обратиться к вашей базе данных таким образом
%load_ext sql
%config SqlMagic.feedback=False 
%config SqlMagic.autopandas=True
%sql sqlite://content/example.db
%sql select * from join_data

Даг нужно написать в файл /root/airflow/dags/dag.py. Проверку можно сделать в веб интерфейсе. Прежде чем даг появится, может пройти ~ 2-3 минут.