Skip to content

Commit

Permalink
STORM-2896: Add tool to help users migrate offsets from storm-kafka t…
Browse files Browse the repository at this point in the history
…o storm-kafka-client
  • Loading branch information
srdo committed Feb 24, 2018
1 parent d6d4ff1 commit 1236115
Show file tree
Hide file tree
Showing 8 changed files with 646 additions and 0 deletions.
8 changes: 8 additions & 0 deletions external/storm-kafka-migration/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
## Setup
Put the storm-kafka-migration jar in a directory alongside the version of org.apache.kafka:kafka-clients that matches your broker version. You will also need to write a configuration file, an example file can be found in src/main/conf. Put the configuration file in the same directory as the two jars. You should also stop the topology you want to migrate offsets for.

## Migrating non-Trident storm-kafka offsets to storm-kafka-client
Run `java -cp "*" org.apache.storm.kafka.migration.KafkaSpoutMigration your-config-file.yaml`. The tool will print the migrated offsets to console. The offsets will be migrated into Kafka, belonging to the consumer group you set in the configuration. You need to set the same consumer group when setting up your storm-kafka-client spout.

## Migrating Trident storm-kafka offsets to storm-kafka-client
Run `java -cp "*" org.apache.storm.kafka.migration.KafkaTridentSpoutMigration your-config-file.yaml`. The tool will print the migrated offsets to console. The offsets will be migrated to the Zookeeper path you specify in the configuration. When you configure your storm-kafka-client topology, you need to use the same Zookeeper path, and the TridentDataSource.newStream txid must be the same as the "new.topology.txid" setting you specified in the configuration.
121 changes: 121 additions & 0 deletions external/storm-kafka-migration/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>storm-kafka-migration</artifactId>
<name>storm-kafka-migration</name>

<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-client</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${storm.kafka.client.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
<maxAllowedViolations>0</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.sf</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.dsa</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/*.rsa</exclude>
<exclude>META-INF/*.EC</exclude>
<exclude>META-INF/*.ec</exclude>
<exclude>META-INF/MSFTSIG.SF</exclude>
<exclude>META-INF/MSFTSIG.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

## Values you almost certainly want to change
#The Zookeeper servers holding your storm-kafka offsets
zookeeper.servers: "localhost:2181" #Comma separated
#The root path in Zookeeper holding your storm-kafka offsets. Your storm-kafka spout will have set this in the SpoutConfig.
zookeeper.root: "/the/root"
#The ID you set in the storm-kafka spout's SpoutConfig.
spout.id: "spoutId"
#The topic name or pattern to migrate. Note that you must use the same name or pattern as used by storm-kafka, it is not supported to migrate e.g. a single topic from a wilcard subscription in storm-kafka.
topic: "test-topic-*"
#True if the topic is a pattern. This has to match the 'kafka.topic.wildcard.match' setting from storm-kafka (false by default).
is.wildcard.topic: true
#The Kafka bootstrap servers for the KafkaConsumer to use to commit offsets to Kafka
kafka.bootstrap.servers: "localhost:9092"
#The name of the consumer group you will use for the storm-kafka-client spout. The offsets will be migrated to this consumer group.
new.spout.consumer.group: "storm-kafka-client-consumers"

## Less important defaults
zookeeper.session.timeout.ms: 20000
zookeeper.connection.timeout.ms: 15000
zookeeper.retry.times: 5
zookeeper.retry.interval.ms: 1000
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

## Values you almost certainly want to change
#The Zookeeper servers holding your storm-kafka offsets
zookeeper.servers: "localhost:2181" #Comma separated
#The root path in Zookeeper holding your storm-kafka offsets. This is the value of the transactional.zookeeper.root setting in your storm-kafka topology.
zookeeper.root: "/transactional"
#The ID you set when declaring the Trident stream with TridentDataSource.newStream in your storm-kafka topology.
txid: "streamId"
#The topic name or pattern to migrate. Note that you must use the same name or pattern as used by storm-kafka, it is not supported to migrate e.g. a single topic from a wilcard subscription in storm-kafka.
topic: "test-topic-*"
#True if the topic is a pattern. This has to match the 'kafka.topic.wildcard.match' setting from storm-kafka (false by default).
is.wildcard.topic: true
#The ID you will set when declaring the Trident stream with TridentDataSource.newStream in your storm-kafka-client topology. Please set this to a different value than txid, since otherwise the migration tool will write to the same Zookeeper paths it is reading from. In case you hit errors, it is better if the original data is left untouched.
new.topology.txid: "storm-kafka-client-streamId"

## Less important defaults
zookeeper.session.timeout.ms: 20000
zookeeper.connection.timeout.ms: 15000
zookeeper.retry.times: 5
zookeeper.retry.interval.ms: 1000
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright 2018 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.kafka.migration;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSpoutMigration {

private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutMigration.class);

private static class Configuration {

private String zkHosts;
private String zkRoot;
private String spoutId;
private String topic;
private boolean isWildcardTopic;
private String kafkaBootstrapServers;
private String newSpoutConsumerGroup;
private int zkSessionTimeoutMs;
private int zkConnectionTimeoutMs;
private int zkRetryTimes;
private int zkRetryIntervalMs;
}

/**
* Migrates offsets from the Zookeeper store used by the storm-kafka non-Trident spouts, to Kafka's offset store used by the
* storm-kafka-client non-Trident spout.
*/
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Args: confFile");
System.exit(1);
}

