### Настройка Airflow

Для начала вам необходимо выполнить ряд команд чтобы настроить окружение для дальнейшей работы, это позволит первое время не заниматься настройкой среды исполнения, а сразу начать писать код и работать с Airflow.

In [1]:
# Установка Airflow
!pip install apache-airflow==2.1.4

# Инициализация базы данных
!airflow db init

Collecting apache-airflow==2.1.4
  Downloading apache_airflow-2.1.4-py3-none-any.whl (5.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.3/5.3 MB[0m [31m51.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting alembic<2.0,>=1.2 (from apache-airflow==2.1.4)
  Downloading alembic-1.12.0-py3-none-any.whl (226 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m226.0/226.0 kB[0m [31m32.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting argcomplete~=1.10 (from apache-airflow==2.1.4)
  Downloading argcomplete-1.12.3-py2.py3-none-any.whl (38 kB)
Collecting attrs<21.0,>=20.0 (from apache-airflow==2.1.4)
  Downloading attrs-20.3.0-py2.py3-none-any.whl (49 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.3/49.3 kB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
Collecting clickclick>=1.2 (from apache-airflow==2.1.4)
  Downloading clickclick-20.10.2-py2.py3-none-any.whl (7.4 kB)
Collecting colorlog<6.0,>=4.0.2 (from apache-airflow==2.1.4)
  Dow

In [2]:
# Создадим необходимые папки
!mkdir /root/airflow/dags
!touch /root/airflow/dags/dag.py

In [3]:
# Включим веб-сервер
!airflow webserver -p 18273 -D

  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[[34m2023-10-24 09:39:03,568[0m] {[34mdagbag.py:[0m496} INFO[0m - Filling up the DagBag from [01m/dev/null[22m[0m
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:18273
Timeout: 120
Logfiles: - -
Access Logformat: 


In [4]:
# Создадим пользователя Airflow
!airflow users create \
          --username admin \
          --firstname admin \
          --lastname admin \
          --role Admin \
          --email admin@example.org \
          -p 12345

Admin user admin created


#Поместите в dag.py следующий код.

from airflow import DAG


from datetime import timedelta
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator

dag = DAG('dag',schedule_interval=timedelta(days=1), start_date=days_ago(1))

def print_context(**context):
    context['ti'].xcom_push(key='context_len', value=str(context))

run_this = PythonOperator(
    task_id='print_the_context',
    python_callable=print_context,
    dag=dag,
)

In [5]:
#Поместите в dag.py следующий код.

from airflow import DAG


from datetime import timedelta
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator

dag = DAG('dag',schedule_interval=timedelta(days=1), start_date=days_ago(1))

def print_context(**context):
    context['ti'].xcom_push(key='context_len', value=str(context))

run_this = PythonOperator(
    task_id='print_the_context',
    python_callable=print_context,
    dag=dag,
)

In [6]:
# Запуск шедулера
!airflow scheduler -D

  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/


In [8]:
# Последующие команды не имеют отношения к Airflow
# Они нужни только для корректной работы веб морды
# в среде Google Colab

!pip install pyngrok
!ngrok authtoken 2WzHNlq2B5lQPxYzm2veE1riXTd_3fmsV44g6dEWoC2eKkqWA # найти его можно https://dashboard.ngrok.com/get-started/setup

# Эта команда просто отображет веб морду на другой адрес
# Его вы можете найти https://dashboard.ngrok.com/cloud-edge/status
# При каждом отключении ссылка будет меняться
!nohup ngrok http 18273 > /dev/null &

Authtoken saved to configuration file: /root/.ngrok2/ngrok.yml
nohup: redirecting stderr to stdout


После запуска команды выше, перейдите по адресу в ngrok и подождите  пока появится DAG с именем dag

### Задача на разработку

В [прошлой проектной работе](https://stepik.org/lesson/556651/step/14?unit=550660) вы реализовали ETL скрипт который выгружает данные из сторонних источников. Теперь я предлагаю вам взять небольшую его часть и переписать с помощью Airflow. Использовать только 1 дату 2021-01-01 можно прописать в функции напрямую, захардкодить.

Вам необходимо обернуть ваш код в PythonOperator

*   Скачайте валюту за 2021-01-01 и положите в БД sqlite (использовать
PythonOperator чтобы скачать данные, можно использовать pandas)
*   Скачайте данные за 2021-01-01 и положите в БД sqlite (использовать PythonOperator чтобы скачать данные, можно использовать pandas)


Даг нужно написать в файл /root/airflow/dags/dag.py. Проверку можно сделать в веб интерфейсе. Прежде чем даг появится, может пройти ~ 2-3 минут.

In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
import sqlite3
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago



#currency
def extract_currency(date):
    currency = pd.read_csv(f'https://raw.githubusercontent.com/datanlnja/airflow_course/main/excangerate/{date}.csv')
    conn = sqlite3.connect('sqlite3.db')
    currency.to_sql( name= 'currency', con = conn, if_exists='append')


#data
def extract_data(date):
    data = pd.read_csv(f'https://raw.githubusercontent.com/datanlnja/airflow_course/main/data/{date}.csv')
    conn = sqlite3.connect('sqlite3.db')
    data.to_sql( name= 'data', con = conn, if_exists='append')


#dag
dag =  DAG('dag',
           schedule_interval=timedelta(days=1),
           start_date=days_ago(1))

#python operator currency
extract_c = PythonOperator(
    task_id='extract_currency', # Имя задачи внутри Dag
    python_callable=extract_currency, # Запускаемая Python функция, описана выше

    # Чтобы передать аргументы в нашу функцию
    # их следует передавать через следующий код
    op_kwargs={'date': '2021-01-01'},
    dag=dag
    )


#python operator data
extract_d = PythonOperator(
    task_id='extract_data', # Имя задачи внутри Dag
    python_callable=extract_data, # Запускаемая Python функция, описана выше

    # Чтобы передать аргументы в нашу функцию
    # их следует передавать через следующий код
    op_kwargs={'date': '2021-01-01'},
    dag=dag
    )

extract_c >> extract_d

<Task(PythonOperator): extract_data>

In [None]:
# чтобы првоерить решение можете обратиться к вашей базе данных таким образом

import sqlite3

# Создаем подключение к базе данных
conn = sqlite3.connect('<ПУТЬ ДО БАЗЫ>')

# Создаем курсор для выполнения SQL-запросов
cursor = conn.cursor()

# Выполняем запрос к таблице
cursor.execute("SELECT * FROM JOIN_DATA")

# Извлекаем все строки из результата запроса
rows = cursor.fetchall()

# Выводим результаты
for row in rows:
    print(row)

# Закрываем соединение
conn.close()