##Тестовое задание на позицию Data Engineer.

Задание включает в себя 3 небольших задачи. В каждой задаче **рекомендуется** оставлять комментарии, код должен быть оформлен согласно **PEP8**. Задания необходимо выполнить без использования Pandas и готовых библиотек для API Яндекс.Погоды.

**Перед выполнением тестового задания, необходимо скопировать notebook к себе на диск, и выполнять тестовое в своей копии**.

---
####1. Выгрузка данных из API Яндекс.Погоды и преобразование их в csv

Используя API Яндекс.Погоды, необходимо выгрузить прогнозные данные за 7 дней для Москвы, Казани, Санкт-Петербурга, Тулы и Новосибирска. В случае, если API отдает пустые значения за день, то их необходимо удалить.

Информация должна быть представлена по часам с расширенным набором полей по осадкам.

Полученный json необходимо преобразовать в csv, формат:

\begin{array}{ccc}
\text{city}, \text{date}, \text{hour}, \text{temperature_c}, \text{pressure_mm}, \text{is_rainy} \\
Moscow, 19.08.2023, 12, 27, 750, 0 \\
Moscow, 19.08.2023, 13, 27, 750, 0 \\
... \\
Kazan, 19.08.2023, 12, 20, 770, 1 \\
Kazan, 19.08.2023, 13, 21, 770, 0 \\
\end{array}

**Описание полей:**

city - Город

date - Дата события

hour - Часы

temperature_c - Температура в Цельсиях

pressure_mm - Давление в мм ртутного столба

is_rainy - Флаг наличия дождя в конкретный день и час (см. документацию по API - описание полей).

Полученный csv необходимо выгрузить на облачный диск и в конце решения предоставить ссылку.

**Ссылка на получение ключа:** https://yandex.ru/dev/weather/doc/dg/concepts/about.html#about__onboarding


**Дополнительно ответьте на вопросы:** какие существуют возможные пути ускорения получения данных по API и их преобразования? Возможно ли эти способы использовать в Airflow?

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import requests
from datetime import datetime
from dataclasses import dataclass
import csv
from typing import Optional

access_key: str = '5f76604b-8180-4b9f-8d99-bca210a4fbae'
api_url: str = 'https://api.weather.yandex.ru/v2'
path: str = 'weekly_forecast.csv'

def get_city_coords(city_name: str,
                    country_name: str = 'Russia'):
  response = requests.get('https://api.api-ninjas.com/v1/geocoding',
                         params={'city': city_name,
                                 'country': country_name})
  if response.status_code == 200:
      return response.json()

  else:
    return None

cities_geodata: dict = {
    'Moscow': {
        'name': 'Moscow',
        'latitude': 55.75222,
        'longitude': 37.61556},
    'Kazan': {
      'name': 'Kazan',
      'latitude': '55.78874',
      'longitude': '49.12214'
          },
    'Petersburg': {
      'name': 'Petersburg',
      'latitude': '59.937500',
      'longitude': '30.308611'
    },
    'Tula': {
      'name': 'Tula',
      'latitude': '54.204838',
      'longitude': '37.618492'
    },
    'Novosibirsk': {
      'name': 'Novosibirsk',
      'latitude': '55.018803',
      'longitude': '82.933952'
    }
}


def get_weather_data(api_url: str, headers: dict,
                     params: dict) -> Optional[dict]:

  response = requests.get(f"{api_url}/forecast",
                        headers=headers,
                        params=params
                          )
  if response.status_code == 200:
      return response.json()

  else:
    return None


def is_rainy(condition: str) -> int:
  if 'rain' in condition:
    return 1
  else:
    return 0


def format_date(date: str):
  date = datetime.strptime(date, '%Y-%m-%d')
  formatted_date = date.strftime('%d.%m.%Y')
  return formatted_date

def get_hourly_city_forecast(api_url: str,
                             api_key: str,
                             city_info: dict) -> list:
  processed_data: list = []

  params = {'lat': city_info.get('latitude', None),
            'lon': city_info.get('longitude', None),
            'limit':7, 'extra': True, 'hours': True}
  headers = {
    'X-Yandex-Weather-Key': api_key
  }

  forecast_data: dict = get_weather_data(api_url=api_url,
                                         params=params, headers=headers)

  for day in forecast_data.get('forecasts'):
    if not day.get('hours'):
      pass
    for hour in day.get('hours'):
      necessary_data: dict = {'name': city_info.get('name'),
                              'date': format_date(day.get('date')),
                              'hour': hour.get('hour'), 'temp': hour.get('temp'),
                              'pressure_mm': hour.get('pressure_mm'),
                              'is_rainy': is_rainy(hour.get('condition'))}
      processed_data.append(necessary_data)

  return processed_data


def save_forecasts(path: str, data: list) -> bool:
    field_names = data[0].keys()
    print(field_names)
    print(type(field_names))
    try:
      with open(path, 'w') as file:
        writer = csv.DictWriter(file, fieldnames=field_names)
        writer.writeheader()
        writer.writerows(data)
    except Exception as e:
      print(e)
      return False
    else:
      return True

def get_cities_forecast(api_url: str,
                        api_key: str,
                        cities: dict):
  all_cities_forecasts: list = []

  for city in cities.values():
    city_forecast: list = get_hourly_city_forecast(api_url=api_url,
                                                   api_key=api_key,
                                                   city_info=city)g
    all_cities_forecasts.extend(city_forecast)
  return all_cities_forecasts

