In [5]:
import warnings
warnings.filterwarnings('ignore')
import pandas as pd; import sys
sys.path.append('..')
import numpy as np
from IPython.display import display
pd.options.display.max_columns = None
pd.options.display.max_rows = 1000
pd.options.display.max_colwidth = 100
from IPython.core.display import display, HTML
import re
import ast
display(HTML("<style>.container { width:100% !important; }</style>"))
from tqdm import tqdm 
import traceback
import imp

###  SQL синтаксис, заметки

<u>Оконная функция</u>  
SELECT window_func(column, args) 
OVER(PARTITION BY window_columns ORDER BY sort_columns ROWS|RANGE BETWEEN lower_bound AND upper_bound)  
  
window_func - оконная функция  
Основные виды оконных функций:  
- агрегирующие = SUM, MIN, MAX, COUNT, AVG 
- ранжирующие = ROW_NUMBER, RANK, DENSE_RANK
- позиционные=LEAD, LAG, FIRST_VALUE, LAST_VALUE
- функции накопления и скользящие

PARTITION BY window_columns - задает группировку, по каждому значению window_columns формируется окно  
ORDER BY sort_columns - сортировка таблицы перед применением оконной функции в рамках сформированного окна  
ROWS|RANGE BETWEEN - фрейминг окна, когда нужен размер меньше, чем задаваемый конкретным значением window_columns  
  
Варианты задания lower_bound, upper_bound для фрейминга окна через ROWS BETWEEN:
- UNBOUNDED PRECEDING / UNBOUNDED FOLLOWING  = все значения после текущего/до текущего в окне  
- n PRECEDING / n FOLLOWING = n строк до текущей / после текущей  
- CURRENT ROW = текущая строка  
  
RANGE BETWEEN - задает диапазон по <u>значению</u> строки указанной в ORDER BY относительно текущей.  
Пример:  
- ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING = окно размера (3 строки до текущей, текущая, 3 строки после)  
- RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING = все строки удовлетворяющие условию  
current_val - 3 <= sort_columns <= current_val + 3 
- RANGE BETWEEN INTERVAL '3' DAY PRECEDING AND CURRENT ROW = все строки в интервале (now - 3 day, now)  
для колонки sort_columns со значением времени 
  
По умолчанию во всех оконках, если не указан фрейминг стоит куммуляция от начала окна до текущей строки, то есть  
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW

---
<u>Подзапросы</u>  
Это использование запросов внутри тела других запросов. Выделяют несколько видов.  
- Скалярный. SELECT * FROM table WHERE user_id = (SELECT MAX(user_id) FROM table2)
- Списки. SELECT * FROM table WHERE user_id in (SELECT user_id FROM table2)
- Табличный. SELECT * FROM (SELECT user_id, sum(orders) cnt FROM table2 GROUP BY 1)
- Коррелированный - когда подзапрос знает про запрос извне (наоборот никогда)  
SELECT * FROM table1 WHERE cost < (select max(cost) from table2 t2 where t2.user_id = t1.user_id)  

PS. Коррелированный подзапрос вычисляется каждый раз для каждой строки внешнего запроса, поэтому  
чаще быстрее использовать CTE + вспомогательный JOIN.  
Однако, если проверка вхождений идет по большой таблице table2 - иногда эффективнее не джойнить  
большие таблицы, а использовать EXISTS.

---
<u>EXISTS</u>  
Применяется к подзапросам и возвращает TRUE если подзапрос содержит хотя бы одну строку.  
Например, ... WHERE EXISTS (SELECT col FROM table)  
Иногда нужно фильтровать запросы через INNER JOIN:  
  
with flt as (select user_id from tbl)  
select
t.*
from tbl2 t 
join flt f on f.user_id = t.user_id
  
Но быстрее реализовать это так:  
select t.*
from tbl2 t2 where
exists (select 1 from tbl where t.user_id = t2.user_id)

---
<u>JOINS</u>  
LEFT, RIGHT = соединение таблиц по ключу, где оставляются в качестве NULL строки правой/левой  
таблиц при условии что нет матчинга с другой табличкой.  
  
FULL OUTER JOIN = FULL JOIN = LEFT + RIGHT - соединение при котором остаются обе таблицы, если нет матча то NULL  
  
CROSS JOIN = декартово произведение таблиц = матчинг всех строк таблицы 1 со всеми строками таблицы 2  
Для CROSS JOIN не указывается ON инструкция, сокращенно можно писать так SELECT ... FROM table1, table2  
  
LEFT/INNER LATERAL JOIN - вид джойна с коррелированным подзапросом, то есть когда  
правая таблица вычисляется в зависимости от значений в левой.

---
<u>UNNEST</u>  
Превращает массив в набор элементов по строкам.  
Например для таблицы select id, val_list from tbl, где val_list = (a1, a2, a3, ...) получаем:  
select id, unnest(val_list) as val from tbl -> табличку с распакованными массивом вида id1, a1, id1, a2, ...
  
Более сложный пример, tbl: id, val_list = ((a11, a12, a13), (a21, a22, a23), ...)  
select t.*, vl.* from tbl t cross join unnest(val_list) as vl(name1, name2, name3)  
Здесь unnest распаковывает массив в таблицу с колонками с name1, ...; cross join unnest  
мапит каждую строку vl к **соответствующей** строке t. То есть это не полный декарт.  
В большинстве СУБД: cross join unnest = cross join lateral unnest.  
  
WITH ORDINALITY - доп. конструкция добавляющая порядковый номер к формируемой таблице  
Например, ... FROM UNNEST(val_list) WITH ORDINALITY AS vl(value, ord_number)  
Появляется колонка ord_number


