# **Решение аналитических задач. БД PostgreSQL и ClickHouse**

#### Импорт библиотек

In [1]:
import pandas as pd
from sqlalchemy import create_engine
import yaml
import os

#### Константы проекта

In [2]:
PATH_CONFIG = '/content/drive/MyDrive/settings.yaml'
TABLE_TEST = 'test'
TABLE_CASE1 = 'case1'
TABLE_CASE2 = 'case2'
TABLE_CASE3 = 'case3'
TABLE_CASE4 = 'case4'

#### Установка БД. PostgreSQL

In [3]:
%%capture
%%bash
# Install postgresql server
sudo apt-get -y -qq update
sudo apt-get -y -qq install postgresql
sudo service postgresql start

# Setup a password `postgres` for username `postgres`
sudo -u postgres psql -U postgres -c "ALTER USER postgres PASSWORD 'postgres';"

# Setup a database with name `tfio_demo` to be used
sudo -u postgres psql -U postgres -c 'DROP DATABASE IF EXISTS db;'
sudo -u postgres psql -U postgres -c 'CREATE DATABASE db;'

In [4]:
with open(PATH_CONFIG, encoding='utf8') as f:
    settings = yaml.safe_load(f)

In [5]:
user_pg = settings['DB_PG']['USER']
password_pg = settings['DB_PG']['PASSWORD']
host_pg = settings['DB_PG']['HOST']
port_pg = settings['DB_PG']['PORT']
name_pg = settings['DB_PG']['NAME']

In [6]:
point_connect_pg = 'postgresql://{}:{}@{}:{}/{}'.format(user_pg, 
                                                        password_pg, 
                                                        host_pg, 
                                                        port_pg, 
                                                        name_pg)
con_pg = create_engine(point_connect_pg)

In [7]:
data_test = {'val1':[1,2,3],'val2':[10,20,30]}
df_test = pd.DataFrame(data=data_test)
df_test.head()

Unnamed: 0,val1,val2
0,1,10
1,2,20
2,3,30


In [8]:
df_test.to_sql(TABLE_TEST, con_pg, index=False, if_exists='replace')

3

In [9]:
def select_postgresql(sql):
    return pd.read_sql(sql,con_pg)

In [10]:
sql_pg = '''select tbl.* from test as tbl;'''

In [11]:
select_postgresql(sql_pg)

Unnamed: 0,val1,val2
0,1,10
1,2,20
2,3,30


#### Установка БД. ClickHouse

In [12]:
%%capture
!sudo apt-get install -y apt-transport-https ca-certificates dirmngr
!sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 8919F6BD2B48D754

!echo "deb https://packages.clickhouse.com/deb stable main" | sudo tee \
    /etc/apt/sources.list.d/clickhouse.list
!sudo apt-get -y -qq update

!sudo apt-get install -y -qq clickhouse-server clickhouse-client

!sudo service clickhouse-server start

In [13]:
%%capture
!pip install clickhouse-connect

In [14]:
user_ch = settings['DB_CH']['USER']
password_ch = settings['DB_CH']['PASSWORD']
host_ch = settings['DB_CH']['HOST']
port_ch = settings['DB_CH']['PORT']
name_ch = settings['DB_CH']['NAME']

In [15]:
import clickhouse_connect
client = clickhouse_connect.get_client(host=host_ch, user=user_ch, port=port_ch)   

In [16]:
client.command(f'drop database if exists {name_ch};')
client.command(f'create database {name_ch};')
client.command('show databases;')

'INFORMATION_SCHEMA\ndb\ndefault\ninformation_schema\nsystem'

In [17]:
client = clickhouse_connect.get_client(host=host_ch, 
                                       user=user_ch, 
                                       port=port_ch, 
                                       database = name_ch)

In [18]:
client.database

'db'

In [19]:
client.command(f'drop table if exists {TABLE_TEST};')
client.command(f'create table {TABLE_TEST} (val1 Int8, val2 Int8) ENGINE Memory;')

