### Настройка Airflow

Для начала вам необходимо выполнить ряд команд чтобы настроить окружение для дальнейшей работы, это позволит первое время не заниматься настройкой среды исполнения, а сразу начать писать код и работать с Airflow.

In [2]:
# Установка Airflow
!pip install apache-airflow==2.1.4

# Инициализация базы данных
!airflow db init

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting apache-airflow==2.1.4
  Downloading apache_airflow-2.1.4-py3-none-any.whl (5.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.3/5.3 MB[0m [31m59.2 MB/s[0m eta [36m0:00:00[0m
Collecting lockfile>=0.12.2
  Downloading lockfile-0.12.2-py2.py3-none-any.whl (13 kB)
Collecting sqlalchemy-jsonfield~=1.0
  Downloading SQLAlchemy_JSONField-1.0.1.post0-py3-none-any.whl (10 kB)
Collecting importlib-resources~=1.4
  Downloading importlib_resources-1.5.0-py2.py3-none-any.whl (21 kB)
Collecting marshmallow-oneofschema>=2.0.1
  Downloading marshmallow_oneofschema-3.0.1-py2.py3-none-any.whl (5.8 kB)
Collecting apache-airflow-providers-imap
  Downloading apache_airflow_providers_imap-3.1.1-py3-none-any.whl (17 kB)
Collecting croniter<1.1,>=0.3.17
  Downloading croniter-1.0.15-py2.py3-none-any.whl (16 kB)
Collecting apache-airflow-providers-http
  Downloading apache_a

In [3]:
# Создадим необходимые папки
!mkdir /root/airflow/dags
!touch /root/airflow/dags/dag.py

In [4]:
# Включим веб-сервер
!airflow webserver -p 18273 -D

  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[[34m2023-02-03 11:15:19,008[0m] {[34mdagbag.py:[0m496} INFO[0m - Filling up the DagBag from [01m/dev/null[22m[0m
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:18273
Timeout: 120
Logfiles: - -
Access Logformat: 


In [6]:
# Создадим пользователя Airflow
!airflow users create \
          --username admin \
          --firstname admin \
          --lastname admin \
          --role Admin \
          --email admin@example.org \
          -p 12345

Admin user admin created


Поместите в dag.py следующий код.

```python
from airflow import DAG
from datetime import timedelta
from airflow.utils.dates import days_ago
from airflow.operators.dummy_operator import DummyOperator

dag = DAG('dag',schedule_interval=timedelta(days=1), start_date=days_ago(1))
t1 = DummyOperator(task_id='task_1', dag=dag)
t2 = DummyOperator(task_id='task_2',dag=dag)
t3 = DummyOperator(task_id='task_3',dag=dag)
t4 = DummyOperator(task_id='task_4',dag=dag)
t5 = DummyOperator(task_id='task_5',dag=dag)
t6 = DummyOperator(task_id='task_6',dag=dag)
t7 = DummyOperator(task_id='task_7',dag=dag)

[t1, t2]>>t5
t3>>t6
[t5,t6] >>  t7
t4
```

In [7]:
# Запуск шедулера
!airflow scheduler -D

  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/


In [8]:
# Последующие команды не имеют отношения к Airflow
# Они нужни только для корректной работы веб морды
# в среде Google Colab

!pip install pyngrok
!ngrok authtoken 2L8ZNEgBuKsRo9mOlrAlMTVg6Xr_4QjZzqinZBf5npXekvyDV # найти его можно https://dashboard.ngrok.com/get-started/setup 

# Эта команда просто отображет веб морду на другой адрес
# Его вы можете найти https://dashboard.ngrok.com/cloud-edge/status
# При каждом отключении ссылка будет меняться
!nohup ngrok http -log=stdout 18273 > /dev/null &

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyngrok
  Downloading pyngrok-5.2.1.tar.gz (761 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m761.3/761.3 KB[0m [31m25.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyngrok
  Building wheel for pyngrok (setup.py) ... [?25l[?25hdone
  Created wheel for pyngrok: filename=pyngrok-5.2.1-py3-none-any.whl size=19792 sha256=d532212e8ae7362c2f206bad8bd4d20f6127fc9e3c0a1f9e457ecfcdb9581c5a
  Stored in directory: /root/.cache/pip/wheels/5d/f2/70/526da675d32f17577ec47ac4c663084efe39d47c826b6c3bb1
Successfully built pyngrok
Installing collected packages: pyngrok
Successfully installed pyngrok-5.2.1
Authtoken saved to configuration file: /root/.ngrok2/ngrok.yml
nohup: redirecting stderr to stdout


После запуска команды выше, перейдите по адресу в ngrok и подождите  пока появится DAG с именем dag

### Задача на разработку

В [прошлой проектной работе](https://stepik.org/lesson/556651/step/14?unit=550660) вы реализовали ETL скрипт который выгружает данные из сторонних источников. Теперь я предлагаю вам взять небольшую его часть и переписать с помощью Airflow. Использовать только 1 дату 2021-01-01 можно прописать в функции напрямую, захардкодить.

Вам необходимо обернуть ваш код в PythonOperator

*   Скачайте валюту за 2021-01-01 и положите в БД sqlite (использовать
PythonOperator чтобы скачать данные, можно использовать pandas)
*   Скачайте данные за 2021-01-01 и положите в БД sqlite (использовать PythonOperator чтобы скачать данные, можно использовать pandas)


Даг нужно написать в файл /root/airflow/dags/dag.py. Проверку можно сделать в веб интерфейсе. Прежде чем даг появится, может пройти ~ 2-3 минут.

In [44]:
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator

import pandas as pd
import sqlite3
import requests

import os

CONN = sqlite3.connect('kursach_1.db')

#    /tmp/tmp_file_currency.csv


def extract_currency(date, tmp_file_currency):
    url = 'https://api.exchangerate.host/timeseries?start_date=' + date + '&end_date=' + date + '&base=EUR&symbols=USD'
    response = requests.get(url)
    data = response.json()
    
    df_exchangerate = pd.DataFrame(columns = ['base', 'start_date', 'end_date','rates_usd'])
    df_exchangerate = df_exchangerate.append({'base':data['base'], 'start_date':data['start_date'], 'end_date':data['end_date'],
                                             'rates_usd':data['rates'][date]['USD']}, ignore_index = True)
    df_exchangerate.to_csv(tmp_file_currency)
    #return df_exchangerate

def extract_git(date, tmp_file_git):
    
    url = 'https://raw.githubusercontent.com/dm-novikov/stepik_airflow_course/main/data_new/' + date + '.csv'
    
    df_git = pd.DataFrame(columns = ['currency', 'value', 'date'])
    df_git = pd.read_csv(url, on_bad_lines='skip')
    df_git.to_csv(tmp_file_git)

    #return df_git

def join_df(tmp_file_currency, tmp_file_git, tmp_file_join):
  df_currency = pd.read_csv(tmp_file_currency)
  df_git = pd.read_csv(tmp_file_git)
  df_join = df_currency.merge(df_git, how='inner', left_on='start_date', right_on='date')
  df_join.to_csv(tmp_file_join)
  os.remove(tmp_file_currency)
  os.remove(tmp_file_git)

def insert_to_db(tmp_file_join, table_name, conn = CONN):
    data = pd.read_csv(tmp_file_join)
    data.to_sql(name=table_name, con=conn, index=False, if_exists='append')
    os.remove(tmp_file_join)

with DAG(dag_id='dag',
         default_args={'owner': 'airflow'},
         schedule_interval='@daily', # Интервал запусков
         start_date=days_ago(1) # Начальная точка запуска
    ) as dag:

    extract_currency = PythonOperator(
        task_id='extract_currency',
        python_callable=extract_currency,
        op_kwargs={
            'date': '2021-01-01', 
            'tmp_file_currency': '/tmp/tmp_file_currency.csv'}
    )

    extract_git = PythonOperator(
        task_id='extract_git',
        python_callable=extract_git,
        op_kwargs={
            'date': '2021-01-01',
            'tmp_file_git': '/tmp/tmp_file_git.csv'}
    )

    transform_df = PythonOperator(
        task_id='transform_df',
        python_callable=join_df,
        op_kwargs={
            'tmp_file_git': '/tmp/tmp_file_git.csv',
            'tmp_file_currency': '/tmp/tmp_file_currency.csv',
            'tmp_file_join': '/tmp/tmp_file_join.csv'}
    )

    insert_to_db = PythonOperator(
        task_id='insert_to_db',
        python_callable=insert_to_db,
        op_kwargs={
            'table_name': 'JOIN_DATA',
            'tmp_file_join': '/tmp/tmp_file_join.csv'}
    ) 

[extract_currency, extract_git] >> transform_df >> insert_to_db



<Task(PythonOperator): insert_to_db>

In [41]:
CONN = sqlite3.connect('kursach_1.db')


def extract_currency(date, tmp_file_currency):
    url = 'https://api.exchangerate.host/timeseries?start_date=' + date + '&end_date=' + date + '&base=EUR&symbols=USD'
    response = requests.get(url)
    data = response.json()
    
    df_exchangerate = pd.DataFrame(columns = ['base', 'start_date', 'end_date','rates_usd'])
    df_exchangerate = df_exchangerate.append({'base':data['base'], 'start_date':data['start_date'], 'end_date':data['end_date'],
                                             'rates_usd':data['rates'][date]['USD']}, ignore_index = True)
    df_exchangerate.to_csv(tmp_file_currency)
    #return df_exchangerate

def extract_git(date, tmp_file_git):
    
    url = 'https://raw.githubusercontent.com/dm-novikov/stepik_airflow_course/main/data_new/' + date + '.csv'
    
    df_git = pd.DataFrame(columns = ['currency', 'value', 'date'])
    df_git = pd.read_csv(url, on_bad_lines='skip')
    df_git.to_csv(tmp_file_git)

    #return df_git

def join_df(tmp_file_currency, tmp_file_git, tmp_file_join):
  df_currency = pd.read_csv(tmp_file_currency)
  df_git = pd.read_csv(tmp_file_git)
  df_join = df_currency.merge(df_git, how='inner', left_on='start_date', right_on='date')
  df_join.to_csv(tmp_file_join)
  os.remove(tmp_file_currency)
  os.remove(tmp_file_git)

def insert_to_db(tmp_file_join, table_name, conn = CONN):
    data = pd.read_csv(tmp_file_join)
    data.to_sql(name=table_name, con=conn, index=False, if_exists='append')
    os.remove(tmp_file_join)



extract_currency(date='2021-01-01', tmp_file_currency='/tmp/tmp_file_currency.csv')
extract_git(date='2021-01-01', tmp_file_git='/tmp/tmp_file_git.csv')
join_df(tmp_file_currency='/tmp/tmp_file_currency.csv', tmp_file_git='/tmp/tmp_file_git.csv', tmp_file_join='/tmp/tmp_file_join.csv')
insert_to_db(tmp_file_join='/tmp/tmp_file_join.csv', table_name='JOIN_DATA', conn = CONN)

In [42]:
CONN = sqlite3.connect('kursach_1.db')
all_table = pd.read_sql_query("SELECT name FROM sqlite_master WHERE type='table'", CONN)
all_table

Unnamed: 0,name
0,JOIN_DATA


In [43]:
CONN = sqlite3.connect('kursach_1.db')
data = pd.read_sql(sql='select * from JOIN_DATA', con = CONN)
data


Unnamed: 0.1,Unnamed: 0,Unnamed: 0_x,base,start_date,end_date,rates_usd,Unnamed: 0_y,currency,value,date
0,0,0,EUR,2021-01-01,2021-01-01,1.217582,0,EUR,38,2021-01-01
1,1,0,EUR,2021-01-01,2021-01-01,1.217582,1,EUR,65,2021-01-01
2,2,0,EUR,2021-01-01,2021-01-01,1.217582,2,EUR,74,2021-01-01
3,3,0,EUR,2021-01-01,2021-01-01,1.217582,3,EUR,42,2021-01-01
4,4,0,EUR,2021-01-01,2021-01-01,1.217582,4,EUR,23,2021-01-01
5,5,0,EUR,2021-01-01,2021-01-01,1.217582,5,EUR,48,2021-01-01
6,6,0,EUR,2021-01-01,2021-01-01,1.217582,6,EUR,86,2021-01-01
7,7,0,EUR,2021-01-01,2021-01-01,1.217582,7,EUR,74,2021-01-01
8,8,0,EUR,2021-01-01,2021-01-01,1.217582,8,EUR,24,2021-01-01


In [None]:
# чтобы првоерить решение можете обратиться к вашей базе данных таким образом
%load_ext sql
%config SqlMagic.feedback=False 
%config SqlMagic.autopandas=True
%sql sqlite:////<ПУТЬ ДО БАЗЫ>
%sql select * from <ТАБЛИЦА>