http://www.bigdataschool.ru

<img src="logo.png" align="right">

# Trino и работа с данными

<hr style="border:2px solid #294A70"> </hr>

<h3 style="color: #294A70">Практические материалы</h3>

<div style="color:red; text-align: right"> Версия 1.02 от 10 января 2025 </div>

In [18]:
from trsql_h import _psql, _sql

In [19]:
_psql("show catalogs")

## PostgreSQL

In [20]:
_psql("show tables in postgres.public")

In [21]:
_sql("select * from postgres.public.countries limit 3")

Unnamed: 0,country,region,population,area,popdensity,coastline,netmigration,infantmortality,gdp,literacy,phones,arable,crops,other,climate,birthrate,deathrate,agriculture,industry,service
0,Afghanistan,ASIA (EX. NEAR EAST),31056997,647500,480,0,2306,16307,700,360,32,1213,22,8765,1,466,2034,38,24,38
1,Albania,EASTERN EUROPE,3581655,28748,1246,126,-493,2152,4500,865,712,2109,442,7449,3,1511,522,232,188,579
2,Algeria,NORTHERN AFRICA,32930091,2381740,138,4,-39,31,6000,700,781,322,25,9653,1,1714,461,101,6,298


In [None]:
_psql("create table postgres.public.testtab ( id int )")

In [None]:
_psql("insert into postgres.public.testtab values ( 0 )")

In [22]:
_psql("select * from postgres.public.testtab")

## Hive (S3)

In [None]:
_psql("show catalogs")

Эта схема создала директорию s3test.db в ~/warehouse

In [None]:
_psql("create schema hive.s3test")

После корректной настройки прав эта схема оказалась в S3

In [None]:
_psql("create schema hive.s3new with (location = 's3a://trino-bds/trino_test/')")

In [None]:
_psql("create table hive.s3new.testtab ( id int )")

In [24]:
_psql("insert into hive.s3new.testtab values ( 1 )")

In [25]:
_psql("select * from hive.s3new.testtab")

Можно поработать с готовыми данными, которые создал spark

In [None]:
_psql("""
create table hive.s3new.range ( id int ) 
with ( format='TEXTFILE', textfile_field_separator=',', external_location = 's3a://trino-bds/range/' )
""")

In [26]:
_psql("select * from hive.s3new.range")

### Partitions

Создадим партиционированную таблицу:

* партиционируем по году (целочисленная колонка)
* каждая партиция - "директория" в S3 (единственно возможный способ для Hive таблиц)
* партиционирование только по последним колонкам (в порядке их следования)

In [None]:
_psql("""
create table hive.s3new.quotes_part (
    secid varchar,
    mn int,
    dt int,
    bid decimal(7,2),
    ask decimal(7,2),
    yr int
)
with (
    format='TEXTFILE', 
    textfile_field_separator=',',
    partitioned_by = ARRAY['yr']
)
""")

In [None]:
_psql("""
insert into hive.s3new.quotes_part
values
    ( 'LKOH', 1, 1, 100.1, 110.1, 2025 ),
    ( 'LKOH', 1, 2, 100.2, 110.2, 2025 )
""")    

In [None]:
_psql("""
insert into hive.s3new.quotes_part
values
    ( 'LKOH', 12, 30, 90.9, 100.9, 2024 )
""")    

In [27]:
_sql("""select * from hive.s3new.quotes_part""")

Unnamed: 0,secid,mn,dt,bid,ask,yr
0,LKOH,1,1,100.1,110.1,2025
1,LKOH,1,2,100.2,110.2,2025
2,LKOH,12,30,90.9,100.9,2024


## Iceberg S3

In [None]:
_psql("create schema ice.s3 with (location = 's3a://trino-bds/trino_ice/')")

In [None]:
_psql("create table ice.s3.testtab ( id int )")

In [None]:
_psql("insert into ice.s3.testtab values ( 0 )")

In [28]:
_psql("select * from ice.s3.testtab")

### Partitions в Iceberg

Создадим партиционированную таблицу:

* партиционируем по году (функция) поля типа DATE
    * не обязательно по колонке
* каждая партиция - "директория" в S3 (но это от нас скрыто)

По умолчанию используется формат PARQUET

In [30]:
_psql("""
create table ice.s3.quotes_part (
    secid varchar,
    dt date,
    bid decimal(7,2),
    ask decimal(7,2)
)
with (
    partitioning = ARRAY['year(dt)']
)
""")