---
<u>TRANSFORM</u>  
Инструкция принимающая массив и применяющая к нему поэлементное преобразование (заданное).  
Т.е вида transfrom(val_list, func). Пример: transform(val_list, x -> x.id * 2) - вытащили id, удвоили, итд

### Базы данных и СУБД

#### Виды и способы организации памяти

Любая задача, связанная с обработкой данных - это какой то объем вычислений + процедур записи/считывания из памяти.  
Примеры вычислений: агрегация значений, хеширование и сравнения значений при поиске по таблицам, вычисление адресов ячеек для записи итд.  
Этим занимается процессор (CPU = central processor unit), который также может иметь много ядер для параллельных вычислений.  
На скорость обработки данных влияет кол-во ядер, тактовая частота процессора, другие детали его архитектуры.  
  
В процессе выполнения запроса процессор записывает промежуточные или конечные данные в разную память.  
Выделяют два ее вида:  
1. Энергозависимая - на конденсаторах, работает только когда сервер включен, ограниченная в объеме, но довольно быстрая.  
Есть совсем небольшая память регистров процессора, с которой он постоянно работает и считывает напрямую (через нее пишутся команды).  
Чуть дольше, но тоже быстрая - оперативная память RAM (random memory access) - тоже на конденсаторах или подобных структурах,  
в нее процессор пишет промежуточные данные которые надо быстро записать/считать (чтобы были "под рукой").  
  
  
2. Энергонезависимая или физическая память. Память в которой данные хранятся при выключенном питании (по сути жесткие диски).  
Она дешевле, ее больше по объему - процессор пишет в нее конечные данные или промежуточные которые не влезли в RAM.  
Оттуда медленнее писать/считывать, поэтому оптимизация часто строится на том чтобы эффективнее использовать оперативку.  
Виды физической памяти - HDD и SSD.  
  
<u>HDD</u> (Hard-disk drive) - старый тип памяти, более медленный чем SSD. Работает по принципу похожему на виниловые пластинки -  
данные пишутся на магнитные ленты, ленты вращаются с тактовой частотой, по ним бегает головка которая своим магнитным полем  
может перезаписывать информацию. Единица хранения информации (бит) - это магнитная ячейка. HDD менее долговечны, так как есть  
вращающиеся элементы, механика.  
  
<u>SSD</u> (Solid-State Drive) - память, использующая свойства полупроводников для чтения/записи (она же используется во флешках).  
Полупроводники - вещества, в которых электроны слабо-связаны с кристаллической решеткой и при определенных условиях могут  
открепляться - то есть полупроводники можно превратить в проводники (через нагрев, хим. реакции, электрическое поле).  
Вот схема полупроводникового транзистора:  
<img src="images/transistor.png" width="250" align="left">  
  
Затвор - это металлический электрод, который может содержать (1) или не содержать (0) электрический заряд.  
Затвор окружен диэлектриком (плавающий), то есть отделен от подложки - полупроводниковой пластины  
Когда в затворе есть заряд - его электрическое поле влияет на подложку (индуцирует p-канал), она становится более  
проводящей - в результате при подаваемом пороговом напряжении между полюсами "сток"-"исток" возникает ток.  
Когда в затворе заряда нет, то ток не возникает. В итоге, подавая напряжение на полюса, можно понять записан ли 0/1 в ячейку.  
А подавая/забирая заряд затвора, можно производить запись информации.  
  
---
SSD-накопители массово стали появляться, когда были разработаны эффективные контроллеры, позволяющие быстро работать с  
памятью подобного типа.

#### CAP - теорема

CAP - это акроним из трех условий:  
1. Consistency (согласованность) - каждая транзакция в БД переводит таблицу из одного валидного состояния в другое.  
Валидное состояние - это когда ограничения на полноту записи, проверку доп. ограничений таблицы выполнены.  
То есть таблица не существует в промежуточном состоянии. Пример: клиент 1 записал в таблицу строку, но из за того что  
база данных распределенная - данные не успели разойтись по всем нодам (см hdfs) и клиент 2 после этого считал устаревшую таблицу.  
По сути пока данные "растекутся" - таблица доступна в промежуточном = невалидном состоянии.  
  
  
2. Availability (доступность) - любой запрос к таблицам возвращает ответ. То есть нет такого, что клиент 2 запросил SELECT из  
таблички, а в ответ получил - "таблица в невалидном состоянии = failed". Если сервер включен - он возвращает данные.  


3. Partiton tolerance (устойчивость к разделению) - при потере доступа к одной части системы, система может продолжать работать.  
По сути, то что дают распределенные СУБД.

СУБД - система управления базой данных.  
CAP-теорема: в любой СУБД невозможно одновременно достичь всех трех условий CAP (только два максимум).  

В нераспределенных СУБД достигаются первые два условия (структуры вроде partition, foreign keys задают проверки на валидность  
таблиц после каждых обновлений, если один клиент что то записал, то другой клиент после этого считает только актуальную версию)  
При этом сервер будет отвечать на любой запрос (пусть и с задержкой на проверку валидности состояния при записи).  
Однако не будет выполняться P-условие по определению - база хранится на одном сервере, у системы нет частей.  
  
Для распределенных СУБД по определению выполняется P-условие, а дальше либо C либо A. 

#### Нераспределенные СУБД