"""
дополнительные вопросы:
  можно сделать код асинхронным, например через aiohttp а запросы обрабатывать в разных процессах
"""

In [None]:
def main(csv_path: str):
  forecasts = get_cities_forecast(api_url=api_url, api_key=access_key,
                             cities=cities_geodata)
  save_forecasts(path='/content/drive/MyDrive/weekly_forecast.csv', data=forecasts)
main(path)

# csv here: https://drive.google.com/file/d/1dT781F0Yg0WRP2G7Hh5lnMSU3nQdrz7L/view?usp=sharing

dict_keys(['name', 'date', 'hour', 'temp', 'pressure_mm', 'is_rainy'])
<class 'dict_keys'>


---
####2. Загрузка данных в БД (PostgreSQL).

Используя полученный csv файл, необходимо загрузить данных в PostgreSQL. Предварительно в БД необходимо создать схемы: для приемки сырых данных и для будущих агрегирующих таблиц.

При создании таблиц приветствуется использование партицирования и индексирования (по возможности и необходимости).

В решении необходимо показать код загрузки данных, скрипты создания схем и таблиц для пункта 2 и 2.1.

Подсказка: для решения задачи нужно развернуть БД, мы рекомендуем это сделать локально с помощью докера.

In [None]:
import psycopg2
csv_load_path = '/content/drive/MyDrive/weekly_forecast.csv'
csv_load_query: str = "COPY forecast_raw.forecasts_hourly_raw FROM STDIN DELIMITER ',' CSV HEADER"

def load_csv( sql_query: str, file_path: str):
  conn = psycopg2.connect(host='cornelius.db.elephantsql.com',
                      port='5432', dbname='fzpvuodf', user='fzpvuodf',
                          password='l5njBN5YHeq8lK93Xb1oRl7YgSAnjDM-')

  cur = conn.cursor()
  with open(path, 'r') as file:
    cur.copy_expert(sql_query, file)

  conn.commit()
  conn.close()

load_csv(path=csv_load_path, sql_query=csv_load_query)

In [None]:
"""
DROP SCHEMA IF EXISTS forecast_raw;
DROP SCHEMA IF EXISTS forecast_agg;

CREATE SCHEMA IF NOT EXISTS forecast_raw;
CREATE SCHEMA IF NOT EXISTS forecast_agg;

DROP TABLE IF EXISTS forecast_raw.forecasts_hourly_raw;
CREATE TABLE IF NOT EXISTS forecast_raw.forecasts_hourly_raw (
	city TEXT,
	forecast_date TEXT,
	forecast_hour SMALLINT,
	temperature_c SMALLINT,
	pressure_mm SMALLINT,
	is_rainy Boolean
)

"""

####2.1 Формирование витрин (PostgreSQL).

1. Используя таблицу с сырыми данными, необходимо собрать витрину, где для каждого города и дня будут указаны часы начала дождя. Условимся, что дождь может начаться только 1 раз за день в любом из городов.

2. Необходимо создать витрину, где для каждого города, дня и часа будет рассчитано скользящее среднее по температуре и по давлению.


Полученные запросы необходимо вставить в google colab, а результаты - выгрузить в формате csv/xlsx и выложить в виде ссылки в google colab.

Подсказка: если в исходном файле не было факта начала дождя, то необходимо расставить рандомно значения факта дождя в таблице с сырыми данными.


In [None]:
"""
CREATE TABLE forecast_agg.rain_start_hour AS
SELECT city, forecast_date, MIN(forecast_hour) AS rain_start
FROM forecast_raw.forecasts_hourly_raw
WHERE is_rainy = TRUE
GROUP BY city, forecast_date
ORDER BY city, forecast_date;

CREATE TABLE forecast_agg.moving_avg_temp_and_pressure AS
SELECT city, forecast_date, forecast_hour, temperature_c,
AVG(temperature_c) OVER (PARTITION BY city, forecast_date ORDER BY forecast_hour
								ROWS BETWEEN 23 PRECEDING AND CURRENT ROW) AS moving_avg_temp,
AVG(pressure_mm) OVER (PARTITION BY city, forecast_date ORDER BY forecast_hour
								ROWS BETWEEN 23 PRECEDING AND CURRENT ROW) AS moving_avg_pressure
FROM forecast_raw.forecasts_hourly_raw
ORDER BY city, forecast_date, forecast_hour;
"""
import psycopg2
def download_csv(sql_query: str, file_path: str):
  conn = psycopg2.connect(host='cornelius.db.elephantsql.com',
                      port='5432', dbname='fzpvuodf', user='fzpvuodf',
                          password='l5njBN5YHeq8lK93Xb1oRl7YgSAnjDM-')

  cur = conn.cursor()
  with open(file_path, 'w') as file:
    cur.copy_expert(sql_query, file)

  conn.commit()
  conn.close()

download_csv(sql_query='copy (select * from forecast_agg.rain_start_hour ) TO STDOUT WITH CSV HEADER',
             file_path='/content/drive/MyDrive/rain_start_hour.csv')
download_csv(sql_query='copy (select * from forecast_agg.moving_avg_temp_and_pressure ) TO STDOUT WITH CSV HEADER',
             file_path='/content/drive/MyDrive/moving_avg_temp_and_pressure.csv')

# result rain start hour: https://drive.google.com/file/d/1jgdb-BH4rAb27IKxRbLimhStK3kFMtOQ/view?usp=sharing
# result moving_avg_temo_and_pressure https://drive.google.com/file/d/1jgdb-BH4rAb27IKxRbLimhStK3kFMtOQ/view?usp=sharing