Skip to content
Permalink
Browse files
Added Infinispan Source connector example
  • Loading branch information
oscerd committed Oct 9, 2020
1 parent 3a7a70f commit db770a1ecf2df3c0b25adabe582d6423ff0aecc5
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 0 deletions.
@@ -0,0 +1,206 @@
# Camel-Kafka-connector Infinispan Source

This is an example for Camel-Kafka-connector Infinispan Source

## Standalone

### What is needed

- An Infinispan instance
### Setting up Infinispan

As first step you need to download the Infinispan Server with version 11.0.3.Final.

Infinispan 11.x is secured by default. For the purpose of this example we'll remove the security check, so we won't need any authentication.

If you have Infinispan unzipped in $INFINISPAN_HOME, you'll need to:

- Edit $INFINISPAN_HOME/server/config/infinispan.xml
and the file content should be

```
<infinispan
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:11.0 https://infinispan.org/schemas/infinispan-config-11.0.xsd
urn:infinispan:server:11.0 https://infinispan.org/schemas/infinispan-server-11.0.xsd"
xmlns="urn:infinispan:config:11.0"
xmlns:server="urn:infinispan:server:11.0">

<cache-container name="default" statistics="true">
<transport cluster="${infinispan.cluster.name}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/>
</cache-container>

<server xmlns="urn:infinispan:server:11.0">
<interfaces>
<interface name="public">
<inet-address value="${infinispan.bind.address:127.0.0.1}"/>
</interface>
</interfaces>

<socket-bindings default-interface="public" port-offset="${infinispan.socket.binding.port-offset:0}">
<socket-binding name="default" port="${infinispan.bind.port:11222}"/>
<socket-binding name="memcached" port="11221"/>
</socket-bindings>

<endpoints socket-binding="default">
<hotrod-connector name="hotrod"/>
<rest-connector name="rest"/>
</endpoints>
</server>
</infinispan>
```

Now we need to create a cache, since the default cache is not available anymore out of the box with Infinispan 11.

You can now start your server

```
> $INFINISPAN_HOME/bin/server.sh
bin/server.sh
06:57:51,378 INFO (main) [BOOT] JVM OpenJDK 64-Bit Server VM AdoptOpenJDK 25.252-b09
06:57:51,395 INFO (main) [BOOT] JVM arguments = [-Xms64m, -Xmx512m, -XX:MetaspaceSize=64M, -Djava.net.preferIPv4Stack=true, -Djava.awt.headless=true, -Dvisualvm.display.name=infinispan-server, -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager, -Dinfinispan.server.home.path=/home/oscerd/playground/infinispan-server-11.0.3.Final]
06:57:51,396 INFO (main) [BOOT] PID = 9678
06:57:51,441 INFO (main) [org.infinispan.SERVER] ISPN080000: Infinispan Server starting
06:57:51,441 INFO (main) [org.infinispan.SERVER] ISPN080017: Server configuration: /home/oscerd/playground/infinispan-server-11.0.3.Final/server/conf/infinispan.xml
06:57:51,441 INFO (main) [org.infinispan.SERVER] ISPN080032: Logging configuration: /home/oscerd/playground/infinispan-server-11.0.3.Final/server/conf/log4j2.xml
06:57:51,959 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'query-dsl-filter-converter-factory'
06:57:51,960 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'continuous-query-filter-converter-factory'
06:57:51,961 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'iteration-filter-converter-factory'
06:57:51,961 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'jdk.nashorn.api.scripting.NashornScriptEngineFactory'
06:57:52,367 WARN (main) [org.infinispan.PERSISTENCE] ISPN000554: jboss-marshalling is deprecated and planned for removal
06:57:52,919 INFO (main) [org.infinispan.CONTAINER] ISPN000128: Infinispan version: Infinispan 'Corona Extra' 11.0.3.Final
06:57:52,921 INFO (main) [org.infinispan.CONTAINER] ISPN000389: Loaded global state, version=11.0.3.Final timestamp=2020-09-30T21:04:46.511Z
06:57:53,046 INFO (main) [org.infinispan.CLUSTER] ISPN000078: Starting JGroups channel cluster with stack tcp
06:57:55,138 INFO (main) [org.jgroups.protocols.pbcast.GMS] ghost-35169: no members discovered after 2001 ms: creating cluster as coordinator
06:57:55,150 INFO (main) [org.infinispan.CLUSTER] ISPN000094: Received new cluster view for channel cluster: [ghost-35169|0] (1) [ghost-35169]
06:57:55,156 INFO (main) [org.infinispan.CLUSTER] ISPN000079: Channel cluster local address is ghost-35169, physical addresses are [192.168.1.15:7800]
06:57:55,810 INFO (main) [org.infinispan.CONTAINER] ISPN000104: Using EmbeddedTransactionManager
```

