# Capturing Data Change of MySQL from Binlog and passing to Kafka

### importing necessary libraries

In [1]:
import pymysql
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import DeleteRowsEvent,UpdateRowsEvent,WriteRowsEvent
from pymysqlreplication.event import QueryEvent,RotateEvent

from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient,NewTopic

import json
import logging
import re
import os
import time
import copy
import subprocess
import socket
import threading
import smtplib
from email.mime.text import MIMEText
from typing import Tuple, Optional,List
from datetime import datetime,date,timedelta,time as dt_time

### configurations used in our code
- change it as per your requirements, like bootstrap.servers,zookeeper_cnf,mysql_cnf and file paths

In [2]:
# kafka configuration for 3-node cluster
kafka_cnf = {
    'bootstrap.servers': 'localhost:9093,localhost:9094,localhost:9095',  # Kafka broker addresses
    'acks': 'all',                                    # Wait for all in-sync replicas to acknowledge the record, ensuring durability
    'enable.idempotence': True,                       # Ensure that messages are produced exactly once, avoiding duplicates
    'retries': 3,                                     # Increase the number of retries to handle transient failures
    'retry.backoff.ms': 50,                           # Decrease backoff time for retries to reduce latency on transient failures
    'delivery.timeout.ms': 2000,                      # Maximum time to wait for a message delivery, including retries
    'request.timeout.ms': 1000,                       # Reduce request timeout to quickly fail fast on unresponsive brokers
    'max.in.flight.requests.per.connection': 5,       # Allows multiple requests to be in-flight to maximize throughput
    'batch.size': 65536,                              # Increase batch size to accumulate more messages before sending, reducing overhead
    'linger.ms': 1,                                   # Time to wait before sending a batch; helps to fill the batch size before sending
    # 'buffer.memory': 67108864,                      # Increase buffer memory to allow more data to be buffered in memory before sending
    'compression.type': 'gzip',                       # Compression type to reduce the size of data sent over the network; can also use 'snappy' for faster performance
}



# Zookeeper configuration for 3-node cluster
zookeeper_cnf = [
    {'host': 'localhost', 'port': 2181},          # zookeeper servers ip:port , you can refer your zookeeper.properties file
    {'host': 'localhost', 'port': 2182},
    {'host': 'localhost', 'port': 2183}
]

# mysql_configuration
mysql_cnf={
    'host':'localhost',       # mysql server ip, port number, user name and user password 
    'port':3308,
    'user':'root',
    'passwd':'Root@123'
}


# mysql_server id, change it to yours
mysql_server_id=2

# binlog starting file from which we starts to read binlog, you can change as per your requirements
binlog_starting_file='binlog.000001'  

# log file path
log_file_path='/media/susan/F4707AD3707A9BD4/MysqlCdc_Python_kafka/3_node_kafka_and_3_node_zookeeper/node1/cdc.log'
# Checkpointing
checkpoint_file = '/media/susan/F4707AD3707A9BD4/MysqlCdc_Python_kafka/3_node_kafka_and_3_node_zookeeper/node1/checkpoint.json'

# file path of the file which is required for checking status of kafka 
kafka_broker_api_versions='/media/susan/F4707AD3707A9BD4/MysqlCdc_Python_kafka/single_node_kafka_and_zookeeper/bin/kafka-broker-api-versions'

# for email alerting, change it to yours
sender_email = os.getenv('EMAIL_USER')    # sender_email address,receiver_email address and password of sender_email address 
receiver_email = os.getenv('EMAIL_USER')  # I just give from environment variable and for password use app passport for email (recommended)
password = os.getenv('EMAIL_PASSWORD')

### initializing logging configuration

In [3]:
# setting up custom logger
logger = logging.getLogger('my_logger')     # you can give any name as per your requirement
logger.setLevel(logging.INFO)               # Setting the logging level, here highest level is info and only that level messages are logged 

# setting custom File handler that log messages to a file
file_handler = logging.FileHandler(log_file_path)
file_handler.setLevel(logging.INFO)         # Log level for file handler

