<a href="https://colab.research.google.com/github/annvorosh/GB/blob/ETL/ETL_L07.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ETL: автоматизация подготовки данных (семинары)
## Урок 7. Построение пайплайнов и визуализация потоков данных в Airflow
— Зарегистрируйтесь в ОрепWeatherApi (https://openweathermap.org/api)
— Создайте ETL, который получает температуру в заданной вами локации, и
дальше делает ветвление:

• В случае, если температура больше 15 градусов цельсия — идёт на ветку, в которой есть оператор, выводящий на
экран «тепло»;
• В случае, если температура ниже 15 градусов, идёт на ветку с оператором, который выводит в консоль «холодно».

Оператор ветвления должен выводить в консоль полученную от АРI температуру.

— Приложите скриншот графа и логов работы оператора ветвленния.

In [1]:
!pip install apache-airflow

Collecting apache-airflow
  Downloading apache_airflow-2.8.1-py3-none-any.whl (13.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.3/13.3 MB[0m [31m23.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting alembic<2.0,>=1.6.3 (from apache-airflow)
  Downloading alembic-1.13.1-py3-none-any.whl (233 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m233.4/233.4 kB[0m [31m13.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting argcomplete>=1.10 (from apache-airflow)
  Downloading argcomplete-3.2.1-py3-none-any.whl (42 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.3/42.3 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting asgiref (from apache-airflow)
  Downloading asgiref-3.7.2-py3-none-any.whl (24 kB)
Collecting colorlog<5.0,>=4.0.2 (from apache-airflow)
  Downloading colorlog-4.8.0-py2.py3-none-any.whl (10 kB)
Collecting configupdater>=3.1.1 (from apache-airflow)
  Downloading ConfigUpdater-3.2-py2.py3-none-any.whl (

In [2]:
!pip install requests



In [3]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import requests

# Определение даты начала выполнения DAG
start_date = datetime(2024, 1, 1)

# Определение интервала выполнения DAG
schedule_interval = timedelta(days=1)

# Создание объекта DAG
dag = DAG(
    'weather_etl',
    start_date=start_date,
    schedule_interval=schedule_interval,
)

# Функция для получения температуры по API
def get_temperature(location):
    response = requests.get(f'https://api.openweathermap.org/data/2.5/weather?q={location}&appid=c5e2524ab03a711ea93b95ef76fead96')
    data = response.json()
    temperature = data['main']['temp']
    return temperature

# Функция для вывода сообщения в консоль в зависимости от температуры
def print_temperature_message(**kwargs):
    location = kwargs.get('location', 'Vyborg')
    temperature = kwargs['ti'].xcom_pull(task_ids='get_temperature')

    if temperature > 15:
        print(f"Weather in {location}: {temperature}°C - warm")
    else:
        print(f"Weather in {location}: {temperature}°C - cold")

# Оператор для получения температуры
get_temperature_operator = PythonOperator(
    task_id='get_temperature',
    python_callable=get_temperature,
    op_kwargs={'location': 'Vyborg'},
    provide_context=True,
    dag=dag,
)

# Оператор ветвления
branch_operator = PythonOperator(
    task_id='branch_operator',
    python_callable=print_temperature_message,
    provide_context=True,
    dag=dag,
)

# Установка зависимостей между операторами
get_temperature_operator >> branch_operator


<Task(PythonOperator): branch_operator>