Например, Postgre, Transact-sql. Система управления данными хостится на одной машине (сервере).  
Для подобных баз данных справедлив <u>ACID</u> фреймворк.  
1. Atomic. Транзакция выполняется полностью или не выполняется. Если я делаю запрос drop table  
но на половине его останавливаю - происходит откат (roll back) к изначальному состоянию. Аналогично select  
либо возвращает полный результат, либо не возвращает ничего при остановке транзакции


2. Consistency. Любая транзакция переводит таблицу из одного валидного состояния в другое.  
Про это говорили выше, доп. условия валидности задаются ключами - например, что нельзя в таблице    
создать две строчки с одинаковым primary key, если он определен.  


3. Isolation. Параллельные транзакции не влияют друг на друга. Если два пользователя одновременно  
общаются с таблицей - то тот чей запрос прилетит в стек позже, увидет уже измененную первым запросом таблицу итд.  
  
  
4. Durability. Надежность - после совершения транзакции, эти изменения фиксируются в физической памяти.  
То есть они не пропадут даже если произойдет сбой сервера, все детали транзакции зафиксированы в журналах.

---
<u>Ключи в таблицах</u>  
Это по сути правила, помогающие определить, что такое валидная таблица для обеспечения Consistency.  
PK = primary key - уникальный ключ таблицы, не содержит NULL, уникальный идентификатор сущности.  
Например, в таблице с клиентами это может быть user_id. Нельзя в таблицу завести дубль user_id или  
добавить строчку без этого значения. Данные транзакции будут отклонены.  
  
FK = foreign key (внешний ключ). Нужен для однозначной связки child-таблицы с ее parent. Например,  
если это таблица заказов, то в ней может быть FK(user_id) указывающий на PK(user_id) родительской  
таблицы. Нельзя добавлять сущность, у которой нет parent_user_id. Таких ключей в одной таблице может  
быть несколько, кроме того допускаются NULL значения.  
  
<u>Индексы</u>  
Это дополнительные структуры с указателями на данные, позволяющие быстрее осуществлять поиск по таблице.  
Если к примеру есть большая таблица и в ней поле user_id, то INDEX(user_id) позволяет быстро  
находить конкретные строки таблицы по индексу без сканирования всей таблицы.  

---
Типы индексов:  
1) hash. Только точный поиск. Колонка user_id -> каждому ее значению сопоставляется hash,  
или ключ - по которому записаны указатели на нужную строку таблицы (указатель = адрес в памяти диска).   
Коллизия - когда два разных user_id имеют один хеш. В этом случае пишется список:  
{hash1 : (user_1_info, user_2_info)} - проходим по нему, смотрим там нужный user_id.  

2) B-tree. Точечный и интервальный поиск - основной тип индексов в postgre.  
Это сбалансированное отсортированное дерево ключей. Пусть ищем строки для user_id between X and Y.  
Спускаемся по дереву - на каждой развилке меньшие значения ключей в левой ветке, большие в правой.  
В каждой node дерева есть информация о min/max значениях того что под ней.  
В итоговых листьях - отсортированные списки ключей колонки.  
Смотрим node1, проверяем пересекается ли (node1_min, node1_max) с (X, Y). Если нет, идем к следующей  
ноде дерева итд. Если есть, то спускаемся ниже - на уровень node11, node12, ..., такие же проверки.  
В итоге выбираем только нужные листья - считываем из них нужные адреса строк - идем читать данные.  
  
---
Способы организации данных таблицы:  
1) Clustered - данные в физической памяти отсортированы по ключам индекса. Когда напр. через дерево  
получен нужный диапазон ключей - можно последовательно считывать данные с нужного места диска, это быстро.  
Но любая update-команда таблицы вызывает ее сортировку, что медленно.  
 
2) Non clustered - данные разбросаны как то по диску памяти, так их быстрее записывать, ключи лишь  
дают адрес разных строк. Но считывание по этим адресам идет медленнее.  

3) Partitioned - разные части таблицы сгруппированы по конкретным колонкам в разные файлы в памяти.  
Например один день day = 1 в таблице - это один файл. Внутри такой партиции поиск может идти уже по  
описанным выше индексам или без них.  

<u>Шардирование</u>  
Это опциональная возможность горизонтального масштабирования в нераспределенных СУБД.  
Некоторая таблица table разбивается на фрагменты: table1, table2, ...  
Каждый фрагмент пишется на свой сервер (шард) и когда к таблице идут запросы, обрабатывается отдельно  
и параллельно. Затем результаты совместно агрегируются на управляющем сервере.  
По сути это имитация работы распределенной СУБД

#### Распределенные СУБД

Например, hadoop + hive, presto, spark. Подходит для работы с большими данными.  
Выделяется две компоненты подобной системы:
1. распределенная файловая система (например, hdfs = hadoop file system) хранения/записи данных  
2. СУБД для обработки данных (встроенная в хадуп либо надстройки вроде hive, presto ...)  
  
---
<u>Файловая система</u>  
Рассмотрим на примере hdfs. Выделяют множество нод - независимых узлов (серверов) с физической памятью  
для хранения данных и процессорами. Некоторая таблица table разбивается на фрагменты - они пишутся в разные  
ноды. Фрагменты также реплицируются (один фрагмент может иметь 2-3 копии, пишутся на разные ноды).  
Репликация позволяет не потерять данные если одна из нод сломается, а также ускоряет работу:  
когда на центральную ноду идет запрос поработать с фрагментом таблицы - нода выбирает ближайшие в сети  
ноды, содержащие нужные фрагменты. Ноды **сами** проводят обработку фрагментов, чтоб не гонять их по сети  
и отдают часто уже агрегаты, итоговые агрегации происходят в центральной ноде
  