# setting custom Formatter to define the log message format
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')  # it log in the format [time_of_logging  level_of_log  actual_messages]

# adding that custom formatter to file handler
file_handler.setFormatter(formatter)

logger.addHandler(file_handler)

# setting propagate to False to prevent messages from being sent to the root logger, cause sometimes root logger prints the log messages in console so
logger.propagate = False


### this function is responsible for reading binlog from mysql

In [4]:
def read_binlog(log_file: str, log_pos: int, retries: int = 3, backoff: int = 3):
    
    """
    Function to read a MySQL binlog starting from a specific file and position, with retry logic.
    
    Args:
    - log_file (str): The name of the binlog file to read from.
    - log_pos (int): The position in the binlog file to start reading from.
    - retries (int): Number of retry attempts in case of failure (default: 3).
    - backoff (int): Time (in seconds) to wait between retries (default: 5 seconds).

    Returns:
    - BinLogStreamReader object if successful, otherwise logs error after retries are exhausted.
    """
    
    attempt=0
    while attempt<retries:
        try:
            # initializing a BinLogStreamReader to read binlog events
            stream=BinLogStreamReader(                                  
                connection_settings=mysql_cnf,
                server_id=mysql_server_id,
                blocking=True,
                resume_stream=True,
                only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent, QueryEvent, RotateEvent],
                log_file=log_file,
                log_pos=log_pos
            )
            
            logger.info(f"Successfully read binlog from file '{log_file}' at position '{log_pos}'.")
            return stream
        except Exception as e:
            attempt+=1

            if attempt<retries:
                logger.info(f"Retrying to read binlog from '{log_file}' at position '{log_pos}'... (Attempt {attempt}/{retries})")
                time.sleep(backoff)
            else:
                logger.error(f"Max retries reached. Binlog reading from '{log_file}' at position '{log_pos}' failed.")


### following two functions are responsible for checkpoint mechanism
- load_checkpoint() is responsible for loading last saved binlog file and position
- save_checkpoint() is responsible for saving last processed binlog file and position

In [5]:
def load_checkpoint() -> Tuple[Optional[str], Optional[int]]:

    """
    Function to load the last saved binlog position (checkpoint) from a file (checkpoint_file).
    
    Returns:
    - A tuple (log_file, log_pos) if a valid checkpoint is found.
    - (None, None) if the checkpoint file is missing, empty, or corrupted.
    """

    # Check if the checkpoint file exists and is not empty
    if os.path.exists(checkpoint_file) and os.path.getsize(checkpoint_file) > 0:
        with open(checkpoint_file, 'r') as f:
            try:
                checkpoint = json.load(f)
                return checkpoint['log_file'], checkpoint['log_pos']
            except json.JSONDecodeError:
                # Handle invalid JSON or empty file
                logger.error("Checkpoint file is corrupted. Starting from the beginning.")
                return None, None
    else:
        # No checkpoint file or the file is empty
        logger.info("No valid checkpoint found. Starting from the beginning.")
        return None, None
        

def save_checkpoint(log_file: str, log_pos: int) -> None:

    """
    Function to save the current binlog file and position to a checkpoint file.
    
    Args:
    - log_file (str): The current binlog file name.
    - log_pos (int): The current position in the binlog file.
    """

    # creating a dictionary to store the log file and position
    checkpoint = {
        'log_file': log_file,
        'log_pos': log_pos
    }

    # writing the checkpoint data to the checkpoint file in JSON format
    with open(checkpoint_file, 'w') as f:
        json.dump(checkpoint, f)   # serializing the dictionary into json format and writing into file
        logger.info(f"Checkpoint saved: log_file='{log_file}', log_pos={log_pos}")


### following functions:
- sanitize_topic_name() is responisble for making appropriate topic name
- create_topic_if_not_exists() is responsible for making kafka topics dynamically/automatically
-  datetime_to_str() is responsible for converting data related data into string cause json serializaiton doesn't support date format

