In [None]:
# | default_exp downloading

# Downloading
> Fastkafka app for Downloading infobip stuff

In [None]:
# | export
from os import environ
import re
from pathlib import Path
import tempfile
from typing import Union, Optional, Tuple, Dict, Any
from contextlib import contextmanager
from urllib.parse import quote_plus as urlquote
from urllib.parse import unquote_plus as urlunquote
from datetime import datetime, timedelta

from sqlalchemy.engine import Connection, create_engine
import pandas as pd
import dask.dataframe as dd

In [None]:
# | export

from infobip_kafka_service.logger import get_logger
logger = get_logger(__name__)

In [None]:
# export


def _create_clickhouse_connection_string(
    username: str,
    password: str,
    host: str,
    port: int,
    database: str,
    protocol: str,
) -> str:
    # Double quoting is needed to fix a problem with special character '?' in password
    quoted_password = urlquote(urlquote(password))
    conn_str = (
        f"clickhouse+{protocol}://{username}:{quoted_password}@{host}:{port}/{database}"
    )

    return conn_str

In [None]:
actual = _create_clickhouse_connection_string(
    username="default",
    password="123456",
    host="localhost",
    port=8123,
    database="infobip",
    #     table="events",
    protocol="http",
)
assert actual == "clickhouse+http://default:123456@localhost:8123/infobip"

actual = _create_clickhouse_connection_string(
    username="default",
    password="123456",
    host="localhost",
    port=9000,
    database="infobip",
    #     table="events",
    protocol="native",
)
assert actual == "clickhouse+native://default:123456@localhost:9000/infobip"

actual = _create_clickhouse_connection_string(
    username="default",
    password="123?456@",
    host="localhost",
    port=9000,
    database="infobip",
    #     table="events",
    protocol="native",
)
assert (
    actual == "clickhouse+native://default:123%253F456%2540@localhost:9000/infobip"
), actual

In [None]:
# export


def create_db_uri_for_clickhouse_datablob(
    username: str,
    password: str,
    host: str,
    port: int,
    table: str,
    database: str,
    protocol: str,
) -> str:
    """Create uri for clickhouse datablob based on connection params

    Args:
        username: Username of clickhouse database
        password: Password of clickhouse database
        host: Host of clickhouse database
        port: Port of clickhouse database
        table: Table of clickhouse database
        database: Database to use
        protocol: Protocol to connect to clickhouse (native/http)

    Returns:
        An uri for the clickhouse datablob
    """
    clickhouse_uri = _create_clickhouse_connection_string(
        username=username,
        password=password,
        host=host,
        port=port,
        database=database,
        protocol=protocol,
    )
    clickhouse_uri = f"{clickhouse_uri}/{table}"
    return clickhouse_uri

In [None]:
db_test_cases = [
    dict(
        username="default",
        password="123456",
        host="localhost",
        port=9000,
        database="infobip",
        table="events",
        protocol="native",
        db_uri="clickhouse+native://default:123456@localhost:9000/infobip/events",
    )
]

for test_case in db_test_cases:
    actual_db_uri = create_db_uri_for_clickhouse_datablob(
        username=test_case["username"],
        password=test_case["password"],
        host=test_case["host"],
        port=test_case["port"],
        table=test_case["table"],
        database=test_case["database"],
        protocol=test_case["protocol"],
    )
    print(f"{actual_db_uri=}")
    assert actual_db_uri == test_case["db_uri"]

In [None]:
# export


def _get_clickhouse_connection_params_from_db_uri(
    db_uri: str,
) -> Tuple[str, str, str, int, str, str, str, str]:
    """
    Function to get clickhouse connection params from db_uri of the db datablob

    Args:
        db_uri: DB uri of db datablob
    Returns:
        The username, password, host, port, table, database, protocol, database_server of the db datablob as a tuple
    """
    result = re.search("(.*)\+(.*):\/\/(.*):(.*)@(.*):(.*)\/(.*)\/(.*)", db_uri)
    database_server = result.group(1)  # type: ignore
    protocol = result.group(2)  # type: ignore
    username = result.group(3)  # type: ignore
    password = urlunquote(urlunquote(result.group(4)))  # type: ignore
    host = result.group(5)  # type: ignore
    port = int(result.group(6))  # type: ignore
    database = result.group(7)  # type: ignore
    table = result.group(8)  # type: ignore
    return username, password, host, port, table, database, protocol, database_server

