In [None]:
from clickhouse_driver import Client

In [None]:
client_1 = Client(host='clickhouse-node1')
client_3 = Client(host='clickhouse-node3')

In [None]:
client_1.execute("CREATE DATABASE IF NOT EXISTS shard")
client_1.execute("CREATE DATABASE IF NOT EXISTS replica")
client_1.execute("CREATE TABLE IF NOT EXISTS shard.rating_kafka(film_id UUID, user_id UUID, rating Float32) ENGINE=Kafka('kafka:29092', 'rating', 'group-id', 'JSONEachRow')")
client_1.execute("CREATE TABLE IF NOT EXISTS shard.rating(film_id UUID, user_id UUID, rating Float32) Engine=ReplicatedMergeTree('/clickhouse/tables/{shard}/rating', 'rating_replica_1') ORDER BY rating")
client_1.execute("CREATE TABLE IF NOT EXISTS replica.rating(film_id UUID, user_id UUID, rating Float32) Engine=ReplicatedMergeTree('/clickhouse/tables/{shard}/rating', 'rating_replica_2') ORDER BY rating")
client_1.execute("CREATE TABLE IF NOT EXISTS default.rating(film_id UUID, user_id UUID, rating Float32) ENGINE = Distributed('company_cluster', '', rating, rand())")

## Подключаем поток материализованного представления для Node 1

In [None]:
client_1.execute("CREATE MATERIALIZED VIEW IF NOT EXISTS shard.mv_rating TO default.rating AS SELECT * FROM shard.rating_kafka")

## Подключаем поток материализованного представления для Node 3

In [None]:
client_3.execute("CREATE DATABASE IF NOT EXISTS shard")
client_3.execute("CREATE DATABASE IF NOT EXISTS replica")
client_3.execute("CREATE TABLE IF NOT EXISTS shard.rating_kafka(film_id UUID, user_id UUID, rating Float32) ENGINE=Kafka('kafka:29092', 'rating', 'group-id', 'JSONEachRow')")
client_3.execute("CREATE TABLE IF NOT EXISTS shard.rating(film_id UUID, user_id UUID, rating Float32) Engine=ReplicatedMergeTree('/clickhouse/tables/{shard}/rating', 'rating_replica_1') ORDER BY rating")
client_3.execute("CREATE TABLE IF NOT EXISTS replica.rating(film_id UUID, user_id UUID, rating Float32) Engine=ReplicatedMergeTree('/clickhouse/tables/{shard}/rating', 'rating_replica_2') ORDER BY rating")
client_3.execute("CREATE TABLE IF NOT EXISTS default.rating(film_id UUID, user_id UUID, rating Float32) ENGINE = Distributed('company_cluster', '', rating, rand())")

In [None]:
client_3.execute("CREATE MATERIALIZED VIEW IF NOT EXISTS shard.mv_rating TO default.rating AS SELECT * FROM shard.rating_kafka")

## Проверяем данные в дистрибутивной таблице

In [None]:
client_1.execute("SELECT * FROM rating")

## Отправляем POST запрос по адресу:

> 127.0.0.1:8000/api/v1/events/rating/rating_event

## Тело запроса:

```
{
    "user_id": "561b7be6-3494-42e4-93b0-2091343184e0",
    "film_id": "1113f275-6b45-4137-b9a2-7793103d4b10",
    "rating": 4.2
}
```

## Снова проверяем данные в дистрибутивной таблице

In [None]:
client_1.execute("SELECT * FROM rating")

----

## Посылаем данные в Kafka (без API)

In [None]:
from kafka import KafkaProducer
from time import sleep


producer = KafkaProducer(bootstrap_servers=['kafka:29092'])

producer.send(
    topic='rating',
    value=b'{"film_id":"73ffe7e1-dffb-4cff-9c47-0c3fb409bccc" , "user_id":"19f7c3e9-de95-40ee-bb00-700b4cd9ca95" , "rating":"3.9"}',
    key=b'500271+tt0120338',
)

sleep(1)

## Повторная проверка данных в дистрибутивной таблице

In [None]:
client_1.execute("SELECT * FROM rating")