In [6]:
def sanitize_topic_name(topic_name: str) -> str:

    """
    Function to sanitize Kafka topic names by replacing invalid characters.

    Args:
    - topic_name (str): The original topic name.

    Returns:
    - sanitized_name (str): The sanitized topic name with a max length of 255 characters.
    """

    # replacing all characters that are not allowed in Kafka topic names (anything except letters, digits, '_', '-', or '.') with underscores.
    sanitized_name = re.sub(r'[^a-zA-Z0-9._-]', '_',topic_name)
    return sanitized_name[:255]



def create_topic_if_not_exists(database: str, table: str) -> None:

    """
    Function to create a Kafka topic if it does not already exist.
    
    Args:
    - database (str): The name of the database.
    - table (str): The name of the table.
    
    The function first sanitizes the topic name, checks if it exists, 
    and if not, creates a new topic with one partition and a replication factor of 1.
    """
    
    topic_name=f"{database}.{table}"
    topic_name = sanitize_topic_name(topic_name)  # ensuring topic names are sanitized
    admin_client = AdminClient(kafka_cnf)         # initializing AdminClinet to manage kafka topics

    existing_topics=admin_client.list_topics(timeout=10).topics.keys()   # retriveing the list of existing kafka topics

    # checking the topic is already exists or not
    if topic_name not in existing_topics:
        try:
            # creating a new kafka topic
            new_topic=admin_client.create_topics(
                [NewTopic(topic_name,num_partitions=1,replication_factor=2)]
            )
            # ensuring the creation of the topic is successful by checking the result
            new_topic[topic_name].result()
            logger.info(f"Topic '{topic_name}' is created.")
           
            
        except Exception as e:
            logger.error(f"Error creating topic '{topic_name}': {e}")
          
    else:
        logger.info(f"Topic '{topic_name}' already exists.")
     


def datetime_to_str(dt):

    """
    Function to convert various datetime-related objects to a string format.

    Args:
    - dt: The datetime-related object to convert.

    Returns:
    - A string representation of the datetime, date, time, or timedelta object.
    """
    
    if isinstance(dt, datetime):        # Full datetime with date and time
        return dt.isoformat()           # returns ISO 8601 string format
    elif isinstance(dt, date):          # Date without time
        return dt.isoformat()
    elif isinstance(dt, dt_time):       # Time only (hours, minutes, seconds)
        return dt.strftime('%H:%M:%S')  # Format time as string
    elif isinstance(dt, timedelta):     # Handle timedelta
        total_seconds = int(dt.total_seconds())
        hours, remainder = divmod(total_seconds, 3600)
        minutes, seconds = divmod(remainder, 60)
        return f"{hours:02}:{minutes:02}:{seconds:02}"  # Formatting as HH:MM:SS
    else:
        return dt  # Return unchanged if not a datetime, date, time, or timedelta object

### following functions:
- initialize_producer() is responsible for initializing kafka producer
- produce_message() is responsible for producing messages to kafka topics

In [7]:
producer = None  # initializing the producer variable globally

def initialize_producer(retries: int = 5, base_backoff: int = 2) -> Producer:

    """
    Initialize Kafka Producer with exponential backoff retry mechanism.

    Args:
        retries: Number of times to retry initialization if it fails.
        base_backoff: Base time in seconds for the exponential backoff between retries.

    Returns:
        Producer: An instance of the initialized Kafka producer.

    Raises:
        Exception: If max retries are reached and producer initialization fails.
    """
    
    attempt = 0
    while attempt < retries:
        try:
            producer = Producer(**kafka_cnf)  # initializing kafka producer
            logger.info("Kafka producer initialized successfully.")
            # print('kafka producer initialized successfully')
            return producer
        except Exception as e:
            logger.error(f"Failed to initialize producer: {e}")
            attempt += 1
            if attempt < retries:
                backoff_time = base_backoff * (2 ** (attempt - 1))  # applying exponential backoff
                logger.info(f"Retrying to initialize producer in {backoff_time} seconds... ({attempt}/{retries})")
                time.sleep(backoff_time)
            else:
                logger.critical("Max retries reached for producer initialization.")
                send_email_alert('Failed Producer initialization Alert','Failed to initialize producer and max retires reached to initialization')
                raise


