In [None]:
# | default_exp _example

In [None]:
# | export

from typing import *

import os
import json
import yaml
from pathlib import Path
from copy import deepcopy
from datetime import datetime
from os import environ
from enum import Enum
import httpx

from confluent_kafka import Producer, Consumer
from fastapi import status, Depends, HTTPException, Request, Response
from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html
from fastapi.openapi.utils import get_openapi
from fastapi.responses import FileResponse, RedirectResponse
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.staticfiles import StaticFiles
from pydantic import validator, BaseModel, Field, HttpUrl, EmailStr, NonNegativeInt

from fast_kafka_api.application import FastKafkaAPI, KafkaErrorMsg
from fast_kafka_api.asyncapi import KafkaMessage
from fast_kafka_api.logger import get_logger

[INFO] fast_kafka_api.asyncapi: ok


In [None]:
# | include: false

import tempfile
import asyncio
from datetime import timedelta

import nest_asyncio
import uvicorn
from fastapi.testclient import TestClient
from starlette.datastructures import Headers

In [None]:
# | export

logger = get_logger(__name__)

In [None]:
logger.info("check")

[INFO] __main__: check


In [None]:
# | export


class ModelType(str, Enum):
    churn = "churn"
    propensity_to_buy = "propensity_to_buy"


class ModelTrainingRequest(BaseModel):
    AccountId: NonNegativeInt = Field(
        ..., example=202020, description="ID of an account"
    )
    ModelName: ModelType = Field(..., example="churn", description="ID of an account")
    total_no_of_records: NonNegativeInt = Field(
        ...,
        example=1_000_000,
        description="total number of records (rows) to be ingested",
    )

class EventData(KafkaMessage):
    """
    A sequence of events for a fixed account_id
    """

    AccountId: NonNegativeInt = Field(
        ..., example=202020, description="ID of an account"
    )
    Application: Optional[str] = Field(
        None,
        example="DriverApp",
        description="Name of the application in case there is more than one for the AccountId",
    )
    DefinitionId: str = Field(
        ...,
        example="appLaunch",
        description="name of the event",
        min_length=1,
    )
    OccurredTime: datetime = Field(
        ...,
        example="2021-03-28T00:34:08",
        description="local time of the event",
    )
    OccurredTimeTicks: NonNegativeInt = Field(
        ...,
        example=1616891648496,
        description="local time of the event as the number of ticks",
    )
    PersonId: NonNegativeInt = Field(
        ..., example=12345678, description="ID of a person"
    )


class RealtimeData(KafkaMessage):
    event_data: EventData = Field(
        ...,
        example=dict(
            AccountId=202020,
            Application="DriverApp",
            DefinitionId="appLaunch",
            OccurredTime="2021-03-28T00:34:08",
            OccurredTimeTicks=1616891648496,
            PersonId=12345678,
        ),
        description="realtime event data",
    )
    make_prediction: bool = Field(
        ..., example=True, description="trigger prediction message in prediction topic"
    )


class TrainingDataStatus(KafkaMessage):
    AccountId: NonNegativeInt = Field(
        ..., example=202020, description="ID of an account"
    )
    no_of_records: NonNegativeInt = Field(
        ...,
        example=12_345,
        description="number of records (rows) ingested",
    )
    total_no_of_records: NonNegativeInt = Field(
        ...,
        example=1_000_000,
        description="total number of records (rows) to be ingested",
    )


class TrainingModelStatus(KafkaMessage):
    AccountId: NonNegativeInt = Field(
        ..., example=202020, description="ID of an account"
    )
    current_step: NonNegativeInt = Field(
        ...,
        example=0,
        description="number of records (rows) ingested",
    )
    current_step_percentage: float = Field(
        ...,
        example=0.21,
        description="the percentage of the current step completed",
    )
    total_no_of_steps: NonNegativeInt = Field(
        ...,
        example=1_000_000,
        description="total number of steps for training the model",
    )


