In [1]:
import json

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
  DeleteRowsEvent,
  UpdateRowsEvent,
  WriteRowsEvent,
)

from utils import concat_sql_from_binlog_event
import pymysql
import os
import sys
import logging

In [None]:
# Logging
logging.basicConfig(
    stream=sys.stdout,
    level=logging.INFO,
    format="%(levelname)s %(message)s")

def main(mysqlConfigs):
  conn = pymysql.connect(**mysqlConfigs)  
  cursor = conn.cursor()
  stream = BinLogStreamReader(
    connection_settings = mysqlConfigs,
    server_id=100,
    blocking=True,
    resume_stream=True,
    only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])

  for binlogevent in stream:
    e_start_pos, last_pos = stream.log_pos, stream.log_pos
    for row in binlogevent.rows:
      event = {"schema": binlogevent.schema,
      "table": binlogevent.table,
      "type": type(binlogevent).__name__,
      "row": row
      }
      #if isinstance(binlog_event, QueryEvent) and binlog_event.query == 'BEGIN':
      #  e_start_pos = last_pos
      print("/*", json.dumps(event), "*/")
      print(concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlogevent, row=row, e_start_pos=e_start_pos))
      print()


if __name__ == "__main__":
  mysqlConfigs = {
      "host": "mysql",
      "port": 3306,
      "user": "root",
      "passwd": "Password0987",
      'db': "my_database",
  }
  main(mysqlConfigs)

/* {"schema": "my_database", "table": "test", "type": "WriteRowsEvent", "row": {"values": {"id": 1}}} */
INSERT INTO `my_database`.`test`(`id`) VALUES (1);

/* {"schema": "my_database", "table": "test", "type": "WriteRowsEvent", "row": {"values": {"id": 2}}} */
INSERT INTO `my_database`.`test`(`id`) VALUES (2);

/* {"schema": "my_database", "table": "test", "type": "WriteRowsEvent", "row": {"values": {"id": 3}}} */
INSERT INTO `my_database`.`test`(`id`) VALUES (3);

/* {"schema": "my_database", "table": "test", "type": "UpdateRowsEvent", "row": {"before_values": {"id": 3}, "after_values": {"id": 4}}} */
UPDATE `my_database`.`test` SET `id`=4 WHERE `id`=3 LIMIT 1;

/* {"schema": "my_database", "table": "test", "type": "DeleteRowsEvent", "row": {"values": {"id": 1}}} */
DELETE FROM `my_database`.`test` WHERE `id`=1 LIMIT 1;