So, you'll need to run

```
> cd $INFINISPAN_HOME/bin/cli.sh
[disconnected]> connect
[ghost-35169@cluster//containers/default]> create cache --template=org.infinispan.DIST_SYNC mycache
[ghost-35169@cluster//containers/default]> describe caches/mycache
{
"distributed-cache" : {
"mode" : "SYNC",
"remote-timeout" : 17500,
"state-transfer" : {
"timeout" : 60000
},
"transaction" : {
"mode" : "NONE"
},
"locking" : {
"concurrency-level" : 1000,
"acquire-timeout" : 15000,
"striping" : false
},
"statistics" : true
}
}
```

It's important to add encoding to your cache configuration, otherwise consuming events won't work.
You'll need to:

- Edit $INFINISPAN_HOME/server/data/caches.xml

and add the encoding section in the configuration

```
<?xml version="1.0" ?>

<infinispan>
<cache-container>
<distributed-cache mode="SYNC" remote-timeout="17500" name="mycache" statistics="true">
<encoding>
<key media-type="text/plain; charset=UTF-8"/>
<value media-type="text/plain; charset=UTF-8"/>
</encoding>
<locking concurrency-level="1000" acquire-timeout="15000" striping="false"/>
<transaction mode="NONE"/>
<state-transfer timeout="60000"/>
</distributed-cache>
<distributed-cache mode="SYNC" remote-timeout="17500" name="cachecache" statistics="true">
<locking concurrency-level="1000" acquire-timeout="15000" striping="false"/>
<transaction mode="NONE"/>
<state-transfer timeout="60000"/>
</distributed-cache>
</cache-container></infinispan>
```

Now you should be able to run this example.

### Running Kafka

```
$KAFKA_HOME/bin/zookeeper-server-start.sh config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
```

## Setting up the needed bits and running the example

You'll need to setup the plugin.path property in your kafka

Open the `$KAFKA_HOME/config/connect-standalone.properties`

and set the `plugin.path` property to your choosen location

In this example we'll use `/home/oscerd/connectors/`

```
> cd /home/oscerd/connectors/
> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-infinispan-kafka-connector/0.5.0/camel-infinispan-kafka-connector-0.5.0-package.zip
> unzip camel-infinispan-kafka-connector-0.5.0-package.zip
```

Now it's time to setup the connectors

Open the Infinispan source configuration file

```
name=CamelInfinispanSourceConnector
connector.class=org.apache.camel.kafkaconnector.infinispan.CamelInfinispanSourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

topics=mytopic

camel.source.endpoint.hosts=localhost
camel.source.path.cacheName=cachecache
camel.source.endpoint.eventTypes=CLIENT_CACHE_ENTRY_CREATED
camel.source.endpoint.sync=false
```

Now you can run the example

```
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelInfinispanSourceConnector.properties
```

On a different terminal run your cli.sh from the Infinispan server

```
> bin/cli.sh
[disconnected]> connect
[ghost-43981@cluster//containers/default]> cache mycache
[ghost-43981@cluster//containers/default/caches/mycache]> put test test
```

In another terminal, using kafkacta, you should be able to see the headers.

```
> kafkacat -b localhost:9092 -t mytopic -C -f 'Headers: %h\n'

Headers: CamelHeader.CamelInfinispanCacheName=mycache,CamelHeader.CamelInfinispanEventType=CLIENT_CACHE_ENTRY_CREATED,CamelHeader.CamelInfinispanIsPre=false,CamelHeader.CamelInfinispanKey=test,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000
% Reached end of topic test290 [0] at offset 1

```

@@ -0,0 +1,28 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name=CamelInfinispanSourceConnector
connector.class=org.apache.camel.kafkaconnector.infinispan.CamelInfinispanSourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

topics=mytopic

camel.source.endpoint.hosts=localhost
camel.source.path.cacheName=cachecache
camel.source.endpoint.eventTypes=CLIENT_CACHE_ENTRY_CREATED
camel.source.endpoint.sync=false

0 comments on commit db770a1

Please sign in to comment.