''

In [20]:
data_test = [[1, 10], [2, 20], [3, 30]]
client.insert(TABLE_TEST, data_test, column_names=['val1', 'val2'])

In [21]:
def select_clickhouse(sql):
  return client.query_df(sql)

In [22]:
sql_ch = '''select tbl.* from test as tbl'''

In [23]:
select_clickhouse(sql_ch)

Unnamed: 0,val1,val2
0,1,10
1,2,20
2,3,30


## **Задача 1. Расчет среднего, моды и медианы**

#### **PostgreSQL**

In [24]:
data_case1 = {'department':['dept1','dept1','dept1','dept1','dept1','dept2','dept2','dept2','dept2','dept2','dept2'],
              'employee':['emp1','emp2','emp3','emp4','emp5','emp6','emp7','emp8','emp9','emp10','emp11'],
              'salary':[2000,2000,3000,4000,5000,3000,3000,4000,5000,6000,7000]}
df_case1 = pd.DataFrame(data=data_case1)
df_case1.to_sql(TABLE_CASE1, con_pg, index=False, if_exists='replace')

11

In [25]:
sql_pg = '''select tbl.* from case1 as tbl;'''

In [26]:
select_postgresql(sql_pg)

Unnamed: 0,department,employee,salary
0,dept1,emp1,2000
1,dept1,emp2,2000
2,dept1,emp3,3000
3,dept1,emp4,4000
4,dept1,emp5,5000
5,dept2,emp6,3000
6,dept2,emp7,3000
7,dept2,emp8,4000
8,dept2,emp9,5000
9,dept2,emp10,6000


In [27]:
sql_pg = '''select tbl.department,
                   round(avg(tbl.salary),1) as avg,
                   mode() within group (order by tbl.salary) as mode,
                   percentile_cont(0.5) within group (order by tbl.salary) as median 
            from case1 as tbl
            group by tbl.department
            order by tbl.department;'''

In [28]:
select_postgresql(sql_pg)

Unnamed: 0,department,avg,mode,median
0,dept1,3200.0,2000,3000.0
1,dept2,4666.7,3000,4500.0


#### **ClickHouse**

In [29]:
client.command(f'drop table if exists {TABLE_CASE1};')
client.command(f"create table if not exists {TABLE_CASE1} (department	String, employee String, salary Int16) \
ENGINE = PostgreSQL('{host_pg}:{port_pg}', '{name_pg}', '{TABLE_CASE1}', '{user_pg}', '{password_pg}');")

''

In [30]:
sql_ch = '''select tbl.* from case1 as tbl'''

In [31]:
select_clickhouse(sql_ch)

Unnamed: 0,department,employee,salary
0,dept1,emp1,2000
1,dept1,emp2,2000
2,dept1,emp3,3000
3,dept1,emp4,4000
4,dept1,emp5,5000
5,dept2,emp6,3000
6,dept2,emp7,3000
7,dept2,emp8,4000
8,dept2,emp9,5000
9,dept2,emp10,6000


In [32]:
sql_ch = '''select tbl.department, 
                   round(avg(tbl.salary),1) as avg,
                   arrayElement(topK(1)(tbl.salary),1) as mode,
                   quantile(tbl.salary) as median
            from case1 as tbl
            group by tbl.department
            order by tbl.department'''

In [33]:
select_clickhouse(sql_ch)

Unnamed: 0,department,avg,mode,median
0,dept1,3200.0,2000,3000.0
1,dept2,4666.7,3000,4500.0


## **Задача 2. Построение сводной таблицы**

#### **PostgreSQL**

In [34]:
data_case2 = {'department':['dept1','dept1','dept1','dept1','dept1','dept2','dept2','dept2','dept3','dept3','dept3'],
              'employee':['emp1','emp2','emp3','emp4','emp5','emp6','emp7','emp8','emp9','emp10','emp11'],
              'role':['manager','developer','developer','developer','manager','developer','developer','developer','manager','developer','developer']}