In [None]:
_psql("""
drop table ice.s3.quotes_part
""")    

In [31]:
_psql("""
insert into ice.s3.quotes_part
values
    ( 'LKOH', date '2025-01-01', 100.1, 110.1 ),
    ( 'LKOH', date '2025-01-02', 100.2, 110.2 )
""")    

In [32]:
_psql("""
insert into ice.s3.quotes_part
values
    ( 'LKOH', date '2024-12-30', 90.9, 100.9 )
""")    

In [33]:
_sql("""select * from ice.s3.quotes_part""")

Unnamed: 0,secid,dt,bid,ask
0,LKOH,2024-12-30,90.9,100.9
1,LKOH,2025-01-01,100.1,110.1
2,LKOH,2025-01-02,100.2,110.2


Что случилось с данными для нас не важно (в плане их физического размещения) - это Iceberg "берет на себя"

#### Системные таблицы (история=транзакции, партиции)

In [None]:
_psql("""select * from ice.s3."quotes_part$history" """)

In [None]:
_psql("""select * from ice.s3."quotes_part$partitions" """)

#### Time travel

Для идентификации могут быть использованы ID снапшотов или таймстампы (что нам удобнее)

In [None]:
_sql("""select * from ice.s3.quotes_part for version as of 5473778666853397737""")

In [None]:
_sql("""select * from ice.s3.quotes_part for version as of 5588679541636484959""")

Откатим последнюю "транзакцию"

In [None]:
_psql("CALL ice.system.rollback_to_snapshot('s3', 'quotes_part', 5588679541636484959)")

In [None]:
_sql("""select * from ice.s3.quotes_part""")

Отличия снапшотов

In [None]:
_psql("""
select * from TABLE(
    ice.system.table_changes(
      schema_name => 's3',
      table_name => 'quotes_part',
      start_snapshot_id => 5473778666853397737,
      end_snapshot_id => 5588679541636484959
    )
  )
""")

#### Update

In [None]:
_psql("""update ice.s3.quotes_part set bid=100.77 where dt=date'2025-01-01'""")

In [None]:
_psql("""select * from ice.s3."quotes_part$history" """)

К сожалению, нельзя посмотреть отличия версий - пока поддерживаются только insert или delete 

In [None]:
_psql("""
select * from TABLE(
    ice.system.table_changes(
      schema_name => 's3',
      table_name => 'quotes_part',
      start_snapshot_id => 5588679541636484959,
      end_snapshot_id => 2790047832203232095
    )
  )
""")

#### Merge

Пример "федеративного" запроса:

* создадим в PostgreSQL таблицу с изменными котировками
* обновим (merge) Iceberg таблицу с использованием данных из этой таблицы

In [None]:
_psql("""
create table postgres.public.quotes_changes (
    secid varchar,
    dt date,
    bid decimal(7,2),
    ask decimal(7,2)
)
""")

In [None]:
_psql("""
insert into postgres.public.quotes_changes
values
    ( 'LKOH', date '2025-01-01', 111.1, 121.1 ),
    ( 'GAZP', date '2025-01-01', 10.1, 11.1 )
""")    

In [None]:
_sql("""select * from ice.s3.quotes_part""")

In [None]:
_psql("""
merge into ice.s3.quotes_part q using postgres.public.quotes_changes c
    on (q.secid = c.secid and q.dt = c.dt)
    when MATCHED
        then UPDATE set bid=c.bid, ask=c.ask
    when NOT MATCHED
        then INSERT values (c.secid,c.dt,c.bid,c.ask)
""")

In [None]:
_sql("""select * from ice.s3.quotes_part""")

#### Schema evolution

Изменим логику партиционирования - сделаем ее теперь по месяцу

In [None]:
_psql("""
alter table ice.s3.quotes_part 
set properties partitioning = ARRAY['month(dt)']
""")

In [None]:
_psql("""
insert into ice.s3.quotes_part
values
    ( 'LKOH', date '2025-01-07', 120.1, 130.1 )
""")    

In [None]:
_psql("""select * from ice.s3."quotes_part$partitions" """)

<hr style="border:2px solid #294A70"> </hr>

<div style="text-align: center"> © ООО «Учебный центр «Коммерсант», 2024 </div>

<div style="text-align: center"> info@bigdataschool.ru, +7(495) 41-41-121 </div>