def produce_message(topic: Optional[str], message: Optional[str], retries: int = 3, backoff: int = 1) -> None:

    """
    Produce a message to a specified Kafka topic with retry mechanism.

    Args:
        topic: The Kafka topic where the message will be sent.
        message: The message content to be produced.
        retries: Number of retries in case of failure to produce the message.
        backoff: Base backoff time in seconds before each retry.

    Returns:
        None
    """
    
    global producer  # Ensuring to use the global producer
    if producer is None:
        logger.error("Producer not initialized. Cannot produce message.")
        return  # Exit if producer is not initialized
    
    attempt = 0
    while attempt < retries:
        try:
            # producing messages to the kafka topic
            producer.produce(topic, value=message)  
            producer.flush()  # ensuring the message is sent
            logger.info(f"Message: '{message}' is successfully produced to topic '{topic}'.")
            # print(f"Message successfully produced to topic '{topic}'.")
            break
        except Exception as e:
            logger.error(f"Failed to produce message to topic '{topic}':\n Message content: '{message} \n error:{e}'")
            # Check if the error is related to PID acquisition
            if "Failed to acquire idempotence PID" in str(e):
                logger.info("Encountered PID acquisition issue; retrying...")
                attempt += 1
                time.sleep(backoff)  # Wait before retrying
            else:
                attempt += 1  # Increment for other types of errors
                if attempt < retries:
                    logger.info(f"Retrying to produce message to topic '{topic}'... Attempt {attempt} of {retries}.")
                    time.sleep(backoff)
                else:
                    logger.critical("Max retries reached. Message production failed.")
                    send_email_alert('Message Production Failure Alert', 
                            f"Failed to produce message to topic '{topic}'.\n"
                            f"Message Content: '{message}'.\n"
                            f"Error: {e}")


### following function is responsible for extracting database and table name, we need it for extracting columns name

In [8]:
def extract_table_and_schema(query):

    """
    Extract the schema and table names from a given SQL query.

    Args:
        query (str): The SQL query string from which to extract schema and table names.

    Returns:
        tuple: A tuple containing the schema name and table name.
               If not found, it returns ('unknown_schema', 'unknown_table').
    """
    
    # regex pattern to capture schema and table names from various SQL commands
    match = re.search(
        r"(ALTER TABLE|CREATE TABLE|DROP TABLE|INSERT INTO|UPDATE|DELETE FROM)\s+"
        r"(?:`?(\w+)`?\.)?`?(\w+)`?", query, re.IGNORECASE)
    
    if match:
        # capture the schema name (if provided) or set to 'unknown_schema'
        schema = match.group(2) if match.group(2) else 'unknown_schema'  
        # capture the table name
        table = match.group(3)
        return schema, table

    # If match isn't found, return default values
    return 'unknown_schema', 'unknown_table'


### following function is responsible for extracting columns name 

In [9]:
def get_column_names_from_schema(schema: str, table: str) -> List[str]:

    """
    Retrieve column names from a specified table in the given schema.

    Args:
        schema (str): The database schema (database name).
        table (str): The table name from which to retrieve column names.

    Returns:
        List[str]: A list of column names from the specified table, or an empty list if an error occurs.
    """

    # creating a copy of mysql_cnf to avoid modifying the global variable
    local_mysql_cnf = copy.deepcopy(mysql_cnf)
    local_mysql_cnf['database'] = schema  # adding database 

    try:
        # connectioning mysql server
        with pymysql.connect(**local_mysql_cnf) as connection:
            with connection.cursor() as cursor:
                query = """
                    SELECT COLUMN_NAME 
                    FROM INFORMATION_SCHEMA.COLUMNS 
                    WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
                    ORDER BY ORDINAL_POSITION  -- Ensures the columns are in the defined order
                """
                # retrieving the columns name
                cursor.execute(query, (schema, table))
                column_names = [row[0] for row in cursor.fetchall()]

                return column_names

    except pymysql.MySQLError as e:
        logger.error(f"Error retrieving column names from {schema}.{table}: {e}")
        return []
    