По CAP-теореме здесь поддерживается P, но ломается либо C либо A.  
C = консистентность = таблица существует только в валидных состояниях  
A = доступность = на любую транзакцию сервер возвращает ответ.  
Здесь нужно либо подождать чтобы после изменения таблица стала валидной (проверки разошлись по всем нодам)  
либо получать ответы, но не всегда актуальные.  
  
Ключи (primary, foreign),а также индексы в распрделенных системах не работают.  
Так как поиск по таблицам распределенный и слишком дорого поддерживать единые структуры контроля  
вроде индексов или ключей. Зато норм работают партиции - куски таблиц физически пишутся в отдельные  
файлы или инкапсулируются на отдельных нодах для упрощенной обработки.  
  
ACID фреймворк также здесь не работает, нарушается Consistency (наблюдаются временные несогласованности, пока обновление таблицы достигнет всех нод сети) или Atomic (иногда слишком дорого делать rollback, но зависит от    настройки СУБД). В высоконагруженных базах может страдать и Isolation  
  
Фреймворк ACID в распределенных системах заменяется на BASE (basic available, soft state, eventual consistency)  
Ради работы с высоконагруженными данными снижаются издержки на валидность, доступность чтения итд.  
Можно выбирать разные уровни строгости в зависимости от ситуации (трейд офф скорости - согласованности)

---
<u>Распределенные СУБД</u>  
Исторически первая связка была hadoop = hdfs + hive. Hive - sql-подобный движок у которого под капотом  
обработка данных по фреймворк MapReduce.  
  
Описание фреймворка распределенных вычислений <u>MapReduce</u>.  
Пусть имеем таблицу table и хотим произвести над ней обработку.  
1) split. Формально разбиваем таблицу на части. В реальности она уже так распределена,  
нужные ноды получают инструкцию по обработке своей части таблицы.  
2) map. Каждый фрагмент таблицы (строка) получает свой hash - по которому таблицы будут сгруппированы на ноды.  
hash в каждой задаче выбирается так, чтобы похожие по типу обработки строки отправлялись на одну ноду.  
3) shuffle. Происходит группировка однотипных данных на схожие ноды для удобной агрегации.  
Некоторые данные уже есть на этих нодах, некоторые пересылаются из ближайших.  
4) reduce. ноды-воркеры на которые пришли данные начинают их агрегацию. Затем информация группируется  
на центральной ноде, которая проводит итоговую агрегацию и выдает ответ.  
  
Пример:  
Надо посчитать число вхождений разных слов в текст: "лодка плывет, лодка качается, качается на воде"  
Текст разбивается на куски, каждая нода хеширует каждое слово функцией предлагаемой центральной нодой.  
Допустим {лодка : hash1, качается : hash2, ...}. Дальше слова с одинаковым хешем передаются на одну ноду,  
то есть будет нода в которой информация: (лодка, лодка). Так как у нее вся информация по этому слову, ей  
не надо координироваться с другими нодами - можно просто посчитать число слов: {лодка : 2}.  
Дальше все воркеры передают свои агрегаты на центральную ноду и получаем ответ:  
{лодка : 2, плывет : 1, качается : 2, ...}

<u>Spark, Presto</u>  
https://datareview.info/article/znakomstvo-s-apache-spark/  
Минус MapReduce - что схема не очень гибкая (конкретная последовательность шагов), не идеально под все флоу.  
Также mapreduce больше ориентирован на чтение/запись данных в физическую память (SSD), что долго.  
Spark - альтернативный фреймворк в котором составляется более гибкий DAG (graph) обработки данных -  
основная цель, максимизировать использование имеющейся на нодах RAM для обработки - увеличить скорость.  
Presto - аналогично имеет свой пайплайн и даже аналог hdfs хранилища для лучшей оптимизации своего флоу.   
Эти фреймворки требуют более дорогих ресурсов (RAM), но при ограниченном объеме вычислений делают это быстрее.  
Hive подоходит для больших запросов (физической памяти много), но делает это медленнее.  

#### NoSQL СУБД

Реляционные базы данных рассмотренные выше - это БД с таблицами с фиксированными схемами и правилами  
соотношения колонок. Например данные вида (произвольный json) хранить и обрабатывать в колонке реляционной  
базы данных не очень удобно, к тому же ломаются все правила валидации таблиц.  
  
При масштабировании объемов записи в базы данных удобно отказаться от реляционного формата и накладных  
расходов по его поддержке в пользу гибкости и меньшей валидации состояний.  
NoSql базы поддерживают гибкие форматы записи данных (вроде картинок, произвольной структуры документов и пр). 
  
Пример, MongoDB - документоориентированная база. Допустим, документ - это произвольный json.  
У нас есть колонка data -> нам нужно по ней агрегировать, параллельно делая парсинг json по нужным полям.  
В MongoDB архитектура позволяет это делать значительно быстрее чем например в Postgre. Более того,  
фильтрация во вложенных структурах - основной функционал Mongo.  
  
Технические фишки Mongo:  
json хранится в формате binary-json + поддерживается индексация по вложенным полям.  
Для парсинга json (например data{0}{'val'}) не считывается весь объект из базы, а только конкретные поля итд.  

NoSql СУБД могут быть распределенными или локальными, на одном сервере.  

#### Sql explain analyzer

Оценка перфоманса sql запроса с декомпозицией по шагам  
Рассмотрено на примере presto - так как там действует DAG схема, то  
есть ряд операторов которые в том числе распараллеливают вычисления и.т.д  
Список возможных операторов приведен ниже, новые также можно гуглить