In [None]:
for test_case in db_test_cases:
    (
        actual_username,
        actual_password,
        actual_host,
        actual_port,
        actual_table,
        actual_database,
        actual_protocol,
        actual_database_server,
    ) = _get_clickhouse_connection_params_from_db_uri(db_uri=test_case["db_uri"])
    display(
        f"{actual_username=}",
        f"{actual_password=}",
        f"{actual_host=}",
        f"{actual_port=}",
        f"{actual_table=}",
        f"{actual_database=}",
        f"{actual_protocol=}",
        f"{actual_database_server=}",
    )

    assert actual_username == test_case["username"]
    assert actual_password == test_case["password"]
    assert actual_host == test_case["host"]
    assert actual_port == test_case["port"]
    assert actual_table == test_case["table"]
    assert actual_database == test_case["database"]
    assert actual_protocol == test_case["protocol"]
    assert actual_database_server == "clickhouse"

In [None]:
#| exporti

def get_clickhouse_params_from_env_vars() -> Dict[str, Union[str, int]]:
    return dict(
        username=environ["KAFKA_CH_USERNAME"],
        password=environ["KAFKA_CH_PASSWORD"],
        host=environ["KAFKA_CH_HOST"],
        database=environ["KAFKA_CH_DATABASE"],
        port=int(environ["KAFKA_CH_PORT"]),
        protocol=environ["KAFKA_CH_PROTOCOL"],
        table=environ["KAFKA_CH_TABLE"],
    )

In [None]:
assert set(get_clickhouse_params_from_env_vars().keys()) == set(
    ["database", "host", "password", "port", "protocol", "table", "username"]
)

In [None]:
# export


@contextmanager  # type: ignore
def get_clickhouse_connection(  # type: ignore
    *,
    username: str,
    password: str,
    host: str,
    port: int,
    database: str,
    table: str,
    protocol: str,
    #     verbose: bool = False,
) -> Connection:
    if protocol != "native":
        raise ValueError()
    conn_str = _create_clickhouse_connection_string(
        username=username,
        password=password,
        host=host,
        port=port,
        database=database,
        protocol=protocol,
    )
    
#     print(f"{conn_str=}")

    db_engine = create_engine(conn_str)
    # args, kwargs = db_engine.dialect.create_connect_args(db_engine.url)
    with db_engine.connect() as connection:
        logger.info(f"Connected to database using {db_engine}")
        yield connection

In [None]:
# rename events to events_distributed

db_params = get_clickhouse_params_from_env_vars()

with get_clickhouse_connection(
    **db_params,
) as connection:
    assert type(connection) == Connection

    query = f"SELECT database, name from system.tables"
    df = pd.read_sql(sql=query, con=connection)
    display(df)

    database = db_params["database"]
    xs = df.loc[(df.database == db_params["database"]) & (df.name == "events")]
    if xs.shape[0] > 0:
        query = f"RENAME TABLE {database}.events TO {database}.events_distributed"
        ys = pd.read_sql(sql=query, con=connection)
        display(ys)

In [None]:
# export

def fillna(s: Optional[Any]) -> str:
    quote = "'"
#     return f"{quote + '' + quote if (s is None) else quote + str(s) + quote}"
    return f"{quote + ('' if s is None else str(s)) + quote}"

In [None]:
assert fillna("") == "''"
assert fillna("Davor") == "'Davor'"
assert fillna(None) == "''"

In [None]:
def create_duplicated_test_ddf():
    df = pd.DataFrame(
        dict(
            AccountId=12345,
            PersonId=[1, 2, 2, 3, 3, 3],
            OccurredTime=[
                datetime.fromisoformat(
                    f"2023-07-10T13:27:{i:02d}.{123456*(i+1) % 1_000_000:06d}"
                )
                for i in range(6)
            ],
            DefinitionId=["one"] * 3 + ["two"] * 2 + ["three"],
            ApplicationId = None,
        )
    )
    df["OccurredTimeTicks"] = df["OccurredTime"].astype(int) // 1000
    df = pd.concat([df]*3)
    df = df.sort_values(list(df.columns))
#     df = df.set_index("PersonId")
    return dd.from_pandas(df, npartitions=2)

ddf = create_duplicated_test_ddf()
ddf

In [None]:
# export

