## ClickHouse Event Monitoring and GET Requests

> Developed by [@edyatl](https://github.com/edyatl) January 2024 <edyatl@yandex.ru>

**Update 2024-04-16:**
- Designed the system to handle incoming events in batches of about 1000 items, ensuring efficient processing and scalability.
- Implemented logic to handle partially duplicate events within each batch, ensuring that only new events not present in previous batches are processed further.
- Developed a filtering mechanism to identify and extract only the new events from each batch, improving data integrity and reducing redundancy.
- Integrated functionality to transmit the filtered events further using GET requests, ensuring seamless communication with external systems.
- Canceled trials handling functionality has been permanently removed.

**Update 2024-04-09:**
- Implemented bypass of cancelled trials processing to ensure accurate event tracking.
- Added functionality to set the secret path of the base URL in the configuration settings for enhanced security.
- Introduced the `FINAL` instruction into SQL queries within the `FROM` section to retrieve ClickHouse rows without duplicates, optimizing data retrieval.
- These updates enhance the reliability, security, and performance of the clickhouse_event_checker tool.

**Update 2024-02-14:** when the *af_start_trial* event arrives, we wait 1 hour from *event_time* and if a new *trial_renewal_cancelled* event arrives for the same id (af_sub1), then we do nothing, and if it doesn’t arrive, then we send a get request as usual.

**Update 2024-02-22:** Added GET requests for *trial_renewal_cancelled* event.

In [1]:
# Load Jupyter extension for auto correction coding style based on Black Lib
%load_ext nb_black

<IPython.core.display.Javascript object>

In [2]:
# Install a pip package in the current Jupyter kernel
# import sys

# !{sys.executable} -m pip install -U dotenv

<IPython.core.display.Javascript object>

In [3]:
import os
import time

from datetime import datetime, timedelta
import json
import sqlite3 as sql
import requests
import pandas as pd

import clickhouse_connect

from config import Configuration as cfg

<IPython.core.display.Javascript object>

In [4]:
cfg.DEBUG = True
from logger import get_cls_logger

<IPython.core.display.Javascript object>

In [5]:
# !cat clickhouse_event_checker.py

<IPython.core.display.Javascript object>

In [6]:
class ClickHouseConnector:
    """Class to connect ClickHouse DWH and fetch events."""

    logger = get_cls_logger(__qualname__)
    json_file_path = cfg.JSON_FILE

    def __init__(self, **kwargs):
        """
        Constructor func, gets credentials and makes an instance.
        """
        self.host = kwargs.get("host") or ""
        self.user = kwargs.get("user") or ""
        self.password = kwargs.get("password") or ""
        self.port = kwargs.get("port") or ""

        self.client = clickhouse_connect.get_client(
            host=self.host, user=self.user, password=self.password, port=self.port
        )

        self.query_str = """SELECT created, event_time, event_name, af_sub1
            FROM analytics.appsflyer_export FINAL
            WHERE media_source = 'Popunder' 
                AND created > {prev_last_created:datetime}
                AND event_name IN ('install', 'af_start_trial', 'af_subscribe', 'trial_renewal_cancelled')
            ORDER BY event_time DESC"""

        # Check if the JSON file exists
        if os.path.exists(self.json_file_path):
            # Read the existing JSON file
            with open(self.json_file_path, "r", encoding="utf-8") as file:
                stored_values = json.load(file)
                self.prev_last_created = datetime.fromisoformat(
                    stored_values.get("prev_last_created", 0)
                )
        else:
            self.prev_last_created = datetime.now() - timedelta(weeks=1)
        self.logger.debug("Make an instance of %s class", self.__class__.__name__)

    def __del__(self):
        """
        Destructor func, closes connection.
        """
        self.client.close()

    def fetch_new_events(self) -> pd.DataFrame:
        """
        Fetches new events from ClickHouse DWH.
        """
        parameters = {"prev_last_created": self.prev_last_created}
        result = self.client.query(self.query_str, parameters=parameters)
        df = pd.DataFrame(result.result_rows, columns=result.column_names)
        if df.empty:
            return pd.DataFrame()
        with open(self.json_file_path, "w", encoding="utf-8") as file:
            json.dump({"prev_last_created": str(df["created"].max())}, file)
        return df

<IPython.core.display.Javascript object>

In [7]:
dwh = ClickHouseConnector(
    host=cfg.CLICKHOUSE_HOST,
    user=cfg.CLICKHOUSE_USER,
    password=cfg.CLICKHOUSE_PASS,
    port=cfg.CLICKHOUSE_PORT,
)

df = dwh.fetch_new_events()

del dwh

<IPython.core.display.Javascript object>

In [8]:
# df[df["event_name"].str.contains("start_trial")]
# df[df["af_sub1"] == "483bfgxtwj652wj656"]
# df["event_time"] = pd.to_datetime(df["event_time"])
# df["event_time"] = df["event_time"].dt.strftime("%Y-%m-%d %H:%M:%S")
df[:]
# df["created"].drop_duplicates()
# df["created"].max()
# filtered_df = df[df["event_time"].dt.date == pd.to_datetime("2024-04-10").date()]
# filtered_df[filtered_df["af_sub1"] == "e24cdgx9l1zci3yeb5"]
# filtered_df = df.drop(columns=["created", "event_name", "af_sub1"])
# duplicates = filtered_df[filtered_df.duplicated()]
# duplicates
# filtered_df.drop_duplicates()

Unnamed: 0,created,event_time,event_name,af_sub1
0,2024-04-16 16:30:01+00:00,2024-04-16 22:04:53,af_subscribe,be623gxj6hoqebl330
1,2024-04-16 16:30:01+00:00,2024-04-16 21:53:47,af_subscribe,5682bgxtw3zibwj2e0
2,2024-04-16 16:30:01+00:00,2024-04-16 21:28:23,af_subscribe,b2072gxj6yd3zi421a
3,2024-04-16 16:30:01+00:00,2024-04-16 19:21:29,af_subscribe,2ed62gxj6a6ghblaf4
4,2024-04-16 16:30:01+00:00,2024-04-16 18:47:49,af_subscribe,0f9c8gxdvslk28nf20
...,...,...,...,...
1458,2024-04-16 16:30:01+00:00,2024-04-15 00:00:51,af_start_trial,87a45gxp2h9a108e5
1459,2024-04-16 16:30:01+00:00,2024-04-15 00:00:41,install,66eecgxp2h9gxvrec4
1460,2024-04-16 16:30:01+00:00,2024-04-15 00:00:34,trial_renewal_cancelled,f67f2gxp2a8p28n45b
1461,2024-04-16 16:30:01+00:00,2024-04-15 00:00:26,install,65fecgxp217fv8ne52


<IPython.core.display.Javascript object>

In [9]:
from datetime import datetime

CURRENT_DATE = datetime.now().strftime("%Y-%m-%d")
csv_file = CURRENT_DATE + "-all.csv"
# filtered_df.to_csv(csv_file)

<IPython.core.display.Javascript object>

In [10]:
# !cat 2024-04-09-trials.csv

<IPython.core.display.Javascript object>

In [11]:
class EventProcessor:
    """Class for processing events."""

    BASE_URL = cfg.BASE_URL
    logger = get_cls_logger(__qualname__)

    def __init__(self, **kwargs):
        """
        Constructor func, gets events DataFrame and makes an instance.
        Trying to connect db or creating it if not exists.
        """
        self.events_df = kwargs.get("events")
        self.install = self.events_df[self.events_df["event_name"] == "install"]
        self.trial = self.events_df[self.events_df["event_name"] == "af_start_trial"]
        self.trial_cancelled = self.events_df[
            self.events_df["event_name"] == "trial_renewal_cancelled"
        ]
        self.activation = self.events_df[self.events_df["event_name"] == "af_subscribe"]
        self.logger.debug("Make an instance of %s class", self.__class__.__name__)

        self._create_db_if_not_exists()

    def _create_db_if_not_exists(self):
        """Create SQLite database if not exists."""
        with sql.connect(cfg.DB_FILE, timeout=10) as con:
            db = con.cursor()
            try:
                self.logger.debug("Try to connect sqlite db")
                db.execute("SELECT id FROM cachetab")
            except sql.OperationalError:
                self.logger.debug("Sqlite db not exists, creating it from schema")
                db.executescript(open(cfg.SCHEMA_FILE, "rt", encoding="utf-8").read())

    def _save_event_to_db(
        self, event_time: datetime, event_name: str, af_sub1: str
    ) -> bool:
        """
        Save event to cache db.
        """
        payload = {
            "date": datetime.now(),
            "event_time": event_time,
            "event_name": event_name,
            "af_sub1": af_sub1,
        }

        with sql.connect(cfg.DB_FILE, timeout=10) as con:
            db = con.cursor()
            db.execute(
                "SELECT id FROM cachetab WHERE af_sub1=:af_sub1 AND event_name=:event_name",
                payload,
            )

            if len(db.fetchall()) == 0:
                db.execute(
                    "INSERT INTO cachetab (date, event_time, event_name, af_sub1)"
                    "values (:date, :event_time, :event_name, :af_sub1)",
                    payload,
                )
                try:
                    con.commit()
                    self.logger.debug(
                        "New record (%s) inserted in db", payload["af_sub1"]
                    )
                    return True
                except sql.OperationalError as err:
                    self.logger.error("OOps: Operational Error: %s", err)
                    return False
            else:
                self.logger.debug("Record has already in db, skipping")
                return False

    def remove_event_from_db(self, af_sub1: str) -> None:
        """
        Remove event from cache db.
        """
        with sql.connect(cfg.DB_FILE, timeout=10) as con:
            db = con.cursor()
            db.execute(
                "DELETE FROM cachetab WHERE af_sub1=:af_sub1", {"af_sub1": af_sub1}
            )
            try:
                con.commit()
                self.logger.debug("Record (%s) removed from db", af_sub1)
            except sql.OperationalError as err:
                self.logger.error("OOps: Operational Error: %s", err)
                return

    def _process_new_events(self, events_df: pd.DataFrame) -> pd.DataFrame:
        """
        Process and save new events to cache db.
        """
        new_events_df = pd.DataFrame(columns=events_df.columns)
        for index, row in events_df.iterrows():
            if self._save_event_to_db(
                row["event_time"], row["event_name"], row["af_sub1"]
            ):
                new_events_df.loc[len(new_events_df)] = row
        return new_events_df

    def remove_old_events(self):
        """
        Remove old events from cache db.
        """
        week_ago = datetime.now() - timedelta(weeks=1)
        with sql.connect(cfg.DB_FILE, timeout=10) as con:
            db = con.cursor()
            db.execute(
                "DELETE FROM cachetab WHERE event_time < :week_ago",
                {"week_ago": week_ago},
            )
            try:
                con.commit()
                self.logger.debug("Old records removed from db")
            except sql.OperationalError as err:
                self.logger.error("OOps: Operational Error: %s", err)
                return

    def process_install_events(self):
        """
        Process install events.
        """
        self.install = self._process_new_events(self.install)
        self.install_requests()

    def process_trial_events(self):
        """
        Process trial events.
        """
        self.trial = self._process_new_events(self.trial)
        self.trial_requests()

    def process_cancelled_trial_events(self):
        """
        Process cancelled trial events.
        """
        self.trial_cancelled = self._process_new_events(self.trial_cancelled)
        self.cancel_trial_requests()

    def process_activation_events(self):
        """
        Process activation events.
        """
        self.activation = self._process_new_events(self.activation)
        self.activation_requests()

    def install_requests(self):
        """
        Send requests for install events.
        """
        self.send_event_requests("install", "install", 1)

    def activation_requests(self):
        """
        Send requests for activation events.
        """
        self.send_event_requests("activation", "trial_converted", 4)

    def trial_requests(self):
        """
        Send requests for trial events.
        """
        self.send_event_requests("trial", "trial_started", 2)

    def cancel_trial_requests(self):
        """
        Send requests for cancelled trial events.
        """
        self.send_event_requests("trial_cancelled", "trial_renewal_cancelled", 6)

    def send_event_requests(self, event_name, event_status, event_number):
        """
        Send requests for specified event.
        """
        events_df = getattr(self, event_name)
        if events_df.shape[0] == 0:
            return
        url = self.BASE_URL
        for af_sub1 in events_df["af_sub1"]:
            params = [
                ["cnv_id", af_sub1],
                ["cnv_status", event_status],
                [f"event{event_number}", 1],
            ]
            response, error = self._requests_call("GET", url=url, params=params)
            if error is not None:
                self.logger.error(
                    "Error while transmiting event (%s): %s", af_sub1, error
                )
                continue

    def _requests_call(self, verb: str, url: str, params=None, **kwargs) -> tuple:
        """
        Wraping func for requests with errors handling.

        :param verb:
            str Method of request ``get`` or ``post``.
        :param url:
            str URL to connect.
        :return:
            Returns a tuple of response object and error.
            If an error occurs, the response will be empty
            and vice versa otherwise.
        """
        r: object = None
        error: str = None
        retries: int = cfg.RETRIES  # default 10
        delay: int = cfg.DELAY  # default 6

        for retry in range(retries):
            try:
                self.logger.debug("Try %s request %s", verb, url)
                r = requests.request(verb, url, params=params)
                r.raise_for_status()
                self.logger.debug(
                    "Get answer with status code: %s %s", r.status_code, r.reason
                )
                return r, error
            except requests.exceptions.HTTPError as errh:
                self.logger.error("Http Error: %s", errh)
                error = errh
                self.logger.debug(
                    "Don't give up! Trying to reconnect, retry %s of %s",
                    retry + 1,
                    retries,
                )
                time.sleep(delay)
            except requests.exceptions.ConnectionError as errc:
                self.logger.error("Connection Error: %s", errc)
                error = errc
                self.logger.debug(
                    "Don't give up! Trying to reconnect, retry %s of %s",
                    retry + 1,
                    retries,
                )
                time.sleep(delay)
            except requests.exceptions.Timeout as errt:
                self.logger.error("Timeout Error: %s", errt)
                error = errt
                self.logger.debug(
                    "Don't give up! Trying to reconnect, retry %s of %s",
                    retry + 1,
                    retries,
                )
                time.sleep(delay)
            except requests.exceptions.RequestException as err:
                self.logger.error("OOps: Unexpected Error: %s", err)
                error = err
                self.logger.debug(
                    "Don't give up! Trying to reconnect, retry %s of %s",
                    retry + 1,
                    retries,
                )
                time.sleep(delay)

        return r, error

<IPython.core.display.Javascript object>

In [12]:
if not df.empty:
    evs = EventProcessor(events=df)

    evs.process_install_events()
    evs.process_activation_events()
    evs.process_trial_events()
    evs.process_cancelled_trial_events()

    evs.remove_old_events()

#     evs.install_requests()
#     evs.trial_requests()
#     evs.activation_requests()
#     evs.process_new_trials()
#     evs.process_cancelled_trials()
#     evs.cancel_trial_requests()

<IPython.core.display.Javascript object>