In [1]:
# пример запроса выгруженного из trino
query_plan = """Trino version: 409-dirty\nQueued: 103.92us, Analysis: 172.36ms, Planning: 52.39ms, Execution: 3.34s\nFragment 1 [SINGLE]\n    CPU: 150.37ms, Scheduled: 625.24ms, Blocked 3.07m (Input: 1.50m, Output: 0.00ns), Input: 3040 rows (53.44kB); per task: avg.: 3040.00 std.dev.: 0.00, Output: 1 row (23B)\n    Output layout: [count_6, expr_7, count]\n    Output partitioning: SINGLE []\n    Project[]\n    │   Layout: [count_6:bigint, expr_7:integer, count:bigint]\n    │   Estimates: {rows: 1 (23B), cpu: 23, memory: 0B, network: 0B}\n    │   CPU: 1.00ms (0.01%), Scheduled: 1.00ms (0.00%), Blocked: 0.00ns (0.00%), Output: 1 row (23B)\n    │   Input avg.: 0.03 rows, Input std.dev.: 556.78%\n    │   expr_7 := 2\n    └─ LocalExchange[partitioning = ROUND_ROBIN]\n       │   Layout: [count_6:bigint, count:bigint]\n       │   Estimates: {rows: 1 (18B), cpu: 18, memory: 0B, network: 0B}\n       │   CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 48.45s (0.19%), Output: 1 row (18B)\n       │   Input avg.: 1.00 rows, Input std.dev.: 0.00%\n       └─ Aggregate[type = FINAL]\n          │   Layout: [count_6:bigint, count:bigint]\n          │   Estimates: {rows: 1 (18B), cpu: ?, memory: 18B, network: 0B}\n          │   CPU: 17.00ms (0.12%), Scheduled: 17.00ms (0.03%), Blocked: 0.00ns (0.00%), Output: 1 row (18B)\n          │   Input avg.: 3040.00 rows, Input std.dev.: 0.00%\n          │   count_6 := count("count_8")\n          │   count := count("count_9")\n          └─ LocalExchange[partitioning = SINGLE]\n             │   Layout: [count_9:bigint, count_8:bigint]\n             │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}\n             │   CPU: 23.00ms (0.16%), Scheduled: 22.00ms (0.03%), Blocked: 2.80s (0.01%), Output: 3040 rows (53.44kB)\n             │   Input avg.: 95.00 rows, Input std.dev.: 32.16%\n             └─ RemoteSource[sourceFragmentIds = [2]]\n                    Layout: [count_9:bigint, count_8:bigint]\n                    CPU: 91.00ms (0.64%), Scheduled: 562.00ms (0.84%), Blocked: 1.50m (0.35%), Output: 3040 rows (53.44kB)\n                    Input avg.: 95.00 rows, Input std.dev.: 32.16%\n\nFragment 2 [HASH]\n    CPU: 1.85s, Scheduled: 13.04s, Blocked 3.53h (Input: 1.73h, Output: 0.00ns), Input: 263673 rows (4.76MB); per task: avg.: 2775.51 std.dev.: 11600.81, Output: 3040 rows (53.44kB)\n    Amount of input data processed by the workers for this stage might be skewed\n    Output layout: [count_9, count_8]\n    Output partitioning: SINGLE []\n    Aggregate[type = PARTIAL]\n    │   Layout: [count_9:bigint, count_8:bigint]\n    │   CPU: 397.00ms (2.78%), Scheduled: 1.41s (2.10%), Blocked: 0.00ns (0.00%), Output: 3040 rows (53.44kB)\n    │   Input avg.: 86.73 rows, Input std.dev.: 2377.07%\n    │   count_9 := count("employer_id") (mask = employer_id$distinct)\n    │   count_8 := count("expr") (mask = expr$distinct)\n    └─ MarkDistinct[distinct = [expr:integer], marker = expr$distinct, hash = [$hashvalue]]\n       │   Layout: [expr:integer, employer_id:integer, employer_id$distinct:boolean, $hashvalue:bigint, expr$distinct:boolean]\n       │   CPU: 193.00ms (1.35%), Scheduled: 1.07s (1.59%), Blocked: 0.00ns (0.00%), Output: 263673 rows (4.77MB)\n       │   Input avg.: 86.73 rows, Input std.dev.: 2377.07%\n       └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue], arguments = ["expr"]]\n          │   Layout: [expr:integer, employer_id:integer, employer_id$distinct:boolean, $hashvalue:bigint]\n          │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}\n          │   CPU: 230.00ms (1.61%), Scheduled: 3.99s (5.93%), Blocked: 1.57h (22.20%), Output: 263673 rows (4.76MB)\n          │   Input avg.: 86.73 rows, Input std.dev.: 440.62%\n          └─ RemoteSource[sourceFragmentIds = [3]]\n                 Layout: [expr:integer, employer_id:integer, employer_id$distinct:boolean, $hashvalue_10:bigint]\n                 CPU: 290.00ms (2.03%), Scheduled: 1.85s (2.76%), Blocked: 1.73h (24.47%), Output: 263673 rows (4.76MB)\n                 Input avg.: 86.73 rows, Input std.dev.: 440.62%\n\nFragment 3 [HASH]\n    CPU: 9.86s, Scheduled: 48.14s, Blocked 2.17h (Input: 1.08h, Output: 0.00ns), Input: 263673 rows (7.04MB); per task: avg.: 2775.51 std.dev.: 35.88, Output: 263673 rows (4.76MB)\n    Output layout: [expr, employer_id, employer_id$distinct, $hashvalue_11]\n    Output partitioning: HASH [expr][$hashvalue_11]\n    Project[]\n    │   Layout: [expr:integer, employer_id:integer, $hashvalue_11:bigint, employer_id$distinct:boolean]\n    │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}\n    │   CPU: 3.50s (24.54%), Scheduled: 8.34s (12.42%), Blocked: 0.00ns (0.00%), Output: 263673 rows (4.76MB)\n    │   Input avg.: 86.73 rows, Input std.dev.: 10.50%\n    └─ MarkDistinct[distinct = [employer_id:integer], marker = employer_id$distinct, hash = [$hashvalue_12]]\n       │   Layout: [expr:integer, employer_id:integer, $hashvalue_11:bigint, $hashvalue_12:bigint, employer_id$distinct:boolean]\n       │   CPU: 2.05s (14.36%), Scheduled: 4.72s (7.03%), Blocked: 0.00ns (0.00%), Output: 263673 rows (7.31MB)\n       │   Input avg.: 86.73 rows, Input std.dev.: 10.50%\n       └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue_12], arguments = ["employer_id"]]\n          │   Layout: [expr:integer, employer_id:integer, $hashvalue_11:bigint, $hashvalue_12:bigint]\n          │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}\n          │   CPU: 2.09s (14.66%), Scheduled: 6.80s (10.12%), Blocked: 1.04h (14.69%), Output: 263673 rows (7.04MB)\n          │   Input avg.: 86.73 rows, Input std.dev.: 152.26%\n          └─ RemoteSource[sourceFragmentIds = [4]]\n                 Layout: [expr:integer, employer_id:integer, $hashvalue_13:bigint, $hashvalue_14:bigint]\n                 CPU: 1.11s (7.80%), Scheduled: 24.95s (37.14%), Blocked: 1.08h (15.29%), Output: 263673 rows (7.04MB)\n                 Input avg.: 86.73 rows, Input std.dev.: 152.26%\n\nFragment 4 [HASH]\n    CPU: 2.87s, Scheduled: 14.17s, Blocked 1.80h (Input: 59.47m, Output: 0.00ns), Input: 327832 rows (7.42MB); per task: avg.: 3450.86 std.dev.: 7621.27, Output: 263673 rows (7.04MB)\n    Amount of input data processed by the workers for this stage might be skewed\n    Output layout: [expr, employer_id, $hashvalue_21, $hashvalue_20]\n    Output partitioning: HASH [employer_id][$hashvalue_20]\n    Project[]\n    │   Layout: [expr:integer, employer_id:integer, $hashvalue_20:bigint, $hashvalue_21:bigint]\n    │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}\n    │   CPU: 810.00ms (5.68%), Scheduled: 5.51s (8.20%), Blocked: 0.00ns (0.00%), Output: 263673 rows (7.04MB)\n    │   Input avg.: 86.73 rows, Input std.dev.: 638.70%\n    │   $hashvalue_21 := combine_hash(bigint \'0\', COALESCE("$operator$hash_code"("expr"), 0))\n    └─ Project[]\n       │   Layout: [expr:integer, employer_id:integer, $hashvalue_20:bigint]\n       │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}\n       │   CPU: 314.00ms (2.20%), Scheduled: 593.00ms (0.88%), Blocked: 0.00ns (0.00%), Output: 263673 rows (5.90MB)\n       │   Input avg.: 86.73 rows, Input std.dev.: 638.70%\n       │   expr := (CASE WHEN ("discard_status" = 0) THEN "employer_id" END)\n       │   $hashvalue_20 := combine_hash(bigint \'0\', COALESCE("$operator$hash_code"("employer_id"), 0))\n       └─ InnerJoin[criteria = ("area_id" = "area_id_1"), hash = [$hashvalue_15, $hashvalue_17], distribution = PARTITIONED]\n          │   Layout: [employer_id:integer, discard_status:integer]\n          │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}\n          │   CPU: 829.00ms (5.81%), Scheduled: 2.84s (4.23%), Blocked: 22.76m (5.37%), Output: 263673 rows (4.40MB)\n          │   Left (probe) Input avg.: 104.95 rows, Input std.dev.: 555.73%\n          │   Right (build) Input avg.: 2.89 rows, Input std.dev.: 57.45%\n          │   Distribution: PARTITIONED\n          │   dynamicFilterAssignments = {area_id_1 -> #df_743}\n          ├─ RemoteSource[sourceFragmentIds = [5]]\n          │      Layout: [employer_id:integer, area_id:integer, discard_status:integer, $hashvalue_15:bigint]\n          │      CPU: 3.00ms (0.02%), Scheduled: 19.00ms (0.03%), Blocked: 45.15m (10.64%), Output: 319057 rows (7.30MB)\n          │      Input avg.: 104.95 rows, Input std.dev.: 555.73%\n          └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue_17], arguments = ["area_id_1"]]\n             │   Layout: [area_id_1:integer, $hashvalue_17:bigint]\n             │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}\n             │   CPU: 108.00ms (0.76%), Scheduled: 513.00ms (0.76%), Blocked: 14.45m (3.41%), Output: 8775 rows (119.97kB)\n             │   Input avg.: 2.89 rows, Input std.dev.: 558.74%\n             └─ RemoteSource[sourceFragmentIds = [6]]\n                    Layout: [area_id_1:integer, $hashvalue_18:bigint]\n                    CPU: 0.00ns (0.00%), Scheduled: 28.00ms (0.04%), Blocked: 14.32m (3.38%), Output: 8775 rows (119.97kB)\n                    Input avg.: 2.89 rows, Input std.dev.: 558.74%\n\nFragment 5 [SOURCE]\n    CPU: 2.20s, Scheduled: 3.91s, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 10789427 rows (239.22MB); per task: avg.: 1078942.70 std.dev.: 720610.15, Output: 319057 rows (7.30MB)\n    Output layout: [employer_id, area_id, discard_status, $hashvalue_16]\n    Output partitioning: HASH [area_id][$hashvalue_16]\n    ScanFilterProject[table = hive:snapshot2:employer, filterPredicate = ("creation_time" > TIMESTAMP \'2025-01-01 00:00:00.000000\'), dynamicFilters = {"area_id" = #df_743}]\n        Layout: [employer_id:integer, area_id:integer, discard_status:integer, $hashvalue_16:bigint]\n        Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}\n        CPU: 2.20s (15.41%), Scheduled: 3.91s (5.82%), Blocked: 0.00ns (0.00%), Output: 319057 rows (7.30MB)\n        Input avg.: 719295.13 rows, Input std.dev.: 95.74%\n        $hashvalue_16 := combine_hash(bigint \'0\', COALESCE("$operator$hash_code"("area_id"), 0))\n        creation_time := creation_time:timestamp:REGULAR\n        area_id := area_id:int:REGULAR\n        discard_status := discard_status:int:REGULAR\n        employer_id := employer_id:int:REGULAR\n        Input: 10789427 rows (239.22MB), Filtered: 97.04%, Physical input: 99.30MB, Physical input time: 1062.00ms\n        Dynamic filters: \n            - df_743, [ SortedRangeSet[type=integer, ranges=8775, {[1], ..., [11876]}] ], collection time=1.11s\n\nFragment 6 [SOURCE]\n    CPU: 9.15ms, Scheduled: 26.39ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 10157 rows (93.81kB); per task: avg.: 10157.00 std.dev.: 0.00, Output: 8775 rows (119.97kB)\n    Output layout: [area_id_1, $hashvalue_19]\n    Output partitioning: HASH [area_id_1][$hashvalue_19]\n    ScanFilterProject[table = hive:snapshot2:area, filterPredicate = ("country_name" = VARCHAR U&\'\\0420\\043E\\0441\\0441\\0438\\044F\')]\n        Layout: [area_id_1:integer, $hashvalue_19:bigint]\n        Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}\n        CPU: 9.00ms (0.06%), Scheduled: 27.00ms (0.04%), Blocked: 0.00ns (0.00%), Output: 8775 rows (119.97kB)\n        Input avg.: 10157.00 rows, Input std.dev.: 0.00%\n        $hashvalue_19 := combine_hash(bigint \'0\', COALESCE("$operator$hash_code"("area_id_1"), 0))\n        area_id_1 := area_id:int:REGULAR\n        country_name := country_name:string:REGULAR\n        Input: 10157 rows (93.81kB), Filtered: 13.61%, Physical input: 269.29kB, Physical input time: 5950000.00ns\n\n"""

