# How to Install Confluent Apache Kafka?
- https://nijanthanravi.wordpress.com/2019/07/14/setup-confluent-kafka-on-windows/
- https://medium.com/@praveenkumarsingh/confluent-kafka-on-windows-how-to-fix-classpath-is-empty-cf7c31d9c787
- https://github.com/confluentinc/confluent-kafka-python

### Python Dependencies:
- pip install confluent-kafka
- pip install avro-python3

### MySQL Script:
```
CREATE TABLE `testdb`.`test_table` (
  `int_field` INT NOT NULL AUTO_INCREMENT,
  `str_field` VARCHAR(45) NOT NULL,
  `double_field` DOUBLE NULL,
  `decimal_field` DECIMAL(6,2) NULL,
  `date_field` DATE NULL,
  `datetime_field` DATETIME NULL,
  PRIMARY KEY (`int_field`)
);

INSERT INTO testdb.test_table (str_field, double_field, decimal_field, date_field, datetime_field)
VALUES ('string value', 5.5, 3.25, '2019-02-10', '2019-02-10 12:34:56');

UPDATE testdb.test_table
SET decimal_field=1234.56,
    date_field='2020-01-23',
    datetime_field='2020-01-23 01:23:45';

DELETE FROM testdb.test_table;
```

### How to Start Confluent Apache Kafka?
1. Open command prompt at confluent folder


2. Start Zookeeper<br>
   2.1 Execute ***bin\windows\zookeeper-server-start.bat etc\kafka\zookeeper.properties***<br>
   2.2 Default port is ```2181```, change at ***etc\kafka\zookeeper.properties*** if necessary<br>
   2.3 If encounter ```Classpath``` error:
   - Open ***bin\windows\kafka-run-class.bat***
   - Search for ```rem Classpath addition for core``` in the bat file
   - Add following lines above the search line:
   ```
   rem classpath addition for LSB style path
   if exist %BASE_DIR%\share\java\kafka\* (
     call:concat %BASE_DIR%\share\java\kafka\*
   )
   ```
   - Re-run zookeeper


3. Start Kafka Broker<br>
   3.1 Execute ***bin\windows\kafka-server-start.bat etc\kafka\server.properties***<br>
   3.2 Delete ***tmp\kafka-log*** & ***tmp\zookeeper*** folder if encounter ```ERROR Shutdown broker because all log dirs in tmp\kafka-logs have failed (kafka.log.LogManager)```


4. Start Schema Registry<br>
   4.1 Download missing schema-registry files to bin\windows folder:
   - https://github.com/confluentinc/schema-registry/tree/master/bin/windows
   4.2 Execute **bin\windows\schema-registry-start.bat etc\schema-registry\schema-registry.properties**


5. Debezium MySQL CDC Connector<br>
   5.1 Download:
   - https://www.confluent.io/hub/debezium/debezium-connector-mysql
   5.2 Create ***share\java\kafka\plugins*** folder, and move downloaded jar files to the folder
   - Ensure to check if there's latest debezium plugin version from official debezium website
   5.3 Ensure ```plugin.path``` is set as ```plugin.path=share/java``` on ***etc\kafka\connect-distributed.properties*** file
   5.4 Configure MySQL binlog:
   - Reference:
     - https://documentation.commvault.com/commvault/v11/article?p=34667.htm
     - https://debezium.io/documentation/reference/0.9/connectors/mysql.html
   - Comment out default ```log-bin``` value from ***my.ini*** file, and paste the following:
   ```
    log_bin=mysql-bin
    binlog_format=row
    binlog_row_image=full
    expire_logs_days=1
    gtid_mode=on
    enforce_gtid_consistency=on
    binlog_rows_query_log_events=on
   ```
   - Check MySQL variables:
   ```
   SHOW VARIABLES WHERE variable_name IN ('server_id','log_bin','binlog_format','binlog_row_image', 'expire_logs_days');
   SHOW VARIABLES WHERE variable_name IN ('gtid_mode', 'enforce_gtid_consistency', 'binlog_rows_query_log_events');
   ```


6. Start Connector<br>
   6.1 Execute one of the followings:<br>
   - JSON Converter: ***bin\windows\connect-distributed.bat etc\kafka\connect-distributed.properties***
   - Avro Converter: ***bin\windows\connect-distributed.bat etc\schema-registry\connect-avro-distributed.properties***
   6.2 If encounter ```FileNotFoundException```:
   - Open ***bin\windows\connect-distributed.bat***
   - Search for ```rem Log4j settings``` in the bat file
   - Replace ```config/tools-log4j.properties``` with ```etc/kafka/tools-log4j.properties```
   - Re-run connector
   6.3 Check if http://localhost:8083/ is running


