<a href="https://colab.research.google.com/github/bryancev/Bootcamp_DE/blob/main/SQL_%D0%B7%D0%B0%D0%B3%D1%80%D1%83%D0%B7%D0%BA%D0%B0_%D0%B4%D0%B0%D0%BD%D0%BD%D1%8B%D1%85_%D0%B2_%D0%91%D0%94.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Настройка подключения к БД

In [47]:
import pandas as pd
import psycopg2
from psycopg2.extras import RealDictCursor
pd.set_option('display.max_columns', None)

In [48]:
# Параметры подключения к БД

conn_info = {
    'dbname': 'dev',
    'user': 'bootcamp',
    'password': '1q2w3e4r',
    'host': 'ru.tuna.am',
    'port': 35663,
    'options': '-c client_encoding=utf8', # Принудительная установка кодировки
    'client_encoding': 'utf8'
}

In [49]:
class Postgres:
    # Простая обёртка для работы с PostgreSQL через psycopg2
    def __init__(self, config):
        """
        Инициализация подключения к базе.
        :param config: словарь с параметрами подключения
        """
        try:
            self._conn = psycopg2.connect(**config)
            self._conn.set_client_encoding('UTF8')  # кодировка UTF-8
        except Exception as e:
            print("Ошибка подключения:", e)
            raise

    def execute(self, query, params=None):
        """
        Выполнить запрос без возврата данных (INSERT/UPDATE/DELETE).
        :param query: SQL-запрос
        :param params: кортеж или список параметров
        """
        with self._conn.cursor() as cur:
            cur.execute(query, params)
        self._conn.commit()  # зафиксировать изменения

    def fetchall(self, query, params=None):
        """
        Выполнить SELECT и вернуть все строки.
        :param query: SQL-запрос
        :param params: кортеж или список параметров
        :return: список словарей (RealDictCursor)
        """
        with self._conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute(query, params)
            return cur.fetchall()

    def close(self):
        """
        Закрыть подключение к базе.
        """
        self._conn.close()

In [50]:
def SELECT(sql, config, params=None):
    """
    Выполнить SELECT-запрос и вернуть результат в виде DataFrame.
    :param sql: SQL-запрос
    :param config: словарь с параметрами подключения
    :param params: кортеж или список параметров (по умолчанию None)
    :return: pandas.DataFrame с данными
    """
    conn = Postgres(config)
    try:
        rows = conn.fetchall(sql, params)
        df = pd.DataFrame(rows)
    finally:
        conn.close()
    return df

In [51]:
def EXECUTE(sql, config, params=None, autocommit=False):
    """
    Выполняет SQL-запросы с поддержкой многострочных команд (например, процедур).

    :param sql: SQL-запрос или набор запросов
    :param config: Конфигурация подключения к БД
    :param params: Параметры для запроса (опционально)
    :param autocommit: Если True, выполняет команды вне транзакции (для DDL-команд)
    """
    conn = Postgres(config)
    conn._conn.autocommit = autocommit

    try:
        if '$$' in sql:
            conn.execute(sql, params)
            print("Успешно выполнена процедура/функция.")
        else:
            statements = [stmt.strip() for stmt in sql.split(';') if stmt.strip()]
            for idx, statement in enumerate(statements, start=1):
                conn.execute(statement, params)
                print(f"Успешно выполнена команда {idx}: {statement[:60]}...")
    except Exception as e:
        print(f"Ошибка выполнения SQL: {e}")
        raise
    finally:
        conn.close()


# Создание схем и таблиц

In [52]:
# Создание схемы внутри хранилища

create_schema = """
    DROP SCHEMA IF EXISTS bryantsev CASCADE;
    CREATE SCHEMA bryantsev;
"""
# выполнение запроса
EXECUTE(create_schema, conn_info)

In [53]:
# Создание таблицы в схеме

