## Kafka Streams

Надстройка (клиент), который позволяет разрабатывать приложения, которые удобно манипулируют данными в топиках `Kafka`. Есть `JVM API`  и специальный `SQL`-подобный язык, который работает с сервером `ksqlDB`.

Воспользуемся `ksqlDB` клиентом для `Python`. Он плохо поддерживается, поэтому несовместим без определенных изменений с последними версиями `CPython`.

In [None]:
# Необходимые патчи, чтобы вернуть работоспособность библиотеке 
import sys
import json

if sys.version_info.minor > 9:
    import collections
    collections.Iterable = collections.abc.Iterable
    collections.Mapping = collections.abc.Mapping
    collections.MutableSet = collections.abc.MutableSet
    collections.MutableMapping = collections.abc.MutableMapping

from ksql import KSQLAPI

# https://github.com/bryanyang0528/ksql-python/issues/80
class KsqlApiCustom(KSQLAPI):
    def __init__(self, url, max_retries=3, check_version=True, **kwargs):
        super().__init__(url, max_retries=max_retries,
                         check_version=check_version, **kwargs)
        self.sa._raise_for_status = self._raise_for_status

    @staticmethod
    def _raise_for_status(r, response):
        r_json = json.loads(response)
        if r.getcode() != 200:
            if r_json.get("@type") == "statement_error" or r_json.get("@type") == "generic_error":
                error_message = r_json["message"]
                error_code = r_json["error_code"]
                stackTrace = r_json["stack_trace"]
                raise KSQLError(error_message, error_code, stackTrace)
            else:
                raise KSQLError("Unknown Error: {}".format(r.content))
        else:
            # seems to be the old API behavior, so some errors have status 200, bug??
            if r_json and r_json[0]["@type"] == "currentStatus" and r_json[0]["commandStatus"]["status"] == "ERROR":
                error_message = r_json[0]["commandStatus"]["message"]
                error_code = None
                stackTrace = None
                raise KSQLError(error_message, error_code, stackTrace)
            return True


In [None]:
client = KsqlApiCustom('http://127.0.0.1:8788')

Создадим `Stream` - непрерывный, неограниченный поток данных. Каждый элемент потока - пара ключ/значение. Представляет собой обертку над топиками kafka.

In [None]:
client.ksql(
    """
    CREATE STREAM rider_locations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
    WITH (kafka_topic='locations', value_format='json', partitions=1)
    """
)

Создадим две "таблицы", которые представляет из себя слепок состояния `Stream`'а, определяемым по некоторым правилам

In [None]:
client.ksql(
    """
    CREATE TABLE current_location AS
        SELECT profileId,
                LATEST_BY_OFFSET(latitude) AS la,
                LATEST_BY_OFFSET(longitude) AS lo
        FROM rider_locations
        GROUP BY profileId
    """
)

client.ksql(
    """
    CREATE TABLE riders_near_mountain_view AS
        SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
                COLLECT_LIST(profileId) AS riders,
                COUNT(*) AS count
        FROM current_location
        GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1)
    """
)      

Запишем в `Stream` данные с помощью функции `INSERT` (физически данные добавятся в топик `Kafka`)

In [None]:
import threading
import time
    

def insert_in_thread():
    time.sleep(10)
    client.ksql(
        """
        INSERT INTO rider_locations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
        INSERT INTO rider_locations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
        INSERT INTO rider_locations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
        INSERT INTO rider_locations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
        INSERT INTO rider_locations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
        INSERT INTO rider_locations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011)
        """
    )

thread = threading.Thread(target=insert_in_thread)    
thread.start()

Запрос типа `PUSH` (c `EMIT CHANGES`) работает бесконечно, генерируя обновления таблицы

In [None]:
res = client.query(
    """
    SELECT * FROM rider_locations
        WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 
        EMIT CHANGES
    """, return_objects=True
)

print(next(res))
print(next(res))
print(next(res))

Запрос вида `PULL` возвращает текущее состояние таблицы. 

In [None]:
res = client.query(
    """
    SELECT * FROM riders_near_mountain_view WHERE distanceInMiles <= 10
    """, return_objects=True, use_http2=True
)

next(res)
for x in res:
    print(x)