7. Start Debezium Source Connector (Producer)<br>
   7.1 Run following on bash prompt, or perform POST request on Postman:
   ```
    curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors -d '{
        "name": "debezium-mysql-source-connector",
        "config": {
            "connector.class": "io.debezium.connector.mysql.MySqlConnector",
            "tasks.max": 1,
            "database.hostname": "localhost",
            "database.port": "3306",
            "database.user": "root",
            "database.password": "root",
            "database.server.id": "184054",
            "database.server.name": "test_server",
            "database.whitelist": "testdb",
            "table.whitelist": "testdb.test_table",
            "database.history.kafka.bootstrap.servers": "localhost:9092",
            "database.history.kafka.topic": "TEST_TOPIC",
            "snapshot.mode": "schema_only"
        }
    }'
   ```
   7.2 If encounter unrecognized server timezone error:
   - Download SQL script (POSIX standard) to populate timezone data at: https://dev.mysql.com/downloads/timezones.html
   - Add ```USE mysql;``` at the beginning of script, and run it to populate timezone data
   - Change timezone once complete: ```SET GLOBAL time_zone='Asia/Kuala_Lumpur'```
   - Re-submit POST request
   7.3 Ensure connector & tasks are in "RUNNING" state:
   - http://localhost:8083/connectors/debezium-mysql-source-connector/status


8. List topic (Optional)<br>
   8.1 Execute ***bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092***<br>
   8.2 <font color='red'>**NOTE**</font>: Topic naming convention to listen on will be: ```<serverName>.<databaseName>.<tableName>```

# Avro Converter

In [None]:
from confluent_kafka import avro
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
from datetime import date, timedelta, datetime
import struct
import json

In [None]:
# Reference: https://github.com/abrarsheikh/avro/blob/9407b60e03002e3e04ac31a81d54821377e051e3/lang/py/src/avro/io.py
def date_from_int(days_since_epoc, return_format=None):
    dt = date(1970, 1, 1) + timedelta(days_since_epoc)
    return dt if return_format is None else dt.strftime(format=return_format)

def timestamp_from_long(ts, return_format=None):
    dt = datetime.utcfromtimestamp(ts / 1000.0)
    return dt if return_format is None else dt.strftime(format=return_format)

def decimal_from_byte(b):
    req_length  = 8
    diff_length = req_length - len(b)

    return struct.unpack('!Q', (b'\0' * diff_length) + b)[0] / 100

In [None]:
record_schema = avro.loads('''
{
   "type": "record",
   "name": "Envelope",
   "namespace": "test_server.testdb.test_table",
   "fields": [
      {
         "type": [
            "null",
            {
               "type": "record",
               "name": "Value",
               "namespace": "test_server.testdb.test_table",
               "fields": [
                  {
                     "type": "int",
                     "name": "int_field"
                  },
                  {
                     "type": "string",
                     "name": "str_field"
                  },
                  {
                     "type": [
                        "null",
                        "double"
                     ],
                     "name": "double_field",
                     "default": null
                  },
                  {
                     "type": [
                        "null",
                        {
                           "type": "bytes",
                           "logicalType": "decimal"
                        }
                     ],
                     "name": "decimal_field",
                     "default": null
                  },
                  {
                     "type": [
                        "null",
                        {
                           "type": "int",
                           "logicalType": "date"
                        }
                     ],
                     "name": "date_field",
                     "default": null
                  },
                  {
                     "type": [
                        "null",
                        {
                           "type": "long",
                           "logicalType": "timestamp-millis"
                        }
                     ],
                     "name": "datetime_field",
                     "default": null
                  }
               ]
            }
         ],
         "name": "before",
         "default": null
      },
      {
         "type": [
            "null",
            "test_server.testdb.test_table.Value"
         ],
         "name": "after",
         "default": null
      },
      {
         "type": "string",
         "name": "op"
      }
   ]
}
''')

In [None]:
c = AvroConsumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'group_id',
    'schema.registry.url': 'http://127.0.0.1:8081',
    'enable.auto.commit': 'False',
    'auto.offset.reset': 'earliest'
}, reader_value_schema=record_schema)
c.subscribe(['test_server.testdb.test_table'])

In [None]:
while True:
    try:
        message = c.poll(10)
    except SerializerError as e:
        print(f'[ERROR:SerializerError] - {e}')
        try:
            print(f'[FAILED MESSAGE]: {message}')
        except NameError:
            pass
        break
    
    if message is None:
        continue
    
    if message.error():
        print(f'[ERROR:MessageError] - {message.error()}')
        continue
    
    value = message.value()
    for status in ['before', 'after']:
        if value and value[status]:
            value[status]['date_field']     = date_from_int(value[status]['date_field'], return_format='%Y-%m-%d')
            value[status]['datetime_field'] = timestamp_from_long(value[status]['datetime_field'], return_format='%Y-%m-%d %H:%M:%S')
            value[status]['decimal_field']  = decimal_from_byte(value[status]['decimal_field'])
    
    print(f'[CONSUMED]:')
    print(json.dumps(value, indent=2))
    print()

c.close()

# JSON Converter

In [None]:
from kafka import KafkaConsumer
import json

In [None]:
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest')
consumer.topics()

In [None]:
consumer.subscribe(['test_server.testdb.test_table'])
for message in consumer:
    
    value = json.loads('{}' if message.value is None else message.value)
    value = {k: v for k,v in value.get('payload', {}).items() if k in ['before', 'after', 'op']}
    
    print(f'[CONSUMED]:')
    print(json.dumps(value, indent=2))
    print()