# Пайплайн для добавления данных в базу данных public с информацией о матчах Дота2

# Импорт библиотек

In [1]:
#импорт pandas
import pandas as pd

import numpy as np

#импорт библиотеки requests
import requests

#импорт библиотеки json
import json

#импорт библиотеки pprint
import pprint

import time, os
from pathlib import Path

# Имопрт библиотеки sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy import text
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import Table, MetaData

from dotenv import load_dotenv

from datetime import datetime

## Создание подключения к базе данных

In [2]:
# Загружаем переменные из supabase.env
load_dotenv(dotenv_path="supabase.env")

USER = os.getenv("user")
PASSWORD = os.getenv("password")
HOST = os.getenv("host")
PORT = os.getenv("port")
DBNAME = os.getenv("dbname")

In [3]:
# Создаем подключение
DATABASE_URL = f"postgresql+psycopg2://{USER}:{PASSWORD}@{HOST}:{PORT}/{DBNAME}?sslmode=require"

#Создаем engine
engine = create_engine(DATABASE_URL)

# Проверяем соединение
try:
    with engine.connect() as conn:
        print("Соединение успешное!")
except Exception as e:
    print(f"Соединение не установлено: {e}")

Соединение успешное!


## Скачивание данных с opendota

In [4]:
# Задаем переменную c url API 
API = "https://api.opendota.com/api"

In [5]:
# Создадим функцию для отправки запросов get
def get_json(url, params=None):
    """функция get_json:
    - принимает url
    - отправляет запрос get с использованием библиотеки requests
    - Возвращает от API json файл 
    - преобразует json в объект языка Python (словарь или список)"""
    r = requests.get(
        url, 
        params=params or {}, 
        timeout=30, 
        headers={"User-Agent": "opendota-ds"})
    r.raise_for_status()
    return r.json()

In [6]:
# Создадим функции для чтения и загрузки данных, а также сохранения памяти прогресса
# функции нужны для того,чтобы возобновлять процесс там, где остановлись и не запрашивать лишнее у API
def load_json_list(path):
    """
    Функция проверяет, существует ли файл 
    Если да: открывает JSON и возвращает список матчей
    Если нет: возвращает пустой список
    """
    if Path(path).exists():
        with open(path, "r", encoding="utf-8") as f:
            return json.load(f)
    return []

def save_json_list(path, data_list):
    """" Открывает файл на запись и сохраняет туда список (match_ids)"""
    with open(path, "w", encoding="utf-8") as f:
        json.dump(list(data_list), f, ensure_ascii=False, indent=2)

Загрузка id открытых матчей:

In [7]:
# Зададим текущую дату и название файла, куда будем сохранять сегодняшние матчи
today = datetime.today().strftime("%Y-%m-%d")
filename = f"match_ids_parced_{today}.json"

# Загрузим сохраненные id матчей, если сегодня матчи уже сохранялись в файл match_ids_parced_{today}.json
if Path(filename).exists():
    match_ids_parced = load_json_list(filename)
else:
    match_ids_parced = []
    
# зарузка id матчей с использованием метода  GET /publicMatches и учетом пагинации (1 стр - 100 матчей)
# сколько страниц хотим загрузить (не более 20 из-за лимита API)
pages_public = 1
# курсор пагинации, на первой итерации равен None
less = None

#множество матчей из базы данных
matches_in_db = set(pd.read_sql_query("SELECT match_id FROM matches", engine)["match_id"])

# проходимся по странице, передаем id матчей. Основной цикл: листаем страницы и собираем ID
# less_than_match_id - курсор для перемещения по страницам при пагинации, на первой итерации запрос без параметров, на следующих - просим более старые
# batch - одна страница результатов, список из 100 матчей
for _ in range(pages_public):
    params = {"less_than_match_id": less} if less else None
    batch = get_json(f"{API}/publicMatches", params=params)
    if not batch:
        break
# достаем из ответа только match_id из каждого словаря страницы
    ids = [m["match_id"] for m in batch if "match_id" in m]
    # проверяем match_id в matches_in_db, новый добавляем в match_ids_parced
    for m_id in ids:
        if m_id not in matches_in_db and m_id not in match_ids_parced: #тут множество заменила на список
            match_ids_parced.append(m_id)
