### Настройка Airflow

Для начала вам необходимо выполнить ряд команд чтобы настроить окружение для дальнейшей работы, это позволит первое время не заниматься настройкой среды исполнения, а сразу начать писать код и работать с Airflow.

In [None]:
# Установка Airflow
!pip install apache-airflow==2.1.4

# Инициализация базы данных
!airflow db init

In [None]:
# Создадим необходимые папки
!mkdir /root/airflow/dags
!touch /root/airflow/dags/dag.py

In [None]:
# Включим веб-сервер
!airflow webserver -p 18273 -D

  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[[34m2023-04-19 09:12:35,385[0m] {[34mdagbag.py:[0m496} INFO[0m - Filling up the DagBag from [01m/dev/null[22m[0m
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:18273
Timeout: 120
Logfiles: - -
Access Logformat: 


In [None]:
# Создадим пользователя Airflow
!airflow users create \
          --username admin \
          --firstname admin \
          --lastname admin \
          --role Admin \
          --email admin@example.org \
          -p 12345

Admin user admin created


Поместите в dag.py следующий код.

```python
from airflow import DAG
from datetime import timedelta
from airflow.utils.dates import days_ago
from airflow.operators.dummy_operator import DummyOperator

dag = DAG('dag',schedule_interval=timedelta(days=1), start_date=days_ago(1))
t1 = DummyOperator(task_id='task_1', dag=dag)
t2 = DummyOperator(task_id='task_2',dag=dag)
t3 = DummyOperator(task_id='task_3',dag=dag)
t4 = DummyOperator(task_id='task_4',dag=dag)
t5 = DummyOperator(task_id='task_5',dag=dag)
t6 = DummyOperator(task_id='task_6',dag=dag)
t7 = DummyOperator(task_id='task_7',dag=dag)

[t1, t2]>>t5
t3>>t6
[t5,t6] >>  t7
t4
```

In [None]:
# Запуск шедулера
!airflow scheduler -D

  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/


In [None]:
# Последующие команды не имеют отношения к Airflow
# Они нужни только для корректной работы веб морды
# в среде Google Colab

!pip install pyngrok
!ngrok authtoken '' # найти его можно https://dashboard.ngrok.com/get-started/setup 

# Эта команда просто отображет веб морду на другой адрес
# Его вы можете найти https://dashboard.ngrok.com/cloud-edge/status
# При каждом отключении ссылка будет меняться


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Authtoken saved to configuration file: /root/.ngrok2/ngrok.yml


In [None]:
!nohup ngrok http 18273 > /dev/null &

nohup: redirecting stderr to stdout


После запуска команды выше, перейдите по адресу в ngrok и подождите  пока появится DAG с именем dag

### Задача на разработку


На прошло шаге мы написали простой пайплайн и обернули его в Airflow. Теперь давайте расширим его возможности с помощью макросов и Xcom. Во всех задачах вам необходимо использовать макрос {{ execution_date }} чтобы данные выгружались за определенный день. Для DAG вам нужно указать даты начала и конца исполнения задач с 2021-01-01 по 2021-01-04.

Вам необходимо обернуть ваш код в PythonOperator



*   Скачайте валюту за {{ ds }} и положите в Xcom, но не все а только значение
*   Скачайте данные за {{ ds }} и положите в БД sqlite (использовать PythonOperator чтобы скачать данные, можно использовать pandas)
​



In [None]:
import pandas as pd
import sqlite3
from datetime import datetime


CON = sqlite3.connect('currency.db', isolation_level=None)


from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.email_operator import EmailOperator
from airflow.operators.python_operator import PythonOperator


def extract_data(date, url, tmp_file):
    url = f'{url}{date}.csv'
    data = pd.read_csv(url).to_csv(tmp_file)
    return data

def sql_query(sql, conn=CON):
  df = None
  cursor = conn.cursor()
  query = cursor.execute(sql)
  if query.description:
    df = pd.DataFrame.from_records(data = query.fetchall(), columns = [column[0] for column in query.description])
  cursor.close()
  return df

def load_data(tmp_file, table_name, conn=CON, **context) -> None:
    """ Load to DB
    """
    data = pd.read_csv(tmp_file)# Изменение read_csv
    data["insert_time"] = pd.to_datetime("now")
    data.to_sql(table_name, conn, if_exists='replace', index=False)
    count = sql_query(f"select count(*) from '{table_name}'")
    print(f"Records in table '{table_name}':", count.values)

def extract_currency(date, tmp_file):
  url = 'https://api.exchangerate.host/timeseries?start_date=' + str(date) + '&end_date=' + str(date) + "&base='EUR'" + "&symbols=USD" + '&format=csv'
  data = pd.read_csv(url).to_csv(tmp_file)
  return data

def xcom_data(tmp_file, **kwargs) -> None:
    """ Load to xcom
    """
    data = pd.read_csv(tmp_file)
    rate = data["rate"].values
    print(f"Load to x-com:", rate)
    # kwargs['ti'].xcom_push(key='key', value=rate[0])
    return rate[0]
    

with DAG(dag_id='dag',
         default_args={'owner': 'airflow'},
         schedule_interval='@daily',
         start_date= datetime(2021, 1, 1),
         end_date=datetime(2021, 1, 4)
    ) as dag:

    extract_data = PythonOperator(
        task_id='extract_data',
        python_callable=extract_data,
        op_kwargs={'date': '{{ ds }}' ,
            'url': 'https://raw.githubusercontent.com/dm-novikov/stepik_airflow_course/main/data_new/',
            'tmp_file': '/tmp/file.csv'},
        dag=dag
    )

    load_data = PythonOperator(
        task_id='load_data',
        python_callable=load_data,
        dag=dag,
        op_kwargs={
            'tmp_file': '/tmp/file.csv',
            'table_name': 'currency'
        }
    )

    extract_currency = PythonOperator(
        task_id='extract_currency',
        python_callable=extract_currency,
        op_kwargs={'date': '{{ ds }}' ,
            'tmp_file': '/tmp/exchangerate.csv'},
        dag=dag
    )

    xcom_data = PythonOperator(
        task_id='xcom_data',
        python_callable=xcom_data,
        dag=dag,
        op_kwargs={
            'tmp_file': '/tmp/exchangerate.csv'
        }
    )


    extract_data >> load_data
    extract_currency >> xcom_data

In [None]:
# чтобы првоерить решение можете обратиться к вашей базе данных таким образом
%load_ext sql
%config SqlMagic.feedback=False 
%config SqlMagic.autopandas=True
%sql sqlite:////<ПУТЬ ДО БАЗЫ>
%sql select * from <ТАБЛИЦА>

(sqlite3.OperationalError) near "ДО": syntax error
[SQL: ДО БАЗЫ>]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
 * sqlite:////<ПУТЬ
(sqlite3.OperationalError) near "<": syntax error
[SQL: select * from <ТАБЛИЦА>]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