In [3]:
# ФУНКЦИИ ДЛЯ ПАРСИНГА
def get_cpu_of_fragments(query_plan):
    def to_seconds(s):
        if 'ms' in s or 'us' in s:
            return 0        
        num = float(s[:-1])
        unit = s[-1]
        return int(num * 3600 if unit == 'h' else num * 60)
    parse = query_plan.split('Fragment ')
    frag_num_list = []
    frag_list = []
    cpu_list = []
    cpu_in_sec_list = []
    for idx, frag in enumerate(parse):
        if idx == 0:
            continue
        cpu = frag.split('CPU: ')[1].split('Input')[0][:-2]
        frag_num_list.append(idx)
        cpu_list.append(cpu)
        cpu_in_sec_list.append(to_seconds(cpu.split(', Scheduled: ')[0]))
        frag_list.append(('Fragment ' + frag).strip())

    return pd.DataFrame({'frag_num' : frag_num_list,
                         'CPU & Scheduled' : cpu_list,
                         'CPU_in_sec' : cpu_in_sec_list, 'frag' : frag_list}).sort_values(by='CPU_in_sec', ascending=False)

def remove_combine_hash(text):
    pattern = re.compile(r'combine_hash\(')
    while True:
        start = text.find('combine_hash(')
        if start == -1:
            break
        # найти соответствующую закрывающую скобку
        count = 0
        for i in range(start, len(text)):
            if text[i] == '(':
                count += 1
            elif text[i] == ')':
                count -= 1
                if count == 0:
                    end = i
                    break
        # simplifier combine_hash(...)
        text = text[:start] + 'combine_hash[...]' + text[end+1:]
    return text