class ModelMetrics(KafkaMessage):
    """The standard metrics for classification models.

    The most important metrics is AUC for unbalanced classes such as churn. Metrics such as
    accuracy are not very useful since they are easily maximized by outputting the most common
    class all the time.
    """

    AccountId: NonNegativeInt = Field(
        ..., example=202020, description="ID of an account"
    )
    Application: Optional[str] = Field(
        None,
        example="DriverApp",
        description="Name of the application in case there is more than one for the AccountId",
    )
    timestamp: datetime = Field(
        ...,
        example="2021-03-28T00:34:08",
        description="UTC time when the model was trained",
    )
    model_type: ModelType = Field(
        ...,
        example="churn",
        description="Name of the model used (churn, propensity to buy)",
    )
    auc: float = Field(
        ..., example=0.91, description="Area under ROC curve", ge=0.0, le=1.0
    )
    f1: float = Field(..., example=0.89, description="F-1 score", ge=0.0, le=1.0)
    precission: float = Field(
        ..., example=0.84, description="precission", ge=0.0, le=1.0
    )
    recall: float = Field(..., example=0.82, description="recall", ge=0.0, le=1.0)
    accuracy: float = Field(..., example=0.82, description="accuracy", ge=0.0, le=1.0)


class Prediction(KafkaMessage):
    AccountId: NonNegativeInt = Field(
        ..., example=202020, description="ID of an account"
    )
    Application: Optional[str] = Field(
        None,
        example="DriverApp",
        description="Name of the application in case there is more than one for the AccountId",
    )
    PersonId: NonNegativeInt = Field(
        ..., example=12345678, description="ID of a person"
    )
    prediction_time: datetime = Field(
        ...,
        example="2021-03-28T00:34:08",
        description="UTC time of prediction",
    )
    model_type: ModelType = Field(
        ...,
        example="churn",
        description="Name of the model used (churn, propensity to buy)",
    )
    score: float = Field(
        ...,
        example=0.4321,
        description="Prediction score (e.g. the probability of churn in the next 28 days)",
        ge=0.0,
        le=1.0,
    )

In [None]:
# | export

_total_no_of_records = 0
_no_of_records_received = 0


def create_ws_server(assets_path: Path = Path("./assets")) -> FastKafkaAPI:
    title = "Example for FastKafkaAPI"
    description = "A simple example on how to use FastKafkaAPI"
    version = "0.0.1"
    openapi_url = "/openapi.json"
    favicon_url = "/assets/images/favicon.ico"

    contact = dict(name="airt.ai", url="https://airt.ai", email="info@airt.ai")

    kafka_brokers = {
        "localhost": {
            "url": "kafka",
            "description": "local development kafka",
            "port": 9092,
        },
        "staging": {
            "url": "kafka.staging.acme.com",
            "description": "staging kafka",
            "port": 9092,
            "protocol": "kafka-secure",
            "security": {"type": "plain"},
        },
        "production": {
            "url": "kafka.infobip.acme.com",
            "description": "production kafka",
            "port": 9092,
            "protocol": "kafka-secure",
            "security": {"type": "plain"},
        },
    }

    kafka_server_url = environ["KAFKA_HOSTNAME"]
    kafka_server_port = environ["KAFKA_PORT"]
    kafka_config = {
        "bootstrap.servers": f"{kafka_server_url}:{kafka_server_port}",
        "group.id": f"{kafka_server_url}:{kafka_server_port}_group",
        "auto.offset.reset": "earliest",
    }
    if "KAFKA_API_KEY" in environ:
        kafka_config = {
            **kafka_config,
            **{
                "security.protocol": "SASL_SSL",
                "sasl.mechanisms": "PLAIN",
                "sasl.username": environ["KAFKA_API_KEY"],
                "sasl.password": environ["KAFKA_API_SECRET"],
            },
        }

    app = FastKafkaAPI(
        title=title,
        contact=contact,
        kafka_brokers=kafka_brokers,
        kafka_config=kafka_config,
        description=description,
        version=version,
        docs_url=None,
        redoc_url=None,
    )

    @app.get("/docs", include_in_schema=False)
    def overridden_swagger():
        return get_swagger_ui_html(
            openapi_url=openapi_url,
            title=title,
            swagger_favicon_url=favicon_url,
        )

    @app.get("/redoc", include_in_schema=False)
    def overridden_redoc():
        return get_redoc_html(
            openapi_url=openapi_url,
            title=title,
            redoc_favicon_url=favicon_url,
        )

    @app.post("/from_kafka_start")
    async def from_kafka_start(training_request: ModelTrainingRequest):
        global _total_no_of_records
        global _no_of_records_received

        _total_no_of_records = training_request.total_no_of_records
        _no_of_records_received = 0

    @app.get("/from_kafka_end")
    async def from_kafka_end():
        pass

    @app.consumes  # type: ignore
    async def on_training_data(msg: EventData):
        # ToDo: this is not showing up in logs
        logger.debug(f"msg={msg}")
        global _total_no_of_records
        global _no_of_records_received
        _no_of_records_received = _no_of_records_received + 1

        if _no_of_records_received % 100 == 0:
            training_data_status = TrainingDataStatus(
                AccountId=EventData.AccountId,
                no_of_records=_no_of_records_received,
                total_no_of_records=_total_no_of_records,
            )
            app.produce("training_data_status", training_data_status)

    @app.consumes  # type: ignore
    async def on_realitime_data(msg: RealtimeData):
        pass

    @app.produces  # type: ignore
    def on_training_data_status(msg: TrainingDataStatus, kafka_msg: Any):
        logger.debug(f"on_training_data_status(msg={msg}, kafka_msg={kafka_msg})")

    @app.produces  # type: ignore
    def on_training_model_status(msg: TrainingModelStatus, kafka_msg: Any):
        logger.debug(f"on_training_model_status(msg={msg}, kafka_msg={kafka_msg})")

    @app.produces  # type: ignore
    def on_model_metrics(msg: ModelMetrics, kafka_msg: Any):
        logger.debug(f"on_training_model_status(msg={msg}, kafka_msg={kafka_msg})")

    @app.produces  # type: ignore
    def on_prediction(msg: Prediction, kafka_msg: Any):
        logger.debug(f"on_realtime_data_status(msg={msg},, kafka_msg={kafka_msg})")

    @app.produces_on_error  # type: ignore
    def on_error(kafka_error_msg: KafkaErrorMsg, kafka_err: Any):
        logger.warning(f"on_error(kafka_error_msg={kafka_error_msg}, kafka_err={kafka_err},)")

    return app

