##Тестовое задание на позицию Data Engineer.

Задание включает в себя 3 небольших задачи. В каждой задаче **рекомендуется** оставлять комментарии, код должен быть оформлен согласно **PEP8**. Задания необходимо выполнить без использования Pandas и готовых библиотек для API Яндекс.Погоды.

**Перед выполнением тестового задания, необходимо скопировать notebook к себе на диск, и выполнять тестовое в своей копии**.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

---
####1. Выгрузка данных из API Яндекс.Погоды и преобразование их в csv

Используя API Яндекс.Погоды, необходимо выгрузить прогнозные данные за 7 дней для Москвы, Казани, Санкт-Петербурга, Тулы и Новосибирска. В случае, если API отдает пустые значения за день, то их необходимо удалить.

Информация должна быть представлена по часам с расширенным набором полей по осадкам.

Полученный json необходимо преобразовать в csv, формат:

\begin{array}{ccc}
\text{city}, \text{date}, \text{hour}, \text{temperature_c}, \text{pressure_mm}, \text{is_rainy} \\
Moscow, 19.08.2023, 12, 27, 750, 0 \\
Moscow, 19.08.2023, 13, 27, 750, 0 \\
... \\
Kazan, 19.08.2023, 12, 20, 770, 1 \\
Kazan, 19.08.2023, 13, 21, 770, 0 \\
\end{array}

**Описание полей:**

city - Город

date - Дата события

hour - Часы

temperature_c - Температура в Цельсиях

pressure_mm - Давление в мм ртутного столба

is_rainy - Флаг наличия дождя в конкретный день и час (см. документацию по API - описание полей).

Полученный csv необходимо выгрузить на облачный диск и в конце решения предоставить ссылку.

**Ссылка на получение ключа:** https://yandex.ru/dev/weather/doc/dg/concepts/about.html#about__onboarding


**Дополнительно ответьте на вопросы:** какие существуют возможные пути ускорения получения данных по API и их преобразования? Возможно ли эти способы использовать в Airflow?

In [None]:
from typing import Any

import requests
from requests import Response


def get_weather_data(lat: str, lon: str) -> Any:
    """
        Функция получает данные о погоде для заданных координат.

        Args:
            lat (str): Широта.
            lon (str): Долгота.

        Returns:
            dict: Словарь с данными о погоде
            или
            int: код состояния ответа HTTP.
    """
    # Ключ для подключения к API
    access_key: str = 'f427b830-9b36-40da-951e-db33ed3b4ef8'
    # Адрес для отправки GET-запросов
    url: str = 'https://api.weather.yandex.ru/v2/forecast'
    # Устанавливаем параметры запроса
    # широта, долгота, язык ответа, срок прогноза в днях, наличие почасового прогноза, подробный прогноз осадков
    params: dict[str, int | bool | str] = {
        'lat': lat,
        'lon': lon,
        'lang': 'en_EN',
        'limit': 7,
        'hours': True,
        'extra': True
    }
    try:
        response: Response = requests.get(url, params=params, headers={'X-Yandex-API-Key': access_key})
        # Проверяем статус ответа
        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as err:
        # Возвращаем код состояния ответа HTTP
        return err.response.status_code
    except requests.exceptions.ConnectionError as err:
        # Если возникает ошибка подключения, возвращаем соответствующее сообщение
        return err


if __name__ == '__main__':
    # Создаем словарь для хранения координат населенных пунктов
    cities: dict[str, tuple[str, str]] = {
        'Moscow': ('55.75396', '37.620393'),
        'Kazan': ('55.018803', '82.933952'),
        'Saint Petersburg': ('59.937500', '30.308611'),
        'Tula': ('54.204838', '37.618492'),
        'Novosibirsk': ('55.018803', '82.933952')
    }
    # Используем контекстный менеджер, чтобы автоматически закрыть файл после завершения операций
    with open('cities_weather.csv', mode='w') as file:
        for city, (lat, lon) in cities.items():
            weather_data: Any = get_weather_data(lat, lon)
            if isinstance(weather_data, dict):
                for forecast in weather_data['forecasts']:
                    for hour in forecast['hours']:
                        weather: tuple[Any, Any, Any, Any, Any, Any] = (
                            weather_data['geo_object']['locality']['name'],
                            forecast['date'],
                            hour['hour'],
                            hour['temp'],
                            hour['pressure_mm'],
                            hour['prec_type']
                        )
                        # Преобразуем каждый элемент кортежа в строку и объединяем их через запятую
                        line: str = ','.join(str(item) for item in weather)
                        file.write(line + '\n')
            else:
                file.write(f'Ошибка при получении данных. Код состояния HTTP: {weather_data}')


In [None]:
# Для ускорения получения данных по API можно воспозьзоваться следующими вариантами.
# 1. Использовать класс session из библиотеки requests для поддержки пула соединений. Это позволит повторно использовать существующие соединения.
# 2. Переписать последовательные синхронные запросы на асинхронные. Например, при помощи asyncio или aiohttp.
# 3. Использовать кэширование. Это позволит сохранять результаты предыдущих запросов и использовать их при повторных запросах.
# 4. Использование NumPy и Pandas. Вероятно они помогут ускорить код при работе с API.
#
# В Airflow можно использовать PythonOperator для этих целей.

---
####2. Загрузка данных в БД (PostgreSQL).