# обновляем курсор, берем минимальный матч на этой страницы
    less = min(ids) if ids else less
    time.sleep(1.2)  # условие, чтобы не превысить лимит в 60 запросов на бесплатном доступе

# сохраняем список скачанных id матчей в json или обновляем существующий
save_json_list(filename, match_ids_parced)
# печатаем итоговое количество
print(f"Всего скачано матчей: {len(match_ids_parced)} \n Сохранено в {filename}")

Всего скачано матчей: 200 
 Сохранено в match_ids_parced_2025-10-29.json


Загрузка данных о матчах

In [8]:
# скачаем полные карточки матчей для подмножества match_ids_parced

# Добавим количество матчей, которые мы ходим скачать за запуск скрипта (200-500)
match_count = 100  

# открываем файл id скачанных файлов (если нужно начать с этой ячейки)
#filename = f"match_ids_parced_{today}.json"
#match_ids_parced = load_json_list(filename)

# Присвоим переменной jsonl_path название файла с основным хранилищем сырых матчей в формате JSONL (каждый матч — отдельная строка JSON)
jsonl_path = f"matches_raw_{today}.jsonl" 
path_obj = Path(jsonl_path)

# cоздаем файл, если он не существует
path_obj.touch(exist_ok=True) 

# Cоздаем файл, в который будем загружать скачанные данные
downloaded_file = f"downloaded_ids_{today}.json"

if Path(downloaded_file).exists():
    downloaded_ids = set(load_json_list(downloaded_file))
else:
    downloaded_ids = set()

# Загружаем файлы, которые не сохранены
to_fetch = [m_id for m_id in match_ids_parced if m_id not in downloaded_ids][:match_count]
print(f"Добавляем {len(to_fetch)} матчей...")

# скачиваем /matches/{id} и сразу пишем на диск
with open(jsonl_path, "a", encoding="utf-8") as f:
    for m_id in to_fetch:
        try:
            url = f"{API}/matches/{m_id}"
            data = get_json(url)
           # записываем сразу построчно информацию о матчах, если что-то упадёт, прогресс не будет утерян
            f.write(json.dumps(data, ensure_ascii=False) + "\n")
            downloaded_ids.add(m_id)
        except requests.HTTPError as e:
            # При ошибке 429 (Too Many Requests) повторяем
            if e.response is not None and e.response.status_code == 429:
                time.sleep(2.0)
                try:
                    data= get_json(url)
                    f.write(json.dumps(data, ensure_ascii=False) + "\n")
                    downloaded_ids.add(m_id)
                except Exception as ex:
                    print(f"Ошибка загрузки матча {m_id}: {ex}. Пропускаем.")
                    continue
        time.sleep(1.2)           
# сохраняем актуальный список уже скачанных ID
save_json_list(downloaded_file, list(downloaded_ids))
print(f"Всего скачано сегодня: {len(downloaded_ids)}")

Добавляем 100 матчей...
Всего скачано сегодня: 200


## Подготовка датафреймов

#### Загрузка данных в датафреймы

In [9]:
# делаем еще раз проверку того, какие матчи уже есть в базе данных
matches_in_db = set(pd.read_sql_query("SELECT match_id FROM matches", engine)["match_id"])
matches_in_db = set(int(x) for x in matches_in_db if x is not None)
print(f"{len(matches_in_db)} матчей в базе данных")

20925 матчей в базе данных


In [10]:
# готовим хранилище для raw-данных
raw_matches = []

# Загружаем матчи из JSONL
seen = set()

with open(jsonl_path, "r", encoding="utf-8") as f:
    for line in f:
        if not line.strip():
            continue
        try:
            obj = json.loads(line)
            m_id = obj.get("match_id")
            if m_id is None:
                continue
            m_id_int = int(m_id)
        except Exception:
            continue
        if (m_id_int in matches_in_db) or m_id_int in seen:
            continue
        seen.add(m_id_int)
        raw_matches.append(obj)

print(f"Загружено {len(raw_matches)} матчей из JSONL")

Загружено 100 матчей из JSONL


In [11]:
# Создаем датафрейм df_matches

