Skip to content
Permalink
Browse files
Added an Infinispan Sink Example
  • Loading branch information
oscerd committed Oct 1, 2020
1 parent bff2dd8 commit 846d5761ada795314f0422e56abad2c2793d9b27
Show file tree
Hide file tree
Showing 2 changed files with 221 additions and 0 deletions.
@@ -0,0 +1,195 @@
# Camel-Kafka-connector Infinispan Sink

This is an example for Camel-Kafka-connector Infinispan Sink

## Standalone

### What is needed

- An Infinispan instance
### Setting up Infinispan

As first step you need to download the Infinispan Server with version 11.0.3.Final.

Infinispan 11.x is secured by default. For the purpose of this example we'll remove the security check, so we won't need any authentication.

If you have Infinispan unzipped in $INFINISPAN_HOME, you'll need to:

- Edit $INFINISPAN_HOME/server/config/infinispan.xml
and the file content should be

```
<infinispan
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:11.0 https://infinispan.org/schemas/infinispan-config-11.0.xsd
urn:infinispan:server:11.0 https://infinispan.org/schemas/infinispan-server-11.0.xsd"
xmlns="urn:infinispan:config:11.0"
xmlns:server="urn:infinispan:server:11.0">

<cache-container name="default" statistics="true">
<transport cluster="${infinispan.cluster.name}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/>
</cache-container>

<server xmlns="urn:infinispan:server:11.0">
<interfaces>
<interface name="public">
<inet-address value="${infinispan.bind.address:127.0.0.1}"/>
</interface>
</interfaces>

<socket-bindings default-interface="public" port-offset="${infinispan.socket.binding.port-offset:0}">
<socket-binding name="default" port="${infinispan.bind.port:11222}"/>
<socket-binding name="memcached" port="11221"/>
</socket-bindings>

<endpoints socket-binding="default">
<hotrod-connector name="hotrod"/>
<rest-connector name="rest"/>
</endpoints>
</server>
</infinispan>
```

Now we need to create a cache, since the default cache is not available anymore out of the box with Infinispan 11.

You can now start your server

```
> $INFINISPAN_HOME/bin/server.sh
bin/server.sh
06:57:51,378 INFO (main) [BOOT] JVM OpenJDK 64-Bit Server VM AdoptOpenJDK 25.252-b09
06:57:51,395 INFO (main) [BOOT] JVM arguments = [-Xms64m, -Xmx512m, -XX:MetaspaceSize=64M, -Djava.net.preferIPv4Stack=true, -Djava.awt.headless=true, -Dvisualvm.display.name=infinispan-server, -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager, -Dinfinispan.server.home.path=/home/oscerd/playground/infinispan-server-11.0.3.Final]
06:57:51,396 INFO (main) [BOOT] PID = 9678
06:57:51,441 INFO (main) [org.infinispan.SERVER] ISPN080000: Infinispan Server starting
06:57:51,441 INFO (main) [org.infinispan.SERVER] ISPN080017: Server configuration: /home/oscerd/playground/infinispan-server-11.0.3.Final/server/conf/infinispan.xml
06:57:51,441 INFO (main) [org.infinispan.SERVER] ISPN080032: Logging configuration: /home/oscerd/playground/infinispan-server-11.0.3.Final/server/conf/log4j2.xml
06:57:51,959 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'query-dsl-filter-converter-factory'
06:57:51,960 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'continuous-query-filter-converter-factory'
06:57:51,961 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'iteration-filter-converter-factory'
06:57:51,961 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'jdk.nashorn.api.scripting.NashornScriptEngineFactory'
06:57:52,367 WARN (main) [org.infinispan.PERSISTENCE] ISPN000554: jboss-marshalling is deprecated and planned for removal
06:57:52,919 INFO (main) [org.infinispan.CONTAINER] ISPN000128: Infinispan version: Infinispan 'Corona Extra' 11.0.3.Final
06:57:52,921 INFO (main) [org.infinispan.CONTAINER] ISPN000389: Loaded global state, version=11.0.3.Final timestamp=2020-09-30T21:04:46.511Z
06:57:53,046 INFO (main) [org.infinispan.CLUSTER] ISPN000078: Starting JGroups channel cluster with stack tcp
06:57:55,138 INFO (main) [org.jgroups.protocols.pbcast.GMS] ghost-35169: no members discovered after 2001 ms: creating cluster as coordinator
06:57:55,150 INFO (main) [org.infinispan.CLUSTER] ISPN000094: Received new cluster view for channel cluster: [ghost-35169|0] (1) [ghost-35169]
06:57:55,156 INFO (main) [org.infinispan.CLUSTER] ISPN000079: Channel cluster local address is ghost-35169, physical addresses are [192.168.1.15:7800]
06:57:55,810 INFO (main) [org.infinispan.CONTAINER] ISPN000104: Using EmbeddedTransactionManager
```

