### Настройка Airflow

Для начала вам необходимо выполнить ряд команд чтобы настроить окружение для дальнейшей работы, это позволит первое время не заниматься настройкой среды исполнения, а сразу начать писать код и работать с Airflow.

In [1]:
# Установка Airflow
!pip install "apache-airflow[telegram]==2.1.4" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.1.4/constraints-3.7.txt"

# Инициализация базы данных
!airflow db init

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
DB: sqlite:////root/airflow/airflow.db
[[34m2023-02-26 20:56:31,390[0m] {[34mdb.py:[0m702} INFO[0m - Creating tables[0m
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
WARNI [airflow.models.crypto] empty cryptography key - values will not be stored encrypted.
WARNI [unusual_prefix_29f86b268ee2fc845f05f8c9939fae016b0c47cc_example_kubernetes_executor_config] Could not import DAGs in example_kubernetes_executor_config.py: No module named 'kubernetes'
WARNI [unusual_prefix_29f86b268ee2fc845f05f8c9939fae016b0c47cc_example_kubernetes_executor_config] Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']
Initialization done


In [2]:
# Создадим необходимые папки
!mkdir /root/airflow/dags
!touch /root/airflow/dags/dag.py

mkdir: cannot create directory ‘/root/airflow/dags’: File exists


In [None]:
# Включим веб-сервер
!airflow webserver -p 18273 -D

In [4]:
# Создадим пользователя Airflow
!airflow users create \
          --username admin \
          --firstname admin \
          --lastname admin \
          --role Admin \
          --email admin@example.org \
          -p 12345

admin already exist in the db


In [5]:
# Запуск шедулера
!airflow scheduler -D

  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/


In [6]:
# Последующие команды не имеют отношения к Airflow
# Они нужни только для корректной работы веб морды
# в среде Google Colab

!pip install pyngrok
!ngrok authtoken 2MBRgJ066swk4ThgvdLuWtytQAI_3WLNSDpTtkwHYi33C27nr # найти его можно https://dashboard.ngrok.com/get-started/setup 

# Эта команда просто отображет веб морду на другой адрес
# Его вы можете найти https://dashboard.ngrok.com/cloud-edge/status
# При каждом отключении ссылка будет меняться
!nohup ngrok http -log=stdout 18273 > /dev/null &

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Authtoken saved to configuration file: /root/.ngrok2/ngrok.yml
nohup: redirecting stderr to stdout


После запуска команды выше, перейдите по адресу в ngrok и подождите  пока появится DAG с именем dag

In [7]:
from airflow import DAG
from datetime import timedelta
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from datetime import datetime

import pandas as pd
import sqlite3
import requests
import io

CONN = sqlite3.connect('/content/example.db')


def extract_currency(date, **context):
   url = f'https://api.exchangerate.host/timeseries?start_date={date}&end_date={date}&base=EUR&symbols=USD&format=csv'
   data = pd.read_csv(url)
   context['ti'].xcom_push(key='return_value', value=data['rate'].values[0])

def extract_data(date, tmp_file, **context):
  url = f'https://raw.githubusercontent.com/dm-novikov/stepik_airflow_course/main/data_new/{date}.csv'
  pd.read_csv(url).to_csv(tmp_file, index=None)

def insert_to_db(tmp_file, table_name, conn=CONN, **context):
  data = pd.read_csv(tmp_file)
  data.to_sql(table_name, conn, if_exists='append', index=False)

# Создаем даг который создаст таблицу, запускаем один раз
dag_create_table= DAG(
    dag_id='dag_create_table',
    schedule_interval='@once',
    start_date=datetime(2022, 1, 26),
)

# Задача для создания таблицы в sqlite базе данных
create_table_data = SqliteOperator(
    task_id='create_table_data',
    sql="""CREATE TABLE join_data(
      currency TEXT, 
      value INTEGER, 
      date DATE, 
      rate FLOAT
      )
    """,
    dag=dag_create_table,
)


with DAG(dag_id='dag',
         start_date=datetime(2021, 1, 1),
         end_date=datetime(2021, 1, 4), 
         max_active_runs=1
    ) as dag:

  extract_currency = PythonOperator(
      task_id='extract_currency',
      python_callable=extract_currency,
      op_kwargs={
          'date': '{{ ds }}'
          }
  )

  extract_data = PythonOperator(
      task_id='extract_data',
      python_callable=extract_data,
      op_kwargs={
          'date': '{{ ds }}',
          'tmp_file': '/tmp/extract_data.csv'}
  )



  insert_data_to_db = PythonOperator(
      task_id='insert_data_to_db',
      python_callable=insert_to_db,
      op_kwargs={
          'table_name': 'data',
          'tmp_file': '/tmp/extract_data.csv'}
  )


  join_data= SqliteOperator(
    task_id='join_data',
    sql="""
    INSERT INTO join_data 
    SELECT date, currency, '{{ ti.xcom_pull(key='return_value') }}' as rate, value
    FROM data
    JOIN (select max(date) max_date from data) mx on data.date = mx.max_date
    WHERE currency = 'EUR'
    """
  )

   
  extract_data >> insert_data_to_db
  [extract_currency, insert_data_to_db] >> join_data

[[34m2023-02-26 20:57:13,710[0m] {[34mutils.py:[0m160} INFO[0m - NumExpr defaulting to 2 threads.[0m


In [15]:
# чтобы првоерить решение можете обратиться к вашей базе данных таким образом
%load_ext sql
%config SqlMagic.feedback=False 
%config SqlMagic.autopandas=True
%sql sqlite:////content/example.db
%sql select * from join_data

The sql extension is already loaded. To reload it, use:
  %reload_ext sql
 * sqlite:////content/example.db
(sqlite3.OperationalError) no such table: join_data
[SQL: select * from join_data]
(Background on this error at: http://sqlalche.me/e/13/e3q8)


Даг нужно написать в файл /root/airflow/dags/dag.py. Проверку можно сделать в веб интерфейсе. Прежде чем даг появится, может пройти ~ 2-3 минут.