def _pandas2dask_map(df: pd.DataFrame, *, history_size: Optional[int] = None) -> pd.DataFrame:
    df = df.reset_index()
    df = df.sort_values(["PersonId", "OccurredTime", "OccurredTimeTicks"])
    df = df.drop_duplicates()
    df = df.set_index("PersonId")
    return df


In [None]:
ddf = create_duplicated_test_ddf()
df = ddf.compute().set_index("PersonId")

expected = pd.DataFrame({
    "AccountId": [12345, 12345, 12345, 12345, 12345, 12345],
    "OccurredTime": [
        "2023-07-10 13:27:00.123456",
        "2023-07-10 13:27:01.246912",
        "2023-07-10 13:27:02.370368",
        "2023-07-10 13:27:03.493824",
        "2023-07-10 13:27:04.617280",
        "2023-07-10 13:27:05.740736",
    ],
    "DefinitionId": ["one", "one", "one", "two", "two", "three"],
    "ApplicationId": [None, None, None, None, None, None],
    "OccurredTimeTicks": [
        1688995620123456,
        1688995621246912,
        1688995622370368,
        1688995623493824,
        1688995624617280,
        1688995625740736,
    ],
}, index=pd.Index([1, 2, 2, 3, 3, 3], name="PersonId"))
expected["OccurredTime"] = pd.to_datetime(expected["OccurredTime"])
expected["DefinitionId"] = expected["DefinitionId"].astype("string[pyarrow]")
expected["ApplicationId"] = expected["ApplicationId"].astype("string[pyarrow]")

actual = _pandas2dask_map(df)

pd.testing.assert_frame_equal(actual, expected)

In [None]:
actual.head()

In [None]:
# export

def _pandas2dask(downloaded_path: Path, output_path: Path, *, history_size: Optional[int] = None) -> None:
    with tempfile.TemporaryDirectory() as td:
        d = Path(td)

        ddf = dd.read_parquet(
            downloaded_path,
            blocksize=None,
        )
        ddf["AccountId"] = ddf["AccountId"].astype("int64")
        
        # set index
        ddf = ddf.set_index("PersonId")
        ddf.to_parquet(d, engine="pyarrow")

        # deduplicate and sort by PersonId and OccurredTime
        ddf = dd.read_parquet(
            d
        )

        ddf = ddf.map_partitions(_pandas2dask_map)

        ddf.to_parquet(output_path)

In [None]:
with tempfile.TemporaryDirectory() as td:
    d = Path(td)
    ddf = create_duplicated_test_ddf()
    (d / "duplicated").mkdir()
    for i, partition in enumerate(ddf.partitions):
        partition.compute().to_parquet(d / "duplicated" / f"part_{i:06d}.parquet")

    _pandas2dask(d / "duplicated", d / "result")

    ddf = dd.read_parquet(d / "result")

    display(ddf)
    display(ddf.compute())

    expected = pd.DataFrame({
        "AccountId": [12345, 12345, 12345, 12345, 12345, 12345],
        "OccurredTime": [
            "2023-07-10 13:27:00.123456",
            "2023-07-10 13:27:01.246912",
            "2023-07-10 13:27:02.370368",
            "2023-07-10 13:27:03.493824",
            "2023-07-10 13:27:04.617280",
            "2023-07-10 13:27:05.740736",
        ],
        "DefinitionId": ["one", "one", "one", "two", "two", "three"],
        "ApplicationId": [None, None, None, None, None, None],
        "OccurredTimeTicks": [
            1688995620123456,
            1688995621246912,
            1688995622370368,
            1688995623493824,
            1688995624617280,
            1688995625740736,
        ],
    }, index=pd.Index([1, 2, 2, 3, 3, 3], name="PersonId"))
    expected["OccurredTime"] = pd.to_datetime(expected["OccurredTime"])
    expected["DefinitionId"] = expected["DefinitionId"].astype("string[pyarrow]")
    expected["ApplicationId"] = expected["ApplicationId"].astype("string[pyarrow]")

    pd.testing.assert_frame_equal(ddf.compute(), expected)

In [None]:
# export