def get_frag(df_frag, frag_num, simplify = True):
    def get_fat(frag, expr):
        return frag.replace(expr, f"\033[4m{expr}\033[0m""")
    # получаем frag
    frag = df_frag[df_frag.frag_num == frag_num].frag.iloc[0]
    if simplify:
        cleaned = re.sub(r'Layout:\s*\[.*?\]', 'Layout:[...]', frag)
        cleaned = re.sub(r'Output layout:\s*\[.*?\]', 'Output Layout:[...]', cleaned)
        cleaned = re.sub(r'\n.*Estimates.*\n', "\n", cleaned)
        cleaned = re.sub(r'\n.*Input avg.:.*\n', "\n", cleaned)
        frag = remove_combine_hash(cleaned)
    # frag = re.sub(r'(CPU:\s*\d+\.\d+\w)', r'\033[1;4m\1\033[0m', frag)
    frag = re.sub(r'(CPU:\s*\d+\.\d+\w(?: \(\d+\.\d+%\))?)', r'\033[1;4m\1\033[0m', frag)
    expr_list = ['Aggregate', 'ScanFilterProject', 'InnerJoin', 'LeftJoin',
                 'RemoteSource', 'LocalExchange', 'FilterProject', 'Project', 'Window', 'ScanProject', 'RemoteExchange', 'MarkDistinct']
    for expr in expr_list:
        frag = get_fat(frag, expr)
    
    print(frag)

