ConTube is a scalable data connector framework that facilitates efficient data transfer between diverse systems.
The challenge of efficiently moving data from one system to another has long been a significant issue in data engineering. ConTube seeks to address this by offering a generic data connector framework capable of handling data from diverse system.
ConTube provides a universal set of data connection protocols. These protocols define how to extract data from a system, handle data formatting, and write data into other systems. This framework enables us to implement Runtimes for Pulsar Connectors and Kafka Connectors, thereby leveraging the rich data connection ecosystem of Pulsar and Kafka. This facilitates data migration capabilities between a variety of different systems.
ConTube operates through two main components: Con
and Tube
. A Con
serves as a connection for connecting and moving
data between tubes. A Tube
is a data processing unit that can be connected to other data systems and other tubes to
form a data processing pipeline. A Tube
can be either a Source
or a Sink
. A Source
tube reads data from an
upstream system and provides it for further processing. A Sink
tube receives data from other tubes and writes it to a
downstream system.
This repository defines the core interfaces for ConTube
. Here's a high-level overview of how these interfaces work
together:
Con Interface
: The Con interface serves as a connection for connecting and exchanging data between tubes. It provides the send method for dispatching records to other tubes and the register method for processing incoming records.Source Tube Interface
: The Source interface is implemented by a source tube, which reads records from an upstream system and provides them for further processing.Sink Tube Interface
: The Sink interface is implemented by a sink tube, which receives records from other tubes and writes them to a downstream system.TubeRecord Interface
: The TubeRecord interface encapsulates the data transferred between tubes in the system. It provides methods to access the value of the record, its schema data, its index, and to handle its commit state.Context Interface
: The Context interface provides an interaction medium for a tube with its runtime environment. It allows the tube to access its name and control its execution state through the getName, stop, and fail methods.
- Universal Data Format: ConTube offers a universal data format capable of representing data from virtually any system. This allows us to implement Runtimes for Pulsar Connectors and Kafka Connectors, thereby accessing the rich data connection ecosystem of Pulsar and Kafka. This feature is still under development.
- Data Sourcing and Sinking: ConTube provides a set of interfaces for data sourcing and sinking. This allows us to implement Runtimes for Pulsar Connectors and Kafka Connectors, thereby accessing the rich data connection ecosystem of Pulsar and Kafka. Support for other MQ ecosystems is on the way.
- Lightweight Runtime: In contrast to the Kafka and Pulsar Connect Frameworks, Contube allows for running multiple tubes within a single JVM process or a single Docker container, eliminating the need for running separate MQ clusters.
- Expanded Data Transfer Options: The current implementation of
Con
is MemoryCon, implying that data is transferred in memory. We aim to add moreCon
implementations, such as GrpcCon, which transfers data using gRPC, and MQCon, which transfers data using various message queue protocols. - Data Consistency Guarantee: A work in progress feature, we aim to introduce a data consistency guarantee mechanism
to ensure that data is transferred between systems in an
exactly once
,at least once
, orat most once
manner. - ... and more.
The repository contube-thirdparty provides serveral implementations
of Tube
. These include tubes for executing Kafka Connectors and Pulsar IO Connectors, as well as tubes for interacting
with the Pulsar system. We are working on additional implementations.
TL;DR. For a basic overview, please refer to this example.
This example will demonstrate how to:
- Capture data changes from MongoDB using the Kafka Debezium MongoDB connector
- Transfer the data using MemoryCon (The default implementation of Con interface)
- Sink the data into Elastic-Search using the Pulsar Elastic-Search connector
To begin, download this example from here. You can clone the contube-thirdparty repo using git:
git clone git@github.com:RobertIndie/contube-thirdparty.git
cd examples/mongodb-elasticsearch
This example provides everything needed for setup. If you only want to run and validate this example, you can skip ahead
to the Run and Validate the Example
section.
Before running the example, let's examine the example files.
In the tubes directory, we have defined two tubes that function as data connectors.
The MongoDB source tube uses the Kafka Debezium MongoDB connector to capture data changes from MongoDB. The tube
configuration is located in the mongodb-source.yaml
file:
name: mongo-source
type: source
class: io.github.contube.kafka.connect.source.KafkaConnectSourceTube
sinkTubeName: es-sink
config:
mongodb.hosts: "rs0/mongodb:27017"
mongodb.connection.string: "mongodb://mongodb:27017/?replicaSet=rs0"
mongodb.user: "debezium"
mongodb.password: "dbz"
mongodb.authsource: "admin"
mongodb.name: "debezium"
mongodb.task.id: "1"
task.class: "io.debezium.connector.mongodb.MongoDbConnectorTask"
This tube uses the KafkaConnectSourceTube
class, an implementation for Kafka Connect. The sinkTubeName
is the name
of the sink tube where this source tube will transfer the data. The config
section provides the configuration for the
Kafka Debezium MongoDB connector. More details about the configuration can be found in
the Kafka Debezium MongoDB connector
documentation.
The Elastic Search sink tube uses the Pulsar Elastic Search connector to sink the data into Elastic Search. The tube
configuration is located in the es-sink.yaml
file:
name: es-sink
type: sink
class: io.github.contube.pulsar.connect.sink.PulsarConnectSinkTube
config:
archive: "./tubes/pulsar-io-elastic-search-3.1.1.nar"
connectorConfig:
elasticSearchUrl: "http://elastic-search:9200"
indexName: "my_index"
username: "elastic"
password: "ElasticPasseword"
This tube uses the PulsarConnectSinkTube
class, an implementation for Pulsar IO. The archive
is the path of the
Pulsar Elastic Search connector nar file. The connectorConfig
section provides the configuration for the Pulsar
Elastic Search connector. More details about the configuration can be found in
the Pulsar Elastic Search connector documentation.
In the docker-compose.yaml
file, we illustrate how to use Contube for this example:
contube-test:
image: contube/contube-all
volumes:
- ./tubes:/contube/tubes
depends_on:
- mongodb
- elastic-search
We've mounted the tubes
directory to the /contube/tubes
directory in the container. Therefore, Contube will load the
tube configurations from the tubes
directory and start all tubes therein.
To run this example, first, download the Pulsar Elastic Search connector nar file and place it into the Contube
container. For the Kafka Debezium connector jar file, we've already included it in the Contube container. However, if
you wish to use other Kafka or Pulsar connectors, simply place the connector jar or nar file into the /contube/libs
path of the Contube container.
A script has been prepared for your convenience. You can run ./run.sh
to execute all necessary tasks.
./run.sh
Finally, let's validate this example.
Insert a record into MongoDB:
# Please ensure you are running this command in the `examples/mongodb-elasticsearch` directory.
docker-compose exec mongodb bash -c 'mongo -u debezium -p dbz --authenticationDatabase admin inventory'
db.customers.insert([
{ _id : NumberLong("1005"), first_name : 'Bob', last_name : 'Hopper', email : 'thebob@example.com', unique_id : UUID() }
]);
Check the data in Elastic Search:
# Refresh the index:
curl -s http://localhost:9200/my_index/_refresh
# Search documents:
curl -s http://localhost:9200/my_index/_search | jq
The search should return data containing the following document:
{
"_index": "my_index",
"_type": "_doc",
"_id": "xnhDMIwBlImub__wU9mI",
"_score": 1.0,
"_source": {
"after": "{\"_id\": {\"$numberLong\": \"1005\"},\"first_name\": \"Bob\",\"last_name\": \"Hopper\",\"email\": \"thebob@example.com\",\"unique_id\": {\"$binary\": \"ZtUqTQYzSCq5xfpaSlM/3w==\",\"$type\": \"04\"}}",
"source": {
"version": "1.9.7.Final",
"connector": "mongodb",
"name": "debezium",
"ts_ms": 1701616760000,
"snapshot": "true",
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 1
},
"op": "r",
"ts_ms": 1701616764265
}
}
You can also experiment with running other Kafka/Pulsar connectors on Contube. Simply place the connector jar/nar files
into the /contube/libs
path of the Contube container, add the tube configuration into the tubes
directory, and then
run the Contube container.
Note: If you encounter connection issues with the Contube container, try restarting the container by
running docker-compose restart contube-test
. If you encounter other issues, please feel free to open an issue.
To compile Contube, execute the following command:
./gradlew jar
Next, place your tube implementation jar file into the libs
directory and add your tubes configuration files. Start
the runtime by executing the following command:
bin/runtime.sh conf/runtime.yaml path/to/your-tube-config.yaml
We have prepared a straightforward file source and sink example in the examples
directory. You can initiate the
example by running the following command:
bin/runtime.sh conf/runtime.yaml examples/file-source.yaml examples/file-sink.yaml
This example will duplicate the content from examples/source.txt
to examples/test-result-sink.txt
.
This project is currently in the Proof of Concept (POC) stage. We warmly welcome all contributors who are interested in this project.
Here are some of the features we plan to introduce in the future:
- Universal Data Format
- Pluggable Data Schema
- Additional Con implementations: GrpcCon, MQCon, etc.
- Distributed offset store implementation
- Data consistency guarantee mechanism
- Awaiting more ideas...
We enthusiastically welcome contributions from the community. If you discover a bug or have a feature request, please open an issue or submit a pull request.
This project is licensed under the Apache License 2.0.