### following functions;
- send_email_alert() responsible for sending alert emails
- check_mysql_health() responsible for status checking of mysql server; it is running or not
- check_kafka_health() is responsible for status checking of kafka server
- check_zookeeper_health() is responsible for status chechking of zookeeper server

In [10]:

def send_email_alert(subject, message):

    """
    Function to send an email alert.

    Args:
        subject (str): The subject line of the email.
        message (str): The content of the email message.
    
    This function retrieves sender and receiver email addresses and the password from 
    environment variables for security. It creates an email message and uses an SMTP 
    server to send the email alert.
    """
    
    from_email=sender_email
    to_email=receiver_email 
    passwd = password
    
    msg = MIMEText(message)    # email message
    msg['Subject'] = subject   # email subject
    msg['From'] = from_email
    msg['To'] = to_email
    
    try:
        # sending alert email
        with smtplib.SMTP('smtp.gmail.com', 587) as server:  # Update with your SMTP server
            server.starttls()
            server.login(from_email, passwd)
            server.sendmail(from_email, to_email, msg.as_string())
            logger.info("Alert email sent successfully.")
    except Exception as e:
        logger.error(f"Failed to send email: {str(e)}")



def check_mysql_health(mysql_cnf, retries=5, delay=10):
    """
    Function to check MySQL server health with retry mechanism.

    Args:
        mysql_cnf (dict): A dictionary containing MySQL connection parameters (user, password, host, port).
        retries (int): Number of retry attempts if MySQL server is down.
        delay (int): Delay (in seconds) between retry attempts.
    
    This function attempts to ping the MySQL server using the provided configuration and
    retries if the server is down. Returns True if the server is healthy, otherwise False
    after all retries.
    """
    
    for attempt in range(1, retries + 1):
        try:
            # Executing the mysqladmin command to ping the MySQL server
            result = subprocess.run(
                [
                    'mysqladmin',               # Command to interact with MySQL server
                    '-u', mysql_cnf['user'], 
                    '-p' + mysql_cnf['passwd'], 
                    '-h', mysql_cnf['host'], 
                    '-P', str(mysql_cnf['port']),
                    'ping'                     # Command to check if the server is alive
                ],
                stdout=subprocess.PIPE, stderr=subprocess.PIPE   # Capture standard output and error
            )

            # Checking if the command executed successfully
            if result.returncode == 0:
                logger.info(f"MySQL server {mysql_cnf['host']}:{mysql_cnf['port']} is healthy.")
                return True  # Server is healthy
            else:
                # Raise an exception if the command fails
                raise Exception(result.stderr.decode())
        
        except Exception as e:
            logger.error(f"Attempt {attempt}: MySQL check failed for server {mysql_cnf['host']}:{mysql_cnf['port']}: {str(e)}")
            
            # If the number of retries is reached, send an alert
            if attempt == retries:
                error_message = f"MySQL server {mysql_cnf['host']}:{mysql_cnf['port']} is down after {retries} attempts."
                subject = "MySQL Health Alert"
                send_email_alert(subject, error_message)  # Send email alert
                logger.critical(f"MySQL server {mysql_cnf['host']}:{mysql_cnf['port']} is down after {retries} retries.")
                return False
            else:
                logger.warning(f"MySQL server {mysql_cnf['host']}:{mysql_cnf['port']} is still down. Retrying in {delay} seconds... (Attempt {attempt+1}/{retries})")
                time.sleep(delay)