df_matches = pd.DataFrame([{
    "match_id": d.get("match_id"),
    "duration": d.get("duration"),
    "first_blood_time": d.get("first_blood_time"),
    "radiant_win": d.get("radiant_win"),
    "region": d.get("region"),
} for d in raw_matches if isinstance(d, dict)])

In [12]:
# Создаем датафрейм df_players
df_players = pd.DataFrame([
    {
        "match_id": d.get("match_id"),
        "player_slot": p.get("player_slot"),
        "steam_id": p.get("account_id"),
        "personaname": p.get("personaname"),
        "assists": p.get("assists"),
        "deaths": p.get("deaths"),
        "denies": p.get("denies"),
        "gold_per_min": p.get("gold_per_min"),
        "hero_id": p.get("hero_id"),
        "item_0": p.get("item_0"),
        "item_1": p.get("item_1"),
        "item_2": p.get("item_2"),
        "item_3": p.get("item_3"),
        "item_4": p.get("item_4"),
        "item_5": p.get("item_5"),
        "kills": p.get("kills"),
        "xp_per_min": p.get("xp_per_min"),
        "is_radiant": p.get("isRadiant"),
        "win": p.get("win"),
        "last_hits": p.get("last_hits"),
        "hero_damage": p.get("hero_damage"),
        "hero_healing": p.get("hero_healing"),
        "tower_damage": p.get("tower_damage"),
    }
    for d in raw_matches if isinstance(d, dict)
    for p in d.get("players", [])
])


#### Подготовка датафреймов

##### Предобработка

In [13]:
# создадим функцию для нормализации строковых типов данных object
def normalize_object(df):
    """ Принимает df, находит колонки 'object', нормализует их
    убирает заглавные буквы и пробелы в начале и в конце строк
    """
    for col in df.select_dtypes(include=["object"]):
        df[col] = df[col].astype(str).str.strip().str.lower()
    return df

In [14]:
# Нормализуем строковые типы данных с использованием функции normalize_object(df)
df_players = normalize_object(df_players)

In [15]:
# Преобразуем необходимые колонки в Int54, bool
df_matches['region'] = df_matches['region'].astype('Int64')
df_players['steam_id'] = df_players['steam_id'].astype('Int64')
df_players['win'] = df_players['win'].astype('bool')

In [16]:
# удалим дубликаты, если они есть
df_matches = df_matches.drop_duplicates(subset=None, keep='first')
df_players = df_players.drop_duplicates(subset=["match_id", "player_slot"], keep='first').reset_index(drop=True)

##### Создание таблиц из df_players

In [17]:
# создадим df_player_matches
df_players_matches = df_players[['match_id', 'player_slot', 
                                'steam_id', 'personaname', 'hero_id', 'is_radiant', 'win']]

In [18]:
# создадим df_players_stat
df_players_stat = df_players[['match_id', 'player_slot', 'kills', 'deaths', 
                             'assists', 'denies', 'last_hits', 'gold_per_min', 'xp_per_min', 
                             'hero_damage', 'hero_healing', 'tower_damage']]

In [19]:
# создадим df_players_items 
df_players_items = df_players[['match_id', 'player_slot','item_0', 'item_1', 'item_2', 'item_3', 'item_4', 'item_5']]

# из широкой таблицы сделаем плоскую: slot_col - название столбца, item_id - id предмета
df_players_items = df_players_items.melt(
                            id_vars = ['match_id', 'player_slot'], 
                            value_vars = ['item_0', 'item_1','item_2', 'item_3', 'item_4', 'item_5'], 
                            var_name = 'slot_col', 
                            value_name='item_id')

# создадим столбец, взяв последний элемент строки столбца slot_col 
df_players_items['item_slot'] = df_players_items['slot_col'].str.slice(-1)

# удалим колонку slot_col 
df_players_items = df_players_items.drop(columns='slot_col')

In [20]:
# встречается ноль - это остутствующее значение. Необходимо заменить его на nan, чтобы не было ошибки при стыковке таблиц
df_players_items['item_id'] = df_players_items['item_id'].replace(0, pd.NA)