create_table = """
DROP TABLE IF EXISTS bryantsev.open_meteo;
CREATE TABLE bryantsev.open_meteo(
    obs_ts TIMESTAMP(0) NOT NULL,
    timezone VARCHAR(50) NOT NULL,
    location VARCHAR(50) NOT NULL,
    temperature DOUBLE PRECISION,
    rain DOUBLE PRECISION,
    pg_updated_at DATE DEFAULT CURRENT_DATE,
    data_source VARCHAR(50) DEFAULT 'api.open-meteo.com',
    api_updated_at TIMESTAMP,

    -- Первичный ключ по паре (obs_ts, timezone, location)
    CONSTRAINT pk_open_meteo PRIMARY KEY (obs_ts, timezone, location),

    -- Ограничения
    CONSTRAINT valid_temperature_range CHECK (temperature >= -100 AND temperature <= 100),
    CONSTRAINT non_negative_rain CHECK (rain >= 0)
);

-- Создание индекса
CREATE UNIQUE INDEX uidx_weather_obs_ts_timezone_location
    ON bryantsev.open_meteo (obs_ts, timezone, location);
"""
# выполнение запросов
EXECUTE(create_table, conn_info)

# Загрузка данных

In [54]:
# Импорт библиотек
import pandas as pd
import requests
from datetime import datetime
import json

In [55]:
# Настройки API
API_URL = "https://api.open-meteo.com/v1/forecast"

# Список локаций с координатами (широта, долгота)
CITIES = {
    "Москва": (55.7558, 37.6173),
    "Санкт-Петербург": (59.9343, 30.3351),
    "Екатеринбург": (56.8389, 60.6057),
    "Казань": (55.7963, 49.1084),
}

# Диапазон дат для получения данных
START_DATE = "2025-07-01"
END_DATE = "2025-07-01"

# Таймзона
timezone = "Asia/Yekaterinburg"

In [56]:
# ====== 1. Функция получения данных ======
def fetch_weather_data():
    """Retrieve weather data from API and transform into database-ready format"""
    weather_data = []

    for location, (lat, lon) in CITIES.items():
        params = {
                "latitude": lat,
                "longitude": lon,
                "hourly": ["temperature_2m", "rain"],
                "start_date": START_DATE,
                "end_date": END_DATE,
                "timezone": timezone
            }

        try:
            response = requests.get(API_URL, params=params, timeout=10)
            response.raise_for_status()
            data = response.json()

            for i, ts in enumerate(data["hourly"]["time"]):
                weather_data.append({
                    "obs_ts": datetime.strptime(ts, '%Y-%m-%dT%H:%M'),
                    "timezone": timezone,
                    "location": location,
                    "temperature": data["hourly"]["temperature_2m"][i],
                    "rain": data["hourly"]["rain"][i],
                    "api_updated_at": datetime.now()
                })

        except requests.exceptions.RequestException as e:
            print(f"Ошибка для города {location}: {e}")
            continue

    return weather_data

In [57]:
# ====== 2. Функция загрузки в PostgreSQL ======
def upload_to_postgres(data):
    """Загрузка данных в PostgreSQL с обработкой конфликтов"""
    conn = Postgres(conn_info)
    try:
        with conn._conn.cursor() as cur:
            for record in data:
                cur.execute("""
                    INSERT INTO bryantsev.open_meteo (obs_ts, timezone, location, temperature, rain, api_updated_at)
                    VALUES (%(obs_ts)s, %(timezone)s, %(location)s, %(temperature)s, %(rain)s, %(api_updated_at)s)
                    ON CONFLICT (obs_ts, timezone, location) DO UPDATE SET
                        temperature = EXCLUDED.temperature,
                        rain = EXCLUDED.rain,
                        api_updated_at = EXCLUDED.api_updated_at
                """, record)
        conn._conn.commit()
        print(f"Успешно загружено {len(data)} записей")

    except Exception as e:
        print(f"Ошибка загрузки: {e}")
        conn._conn.rollback()
    finally:
        conn.close()

In [58]:
# ====== 3. Запуск процесса ======
# Получаем данные
weather_data = fetch_weather_data()

# Загружаем в БД
if weather_data:
    upload_to_postgres(weather_data)

Успешно загружено 96 записей
