Skip to content

Latest commit

 

History

History
195 lines (155 loc) · 7.47 KB

File metadata and controls

195 lines (155 loc) · 7.47 KB

Camel-Kafka-connector Infinispan Sink

This is an example for Camel-Kafka-connector Infinispan Sink

Standalone

What is needed

  • An Infinispan instance

Setting up Infinispan

As first step you need to download the Infinispan Server with version 11.0.3.Final.

Infinispan 11.x is secured by default. For the purpose of this example we’ll remove the security check, so we won’t need any authentication.

If you have Infinispan unzipped in $INFINISPAN_HOME, you’ll need to:

  • Edit $INFINISPAN_HOME/server/config/infinispan.xml

and the file content should be

<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:11.0 https://infinispan.org/schemas/infinispan-config-11.0.xsd
                            urn:infinispan:server:11.0 https://infinispan.org/schemas/infinispan-server-11.0.xsd"
        xmlns="urn:infinispan:config:11.0"
        xmlns:server="urn:infinispan:server:11.0">

   <cache-container name="default" statistics="true">
      <transport cluster="${infinispan.cluster.name}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/>
   </cache-container>

   <server xmlns="urn:infinispan:server:11.0">
      <interfaces>
         <interface name="public">
            <inet-address value="${infinispan.bind.address:127.0.0.1}"/>
         </interface>
      </interfaces>

      <socket-bindings default-interface="public" port-offset="${infinispan.socket.binding.port-offset:0}">
         <socket-binding name="default" port="${infinispan.bind.port:11222}"/>
         <socket-binding name="memcached" port="11221"/>
      </socket-bindings>

      <endpoints socket-binding="default">
         <hotrod-connector name="hotrod"/>
         <rest-connector name="rest"/>
      </endpoints>
   </server>
</infinispan>

Now we need to create a cache, since the default cache is not available anymore out of the box with Infinispan 11.

You can now start your server

> $INFINISPAN_HOME/bin/server.sh
bin/server.sh
06:57:51,378 INFO  (main) [BOOT] JVM OpenJDK 64-Bit Server VM AdoptOpenJDK 25.252-b09
06:57:51,395 INFO  (main) [BOOT] JVM arguments = [-Xms64m, -Xmx512m, -XX:MetaspaceSize=64M, -Djava.net.preferIPv4Stack=true, -Djava.awt.headless=true, -Dvisualvm.display.name=infinispan-server, -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager, -Dinfinispan.server.home.path=/home/oscerd/playground/infinispan-server-11.0.3.Final]
06:57:51,396 INFO  (main) [BOOT] PID = 9678
06:57:51,441 INFO  (main) [org.infinispan.SERVER] ISPN080000: Infinispan Server starting
06:57:51,441 INFO  (main) [org.infinispan.SERVER] ISPN080017: Server configuration: /home/oscerd/playground/infinispan-server-11.0.3.Final/server/conf/infinispan.xml
06:57:51,441 INFO  (main) [org.infinispan.SERVER] ISPN080032: Logging configuration: /home/oscerd/playground/infinispan-server-11.0.3.Final/server/conf/log4j2.xml
06:57:51,959 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'query-dsl-filter-converter-factory'
06:57:51,960 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'continuous-query-filter-converter-factory'
06:57:51,961 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'iteration-filter-converter-factory'
06:57:51,961 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'jdk.nashorn.api.scripting.NashornScriptEngineFactory'
06:57:52,367 WARN  (main) [org.infinispan.PERSISTENCE] ISPN000554: jboss-marshalling is deprecated and planned for removal
06:57:52,919 INFO  (main) [org.infinispan.CONTAINER] ISPN000128: Infinispan version: Infinispan 'Corona Extra' 11.0.3.Final
06:57:52,921 INFO  (main) [org.infinispan.CONTAINER] ISPN000389: Loaded global state, version=11.0.3.Final timestamp=2020-09-30T21:04:46.511Z
06:57:53,046 INFO  (main) [org.infinispan.CLUSTER] ISPN000078: Starting JGroups channel cluster with stack tcp
06:57:55,138 INFO  (main) [org.jgroups.protocols.pbcast.GMS] ghost-35169: no members discovered after 2001 ms: creating cluster as coordinator
06:57:55,150 INFO  (main) [org.infinispan.CLUSTER] ISPN000094: Received new cluster view for channel cluster: [ghost-35169|0] (1) [ghost-35169]
06:57:55,156 INFO  (main) [org.infinispan.CLUSTER] ISPN000079: Channel cluster local address is ghost-35169, physical addresses are [192.168.1.15:7800]
06:57:55,810 INFO  (main) [org.infinispan.CONTAINER] ISPN000104: Using EmbeddedTransactionManager

So, you’ll need to run

> cd $INFINISPAN_HOME/bin/cli.sh
[disconnected]> connect
[ghost-35169@cluster//containers/default]> create cache --template=org.infinispan.DIST_SYNC mycache
[ghost-35169@cluster//containers/default]> describe caches/mycache
{
  "distributed-cache" : {
    "mode" : "SYNC",
    "remote-timeout" : 17500,
    "state-transfer" : {
      "timeout" : 60000
    },
    "transaction" : {
      "mode" : "NONE"
    },
    "locking" : {
      "concurrency-level" : 1000,
      "acquire-timeout" : 15000,
      "striping" : false
    },
    "statistics" : true
  }
}

Now we should be ready to work on this.

Running Kafka

$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic

Setting up the needed bits and running the example

You’ll need to setup the plugin.path property in your kafka

Open the $KAFKA_HOME/config/connect-standalone.properties

and set the plugin.path property to your choosen location

In this example we’ll use /home/oscerd/connectors/

> cd /home/oscerd/connectors/
> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-infinispan-kafka-connector/0.7.1/camel-infinispan-kafka-connector-0.7.1-package.zip
> unzip camel-infinispan-kafka-connector-0.7.1-package.zip

Now it’s time to setup the connectors

Open the AWS2 SNS configuration file

name=CamelInfinispanSinkConnector
connector.class=org.apache.camel.kafkaconnector.infinispan.CamelInfinispanSinkConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

topics=mytopic

camel.sink.endpoint.hosts=localhost
camel.sink.path.cacheName=mycache

Now you can run the example

$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelInfinispanSinkConnector.properties

On a different terminal run the kafka-producer and send messages to your Kafka Broker.

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopic
Kafka to Infinispan message 1
Kafka to Infinispan message 2

You should see the stats of cache changing. You can check this by running

> $INFINISPAN_HOME/bin/cli.sh
[disconnected]> connect
[ghost-35169@cluster//containers/default]> cache mycache
[ghost-35169@cluster//containers/default/caches/mycache]> stats
{
  "total_number_of_entries" : 0,
  "off_heap_memory_used" : 0,
  "time_since_start" : 350,
  "time_since_reset" : 350,
  "stores" : 0,
  "current_number_of_entries" : 0,
  "data_memory_used" : 0,
  "misses" : 0,
  "remove_hits" : 0,
  "remove_misses" : 0,
  "evictions" : 0,
  "average_read_time" : 0,
  "average_read_time_nanos" : 0,
  "average_write_time" : 0,
  "average_write_time_nanos" : 0,
  "average_remove_time" : 0,
  "average_remove_time_nanos" : 0,
  "required_minimum_number_of_nodes" : 1,
  "current_number_of_entries_in_memory" : 0,
  "retrievals" : 0,
  "hits" : 0
}