Skip to content
Permalink
Browse files
Added a PgEvent Source connector example
  • Loading branch information
oscerd committed Nov 3, 2020
1 parent 989c9f6 commit ac730e6cfe6a6edf96cc2ade8d1a44b0055d5c90
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 0 deletions.
@@ -0,0 +1,196 @@
= Camel-Kafka-connector PGEvent Source

This is an example for Camel-Kafka-connector PGEvent Source

== Standalone

=== What is needed

- An AWS Account
- A running postgresql instance through docker

=== Running Kafka

[source]
----
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
----

=== Download the connector package

Download the connector package zip and extract the content to a directory. In this example we'll use `/home/oscerd/connectors/`

[source]
----
> cd /home/oscerd/connectors/
> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-pgevent-kafka-connector/0.6.0/camel-pgevent-kafka-connector-0.6.0-package.zip
> unzip camel-pgevent-kafka-connector-0.6.0-package.zip
----

=== Configuring Kafka Connect

You'll need to set up the `plugin.path` property in your kafka

Open the `$KAFKA_HOME/config/connect-standalone.properties` and set the `plugin.path` property to your choosen location:

[source]
----
...
plugin.path=/home/oscerd/connectors
...
----

=== Setup the docker image

We'll need a full running Postgresql instance.

First step is running it:

[source]
----
> docker run --name some-postgres -e POSTGRES_PASSWORD=mysecretpassword -d postgres
6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb
----

Take note of the container id.
We need now to create the table we'll use: the table is the following

[source]
----
CREATE TABLE accounts (
user_id serial PRIMARY KEY,
username VARCHAR ( 50 ) UNIQUE NOT NULL,
city VARCHAR ( 50 ) NOT NULL
);
----

We are now ready to create the table

[source]
----
> docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres
psql (13.0 (Debian 13.0-1.pgdg100+1))
Type "help" for help.
postgres=# CREATE TABLE accounts (
postgres(# user_id serial PRIMARY KEY,
postgres(# username VARCHAR ( 50 ) UNIQUE NOT NULL,
postgres(# city VARCHAR ( 50 ) NOT NULL
postgres(# );
----

Now we need to create a trigger with a notify command for the channel we'll choose. In our case we'll call the channel like the table we created, so 'accounts'

The Trigger is the following

[source]
----
CREATE OR REPLACE FUNCTION row_inserted()
RETURNS trigger AS $$
DECLARE
BEGIN
PERFORM pg_notify(
'accounts',
row_to_json(NEW)::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER row_inserted
AFTER INSERT ON accounts
FOR EACH ROW
EXECUTE PROCEDURE row_inserted();
----

We need to execute it in our psql session

[source]
----
> docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres
psql (13.0 (Debian 13.0-1.pgdg100+1))
Type "help" for help.
postgres=# CREATE OR REPLACE FUNCTION row_inserted()
postgres-# RETURNS trigger AS $$
postgres$# DECLARE
postgres$# BEGIN
postgres$# PERFORM pg_notify(
postgres$# 'accounts',
postgres$# row_to_json(NEW)::text);
postgres$# RETURN NEW;
postgres$# END;
postgres$# $$ LANGUAGE plpgsql;
CREATE FUNCTION
postgres=#
postgres=# CREATE TRIGGER row_inserted
postgres-# AFTER INSERT ON accounts
postgres-# FOR EACH ROW
postgres-# EXECUTE PROCEDURE row_inserted();
CREATE TRIGGER
----

We need to take note also of the container ip

----
> docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb
172.17.0.2
----

=== Setup the connectors

Open the PGEvent configuration file at `$EXAMPLES/pgevent/pgevent-source/config/CamelPgeventSourceConnector.properties`

[source]
----
name=CamelPgeventSourceConnector
connector.class=org.apache.camel.kafkaconnector.pgevent.CamelPgeventSourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
topics=mytopic
camel.source.endpoint.user=postgres
camel.source.endpoint.pass=mysecretpassword
camel.source.path.port=5432
camel.source.path.host=172.17.0.2
camel.source.path.channel=accounts
camel.source.path.database=postgres
----

and add the correct IP for the container.

=== Running the example

Run the kafka connect with the SQL Source connector:

[source]
----
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $EXAMPLES/pgevent/pgevent-source/config/CamelPgeventSourceConnector.properties
----

Now we need to create run some insert in our database

[source]
----
> docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres
postgres=# insert into accounts(username,city) values('andrea','Roma');
INSERT 0 1
Asynchronous notification "accounts" with payload "{"user_id":1,"username":"andrea","city":"Roma"}" received from server process with PID 152.
postgres=# insert into accounts(username,city) values('John','New York');
INSERT 0 1
Asynchronous notification "accounts" with payload "{"user_id":2,"username":"John","city":"New York"}" received from server process with PID 152.
----

On a different terminal run the kafkacat consumer

[source]
----
> ./kafkacat -b localhost:9092 -t mytopic
{"user_id":1,"username":"andrea","city":"Roma"}
{"user_id":2,"username":"John","city":"New York"}
% Reached end of topic dbtest6 [0] at offset 2
----

@@ -0,0 +1,31 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name=CamelPgeventSourceConnector
connector.class=org.apache.camel.kafkaconnector.pgevent.CamelPgeventSourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

topics=mytopic

camel.source.endpoint.user=postgres
camel.source.endpoint.pass=mysecretpassword
camel.source.path.port=5432
camel.source.path.host=172.17.0.2

camel.source.path.channel=accounts
camel.source.path.database=postgres

0 comments on commit ac730e6

Please sign in to comment.