def _download_account_id_rows_as_parquet(
    *,
    account_id: Union[int, str],
    application_id: Optional[str],
    history_size: Optional[int] = None,
    host: str,
    port: int,
    username: str,
    password: str,
    database: str,
    protocol: str,
    table: str,
    chunksize: Optional[int] = 1_000_000,
    index_column: str = "PersonId",
    output_path: Path,
) -> None:

    with get_clickhouse_connection(  # type: ignore
        username=username,
        password=password,
        host=host,
        port=port,
        database=database,
        table=table,
        protocol=protocol,
    ) as connection:

        with tempfile.TemporaryDirectory() as td:
            d = Path(td)
            i = 0

            query = f"SELECT DISTINCT * FROM {table} WHERE AccountId={account_id}" # nosec B608
            if application_id is not None and application_id != "":
                 query = query + f" AND ApplicationId='{application_id}'"
            query = query + " ORDER BY PersonId ASC, OccurredTimeTicks DESC"
            if history_size:
                query = query + f" LIMIT {history_size} BY PersonId"
    
            logger.info(f"_download_account_id_rows_as_parquet(): {query=}")

            (d / "downloaded").mkdir(parents=True, exist_ok=True)
            for df in pd.read_sql(sql=query, con=connection, chunksize=chunksize):
                fname = d / "downloaded" / f"clickhouse_data_{i:09d}.parquet"
                logger.info(
                    f"_download_account_id_rows_as_parquet() Writing data retrieved from the database to temporary file: {fname}"
                )
                df.to_parquet(fname, engine="pyarrow")  # type: ignore
                i = i + 1
                
            logger.info(
                f"_download_account_id_rows_as_parquet() Rewriting temporary parquet files from {d / f'clickhouse_data_*.parquet'} to output directory {output_path}"
            )
            _pandas2dask(d / "downloaded", output_path)
                        
            # test if everything is ok
            test_ddf = dd.read_parquet(output_path).head()           
           
           
def download_account_id_rows_as_parquet(
    *,
    account_id: Union[int, str],
    application_id: Optional[str],
    history_size: Optional[int] = None,
    chunksize: Optional[int] = 1_000_000,
    index_column: str = "PersonId",
    output_path: Path,
) -> None:
    
    db_params = get_clickhouse_params_from_env_vars()
    
    return _download_account_id_rows_as_parquet(
        account_id=account_id,
        application_id=application_id,
        history_size=history_size,
        chunksize=chunksize,
        index_column=index_column,
        output_path=output_path,
        **db_params, # type: ignore
    )

In [None]:
# skip

AccountId=12344
ModelId=20062

for ApplicationId in [None, "", "A1F7EDD6E6BA23EBCD167C9C986ACFCB"]:
    print("*"*120)
    print()
    print(f"{ApplicationId=}")
    print()
    
    output_path = Path(tempfile.mkdtemp(prefix="clickhouse_download_"))
    output_path.mkdir(exist_ok=True, parents=True)

    download_account_id_rows_as_parquet(
        account_id=AccountId,
        application_id=ApplicationId,
        output_path=output_path,
    )

    ddf = dd.read_parquet(output_path)
    display(ddf.head())
    print(f"{ddf.shape[0].compute()=:,d}")

In [None]:
# skip

AccountId=12344
# ModelId=10037
for ApplicationId in [None, "", "A1F7EDD6E6BA23EBCD167C9C986ACFCB"]:
    print("*"*120)
    print()
    print(f"{ApplicationId=}")
    print()
    
    output_path = Path(tempfile.mkdtemp(prefix="clickhouse_download_"))
    output_path.mkdir(exist_ok=True, parents=True)

    download_account_id_rows_as_parquet(
        account_id=AccountId,
        application_id=ApplicationId,
        output_path=output_path,
        history_size=30,
    )

    ddf = dd.read_parquet(output_path)
    display(ddf.head(31))
    display(ddf.tail(31))
    print(f"{ddf.shape[0].compute()=:,d}")
        
print("ok")

In [None]:
# skip

ddf = dd.read_parquet("/tmp/clickhouse_download_72car5_j")
display(ddf.loc[14438].compute().head(30))
display(ddf.loc[14438].compute().tail(30))

ddf = dd.read_parquet("/tmp/clickhouse_download_gyrb53v5")
display(ddf.loc[14438].compute())

## Kafka

In [None]:
# export