df_case2 = pd.DataFrame(data=data_case2)
df_case2.to_sql(TABLE_CASE2, con_pg, index=False, if_exists='replace')

11

In [35]:
sql_pg = '''select tbl.* from case2 as tbl;'''

In [36]:
select_postgresql(sql_pg)

Unnamed: 0,department,employee,role
0,dept1,emp1,manager
1,dept1,emp2,developer
2,dept1,emp3,developer
3,dept1,emp4,developer
4,dept1,emp5,manager
5,dept2,emp6,developer
6,dept2,emp7,developer
7,dept2,emp8,developer
8,dept3,emp9,manager
9,dept3,emp10,developer


In [37]:
sql_pg = '''select tbl.role,
                   sum(case tbl.department when 'dept1' then 1 else 0 end) as "d1",
                   sum(case tbl.department when 'dept2' then 1 else 0 end) as "d2",
                   sum(case tbl.department when 'dept3' then 1 else 0 end) as "d3"
            from case2 as tbl
            group by tbl.role
            order by tbl.role desc;'''

In [38]:
select_postgresql(sql_pg)

Unnamed: 0,role,d1,d2,d3
0,manager,2,0,1
1,developer,3,3,2


In [39]:
sql_pg = '''select tbl.role,
                   count(tbl.employee) filter (where tbl.department = 'dept1') as "d1",
                   count(tbl.employee) filter (where tbl.department = 'dept2') as "d2",
                   count(tbl.employee) filter (where tbl.department = 'dept3') as "d3"
            from case2 as tbl
            group by tbl.role
            order by tbl.role desc;'''

In [40]:
select_postgresql(sql_pg)

Unnamed: 0,role,d1,d2,d3
0,manager,2,0,1
1,developer,3,3,2


#### **ClickHouse**

In [41]:
client.command(f'drop table if exists {TABLE_CASE2};')
client.command(f"create table if not exists {TABLE_CASE2} (department	String, employee String, role String) \
ENGINE = PostgreSQL('{host_pg}:{port_pg}', '{name_pg}', '{TABLE_CASE2}', '{user_pg}', '{password_pg}');")

''

In [42]:
sql_ch = '''select tbl.* from case2 as tbl'''

In [43]:
select_clickhouse(sql_ch)

Unnamed: 0,department,employee,role
0,dept1,emp1,manager
1,dept1,emp2,developer
2,dept1,emp3,developer
3,dept1,emp4,developer
4,dept1,emp5,manager
5,dept2,emp6,developer
6,dept2,emp7,developer
7,dept2,emp8,developer
8,dept3,emp9,manager
9,dept3,emp10,developer


In [44]:
sql_ch = '''select tbl.role,
                   countIf(tbl.department='dept1') as "d1",
                   countIf(tbl.department='dept2') as "d2",
                   countIf(tbl.department='dept3') as "d3"
            from case2 as tbl
            group by tbl.role
            order by tbl.role desc'''

In [45]:
select_clickhouse(sql_ch)

Unnamed: 0,role,d1,d2,d3
0,manager,2,0,1
1,developer,3,3,2


In [46]:
sql_ch = '''select tbl.role,
                   groupArray((tbl.department, tbl.count_emp)) as group_array_dept
            from (select tbl.role,
                         tbl.department,
                         count(tbl.employee) as count_emp
                  from case2 as tbl
                  group by tbl.role, tbl.department
                  order by tbl.department) as tbl
            group by tbl.role
            order by tbl.role desc'''

In [47]:
select_clickhouse(sql_ch)

Unnamed: 0,role,group_array_dept
0,manager,"[(dept1, 2), (dept3, 1)]"
1,developer,"[(dept1, 3), (dept2, 3), (dept3, 2)]"


## **Задача 3. Проверка условия**

#### **PostgreSQL**

In [48]:
data_case3 = {'employee':['emp1','emp2','emp3','emp4','emp5','emp6','emp7','emp8','emp9','emp10'],
              'salary':[2500,3500,5000,2500,4500,6500,7000,8500,2000,3000]}
