# Демонстрация инструмента последовательного снятия снимков агрегированных данных из потоковых данных

Импортируем необходимые библиотеки.

In [1]:
import subprocess
import time

import requests
import IPython.display

import constants
from _development.tests.test_utilities import print_response, print_tables_side_by_side, terminate_shell_subprocess

Запустим генератор потоковых данных.

Потоковые данные содержат 3 поля:
- a: int
- b: int
- c: float

Генератор потоковых данных работает по следующему алгоритму:
1. Добавление 10 новых объектов, где:
    - a – случайное целое число от 0 до 10
    - b – случайное целое число от 0 до 100
    - с – случайное число от 0 до 10
2. Обновление 1 случайного объекта:
    - a задается равным 1000
    - b и c умножаются на 100
3. Удаление 1 случайного объекта

Данные изменения применяются к таблице базы данных PostgreSQL. Оттуда данные выгружаются в Apache Kafka инструментом CDC Debezium. Далее данные поступают в разработанный инструмент.

In [2]:
stream_generator = subprocess.Popen('python stream_generator.py', shell=True)

Запустим разработанный инструмент.

In [3]:
main = subprocess.Popen('python ../../main.py', shell=True)

Зададим имя топика Apache Kafka, откуда инструмент будет получать данные.

In [4]:
topic_name = 'postgres_source.public.demo_table'

Подготовим JSON запроса подключения к источнику данных CREATE SOURCE.

In [5]:
create_source = {'query_type': 'CREATE SOURCE',
                 'name': 'demo_source', 'type': 'DebeziumSource',
                 'parameters': {'kafka_topic_name': topic_name, 'group_id': None, 'auto_offset_reset': 'earliest',
                                'bootstrap_servers': ['kafka:9092'], 'consumer_timeout_ms': 1000}}

Подготовим JSON запроса создания материализованного представления CREATE MATERIALIZED VIEW.



In [6]:
create_view = {'query_type': 'CREATE MATERIALIZED VIEW',
               'name': 'demo_view', 'view_source_name': 'demo_source', 'groupby_columns': ['a'],
               'parameters': {'extrapolation': True},
               'aggregates': [{'function': 'Sum', 'column': 'b'}, {'function': 'Avg', 'column': 'c'}, {'function': 'CountDistinct', 'column': 'b'},
                              {'function': 'CountDistinctCBF', 'column': 'b', 'parameters': {'expected_element_count': 100, 'false_positive_probability': 0.01}}]}

Подготовим JSON запроса получения данных SELECT.

In [7]:
select = {'query_type': 'SELECT', 'name': 'demo_view', 'orderby': [['a', 'DESC']], 'format': 'tabulate'}

In [8]:
select_extrapolated = {'query_type': 'SELECT EXTRAPOLATED', 'name': 'demo_view', 'orderby': [['a', 'DESC']], 'format': 'tabulate'}

In [12]:
print_response(requests.post(f'http://localhost:{constants.SERVER_PORT}', json=create_source))

ConnectionError: HTTPConnectionPool(host='localhost', port=8000): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000002D4D5CD1C10>: Failed to establish a new connection: [WinError 10061] Подключение не установлено, т.к. конечный компьютер отверг запрос на подключение'))

In [13]:
print_response(requests.post(f'http://localhost:{constants.SERVER_PORT}', json=create_view))

ConnectionError: HTTPConnectionPool(host='localhost', port=8000): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000002D4D650ADD0>: Failed to establish a new connection: [WinError 10061] Подключение не установлено, т.к. конечный компьютер отверг запрос на подключение'))

In [11]:
while True:
    select_response = requests.post(f'http://localhost:{constants.SERVER_PORT}', json=select).content.decode()
    select_extrapolated_response = requests.post(f'http://localhost:{constants.SERVER_PORT}', json=select_extrapolated).content.decode()
    print_tables_side_by_side(select_response, select_extrapolated_response, 'SELECT', 'SELECT EXTRAPOLATED')
    time.sleep(5)
    IPython.display.clear_output()

SELECT                                                                                 SELECT EXTRAPOLATED                                                                        
╭──────┬──────────┬────────────┬─────────────────────┬───────────────────────────╮     ╭──────┬───────────────────┬────────────┬─────────────────────┬───────────────────────────╮
│    a │   sum(b) │     avg(c) │   count_distinct(b) │   count_distinct_c_b_f(b) │     │    a │            sum(b) │     avg(c) │   count_distinct(b) │   count_distinct_c_b_f(b) │
├──────┼──────────┼────────────┼─────────────────────┼───────────────────────────┤     ├──────┼───────────────────┼────────────┼─────────────────────┼───────────────────────────┤
│ 1000 │  8491700 │ 3922.78    │                  44 │                   31.1433 │     │    6 │   56835.7         │   -3.5197  │                  70 │                   49.13   │
│   10 │    16593 │    4.32829 │                  47 │                   32.4043 │     │    1 │  114277  

KeyboardInterrupt: 

In [11]:
terminate_shell_subprocess(main)

In [13]:
terminate_shell_subprocess(stream_generator)

In [14]:
import kafka_utilities

kafka_utilities.delete_topic(topic_name)