Map<String, Object> conf = Utils.findAndReadConfigFile(args[0]);
Configuration configuration = new Configuration();

configuration.zkHosts = MapUtil.getOrError(conf, "zookeeper.servers");
configuration.zkRoot = MapUtil.getOrError(conf, "zookeeper.root");
configuration.spoutId = MapUtil.getOrError(conf, "spout.id");
configuration.topic = MapUtil.getOrError(conf, "topic");
configuration.isWildcardTopic = MapUtil.getOrError(conf, "is.wildcard.topic");
configuration.kafkaBootstrapServers = MapUtil.getOrError(conf, "kafka.bootstrap.servers");
configuration.newSpoutConsumerGroup = MapUtil.getOrError(conf, "new.spout.consumer.group");
configuration.zkSessionTimeoutMs = MapUtil.getOrError(conf, "zookeeper.session.timeout.ms");
configuration.zkConnectionTimeoutMs = MapUtil.getOrError(conf, "zookeeper.connection.timeout.ms");
configuration.zkRetryTimes = MapUtil.getOrError(conf, "zookeeper.retry.times");
configuration.zkRetryIntervalMs = MapUtil.getOrError(conf, "zookeeper.retry.interval.ms");

Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = getOffsetsToCommit(configuration);

LOG.info("Migrating offsets {}", offsetsToCommit);

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.kafkaBootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, configuration.newSpoutConsumerGroup);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

try (KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(props)) {
consumer.assign(offsetsToCommit.keySet());
consumer.commitSync(offsetsToCommit);
}

LOG.info("Migrated offsets {} to consumer group {}", offsetsToCommit, configuration.newSpoutConsumerGroup);
}

private static Map<TopicPartition, OffsetAndMetadata> getOffsetsAtPath(
CuratorFramework curator, ObjectMapper objectMapper, String partitionsRoot) throws Exception {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

if (curator.checkExists().forPath(partitionsRoot) == null) {
throw new RuntimeException("No such path " + partitionsRoot);
}
List<String> partitionPaths = curator.getChildren().forPath(partitionsRoot);
for (String partitionPath : partitionPaths) {
String absPartitionPath = partitionsRoot + "/" + partitionPath;
LOG.info("Reading offset data from path {}", absPartitionPath);
byte[] partitionBytes = curator.getData().forPath(absPartitionPath);
Map<String, Object> partitionMetadata = objectMapper.readValue(partitionBytes, new TypeReference<Map<String, Object>>() {
});
String topic = (String) partitionMetadata.get("topic");
int partition = ((Number) partitionMetadata.get("partition")).intValue();
long offset = ((Number) partitionMetadata.get("offset")).longValue();
offsets.put(new TopicPartition(topic, partition), new OffsetAndMetadata(offset));
}
return offsets;
}

private static Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit(Configuration configuration) throws Exception {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();

try (CuratorFramework curator = newCurator(configuration)) {
curator.start();
ObjectMapper objectMapper = new ObjectMapper();

String spoutRoot = configuration.zkRoot + "/" + configuration.spoutId;
if (curator.checkExists().forPath(spoutRoot) == null) {
throw new RuntimeException("No such path " + spoutRoot);
}

if (configuration.isWildcardTopic) {
LOG.info("Expecting wildcard topics, looking for topics in {}", spoutRoot);
List<String> topicPaths = curator.getChildren().forPath(spoutRoot);
for (String topicPath : topicPaths) {
if (!topicPath.matches(configuration.topic)) {
LOG.info("Skipping directory {} because it doesn't match the topic pattern {}", topicPath, configuration.topic);
} else {
String absTopicPath = spoutRoot + "/" + topicPath;
LOG.info("Looking for partitions in {}", absTopicPath);
offsetsToCommit.putAll(getOffsetsAtPath(curator, objectMapper, absTopicPath));
}
}
} else {
LOG.info("Expecting exact topic match, looking for offsets in {}", spoutRoot);
offsetsToCommit.putAll(getOffsetsAtPath(curator, objectMapper, spoutRoot));
}

}
return offsetsToCommit;
}

private static CuratorFramework newCurator(Configuration config) throws Exception {
return CuratorFrameworkFactory.newClient(config.zkHosts,
config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs,
new RetryNTimes(config.zkRetryTimes,
config.zkRetryIntervalMs));
}

}
Loading

0 comments on commit 1236115

Please sign in to comment.