Используя полученный csv файл, необходимо загрузить данных в PostgreSQL. Предварительно в БД необходимо создать схемы: для приемки сырых данных и для будущих агрегирующих таблиц.

При создании таблиц приветствуется использование партицирования и индексирования (по возможности и необходимости).

В решении необходимо показать код загрузки данных, скрипты создания схем и таблиц для пункта 2 и 2.1.

Подсказка: для решения задачи нужно развернуть БД, мы рекомендуем это сделать локально с помощью докера.

In [None]:
import psycopg2

# Креды для подключения к БД. Используем схему raw_data
conn = psycopg2.connect(
    host="localhost",
    dbname="postgres",
    user="postgres",
    password="12345",
    options=f'-c search_path=raw_data',
)

# Создаем курсор
cur = conn.cursor()
# Загружаем данные используя контекстный менеджер, чтобы автоматически закрыть файл после завершения операций
with open('cities_weather.csv', 'r') as file:
    cur.copy_from(file, 'cities_weather', sep=',')
# Сохраняем изменения
conn.commit()
# Закрываем соединение
cur.close()
conn.close()


In [None]:
# Создаем схему для сырых данных
# CREATE SCHEMA raw_data;

# Ddl таблицы cities_weather в БД
# CREATE TABLE raw_data.cities_weather (
# 	city varchar(50) NULL,
# 	"date" varchar(10) NULL,
# 	"hour" int2 NULL,
# 	temperature_c int2 NULL,
# 	pressure_mm int2 NULL,
# 	is_rainy int2 NULL
# );
#
# Оптимизация.
# Исходя из самых распространенных запросов к таблицам можно было бы создать индексы (например, btree) по тем полям, которые участвуют в этих запросах. И периодически смотреть какие индексы у нас используются, убирать редко используемые и добавлять нужные.
# Так же можно использовать секционирование. Например, при использовании Airflow по dag_run_id. Оптимальным будет секционирование по столбцам (или по набору столбцов), которые чаще всего присутствуют в предложении where в запросах, обращающихся к секционируемой таблице.

####2.1 Формирование витрин (PostgreSQL).

1. Используя таблицу с сырыми данными, необходимо собрать витрину, где для каждого города и дня будут указаны часы начала дождя. Условимся, что дождь может начаться только 1 раз за день в любом из городов.

2. Необходимо создать витрину, где для каждого города, дня и часа будет рассчитано скользящее среднее по температуре и по давлению.


Полученные запросы необходимо вставить в google colab, а результаты - выгрузить в формате csv/xlsx и выложить в виде ссылки в google colab.

Подсказка: если в исходном файле не было факта начала дождя, то необходимо расставить рандомно значения факта дождя в таблице с сырыми данными.


In [None]:
# Создаем схему для витрин
# CREATE SCHEMA data_mart;

# Создаем представление. В нем используем запрос, в котором выбираем название города, дату и минимальный час (час начала дождя) для каждого дня, когда ожидается дождь (значения 1 или 2).
# В документации указано, что поле "prec_type" (тип осадков) может принимать следующие значения: 0 — без осадков, 1 — дождь, 2 — дождь со снегом, 3 — снег, 4 — град.
# https://yandex.ru/dev/weather/doc/ru/concepts/forecast-rest

# CREATE OR REPLACE VIEW data_mart.start_rain
# AS SELECT city,
#     date,
#     min(hour) AS start_rain_hour
#    FROM raw_data.cities_weather
#   WHERE is_rainy in (1,2)
#   GROUP BY city, date;

In [None]:
# Принцип расчета методом скользящей средней.
# Определяется размер окна (количество последовательных данных), которое будет использоваться для вычисления среднего значения. Для каждой точки данных вычисляется среднее значение, используя #
# окно. Исходные значения заменяются полученными средними значениями.
# Для каждого города выберем трехдневный период. В этом запросе функция AVG используется с оконным выражением. Используем сортировку по городу, дате и часу.
# Определяем окно, включающее строки от 1 предыдущей до 1 следующей.

# SELECT
#     city,
#     date,
#     hour,
#     temperature_c,
#     pressure_mm,
#     AVG(temperature_c) OVER (
#         ORDER BY city,date, hour
#         ROWS between 1 preceding and 1 following
#     ) AS temp_avg,
#     AVG(pressure_mm) OVER (
#         ORDER BY city,date, hour
#         ROWS between 1 preceding and 1 following
#     ) AS press_avg
# FROM raw_data.cities_weather
# GROUP BY city,date, hour, temperature_c, pressure_mm
# order by city,date,hour;

---
####3. Задача на проектирование БД на данных Яндекс.Метрики

В функционал Яндекс.Метрики входит возможность выкачивания сырых данных с помощью API: отдельными запросами выкачиваются просмотры и визиты. Для этого процесса необходимо спроектировать базу данных, предусмотрев несколько слоев данных и "хотелки" заказчиков: в 90% случаев заказчикам необходимы агрегаты данных (например, построить воронку по визитам на страницах и вводу номеров телефонов в разрезе дат, страниц, utm меток, или построить флоу пользователей в разрезе устройств, ОС, и т.д.).

Результат необходимо предоставить в виде схемы с описанием.

Ссылки на структуру таблиц:

https://yandex.ru/dev/metrika/doc/api2/logs/fields/hits.html

https://yandex.ru/dev/metrika/doc/api2/logs/fields/visits.html