Skip to content

Commit

Permalink
IGNITE-2730: Ignite Events Source Streaming to Kafka. - Fixes #560.
Browse files Browse the repository at this point in the history
Signed-off-by: shtykh_roman <rshtykh@yahoo.com>
  • Loading branch information
shroman committed Mar 30, 2016
1 parent a1a6bf2 commit 12c707c
Show file tree
Hide file tree
Showing 16 changed files with 1,156 additions and 31 deletions.
81 changes: 79 additions & 2 deletions modules/kafka/README.txt
Expand Up @@ -33,7 +33,7 @@ interested in):
</project> </project>




## Streaming Data via Kafka Connect ## Streaming Data to Ignite via Kafka Connect


Sink Connector will help you export data from Kafka to Ignite cache. It polls data from Kafka topics and writes it to the user-specified cache. Sink Connector will help you export data from Kafka to Ignite cache. It polls data from Kafka topics and writes it to the user-specified cache.
For more information on Kafka Connect, see [Kafka Documentation](http://kafka.apache.org/documentation.html#connect). For more information on Kafka Connect, see [Kafka Documentation](http://kafka.apache.org/documentation.html#connect).
Expand All @@ -46,8 +46,8 @@ as described in the following subsection.
1. Put the following jar files on Kafka's classpath 1. Put the following jar files on Kafka's classpath
- ignite-kafka-connect-x.x.x-SNAPSHOT.jar - ignite-kafka-connect-x.x.x-SNAPSHOT.jar
- ignite-core-x.x.x-SNAPSHOT.jar - ignite-core-x.x.x-SNAPSHOT.jar
- ignite-spring-x.x.x-SNAPSHOT.jar
- cache-api-1.0.0.jar - cache-api-1.0.0.jar
- ignite-spring-1.5.0-SNAPSHOT.jar
- spring-aop-4.1.0.RELEASE.jar - spring-aop-4.1.0.RELEASE.jar
- spring-beans-4.1.0.RELEASE.jar - spring-beans-4.1.0.RELEASE.jar
- spring-context-4.1.0.RELEASE.jar - spring-context-4.1.0.RELEASE.jar
Expand Down Expand Up @@ -127,3 +127,80 @@ k1,v1
``` ```
http://node1:8080/ignite?cmd=size&cacheName=cache1 http://node1:8080/ignite?cmd=size&cacheName=cache1
``` ```

## Streaming Cache Event Data to Kafka via Kafka Connect

Source connector enables listening to Ignite cache events and, upon filtering, stream them to Kafka.

Connector can be found in 'optional/ignite-kafka.' It and its dependencies have to be on the classpath of a Kafka running instance,
as described in the following subsection.

### Setting up and Running

1. Put the following jar files on Kafka's classpath
- ignite-kafka-connect-x.x.x-SNAPSHOT.jar
- ignite-core-x.x.x-SNAPSHOT.jar
- cache-api-1.0.0.jar
- ignite-spring-1.5.0-SNAPSHOT.jar
- spring-aop-4.1.0.RELEASE.jar
- spring-beans-4.1.0.RELEASE.jar
- spring-context-4.1.0.RELEASE.jar
- spring-core-4.1.0.RELEASE.jar
- spring-expression-4.1.0.RELEASE.jar
- commons-logging-1.1.1.jar

2. Prepare worker configurations, e.g.,
```
bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
```

Note that the current implementation ignores key and schema of Kafka Connect, and stores marshalled cache events
using org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter.

3. Prepare connector configurations, e.g.,
```
# connector
name=ignite-src-connector
connector.class=IgniteSourceConnector
tasks.max=2

# cache
topicNames=testTopic1,testTopic2
cacheEvts=put,remove
## if you decide to filter remotely (recommended)
cacheFilterCls=MyFilter
cacheName=cache1
igniteCfg=/some-path/ignite.xml
```
where 'cacheName' is the name of the cache you specify in '/some-path/ignite.xml' and the data from 'testTopic1,testTopic2'
will be pulled and stored. Also consider using 'evtBufferSize' and 'evtBatchSize' for tuning the internal queue
used to safely transfer data from Ignite cache to Kafka.

The following cache events can be specified in the connector configurations:
- CREATED
- DESTROYED
- PUT
- READ
- REMOVED
- LOCKED
- UNLOCKED
- SWAPPED
- UNSWAPPED
- EXPIRED

For a simple cache configuration file example, see example-ignite.xml in tests.

4. Start the connector, as described in [Kafka Documentation](http://kafka.apache.org/documentation.html#connect).
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.stream.kafka.connect;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;

/**
* Source connector to manage source tasks that listens to registered Ignite grid events and forward them to Kafka.
*
* Note that only cache events are enabled for streaming.
*/
public class IgniteSourceConnector extends SourceConnector {
/** Source properties. */
private Map<String, String> configProps;

/** {@inheritDoc} */
@Override public String version() {
return AppInfoParser.getVersion();
}

/** {@inheritDoc} */
@Override public void start(Map<String, String> props) {
try {
A.notNullOrEmpty(props.get(IgniteSourceConstants.CACHE_NAME), "cache name");
A.notNullOrEmpty(props.get(IgniteSourceConstants.CACHE_CFG_PATH), "path to cache config file");
A.notNullOrEmpty(props.get(IgniteSourceConstants.CACHE_EVENTS), "Registered cache events");
A.notNullOrEmpty(props.get(IgniteSourceConstants.TOPIC_NAMES), "Kafka topics");
}
catch (IllegalArgumentException e) {
throw new ConnectException("Cannot start IgniteSourceConnector due to configuration error", e);
}

configProps = props;
}

/** {@inheritDoc} */
@Override public Class<? extends Task> taskClass() {
return IgniteSourceTask.class;
}

/** {@inheritDoc} */
@Override public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
Map<String, String> taskProps = new HashMap<>();

taskProps.putAll(configProps);

for (int i = 0; i < maxTasks; i++)
taskConfigs.add(taskProps);

return taskConfigs;
}

/** {@inheritDoc} */
@Override public void stop() {
// No-op.
}
}
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.stream.kafka.connect;

/**
* Sink configuration strings.
*/
public class IgniteSourceConstants {
/** Ignite configuration file path. */
public static final String CACHE_CFG_PATH = "igniteCfg";

/** Cache name. */
public static final String CACHE_NAME = "cacheName";

/** Events to be listened to. Names corresponds to {@link IgniteSourceTask.CacheEvt}. */
public static final String CACHE_EVENTS = "cacheEvts";

/** Internal buffer size. */
public static final String INTL_BUF_SIZE = "evtBufferSize";

/** Size of one chunk drained from the internal buffer. */
public static final String INTL_BATCH_SIZE = "evtBatchSize";

/** User-defined filter class. */
public static final String CACHE_FILTER_CLASS = "cacheFilterCls";

/** Kafka topic. */
public static final String TOPIC_NAMES = "topicNames";
}

0 comments on commit 12c707c

Please sign in to comment.