def add_download_training_data(
    app: FastKafka,
    *,
    root_path: Path,
    username: str = "infobip",
) -> None:
    root_path.mkdir(exist_ok=True, parents=True)
    
    @app.produces(topic=f"{username}_training_model_status")  # type: ignore
    async def to_training_model_status(
        training_model_status: TrainingModelStatus,
    ) -> TrainingModelStatus:
        print(f"to_training_model_status({training_model_status})")
        return training_model_status

    @app.consumes(topic=f"{username}_training_model_start")  # type: ignore
    async def on_training_model_start(
        msg: TrainingModelStart, app: FastKafka = app
    ) -> None:
        try:
            print(f"on_training_model_start({msg}) started")

            AccountId = msg.AccountId
            ApplicationId = msg.ApplicationId
            ModelId = msg.ModelId
            task_type = msg.task_type

            dt = datetime.now().date().isoformat()
            path = root_path / f"AccountId-{AccountId}" / f"ApplicationId-{ApplicationId}" / f"ModelId-{ModelId}" / dt
            
            
            training_model_status = TrainingModelStatus(
                AccountId=AccountId,
                ApplicationId=ApplicationId,
                ModelId=ModelId,
                current_step=0,
                current_step_percentage=0.0,
                total_no_of_steps=3,
            )
            await app.to_training_model_status(training_model_status)

            if path.exists():
                print(
                    f"on_training_model_start({msg}): path '{path}' exists, moving on..."
                )
            else:
                # this mean we can download data from clickhouse

                path.mkdir(parents=True, exist_ok=True)

                print(f"on_training_model_start({msg}): downloading data to '{path}'...")
                with using_cluster("cpu"):
                    download_account_id_rows_as_parquet(
                        account_id=AccountId,
                        application_id=ApplicationId,
                        output_path=path,
                    )

                print(f"on_training_model_start({msg}): data downloaded to '{path}'...")

            training_model_status = TrainingModelStatus(
                AccountId=AccountId,
                ApplicationId=ApplicationId,
                ModelId=ModelId,
                current_step=0,
                current_step_percentage=1.0,
                total_no_of_steps=3,
            )
            await app.to_training_model_status(training_model_status)

        finally:
            print(f"on_training_model_start({msg}) finished.")

In [None]:
# with monkeypatch_clickhouse_params_from_env_vars():

kafka_brokers = dict(
    localhost={
        "url": "localhost",
        "port": 9092,
    },
    staging={
        "url": environ["KAFKA_HOSTNAME"],
        "port": environ["KAFKA_PORT"],
        "description": "Staging Kafka broker",
        "protocol": "kafka-secure",
        "security": {"type": "scramSha256"},
    }
)
app = FastKafka(kafka_brokers=kafka_brokers)

with TemporaryDirectory(prefix="infobip_downloader_") as d:
    root_path = Path(d)

    add_download_training_data(app, root_path=root_path)

    tester = Tester(app)

    async with tester:
        AccountId = 317238
        ModelId = "10051"
        ApplicationId = "MobileApp"

        training_model_start = TrainingModelStart(
            AccountId=AccountId,
            ApplicationId=ApplicationId,
            ModelId=ModelId,
            no_of_records=1_000,
            task_type="churn",
        )

        heading(f"tester.to_infobip_training_model_start({training_model_start})")

        await tester.to_infobip_training_model_start(training_model_start)

        heading(
            f"tester.awaited_mocks.on_infobip_training_model_status.assert_called()"
        )

        await tester.awaited_mocks.on_infobip_training_model_status.assert_called(
            timeout=30
        )

        dt = datetime.now().date().isoformat()
        data_path = (
            root_path
            / f"AccountId-{AccountId}"
            / f"ApplicationId-{ApplicationId}"
            / f"ModelId-{ModelId}"
            / dt
            / "part.0.parquet"
        )
        assert data_path.exists(), data_path


print("ok")

In [None]:
# export


def create_app(*, root_path: Optional[Path]=None, group_id: Optional[str] = None) -> FastKafka:
    if group_id is None:
        group_id = f"infobip-downloader-{random.randint(100_000_000, 999_999_999):0,d}".replace( # nosec: B311:blacklist
            ",", "-"
        )
        
    print(f"{group_id=}")
    if root_path is None:
        root_path = Path(".") / group_id

    app = create_fastkafka_application(group_id=group_id)

    add_logging(app)
    add_download_training_data(app, root_path=root_path)
    
    return app

In [None]:
# skip


app = create_app(group_id=downloading_group_id)

app.set_kafka_broker("staging")

async with app:
    while(True):
        await asyncio.sleep(1)