In [None]:
# | include: false
def create_fastapi_app(assets_path: Path = Path("../assets")) -> FastKafkaAPI:
    assets_path = assets_path.resolve()
    app = create_ws_server(assets_path=assets_path)
    return app

In [None]:
# | include: false


def start_fastapi_server(
    assets_path: Path = Path("../assets"),
    host: str = "0.0.0.0",
    port: int = 6006,
):
    app = create_fastapi_app(assets_path=assets_path)
    uvicorn.run(app, host="0.0.0.0", port=6006)

In [None]:
# | eval: false
# | include: false

nest_asyncio.apply()

In [None]:
# | eval: false
# | include: false

os.chdir("/tmp")
start_fastapi_server()

INFO:     Started server process [10784]
INFO:     Waiting for application startup.


[INFO] fast_kafka_api.asyncapi: Async specifications generated at: 'asyncapi/spec/asyncapi.yml'
[INFO] fast_kafka_api.asyncapi: Async docs generated at 'asyncapi/docs'
[INFO] fast_kafka_api.asyncapi: Output of '$ npx -y -p @asyncapi/generator ag asyncapi/spec/asyncapi.yml @asyncapi/html-template -o asyncapi/docs --force-write'[32m

Done! ✨[0m
[33mCheck out your shiny new generated files at [0m[35m/tmp/asyncapi/docs[0m[33m.[0m


[INFO] fast_kafka_api.application: consumers_async_loop(): Kafka admin created <confluent_kafka.admin.AdminClient object>.
[INFO] fast_kafka_api.application: consumers_async_loop(): Kafka topics ['error', 'model_metrics', 'prediction', 'realitime_data', 'training_data', 'training_data_status', 'training_model_status'] created if needed.
[INFO] fast_kafka_api.application: AIOProducer created.
[INFO] fast_kafka_api.application: consumers_async_loop(topic=training_data, config={'bootstrap.servers': 'tvrtko-kafka:9092', 'group.id': 'tvrtko-kafka:9092_group',

INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:6006 (Press CTRL+C to quit)
INFO:     Shutting down
INFO:     Waiting for application shutdown.


[INFO] fast_kafka_api.application: consumers_async_loop(topic=realitime_data) shutting down...
[INFO] fast_kafka_api.application: consumers_async_loop(topic=realitime_data): Kafka Consumer closed.
[INFO] fast_kafka_api.application: consumers_async_loop(topic=realitime_data) exiting.
[INFO] fast_kafka_api.application: consumers_async_loop(topic=training_data) shutting down...
[INFO] fast_kafka_api.application: consumers_async_loop(topic=training_data): Kafka Consumer closed.
[INFO] fast_kafka_api.application: consumers_async_loop(topic=training_data) exiting.
[INFO] fast_kafka_api.application: AIOProducer closed.


INFO:     Application shutdown complete.
INFO:     Finished server process [10784]