def get_total_perfomance(query_plan):
    print(query_plan.split('Fragment')[0])

In [7]:
# перфоманс всего запроса
get_total_perfomance(query_plan)

Trino version: 409-dirty
Queued: 103.92us, Analysis: 172.36ms, Planning: 52.39ms, Execution: 3.34s



In [9]:
# получаем список Fragment с CPU, отсортированных по тяжести
df = get_cpu_of_fragments(query_plan)
df

Unnamed: 0,frag_num,CPU & Scheduled,CPU_in_sec,frag
2,3,"9.86s, Scheduled: 48.14s, Blocked 2.17h",591,"Fragment 3 [HASH]\n CPU: 9.86s, Scheduled: 48.14s, Blocked 2.17h (Input: 1.08h, Output: 0.00n..."
3,4,"2.87s, Scheduled: 14.17s, Blocked 1.80h",172,"Fragment 4 [HASH]\n CPU: 2.87s, Scheduled: 14.17s, Blocked 1.80h (Input: 59.47m, Output: 0.00..."
4,5,"2.20s, Scheduled: 3.91s, Blocked 0.00ns",132,"Fragment 5 [SOURCE]\n CPU: 2.20s, Scheduled: 3.91s, Blocked 0.00ns (Input: 0.00ns, Output: 0...."
1,2,"1.85s, Scheduled: 13.04s, Blocked 3.53h",111,"Fragment 2 [HASH]\n CPU: 1.85s, Scheduled: 13.04s, Blocked 3.53h (Input: 1.73h, Output: 0.00n..."
0,1,"150.37ms, Scheduled: 625.24ms, Blocked 3.07m",0,"Fragment 1 [SINGLE]\n CPU: 150.37ms, Scheduled: 625.24ms, Blocked 3.07m (Input: 1.50m, Output..."
5,6,"9.15ms, Scheduled: 26.39ms, Blocked 0.00ns",0,"Fragment 6 [SOURCE]\n CPU: 9.15ms, Scheduled: 26.39ms, Blocked 0.00ns (Input: 0.00ns, Output:..."


In [10]:
# simplify - свернуть в фрагменте большие куски метаинформации (layout с набором колонок , combinehash итд)
#  Fragment разбивается на дерево операторов которые работают с данными и занимают cpu - его значения и процент от тотал cpu подчеркнуты
# также подчеркнуты названия операторов, список их значений ниже (можно пополнять)

# при оценке перфоманса
# CPU: 16.56s, Scheduled: 1.18m, Blocked 0.00ns
# cpu_time = полное время выполнения запроса на всех ядрах (если их много то время превышает реальное время выполнения запроса из за параллельности)
# scheduled = полное время ожидания ресурса cpu на всех ядрах (зависит от других запросов в кластере итд - шумная штука)
# blocked = полное время ожидания из за блокировок/чтения-записи с диска/доступа к буферам итд
# оптимизировать свой запрос оптимальнее всего следя за тем как снижается cpu_time

get_frag(df, frag_num = 5, simplify=True)

Fragment 5 [SOURCE]
    [1;4mCPU: 2.20s[0m, Scheduled: 3.91s, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 10789427 rows (239.22MB); per task: avg.: 1078942.70 std.dev.: 720610.15, Output: 319057 rows (7.30MB)
    Output Layout:[...]
    Output partitioning: HASH [area_id][$hashvalue_16]
    [4mScan[4mFilter[4mProject[0m[0m[0m[table = hive:snapshot2:employer, filterPredicate = ("creation_time" > TIMESTAMP '2025-01-01 00:00:00.000000'), dynamicFilters = {"area_id" = #df_743}]
        Layout:[...]
        [1;4mCPU: 2.20s (15.41%)[0m, Scheduled: 3.91s (5.82%), Blocked: 0.00ns (0.00%), Output: 319057 rows (7.30MB)
        $hashvalue_16 := combine_hash[...]
        creation_time := creation_time:timestamp:REGULAR
        area_id := area_id:int:REGULAR
        discard_status := discard_status:int:REGULAR
        employer_id := employer_id:int:REGULAR
        Input: 10789427 rows (239.22MB), Filtered: 97.04%, Physical input: 99.30MB, Physical input time: 1062.00ms
        

In [None]:
# разные виды операторов, список можно дополнять

'Aggregate' # агрегация с совершением операций вроде max, sum ...

'InnerJoin' #  джойны
'LeftJoin'

'MarkDistinct' # помечает уникальные строки (для операций типа count distinct ...)

'RemoteSource'  # получение данных из другого узла (например заливаем таблицу на ноду для ее обработки)

'LocalExchange' # перегрупировка данных перед совершением операции на одной local ноде
'RemoteExchange' # перераспределение данных между нодами для параллельного выполнения операции


'Window' # вычисление оконки

'FilterProject' # фильтрация по типу where итд
'Project' # оператор типа calc_field когда надо получить колонку из др колонок (например Y = X * 2) - проекция
'ScanProject' # сканирует таблицу + выбирает нужные колонки (проекция)
'ScanFilterProject' # отбор данных вместе с фильтрацией