In [37]:
# import yandex API_KEY, Postgre database credentials
from creds import *

In [38]:
import requests
from json import loads
from datetime import datetime

In [39]:
HEADERS = {"X-Yandex-API-Key": API_KEY}

In [40]:
query = """{
  Moscow: weatherByPoint(request: { lat: 55.7522, lon: 37.6156 }) {
    ...WeatherData
  }
  Kazan: weatherByPoint(request: { lat: 55.7887, lon: 49.1221 }) {
    ...WeatherData
  }
  SaintPetersburg: weatherByPoint(request: { lat: 59.9386, lon: 30.3141 }) {
    ...WeatherData
  }
  Tula: weatherByPoint(request: { lat: 54.1961, lon: 37.6182 }) {
    ...WeatherData
  }
}

fragment WeatherData on Weather {
  forecast {
    days(limit: 7) {
      hours {
        time
        temperature
        pressure
        precType
      }
    }
  }
}"""

In [41]:
response = requests.post('https://api.weather.yandex.ru/graphql/query', headers=HEADERS, json={'query': query})
data_dict = loads(response.text)

In [43]:
to_file = 'city,date,hour,temperature_c,pressure_mm,is_rainy\n'

for cities, i in data_dict.get('data').items():
    for days in i.get('forecast').get('days'):
        for hours in days.values():
            for weather in hours:

                # есть еще дождь со снегом - статус 'SLEET'. Не стал его добавлять, так как в условии только дождь
                if weather.get('precType') == 'RAIN':
                    to_file += (f"{cities},{datetime.strptime(weather.get('time'), 
                                                              '%Y-%m-%dT%H:%M:%S+03:00').strftime('%d.%m.%Y,%H')},{weather.get('temperature')},{weather.get('pressure')},1\n")
                else:
                    to_file += (f"{cities},{datetime.strptime(weather.get('time'), 
                                                              '%Y-%m-%dT%H:%M:%S+03:00').strftime('%d.%m.%Y,%H')},{weather.get('temperature')},{weather.get('pressure')},0\n")

In [45]:
# Записано в UTF-8
with open('../weather.csv', 'w') as f:
    f.write(to_file)

In [2]:
from psycopg2 import connect

In [3]:
def create_connection() -> connect:

    return connect(user=USER, password=PASSW, port=PORT, host=HOST, dbname=DBNAME)

In [52]:
sql = """CREATE SCHEMA IF NOT EXISTS stg;
CREATE SCHEMA IF NOT EXISTS dds;
CREATE SCHEMA IF NOT EXISTS cdm;

CREATE TABLE IF NOT EXISTS dds.weather (
    city VARCHAR(15) NOT NULL,
    "date" DATE,
    "hour" SMALLINT,
    temperature_c SMALLINT,
    pressure_mm SMALLINT,
    is_rainy BOOLEAN
)
PARTITION BY RANGE ("date");

CREATE INDEX IX_weather_date
ON dds.weather("date");

CREATE TABLE IF NOT EXISTS dds.weather_18 PARTITION OF dds.weather
    FOR VALUES FROM ('2024-03-18') TO ('2024-03-19');

CREATE TABLE IF NOT EXISTS dds.weather_19 PARTITION OF dds.weather
    FOR VALUES FROM ('2024-03-19') TO ('2024-03-20');
    
CREATE TABLE IF NOT EXISTS dds.weather_20 PARTITION OF dds.weather
    FOR VALUES FROM ('2024-03-20') TO ('2024-03-21');

CREATE TABLE IF NOT EXISTS dds.weather_21 PARTITION OF dds.weather
    FOR VALUES FROM ('2024-03-21') TO ('2024-03-22');

CREATE TABLE IF NOT EXISTS dds.weather_22 PARTITION OF dds.weather
    FOR VALUES FROM ('2024-03-22') TO ('2024-03-23');

CREATE TABLE IF NOT EXISTS dds.weather_23 PARTITION OF dds.weather
    FOR VALUES FROM ('2024-03-23') TO ('2024-03-24');

CREATE TABLE IF NOT EXISTS dds.weather_24 PARTITION OF dds.weather
    FOR VALUES FROM ('2024-03-24') TO ('2024-03-25');
"""

In [53]:
with create_connection() as conn:
    with conn.cursor() as cur:

        cur.execute(sql)

        with open('../weather.csv', 'r') as f:
            cur.copy_expert(f"COPY dds.weather FROM STDIN WITH CSV HEADER DELIMITER ','", f)

        conn.commit()

In [None]:
# Используя таблицу с сырыми данными, необходимо собрать витрину, 
# где для каждого города и дня будут указаны часы начала дождя. 
# Условимся, что дождь может начаться только 1 раз за день в любом из городов.

sql = """SELECT city, "date", LAST_VALUE("hour") OVER(PARTITION BY city, 
    "date" ORDER BY "hour" DESC RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
FROM dds.weather w
WHERE is_rainy IS TRUE;"""

In [None]:
# Необходимо создать витрину, где для каждого города, 
# дня и часа будет рассчитано скользящее среднее по температуре и по давлению.

sql = """SELECT city, "date", "hour", AVG(temperature_c) OVER w moving_temp, AVG(pressure_mm) OVER w moving_press
FROM dds.weather
WINDOW w AS (
	PARTITION BY city
	ORDER BY "date", "hour"
	ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
	)
ORDER BY city, "date", "hour";"""