def check_kafka_cluster_health(kafka_cnf, retries=5, delay=10):
    """
    Function to check the health of each Kafka broker in a cluster.
    
    Args:
        kafka_cnf (dict): Kafka configuration with 'bootstrap.servers'.
        retries (int): Number of retry attempts if Kafka broker is down.
        delay (int): Delay (in seconds) between retry attempts.
    """
    kafka_servers = kafka_cnf['bootstrap.servers'].split(',')  # List of Kafka brokers

    kafka_status = []  # To store health status of each broker
    
    for Kserver in kafka_servers:
        for attempt in range(1, retries + 1):
            try:
                result = subprocess.run(
                    ['kafka-broker-api-versions', '--bootstrap-server', Kserver],
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE
                )
                if result.returncode == 0:
                    logger.info(f"Kafka broker {Kserver} is healthy.")
                    kafka_status.append(True)  # Broker is healthy
                    break
                else:
                    raise Exception(result.stderr.decode())
            except Exception as e:
                logger.error(f"Attempt {attempt}: Kafka check failed for broker {Kserver}: {str(e)}")
                
                if attempt == retries:  # If last retry also fails
                    send_email_alert(f"Kafka Health Alert", f"Kafka broker {Kserver} is down after {retries} attempts.")
                    logger.critical(f"Kafka broker {Kserver} is down after {retries} retries.")
                    kafka_status.append(False)  # Broker is unhealthy
                else:
                    logger.warning(f"Retrying Kafka broker {Kserver} in {delay} seconds... (Attempt {attempt+1}/{retries})")
                    time.sleep(delay)

    # Check if any broker is healthy
    if any(kafka_status):
        return True  # At least one broker is healthy
    else:
        return False  # All brokers are down

                    

def check_zookeeper_cluster_health(zookeeper_cnf, retries=5, delay=10):
    """
    Function to check the health of each Zookeeper node in the cluster.

    Args:
        zookeeper_cnf (list): List of dictionaries containing Zookeeper connection parameters (host, port).
        retries (int): Number of retry attempts if Zookeeper server is down.
        delay (int): Delay (in seconds) between retry attempts.
    
    Returns:
        bool: True if at least one Zookeeper node is healthy, False if all are down.
    """
    zk_status = []  # To track the health of each Zookeeper node
    
    for node in zookeeper_cnf:
        zk_host = node['host']
        zk_port = node['port']
        
        for attempt in range(1, retries + 1):
            try:
                with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                    s.settimeout(5)
                    s.connect((zk_host, zk_port))
                    s.sendall(b'ruok\n')  # Send the 'ruok' (Are you okay?) command
                    response = s.recv(1024).decode().strip()  # Read the response

                if response.lower() == 'imok':
                    logger.info(f"Zookeeper server {zk_host}:{zk_port} is healthy.")
                    zk_status.append(True)  # Mark this node as healthy
                    break  # Exit retry loop for this node
                else:
                    raise Exception(f"Unexpected Zookeeper response: {response}")
            except Exception as e:
                logger.error(f"Attempt {attempt}: Zookeeper check failed for server {zk_host}:{zk_port}: {str(e)}")
                
                if attempt == retries:
                    send_email_alert(
                        "Zookeeper Health Alert",
                        f"Zookeeper server {zk_host}:{zk_port} is down after {retries} attempts."
                    )
                    logger.critical(f"Zookeeper server {zk_host}:{zk_port} is down after {retries} retries.")
                    zk_status.append(False)  # Mark this node as unhealthy
                else:
                    logger.warning(f"Retrying Zookeeper server {zk_host}:{zk_port} in {delay} seconds... (Attempt {attempt+1}/{retries})")
                    time.sleep(delay)

    # Check if any Zookeeper node is healthy
    if any(zk_status):
        return True  # At least one node is healthy
    else:
        return False  # All nodes are down


### following function monitor_system_continuously() is responsible for checkig status of the servers; mysql, kafka, zookeeper at every hour 
- if the system is down for 3 iteration then the checking gonna stop, so for continuos 3 iteration if the servers are down then each alert email gonna trigger for each hour (i.e. after 3rd alert the checking will be stopped)

In [11]:

def monitor_system_continuously(interval: int = 3600):

    """
    Continuously monitor the system at specified intervals and send alerts if services are down.

    Args:
        interval (int): The time (in seconds) to wait between checks. Default is 3600 seconds (1 hour).
    
    This function performs health checks for MySQL, Kafka, and Zookeeper services up to three times. 
    If any service is found to be down, it sends alerts and logs the event. The monitoring runs in a 
    separate thread.
    """
    
    max_checks = 3
    total_checks = 0

    while total_checks < max_checks:
        try:
            # checking the health of MySQL, Kafka, and Zookeeper
            mysql_health = check_mysql_health(mysql_cnf)
            kafka_health = check_kafka_cluster_health(kafka_cnf)
            zookeeper_health = check_zookeeper_cluster_health(zookeeper_cnf)
            
            if not (mysql_health and kafka_health and zookeeper_health):
                raise Exception("Mysql server or all 3 Zookeepers servers or all 3 Kafka servers are down.")

            total_checks=0      # Reset successful checks counter after a successful check
            logger.info("System's health check successful.")
        except Exception as e:
            logger.error(f"System monitoring failed: {e}")

        total_checks += 1
        time.sleep(interval)  # Wait for the specified interval (10 minutes) before checking again

    logger.info("Completed three checks for each hour.")

# Start the monitoring thread when the program starts
monitor_thread = threading.Thread(target=monitor_system_continuously, args=(3600,))  # Checks every 20 minutes
monitor_thread.daemon = True  # Ensures the thread will exit when the main program exits
monitor_thread.start()

### Main function of program

In [13]:

def main():
    global producer  # Ensuring to access the global producer
    try:
        log_file, log_pos = load_checkpoint()
        if log_file is None or log_pos is None:
            log_file = binlog_starting_file
            log_pos = 4
    except Exception as e:
        log_file = binlog_starting_file
        log_pos = 4

    error_count = 0

    # Initializing Kafka Producer
    try:

        producer = initialize_producer()
        time.sleep(7)
    except Exception as e:
        logger.critical(f"Failed to initialize producer: {e}")
        return  # Exit if producer initialization fails
    
    while True:
        try:                
            stream = read_binlog(log_file, log_pos)
         
            for binlog_event in stream:
                if isinstance(binlog_event, RotateEvent):
                    current_log_file = log_file
                    next_log_file = binlog_event.next_binlog
                    
                    # Adjust log position only if it is a true rotation
                    if current_log_file == next_log_file:
                        # Stay on the same log file and keep the position
                        log_pos = log_pos
                    else:
                        # Move to the next log file and reset position
                        log_file = next_log_file
                        log_pos = 4

                        save_checkpoint(log_file, log_pos)

                elif isinstance(binlog_event, (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent)):
                    table = binlog_event.table
                    schema = binlog_event.schema
                    
                    # Initialize columns to avoid reference before assignment
                    columns = []

                    # Attempt to create topic
                    try: 
                        create_topic_if_not_exists(schema, table)   
                        columns = get_column_names_from_schema(schema, table)

                    except Exception as e:
                        logger.error(f"Topic creation error: {e}")
                        # Checking Kafka health
                        if not check_kafka_cluster_health(kafka_cnf):
                            logger.critical("Kafka health check failed. Exiting...")
                            return
                            

                    for row in binlog_event.rows:
                        if isinstance(binlog_event, WriteRowsEvent):
                            row_valuesI = list(row['values'].values())  # Get only the values
                            row_dataI = {columns[i]: datetime_to_str(v) for i, v in enumerate(row_valuesI)}

                            
                            if row_dataI:
                                message = json.dumps({
                                    'type': 'INSERT',
                                    'database': schema,
                                    'table': table,
                                    "data": row_dataI  # Use row_data which has datetime converted
                                })
                                topic = sanitize_topic_name(f"{schema}.{table}")
                                
                                produce_message(topic, message)

                        elif isinstance(binlog_event, UpdateRowsEvent):
                            row_valuesB=list(row['before_values'].values())
                            row_valuesA=list(row['after_values'].values())
                            
                            
                            before_data = {columns[i]: datetime_to_str(v) for i, v in enumerate(row_valuesB)}
                            after_data = {columns[i]: datetime_to_str(v) for i, v in enumerate(row_valuesA)}

                            if after_data:
                                message = json.dumps({
                                    "type": "UPDATE",
                                    "database": schema,
                                    "table": table,
                                    "before": before_data,
                                    "after": after_data
                                })
                                topic = sanitize_topic_name(f"{schema}.{table}")
            
                                produce_message(topic, message)


                        elif isinstance(binlog_event, DeleteRowsEvent):
                            row_valuesD=list(row['values'].values())
                            deleted_data = {columns[i]: datetime_to_str(v) for i, v in enumerate(row_valuesD)}

                            if deleted_data:
                                message = json.dumps({
                                    "type": "DELETE",
                                    "database": schema,
                                    "table": table,
                                    "data": deleted_data
                                })

                                topic = sanitize_topic_name(f"{schema}.{table}")
                                
                                produce_message(topic, message)

                        log_pos = binlog_event.packet.log_pos
                        save_checkpoint(log_file, log_pos)

                elif isinstance(binlog_event, QueryEvent):
                    query = binlog_event.query
                    schema = binlog_event.schema.decode() if isinstance(binlog_event.schema, bytes) else binlog_event.schema

                    if any(keyword in query.upper() for keyword in ["ALTER TABLE", "CREATE TABLE", "DROP TABLE", "INSERT INTO", "UPDATE", "DELETE FROM"]):
                        extracted_schema, extracted_table = extract_table_and_schema(query)
                        schema = extracted_schema if extracted_schema != 'unknown_schema' else schema

                        try:
                            create_topic_if_not_exists(schema, extracted_table)                            
                            
                        except Exception as e:
                            logger.error(f"Topic creation error: {e}")
                            # Checking Kafka health
                            if not check_kafka_cluster_health(kafka_cnf):
                                logger.critical("Kafka health check failed. Exiting...")
                                return

                        message = json.dumps({
                            'type': 'QUERY',
                            'database': schema,
                            'table': extracted_table,
                            'query': query
                        })

                        topic = sanitize_topic_name(f"{schema}.{extracted_table}")
                        
                        produce_message(topic, message)

                        log_pos = binlog_event.packet.log_pos
                        save_checkpoint(log_file, log_pos)
            
                error_count=0
            
        except Exception as e:
            error_count += 1
            logger.error(f"An error occurred: {e}")

            if error_count == 5:  # Increased threshold
                detailed_error_message = f"Error occurred: {e}, Log file: {log_file}, Log position: {log_pos}"
                send_email_alert("Error occurred in processing", detailed_error_message)
                return
                
        finally:
            stream.close()

if __name__ == '__main__':
    main()


%3|1727697933.801|FAIL|rdkafka#producer-5| [thrd:localhost:9094/bootstrap]: localhost:9094/bootstrap: Connect to ipv4#127.0.0.1:9094 failed: Connection refused (after 0ms in state CONNECT)
%3|1727697934.045|FAIL|rdkafka#producer-4| [thrd:localhost:9093/bootstrap]: localhost:9093/bootstrap: Connect to ipv4#127.0.0.1:9093 failed: Connection refused (after 0ms in state CONNECT, 21 identical error(s) suppressed)
%3|1727697934.301|FAIL|rdkafka#producer-5| [thrd:localhost:9093/bootstrap]: localhost:9093/bootstrap: Connect to ipv4#127.0.0.1:9093 failed: Connection refused (after 0ms in state CONNECT)
%3|1727697934.802|FAIL|rdkafka#producer-5| [thrd:localhost:9095/bootstrap]: localhost:9095/bootstrap: Connect to ipv4#127.0.0.1:9095 failed: Connection refused (after 0ms in state CONNECT)
%3|1727697935.302|FAIL|rdkafka#producer-5| [thrd:localhost:9094/bootstrap]: localhost:9094/bootstrap: Connect to ipv4#127.0.0.1:9094 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) 