df_case3 = pd.DataFrame(data=data_case3)
df_case3.to_sql(TABLE_CASE3, con_pg, index=False, if_exists='replace')

10

In [49]:
sql_pg = '''select tbl.* from case3 as tbl;'''

In [50]:
select_postgresql(sql_pg)

Unnamed: 0,employee,salary
0,emp1,2500
1,emp2,3500
2,emp3,5000
3,emp4,2500
4,emp5,4500
5,emp6,6500
6,emp7,7000
7,emp8,8500
8,emp9,2000
9,emp10,3000


In [51]:
sql_pg = '''select tbl.employee,
                   tbl.salary,
                   case
                       when tbl.salary < 4000 then 'A'
                       when tbl.salary < 6000 then 'B'
                       when tbl.salary < 8000 then 'C'
                       else 'D'
                  end as category
            from case3 as tbl
            order by tbl.salary asc'''

In [52]:
select_postgresql(sql_pg)

Unnamed: 0,employee,salary,category
0,emp9,2000,A
1,emp4,2500,A
2,emp1,2500,A
3,emp10,3000,A
4,emp2,3500,A
5,emp5,4500,B
6,emp3,5000,B
7,emp6,6500,C
8,emp7,7000,C
9,emp8,8500,D


#### **ClickHouse**

In [53]:
client.command(f'drop table if exists {TABLE_CASE3};')
client.command(f"create table if not exists {TABLE_CASE3} (employee String, salary Int16) \
ENGINE = PostgreSQL('{host_pg}:{port_pg}', '{name_pg}', '{TABLE_CASE3}', '{user_pg}', '{password_pg}');")

''

In [54]:
sql_ch = '''select tbl.* from case3 as tbl'''

In [55]:
select_clickhouse(sql_ch)

Unnamed: 0,employee,salary
0,emp1,2500
1,emp2,3500
2,emp3,5000
3,emp4,2500
4,emp5,4500
5,emp6,6500
6,emp7,7000
7,emp8,8500
8,emp9,2000
9,emp10,3000


In [56]:
sql_ch = '''select tbl.employee,
                   tbl.salary,
                   multiIf(tbl.salary < 4000, 'A', 
                           tbl.salary < 6000, 'B', 
                           tbl.salary < 8000, 'C',
                                              'D') as category
            from case3 as tbl
            order by tbl.salary asc'''

In [57]:
select_clickhouse(sql_ch)

Unnamed: 0,employee,salary,category
0,emp9,2000,A
1,emp1,2500,A
2,emp4,2500,A
3,emp10,3000,A
4,emp2,3500,A
5,emp5,4500,B
6,emp3,5000,B
7,emp6,6500,C
8,emp7,7000,C
9,emp8,8500,D


## **Задача 4. "Поиск сессий"**

#### **PostgreSQL**

In [58]:
data_case4 = {'dt':['2023-01-01','2023-02-05','2023-03-10',
                    '2023-01-02','2023-02-02','2023-03-03',
                    '2023-01-01','2023-02-07','2023-03-09',
                    '2023-01-15'],
              'employee':['emp1','emp1','emp1',
                          'emp2','emp2','emp2',
                          'emp3','emp3','emp3',
                          'emp5'],
              'salary':[2500,3500,5000,
                        2500,4500,6500,
                        7000,8500,2000,
                        3000]}
df_case4 = pd.DataFrame(data=data_case4)
df_case4['dt'] = pd.to_datetime(df_case4['dt'], format="%Y-%m-%d")
df_case4.to_sql(TABLE_CASE4, con_pg, index=False, if_exists='replace')

10

In [59]:
sql_pg = '''select tbl.* from case4 as tbl;'''

In [60]:
select_postgresql(sql_pg)

