## Работа с инфроструктурой

### 1.Останавливаем все контейнеры, если были запущены и заново их запускаем

In [1]:
!docker-compose down
!docker rm $(docker ps -aq )
!docker-compose up -d

Removing control-center           ... 
Removing rest-proxy               ... 
Removing kafka-connect            ... 
Removing schema-registry          ... 
Removing broker                   ... 
Removing clickhouse-node1         ... 
Removing clickhouse-node3         ... 
Removing clickhouse-node4         ... 
Removing clickhouse-node2         ... 
Removing zookeeper_kafka          ... 
Removing zookeeper                ... 
Removing kafka_clickhouse_redis_1 ... 
[12BRemoving network kafka_clickhouse_defaulte[0m[8A[2K
"docker rm" requires at least 1 argument.
See 'docker rm --help'.

Usage:  docker rm [OPTIONS] CONTAINER [CONTAINER...]

Remove one or more containers
Creating network "kafka_clickhouse_default" with the default driver
Creating kafka_clickhouse_redis_1 ... 
Creating zookeeper                ... 
Creating zookeeper_kafka          ... 
[2BCreating clickhouse-node2         ... mdone[0m
Creating clickhouse-node1         ... 
Creating clickhouse-

### 2. Создаем таблицы в  clickhouse

In [2]:
from clickhouse_driver import Client

client = Client(host='localhost')

Создаем целевую таблицу, используя  Engine MergeTree. В MergeTree таблице будут храниться входные данные из кафки. Ожидаемый ответ []

In [3]:
create_MergeTree = """
CREATE TABLE IF NOT EXISTS readings (
    readings_id Int32 Codec(DoubleDelta, LZ4),
    time DateTime Codec(DoubleDelta, LZ4),
    date ALIAS toDate(time),
    timestamp Int32 Codec(DoubleDelta, LZ4)
) Engine = MergeTree
PARTITION BY toYYYYMM(time)
ORDER BY (readings_id, time);
"""
client.execute(create_MergeTree)

[]

Далее нам нужно создать таблицу с помощью движка Kafka для подключения к топику и чтения данных. Движок будет считывать данные с брокера на хосте kafka, из топика «readings», имя группы потребителей  - «readings consumer_group1». Входной формат — CSV. Обратите внимание, что столбец «дата» опущен. Это псевдоним в целевой таблице, который будет автоматически заполняться из столбца «время». Ожидаемый ответ []

In [4]:
create_kafka_engine = """
CREATE TABLE readings_queue (
    readings_id Int32,
    time DateTime,
    timestamp Int32
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'broker:29092',
       kafka_topic_list = 'readings',
       kafka_group_name = 'readings_consumer_group1',
       kafka_format = 'CSV',
       kafka_max_block_size = 1048576;
"""
client.execute(create_kafka_engine)

[]

Материализованное представление соединит две ранее созданные таблицы, считывая данные из механизма таблиц Kafka и вставляя их в целевую таблицу дерева слияния. Мы можем сделать ряд преобразований данных. Мы сделаем простое чтение и вставку. Использование * предполагает, что имена столбцов идентичны (с учетом регистра). Ожидаемый ответ []

In [5]:
create_materialized_view = """
    CREATE MATERIALIZED VIEW readings_queue_mv TO readings AS
    SELECT readings_id, time, timestamp
    FROM readings_queue;
"""
client.execute(create_materialized_view)

[]

Проверяем доступные таблицы. Ожидаемый ответ [('readings',), ('readings_queue',), ('readings_queue_mv',)]

In [9]:
client.execute('SHOW TABLES FROM default')

[('readings',), ('readings_queue',), ('readings_queue_mv',)]

Проверяем что сейчас данных нет. Ожидаемый ответ []

In [10]:
get_data_from_clickhouse = """
SELECT * FROM readings
"""

client.execute(get_data_from_clickhouse)

[(6, datetime.datetime(2023, 5, 13, 20, 15, 7), 1683964369),
 (1, datetime.datetime(2023, 5, 13, 20, 14, 51), 1683964364),
 (2, datetime.datetime(2023, 5, 13, 20, 14, 54), 1683964365),
 (3, datetime.datetime(2023, 5, 13, 20, 14, 58), 1683964366),
 (4, datetime.datetime(2023, 5, 13, 20, 15, 1), 1683964367),
 (5, datetime.datetime(2023, 5, 13, 20, 15, 4), 1683964368)]

### 3. Создаем коннектор kafka -> redis

Отправляем файл конфигурации на коннектор (kafka-connect в docker-compose). Ожидаемый ответ: {"name":"RedisSinkConnector1","config":{"connector.class":"com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector","tasks.max":"1","topics":"readings","redis.hosts":"redis:6379","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.storage.StringConverter","name":"RedisSinkConnector1"},"tasks":[],"type":"sink"}

In [8]:
!curl -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors

{"name":"RedisSinkConnector1","config":{"connector.class":"com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector","tasks.max":"1","topics":"readings","redis.hosts":"redis:6379","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.storage.StringConverter","name":"RedisSinkConnector1"},"tasks":[],"type":"sink"}

# Запускаем файл kafka_producer_redis_consumer.py
# Смотрим lighthouse