So, you'll need to run

```
> cd $INFINISPAN_HOME/bin/cli.sh
[disconnected]> connect
[ghost-35169@cluster//containers/default]> create cache --template=org.infinispan.DIST_SYNC mycache
[ghost-35169@cluster//containers/default]> describe caches/mycache
{
"distributed-cache" : {
"mode" : "SYNC",
"remote-timeout" : 17500,
"state-transfer" : {
"timeout" : 60000
},
"transaction" : {
"mode" : "NONE"
},
"locking" : {
"concurrency-level" : 1000,
"acquire-timeout" : 15000,
"striping" : false
},
"statistics" : true
}
}
```
Now we should be ready to work on this.

### Running Kafka

```
$KAFKA_HOME/bin/zookeeper-server-start.sh config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
```

## Setting up the needed bits and running the example

You'll need to setup the plugin.path property in your kafka

Open the `$KAFKA_HOME/config/connect-standalone.properties`

and set the `plugin.path` property to your choosen location

In this example we'll use `/home/oscerd/connectors/`

```
> cd /home/oscerd/connectors/
> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-infinispan-kafka-connector/0.5.0/camel-infinispan-kafka-connector-0.5.0-package.zip
> unzip camel-infinispan-kafka-connector-0.5.0-package.zip
```

Now it's time to setup the connectors

Open the AWS2 SNS configuration file

```
name=CamelInfinispanSinkConnector
connector.class=org.apache.camel.kafkaconnector.infinispan.CamelInfinispanSinkConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

topics=mytopic

camel.sink.endpoint.hosts=localhost
camel.sink.path.cacheName=mycache
```

Now you can run the example

```
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelInfinispanSinkConnector.properties
```

On a different terminal run the kafka-producer and send messages to your Kafka Broker.

```
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopic
Kafka to Infinispan message 1
Kafka to Infinispan message 2
```

You should see the stats of cache changing. You can check this by running

```
> $INFINISPAN_HOME/bin/cli.sh
[disconnected]> connect
[ghost-35169@cluster//containers/default]> cache mycache
[ghost-35169@cluster//containers/default/caches/mycache]> stats
{
"total_number_of_entries" : 0,
"off_heap_memory_used" : 0,
"time_since_start" : 350,
"time_since_reset" : 350,
"stores" : 0,
"current_number_of_entries" : 0,
"data_memory_used" : 0,
"misses" : 0,
"remove_hits" : 0,
"remove_misses" : 0,
"evictions" : 0,
"average_read_time" : 0,
"average_read_time_nanos" : 0,
"average_write_time" : 0,
"average_write_time_nanos" : 0,
"average_remove_time" : 0,
"average_remove_time_nanos" : 0,
"required_minimum_number_of_nodes" : 1,
"current_number_of_entries_in_memory" : 0,
"retrievals" : 0,
"hits" : 0
}
```

@@ -0,0 +1,26 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name=CamelInfinispanSinkConnector
connector.class=org.apache.camel.kafkaconnector.infinispan.CamelInfinispanSinkConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

topics=mytopic

camel.sink.endpoint.hosts=localhost
camel.sink.path.cacheName=mycache

0 comments on commit 846d576

Please sign in to comment.