In [21]:
# поменяем колонки местами для соответствия базе данных
df_players_items = df_players_items[["match_id", "player_slot", "item_slot", "item_id"]]

##### Проверим совпадение внешних ключей

In [22]:
# проверим совпадение внешних ключей между таблицами df_matches, df_player_matches
s1 = set(df_matches['match_id'])
s2 = set(df_players_matches["match_id"])
matches_and_players = s1 - s2
display(len(matches_and_players), len(s1), len(s2))

0

100

100

In [23]:
# добавим удаление лишних матчей, если они существуют
df_matches = df_matches[df_matches["match_id"].isin(df_players_matches["match_id"])]

##### Финально преобразуем типы данных для совместимости с загрузкой в базу данных

In [24]:
# Создадим функцию
def convert_df_types(df):
    df = df.copy()
    for col in df.select_dtypes(include=["int64", "Int64", "float64", "bool"]):
        df[col] = df[col].apply(
            lambda x: None if x is None else x.item() if isinstance(x, (np.integer, np.floating, np.bool_)) else x
        )
    return df

In [25]:
# Сконвертируем датафреймы
df_matches = convert_df_types(df_matches)
df_players_matches = convert_df_types(df_players_matches)
df_players_stat = convert_df_types(df_players_stat)
df_players_items = convert_df_types(df_players_items)

## Загрузка данных в базу данных

### Загрузка данных из датафреймов

In [26]:
#records = df.where(pd.notnull(df), None).to_dict(orient="records")

In [27]:
def insert_df_to_db(engine, df, table_name, schema="public"):
    """ Отправляет данные из датафрейма напрямую в БД
        Параметры:
            engine - создан в начале ноутбука
            df - датафрейм со скачанными данными
            table_name -  название таблицы в БД
            schema - название схемы, по умолчанию "public"
    """
    # Заменяем pd.NA и numpy.nan на None
    df_clean = df.where(pd.notnull(df), None).copy()
    records = df_clean.to_dict(orient="records")
    metadata = MetaData()
    table = Table(table_name, metadata, autoload_with=engine, schema=schema)

    stmt = insert(table).values(records)
    stmt = stmt.on_conflict_do_nothing()
    
    with engine.begin() as conn:
        result = conn.execute(stmt)
        inserted = result.rowcount
        
    print(f"Добавлено {inserted} строк в {schema}.{table_name}")
    

In [28]:
# Загружаем данные
insert_df_to_db(engine, df_matches, "matches")
insert_df_to_db(engine, df_players_matches, "players")
insert_df_to_db(engine, df_players_stat, "players_stat")
insert_df_to_db(engine, df_players_items, "players_items")

Добавлено 100 строк в public.matches
Добавлено 1000 строк в public.players
Добавлено 1000 строк в public.players_stat
Добавлено 6000 строк в public.players_items


### Обновление материализованных представлений

In [29]:
# Создаем функцию для обновления MV
def refresh_mv(engine, view_name, schema_name):
    """
    Обновить материализованное представление
    Параметры:
    engine: движок БД
    view_name: имя материализованного представления
    schema_name: название схемы
    
    """
    query_mv = f"REFRESH MATERIALIZED VIEW {schema_name}.{view_name};"
    with engine.begin() as conn:
        conn.execute(text(query_mv))    
    print(f"Материализованное представление {schema_name}.{view_name} обновлено")

In [30]:
# Вызываем перечень материализованных представлений БД
query_mv_check = """
SELECT schemaname, matviewname
FROM pg_matviews;
"""

mv = pd.read_sql_query(query_mv_check, engine)
mv_list = mv[['schemaname', 'matviewname']].to_dict(orient='records')

# Обновим MV
for ele in mv_list:
    schema = ele['schemaname']
    view = ele['matviewname']
    try:
        refresh_mv(engine, view, schema)
    except Exception as e:
        print(f"Ошибка при обновлении {schema}.{view}: {e}")
        

Материализованное представление public.radiant_win_by_duration обновлено
Материализованное представление public.win_factors обновлено
Материализованное представление public.gmp_winrate обновлено
Материализованное представление public.regions_stat обновлено
Материализованное представление public.unknown_players обновлено
Материализованное представление public.heroes_stat обновлено