Unnamed: 0,dt,employee,salary
0,2023-01-01,emp1,2500
1,2023-02-05,emp1,3500
2,2023-03-10,emp1,5000
3,2023-01-02,emp2,2500
4,2023-02-02,emp2,4500
5,2023-03-03,emp2,6500
6,2023-01-01,emp3,7000
7,2023-02-07,emp3,8500
8,2023-03-09,emp3,2000
9,2023-01-15,emp5,3000


In [61]:
sql_pg = '''select tbl.dt,
                   tbl.employee,
                   tbl.salary,
                   tbl.lag_dt,
                   extract(day from tbl.dt-tbl.lag_dt) as dt_difference
            from  (select tbl.dt,
                          tbl.employee,
                          tbl.salary,
                          coalesce(lag(tbl.dt) over w, tbl.dt) lag_dt
                   from case4 as tbl
                   window w as (partition by tbl.employee order by tbl.dt asc)) as tbl;'''

In [62]:
select_postgresql(sql_pg)

Unnamed: 0,dt,employee,salary,lag_dt,dt_difference
0,2023-01-01,emp1,2500,2023-01-01,0.0
1,2023-02-05,emp1,3500,2023-01-01,35.0
2,2023-03-10,emp1,5000,2023-02-05,33.0
3,2023-01-02,emp2,2500,2023-01-02,0.0
4,2023-02-02,emp2,4500,2023-01-02,31.0
5,2023-03-03,emp2,6500,2023-02-02,29.0
6,2023-01-01,emp3,7000,2023-01-01,0.0
7,2023-02-07,emp3,8500,2023-01-01,37.0
8,2023-03-09,emp3,2000,2023-02-07,30.0
9,2023-01-15,emp5,3000,2023-01-15,0.0


#### **ClickHouse**

In [63]:
client.command(f'drop table if exists {TABLE_CASE4};')
client.command(f"create table if not exists {TABLE_CASE4} (dt Date, employee String, salary Int16) \
ENGINE = PostgreSQL('{host_pg}:{port_pg}', '{name_pg}', '{TABLE_CASE4}', '{user_pg}', '{password_pg}');")

''

In [64]:
sql_ch = '''select tbl.* from case4 as tbl'''

In [65]:
select_clickhouse(sql_ch)

Unnamed: 0,dt,employee,salary
0,2023-01-01,emp1,2500
1,2023-02-05,emp1,3500
2,2023-03-10,emp1,5000
3,2023-01-02,emp2,2500
4,2023-02-02,emp2,4500
5,2023-03-03,emp2,6500
6,2023-01-01,emp3,7000
7,2023-02-07,emp3,8500
8,2023-03-09,emp3,2000
9,2023-01-15,emp5,3000


In [66]:
sql_ch = '''select tbl.dt,
                   tbl.employee,
                   tbl.salary,
                   tbl.lag_dt,
                   date_diff('day', tbl.lag_dt, tbl.dt) as dt_difference         
            from (select tbl.dt,
                         tbl.employee,
                         tbl.salary,
                         lagInFrame(tbl.dt) over w  as dt_lag,
                         if(dt_lag<>'1970-01-01', dt_lag, tbl.dt) as  lag_dt
                 from case4 as tbl
                 window w as (partition by tbl.employee order by tbl.dt asc)) as tbl'''

In [67]:
select_clickhouse(sql_ch)

Unnamed: 0,dt,employee,salary,lag_dt,dt_difference
0,2023-01-01,emp1,2500,2023-01-01,0
1,2023-02-05,emp1,3500,2023-01-01,35
2,2023-03-10,emp1,5000,2023-02-05,33
3,2023-01-02,emp2,2500,2023-01-02,0
4,2023-02-02,emp2,4500,2023-01-02,31
5,2023-03-03,emp2,6500,2023-02-02,29
6,2023-01-01,emp3,7000,2023-01-01,0
7,2023-02-07,emp3,8500,2023-01-01,37
8,2023-03-09,emp3,2000,2023-02-07,30
9,2023-01-15,emp5,3000,2023-01-15,0
