# Kafka Connect MySQL to MongoDB

Slighty modified from Confluent's example.

- [confluent platform docker](https://github.com/confluentinc/cp-docker-images/blob/5.3.0-post/examples/cp-all-in-one/docker-compose.yml) version 5.4.0
- mysql version 8.0.19

## Prerequisites

- Docker
- Docker Compose
- curl
- **Docker-Machine**  
  Docker for mac is not as native as Linux. Weird network behavior may occur.  
  Better use docker-machine to avoid it.

## (Mac) Start Docker-Machine

In [1]:
# if using docker-machine
if [ docker-machine >/dev/null 2>&1 ]
then
    docker-machine create --driver virtualbox --virtualbox-memory 6000 confluent
    docker-machine start confluent
    eval $(docker-machine env confluent)
    echo "Docker machine started"
else
    echo "Docker is native on your system"
fi

Docker machine "confluent" already exists
Starting "confluent"...
Machine "confluent" is already running.
Docker machine started


## Download MySQL JDBC Driver

It is important to make sure the **version of MySQL-JDBC** match **the version of MySQL**.

In [2]:
docker-compose stop
echo
docker-compose rm -f


No stopped containers


In [3]:
docker run --rm mysql --version

/usr/sbin/mysqld  Ver 8.0.19 for Linux on x86_64 (MySQL Community Server - GPL)


In [4]:
if [ docker-machine >/dev/null 2>&1 ]
then
    docker-machine ssh confluent -- \
    """
    sudo mkdir -p /tmp/quickstart/jars;
    sudo curl -k \
        -s \
        -SL \
        \"http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.19.tar.gz\" |
        sudo tar -xzf - -C \
            /tmp/quickstart/jars \
            --strip-components=1 \
            mysql-connector-java-8.0.19/mysql-connector-java-8.0.19.jar;
    ls -la /tmp/quickstart/jars
    """
else
    sudo mkdir -p /tmp/quickstart/jars;
    sudo curl -k \
        -s \
        -SL \
        \"http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.19.tar.gz\" |
        sudo tar -xzf - -C \
            /tmp/quickstart/jars \
            --strip-components=1 \
            mysql-connector-java-8.0.19/mysql-connector-java-8.0.19.jar;
    ls -la /tmp/quickstart/jars
fi

total 2312
drwxr-xr-x    2 root     root          4096 Mar  3 07:00 .
drwxr-xr-x    4 root     root          4096 Mar  2 22:19 ..
-rw-r--r--    1 root     root       2356711 Dec  4 11:44 mysql-connector-java-8.0.19.jar


### MySQL-JDBC

- [MySQL Engineering Blogs](https://dev.mysql.com/downloads/connector/j/)
- [mvnrepository](https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.19)

## Mongo Connector

In [None]:
if [ docker-machine >/dev/null 2>&1 ]
then
    docker-machine ssh confluent -- \
    """
    sudo mkdir -p /tmp/quickstart/mongojars;
    sudo curl -s -SL \
        'https://search.maven.org/remotecontent?filepath=org/mongodb/kafka/mongo-kafka-connect/1.0.1/mongo-kafka-connect-1.0.1.jar' \
        -o /tmp/quickstart/jars/mongo-kafka-connect-1.0.1.jar
    ls -la /tmp/quickstart/mongojars
    """
else
    sudo mkdir -p /tmp/quickstart/mongojars;
    sudo curl -s -SL \
        'https://search.maven.org/remotecontent?filepath=org/mongodb/kafka/mongo-kafka-connect/1.0.1/mongo-kafka-connect-1.0.1.jar' \
        -o /tmp/quickstart/jars/mongo-kafka-connect-1.0.1.jar
    ls -la /tmp/quickstart/jars
fi


## Start Docker Compose

In [6]:
docker-compose stop ; docker-compose up -d

Creating zookeeper ... 
[1Bting zookeeper ... [31merror[0m
ERROR: for zookeeper  Cannot create container for service zookeeper: Conflict. The container name "/zookeeper" is already in use by container "40779a9064b63109fdef80105ab787339c562e2b4a65c6fa2b4f014497fcac86". You have to remove (or rename) that container to be able to reuse that name.

ERROR: for zookeeper  Cannot create container for service zookeeper: Conflict. The container name "/zookeeper" is already in use by container "40779a9064b63109fdef80105ab787339c562e2b4a65c6fa2b4f014497fcac86". You have to remove (or rename) that container to be able to reuse that name.
[31mERROR[0m: Encountered errors while bringing up the project.


: 1

## Insert data into MySQL

MySQL container may take a while until it can function.

In [None]:
# Nasty script to wait for mysql be ready
while ! docker exec quickstart-mysql mysql --user=confluent --password=confluent -e "SELECT 1" >/dev/null 2>&1; do
    sleep 1
done

In [None]:
docker exec -i quickstart-mysql mysql -u confluent -pconfluent <<< """
CREATE DATABASE IF NOT EXISTS connect_test;
USE connect_test;

DROP TABLE IF EXISTS test;

CREATE TABLE IF NOT EXISTS test (
  id serial NOT NULL PRIMARY KEY,
  name varchar(100),
  email varchar(200),
  department varchar(200),
  modified timestamp default CURRENT_TIMESTAMP NOT NULL,
  INDEX \`modified_index\` (\`modified\`)
);

INSERT INTO test (name, email, department) VALUES ('alice', 'alice@abc.com', 'engineering');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
SELECT * FROM test;
"""

## Source Connector

### Add source connector

Either API or Web UI(Confluent Platform) will acheive the goal.

In [None]:
export CONNECT_NET="kafka-connect-mysql_default"

We have to wait for Kafka Connect to totally start up.  

To speed up the process, remove some directory from **CONNECT_PLUGIN_PATH**

```bash
      #CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_PLUGIN_PATH: "\
        /usr/share/java/kafka,\
        /usr/share/confluent-hub-components,\
        /usr/share/java/kafka-connect-jdbc,\
        /etc/kafka-connect/jars"
```

In [None]:
while ! docker logs connect 2>&1 | grep -i "INFO Kafka Connect started" ; do
     sleep 1
done

In [None]:
# Call the API of connect
docker run \
    --net="${CONNECT_NET}" \
    --rm curlimages/curl:7.68.0 \
    -X POST \
    -s \
    -H "Content-Type: application/json" \
    --data '{ "name": "quickstart-jdbc-source", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": 1, "connection.url": "jdbc:mysql://quickstart-mysql:3306/connect_test?user=root&password=confluent", "mode": "incrementing", "incrementing.column.name": "id", "timestamp.column.name": "modified", "topic.prefix": "quickstart-jdbc-", "poll.interval.ms": 1000 } }' \
    http://connect:8083/connectors

### If error message is received

Make sure MySQL-JDBC JAR file is correctly mounted to container.

In [None]:
docker exec connect env | grep -e "^CONNECT_PLUGIN_PATH"

In [None]:
docker exec connect ls -la /etc/kafka-connect/jars

Higher the logging level from docker-compose.yml and run again 

```bash
    #CONNECT_LOG4J_ROOT_LOGLEVEL: "DEBUG"
```

### Status Check

In [None]:
# Check if new topic is created
docker run \
    --net="${CONNECT_NET}" \
    --rm \
    confluentinc/cp-kafka:5.4.0 \
    kafka-topics --describe \
    --zookeeper zookeeper:2181 \
    --topic quickstart-jdbc-test

In [None]:
docker exec schema-registry \
    kafka-avro-console-consumer \
    --bootstrap-server broker:29092 \
    --topic quickstart-jdbc-test \
    --from-beginning \
    --property print.key=true \
    --max-messages 10 | \
    grep -e "^null"

## Sink Connector

### Add sink connector

In [None]:
# Call the API of connect
docker run \
    --net="${CONNECT_NET}" \
    --rm \
    curlimages/curl:7.68.0 \
    -s -X POST \
    -H "Content-Type: application/json" \
    --data '{"name": "quickstart-avro-file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"quickstart-jdbc-test", "file": "/tmp/quickstart/jdbc-output.txt"}}' \
    http://connect:8083/connectors

### Status check (Sink)


In [None]:
# Check connector status through API
docker run \
    --net="${CONNECT_NET}" \
    --rm \
    curlimages/curl:7.68.0 \
    -s -X GET http://connect:8083/connectors/quickstart-avro-file-sink/status

## Results

In [None]:
# if using docker-machine
if [ docker-machine >/dev/null 2>&1 ]
then
    docker-machine ssh confluent -- cat /tmp/quickstart/file/jdbc-output.txt
else
    cat /tmp/quickstart/file/jdbc-output.txt
fi

In [None]:
docker-compose stop

## References
- <https://docs.confluent.io/5.0.0/installation/docker/docs/installation/connect-avro-jdbc.html>
- <https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/#no-suitable-driver-found>
- <https://rmoff.net/post/kafka-connect-change-log-level-and-write-log-to-file/>
- <https://stackoverflow.com/questions/25503412/how-do-i-know-when-my-docker-mysql-container-is-up-and-mysql-is-ready-for-taking>