From cfa16c2f68a740cc241dbf84951cd279278d7384 Mon Sep 17 00:00:00 2001 From: Yeonsu Han Date: Tue, 31 Mar 2015 03:41:53 +0900 Subject: [PATCH 1/4] TAJO-1480: Kafka Consumer for kafka strage. --- tajo-storage/tajo-storage-kafka/pom.xml | 197 +++++++++++ .../storage/kafka/SimpleConsumerManager.java | 314 ++++++++++++++++++ .../tajo/storage/kafka/TestConstants.java | 25 ++ .../kafka/TestSimpleConsumerManager.java | 105 ++++++ .../storage/kafka/testUtil/EmbeddedKafka.java | 131 ++++++++ .../kafka/testUtil/EmbeddedZookeeper.java | 90 +++++ 6 files changed, 862 insertions(+) create mode 100644 tajo-storage/tajo-storage-kafka/pom.xml create mode 100644 tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/SimpleConsumerManager.java create mode 100644 tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestConstants.java create mode 100644 tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestSimpleConsumerManager.java create mode 100644 tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/testUtil/EmbeddedKafka.java create mode 100644 tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/testUtil/EmbeddedZookeeper.java diff --git a/tajo-storage/tajo-storage-kafka/pom.xml b/tajo-storage/tajo-storage-kafka/pom.xml new file mode 100644 index 0000000000..067d4ba3a2 --- /dev/null +++ b/tajo-storage/tajo-storage-kafka/pom.xml @@ -0,0 +1,197 @@ + + + + + + tajo-project + org.apache.tajo + 0.10.0-SNAPSHOT + ../../tajo-project + + 4.0.0 + + tajo-storage-kafka + jar + Tajo Kafka Storage + + UTF-8 + UTF-8 + + + + + repository.jboss.org + https://repository.jboss.org/nexus/content/repositories/releases/ + + + false + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.6 + 1.6 + ${project.build.sourceEncoding} + + + + org.apache.rat + apache-rat-plugin + + + verify + + check + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + TRUE + + -Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8 + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + test-jar + + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + + + + + + + + org.apache.tajo + tajo-common + provided + + + org.apache.tajo + tajo-catalog-common + provided + + + org.apache.tajo + tajo-plan + provided + + + org.apache.tajo + tajo-storage-common + provided + + + org.apache.tajo + tajo-storage-hdfs + provided + + + org.apache.kafka + kafka_2.10 + 0.8.2.0 + provided + + + junit + junit + test + + + org.apache.zookeeper + zookeeper + 3.4.6 + test + + + io.airlift + testing + 0.88 + test + + + com.101tec + zkclient + test + 0.4 + + + + + + docs + + false + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + + + module-javadocs + package + + jar + + + ${project.build.directory} + + + + + + + + + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + 2.15 + + + + diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/SimpleConsumerManager.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/SimpleConsumerManager.java new file mode 100644 index 0000000000..0295d0075c --- /dev/null +++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/SimpleConsumerManager.java @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.kafka; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import kafka.api.FetchRequest; +import kafka.api.FetchRequestBuilder; +import kafka.api.PartitionOffsetRequestInfo; +import kafka.cluster.Broker; +import kafka.common.ErrorMapping; +import kafka.common.TopicAndPartition; +import kafka.javaapi.FetchResponse; +import kafka.javaapi.OffsetResponse; +import kafka.javaapi.PartitionMetadata; +import kafka.javaapi.TopicMetadata; +import kafka.javaapi.TopicMetadataRequest; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.message.MessageAndOffset; + +import org.apache.commons.collections.IteratorUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.util.NetUtils; + +// SimpleConsumerManager is kafka client for KafkaScanner. +// It's one per partition. Each partition instantiate this class. +public class SimpleConsumerManager { + private static final Log LOG = LogFactory.getLog(SimpleConsumerManager.class); + // TODO: configurable setting. + static final int CONSUMER_TIMEOUT = 30000; + static final int CONSUMER_BUFFER_SIZE = 64 * 1024; + static final int CONSUMER_FETCH_SIZE = 300 * 1024; + static final int FETCH_TRY_NUM = 3; + + private SimpleConsumer consumer = null; + private List brokers = new ArrayList(); + private String topic; + private int partition; + private String clientId; + // leader of this partition. + private Broker leader; + + public SimpleConsumerManager(String seedBrokers, String topic, int partition) throws IOException { + this.topic = topic; + this.partition = partition; + // Identifier of simpleConsumer. + this.clientId = SimpleConsumerManager.getIdentifier(); + this.brokers = SimpleConsumerManager.getBrokerList(seedBrokers); + this.leader = findLeader(topic, partition); + // consumer creation fail. + if (null == leader) { + throw new IOException("consumer creation fail"); + } else { + consumer = new SimpleConsumer(leader.host(), leader.port(), CONSUMER_TIMEOUT, CONSUMER_BUFFER_SIZE, clientId); + } + } + + /** + * Create SimpleConsumer instance. seedBrokers is connection info of kafka + * brokers. ex) localhost:9092,localhost:9091 topic is topic name. partition + * is partition id. + * + * @param seedBrokers + * @param topic + * @param partition + * @return + * @throws IOException + */ + static public SimpleConsumerManager getSimpleConsumerManager(String seedBrokers, String topic, int partition) + throws IOException { + return new SimpleConsumerManager(seedBrokers, topic, partition); + } + + /** + * Return partition ID list of specific topic. Check for seedBrokers. + * seedBrokers is kafka brokers. + * + * @param seedBrokers + * @param topic + * @return + * @throws IOException + */ + static public Set getPartitions(String seedBrokers, String topic) throws IOException { + Set partitions = new HashSet(); + for (InetSocketAddress seed : SimpleConsumerManager.getBrokerList(seedBrokers)) { + SimpleConsumer consumer = null; + try { + consumer = new SimpleConsumer(seed.getHostName(), seed.getPort(), CONSUMER_TIMEOUT, CONSUMER_BUFFER_SIZE, + SimpleConsumerManager.getIdentifier() + "partitionLookup"); + List topics = new ArrayList(); + topics.add(topic); + TopicMetadataRequest req = new TopicMetadataRequest(topics); + kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); + // call to topicsMetadata() asks the Broker you are connected to for all + // the details about the topic we are interested in + List metaData = resp.topicsMetadata(); + // loop on partitionsMetadata iterates through all the partitions until + // we find the one we want. + for (TopicMetadata item : metaData) { + for (PartitionMetadata part : item.partitionsMetadata()) { + partitions.add(part.partitionId()); + } + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new IOException(e); + } finally { + if (consumer != null) + consumer.close(); + } + } + return partitions; + } + + static private List getBrokerList(String brokers) { + List brokerList = new ArrayList(); + for (String broker : brokers.split(",")) { + brokerList.add(NetUtils.createUnresolved(broker)); + } + return brokerList; + } + + /** + * Create identifier for SimpleConsumer. The SimpleConsumer connects at kafka + * using this identifier. + * + * @return + */ + static private String getIdentifier() { + Random r = new Random(); + return r.nextLong() + "_" + System.currentTimeMillis(); + } + + synchronized public void close() { + if (null != consumer) { + consumer.close(); + } + consumer = null; + } + + /** + * Fetch data from kafka, as much as 'CONSUMER_FETCH_SIZE' size from offset. + * + * @param offset + * @return + */ + @SuppressWarnings("unchecked") + public List fetch(long offset) { + List returnData = null; + FetchRequest req = new FetchRequestBuilder().clientId(clientId) + .addFetch(topic, partition, offset, CONSUMER_FETCH_SIZE).build(); + if (null != consumer) { + FetchResponse fetchResponse = null; + // If that fails, find new leader of partition and try again. + for (int i = 0; i < FETCH_TRY_NUM; i++) { + fetchResponse = consumer.fetch(req); + if (fetchResponse.hasError()) { + short code = fetchResponse.errorCode(topic, partition); + LOG.error("Error fetching data from the Broker:" + leader + " Reason: " + code + " Try: " + i); + if (ErrorMapping.LeaderNotAvailableCode() == code || ErrorMapping.NotLeaderForPartitionCode() == code + || ErrorMapping.RequestTimedOutCode() == code || ErrorMapping.BrokerNotAvailableCode() == code + || ErrorMapping.ReplicaNotAvailableCode() == code) { + Broker newLeader = findNewLeader(); + if (null != newLeader) { + synchronized (consumer) { + this.leader = newLeader; + consumer.close(); + consumer = new SimpleConsumer(leader.host(), leader.port(), CONSUMER_TIMEOUT, CONSUMER_BUFFER_SIZE, + clientId); + } + } + } + } else { + break; + } + fetchResponse = null; + } + if (null != fetchResponse) { + Iterator messages = fetchResponse.messageSet(topic, partition).iterator(); + returnData = IteratorUtils.toList(messages); + } + } + return returnData; + } + + public long getReadOffset(long whichTime) throws IOException { + TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); + Map requestInfo = new HashMap(); + requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); + kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, + kafka.api.OffsetRequest.CurrentVersion(), clientId); + OffsetResponse response = consumer.getOffsetsBefore(request); + + if (response.hasError()) { + LOG.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); + throw new IOException("Error fetching data Offset Data the Broker"); + } + long[] offsets = response.offsets(topic, partition); + return offsets[0]; + } + + /** + * find new leader of partition, if old leader fail. + * + * @return + */ + synchronized private Broker findNewLeader() { + // retry for 3 time. + for (int i = 0; i < 3; i++) { + boolean goToSleep = false; + Broker newLeader = findLeader(topic, partition); + if (leader == null) { + goToSleep = true; + } else if (leader.host().equalsIgnoreCase(newLeader.host()) && leader.port() == newLeader.port() && i == 0) { + // first time through if the leader hasn't changed give ZooKeeper a + // second to recover + // second time, assume the broker did recover before failover, or it was + // a non-Broker issue + goToSleep = true; + } else { + return newLeader; + } + if (goToSleep) { + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + } + } + } + // Unable to find new leader after Broker failure. + return null; + } + + /** + * Find leader broker of specific topic and partition + * + * @param topic + * @param partition + * @return + */ + synchronized private Broker findLeader(String topic, int partition) { + PartitionMetadata returnMetaData = null; + for (InetSocketAddress broker : brokers) { + SimpleConsumer consumer = null; + try { + consumer = new SimpleConsumer(broker.getHostName(), broker.getPort(), CONSUMER_TIMEOUT, CONSUMER_BUFFER_SIZE, + clientId + "_leaderLookup"); + List topics = new ArrayList(); + topics.add(topic); + TopicMetadataRequest req = new TopicMetadataRequest(topics); + kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); + // call to topicsMetadata() asks the Broker you are connected to for all + // the details about the topic we are interested in + List metaData = resp.topicsMetadata(); + // loop on partitionsMetadata iterates through all the partitions until + // we find the one we want. + for (TopicMetadata item : metaData) { + for (PartitionMetadata part : item.partitionsMetadata()) { + if (part.partitionId() == partition) { + returnMetaData = part; + break; + } + } + } + } catch (Exception e) { + LOG.error("Error communicating with Broker [" + broker + "] to find Leader for [" + topic + ", " + partition + + "] Reason: " + e); + } finally { + if (consumer != null) + consumer.close(); + } + } + // Can't find metadata for Topic and Partition. + if (returnMetaData == null) { + return null; + } else { + // add replica broker info to replicaBrokers + if (returnMetaData != null) { + brokers.clear(); + for (kafka.cluster.Broker replica : returnMetaData.replicas()) { + brokers.add(NetUtils.createSocketAddr(replica.host(), replica.port())); + } + } + } + return returnMetaData.leader(); + } + +} diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestConstants.java b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestConstants.java new file mode 100644 index 0000000000..ac8ec6bcf9 --- /dev/null +++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestConstants.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.kafka; + +public class TestConstants { + final static int kafka_partition_num = 3; + final static String test_topic = "test-topic"; + final static String[] test_data = { "1|abc|0.2", "2|def|0.4", "3|ghi|0.6", "4|jkl|0.8", "5|mno|1.0" }; +} diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestSimpleConsumerManager.java b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestSimpleConsumerManager.java new file mode 100644 index 0000000000..1eef94fd6b --- /dev/null +++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestSimpleConsumerManager.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.kafka; + +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import kafka.message.MessageAndOffset; +import kafka.producer.KeyedMessage; + +import org.apache.tajo.storage.kafka.testUtil.EmbeddedKafka; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestSimpleConsumerManager { + static EmbeddedKafka em_kafka; + + // Start up EmbeddedKafka and Generate test data. + @BeforeClass + public static void setUpBeforeClass() throws Exception { + em_kafka = EmbeddedKafka.createEmbeddedKafka(2181, 9092); + em_kafka.start(); + em_kafka.createTopic(TestConstants.kafka_partition_num, 1, TestConstants.test_topic); + genDataForTest(); + } + + // Close EmbeddedKafka. + @AfterClass + public static void tearDownAfterClass() throws Exception { + em_kafka.close(); + } + + // Test for getting topic partitions. + @Test + public void testGetPartitions() throws Exception { + int prtition_num = SimpleConsumerManager.getPartitions(em_kafka.getConnectString(), TestConstants.test_topic) + .size(); + assertTrue(prtition_num == TestConstants.kafka_partition_num); + } + + //Test for to fetch data from kafka. + @Test + public void testFetchData() throws Exception { + StringBuilder fatchData = new StringBuilder(); + for (Integer partitionId : SimpleConsumerManager.getPartitions(em_kafka.getConnectString(), + TestConstants.test_topic)) { + SimpleConsumerManager cm = SimpleConsumerManager.getSimpleConsumerManager(em_kafka.getConnectString(), + TestConstants.test_topic, partitionId); + long startOffset = cm.getReadOffset(kafka.api.OffsetRequest.EarliestTime()); + long lastOffset = cm.getReadOffset(kafka.api.OffsetRequest.LatestTime()); + if (startOffset < lastOffset) { + for (MessageAndOffset message : cm.fetch(startOffset)) { + ByteBuffer payload = message.message().payload(); + byte[] data = new byte[payload.limit()]; + payload.get(data); + fatchData.append(new String(data, "UTF-8")); + } + } + } + StringBuilder testData = new StringBuilder(); + for(String td : TestConstants.test_data){ + testData.append(td); + } + assertTrue(fatchData.toString().equals(testData.toString())); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private static void genDataForTest() throws Exception { + kafka.javaapi.producer.Producer producer = null; + try { + producer = em_kafka.createProducer(em_kafka.getConnectString()); + List> messageList = new ArrayList>(); + messageList.add(new KeyedMessage(TestConstants.test_topic, TestConstants.test_data[0])); + messageList.add(new KeyedMessage(TestConstants.test_topic, TestConstants.test_data[1])); + messageList.add(new KeyedMessage(TestConstants.test_topic, TestConstants.test_data[2])); + messageList.add(new KeyedMessage(TestConstants.test_topic, TestConstants.test_data[3])); + messageList.add(new KeyedMessage(TestConstants.test_topic, TestConstants.test_data[4])); + producer.send(messageList); + } finally { + if (null != producer) { + producer.close(); + } + } + } +} diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/testUtil/EmbeddedKafka.java b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/testUtil/EmbeddedKafka.java new file mode 100644 index 0000000000..a52546e34b --- /dev/null +++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/testUtil/EmbeddedKafka.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.kafka.testUtil; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static io.airlift.testing.FileUtils.deleteRecursively; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import kafka.admin.AdminUtils; +import kafka.producer.ProducerConfig; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; +import kafka.utils.ZKStringSerializer$; + +import org.I0Itec.zkclient.ZkClient; + +import com.google.common.io.Files; + +public class EmbeddedKafka implements Closeable { + private final EmbeddedZookeeper zookeeper; + private final int port; + private final File kafkaDataDir; + private final KafkaServerStartable kafka; + + private final AtomicBoolean started = new AtomicBoolean(); + private final AtomicBoolean stopped = new AtomicBoolean(); + + public static EmbeddedKafka createEmbeddedKafka(int zookeeperPort, int kafkaPort) throws IOException { + return new EmbeddedKafka(new EmbeddedZookeeper(zookeeperPort), kafkaPort); + } + + EmbeddedKafka(EmbeddedZookeeper zookeeper, int kafkaPort) throws IOException { + this.zookeeper = checkNotNull(zookeeper, "zookeeper is null"); + + this.port = kafkaPort; + this.kafkaDataDir = Files.createTempDir(); + + Properties properties = new Properties(); + properties.setProperty("broker.id", "0"); + properties.setProperty("host.name", "localhost"); + properties.setProperty("num.partitions", "2"); + properties.setProperty("log.flush.interval.messages", "10000"); + properties.setProperty("log.flush.interval.ms", "1000"); + properties.setProperty("log.retention.minutes", "60"); + properties.setProperty("log.segment.bytes", "1048576"); + properties.setProperty("auto.create.topics.enable", "false"); + properties.setProperty("zookeeper.connection.timeout.ms", "1000000"); + properties.setProperty("port", Integer.toString(port)); + properties.setProperty("log.dirs", kafkaDataDir.getAbsolutePath()); + properties.setProperty("zookeeper.connect", zookeeper.getConnectString()); + + KafkaConfig config = new KafkaConfig(properties); + this.kafka = new KafkaServerStartable(config); + } + + public void start() throws InterruptedException, IOException { + if (!started.getAndSet(true)) { + zookeeper.start(); + kafka.startup(); + } + } + + @Override + public void close() { + if (started.get() && !stopped.getAndSet(true)) { + kafka.shutdown(); + kafka.awaitShutdown(); + zookeeper.close(); + deleteRecursively(kafkaDataDir); + } + } + + public int getZookeeperPort() { + return zookeeper.getPort(); + } + + public int getPort() { + return port; + } + + public String getConnectString() { + return "localhost:" + Integer.toString(port); + } + + public String getZookeeperConnectString() { + return zookeeper.getConnectString(); + } + + public void createTopic(int partitions, int replication, String topic) { + checkState(started.get() && !stopped.get(), "not started!"); + + ZkClient zkClient = new ZkClient(getZookeeperConnectString(), 30000, 30000, ZKStringSerializer$.MODULE$); + try { + AdminUtils.createTopic(zkClient, topic, partitions, replication, new Properties()); + } finally { + zkClient.close(); + } + } + + @SuppressWarnings("rawtypes") + public kafka.javaapi.producer.Producer createProducer(String connecting) { + Properties properties = new Properties(); + properties.put("serializer.class", "kafka.serializer.StringEncoder"); + properties.put("metadata.broker.list", connecting); + kafka.javaapi.producer.Producer producer; + producer = new kafka.javaapi.producer.Producer(new ProducerConfig(properties)); + return producer; + } +} diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/testUtil/EmbeddedZookeeper.java b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/testUtil/EmbeddedZookeeper.java new file mode 100644 index 0000000000..a6f2d73dfb --- /dev/null +++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/testUtil/EmbeddedZookeeper.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.kafka.testUtil; + +import static io.airlift.testing.FileUtils.deleteRecursively; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; + +import com.google.common.io.Files; + +public class EmbeddedZookeeper implements Closeable { + private final int port; + private final File zkDataDir; + private final ZooKeeperServer zkServer; + private final NIOServerCnxnFactory cnxnFactory; + + private final AtomicBoolean started = new AtomicBoolean(); + private final AtomicBoolean stopped = new AtomicBoolean(); + + public EmbeddedZookeeper() throws IOException { + this(2181); + } + + public EmbeddedZookeeper(int port) throws IOException { + this.port = port; + zkDataDir = Files.createTempDir(); + zkServer = new ZooKeeperServer(); + + FileTxnSnapLog ftxn = new FileTxnSnapLog(zkDataDir, zkDataDir); + zkServer.setTxnLogFactory(ftxn); + + cnxnFactory = new NIOServerCnxnFactory(); + cnxnFactory.configure(new InetSocketAddress(port), 0); + } + + public void start() throws InterruptedException, IOException { + if (!started.getAndSet(true)) { + cnxnFactory.startup(zkServer); + } + } + + @Override + public void close() { + if (started.get() && !stopped.getAndSet(true)) { + cnxnFactory.shutdown(); + try { + cnxnFactory.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + if (zkServer.isRunning()) { + zkServer.shutdown(); + } + deleteRecursively(zkDataDir); + } + } + + public String getConnectString() { + return "127.0.0.1:" + Integer.toString(port); + } + + public int getPort() { + return port; + } +} From 96fa6b02b6dbf72867488252dfb94a1972756ecd Mon Sep 17 00:00:00 2001 From: Yeonsu Han Date: Tue, 31 Mar 2015 11:29:08 +0900 Subject: [PATCH 2/4] TAJO-1480: Kafka Consumer for kafka strage. --- tajo-storage/tajo-storage-kafka/pom.xml | 50 ++++++++++++------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/tajo-storage/tajo-storage-kafka/pom.xml b/tajo-storage/tajo-storage-kafka/pom.xml index 067d4ba3a2..ced6dfd570 100644 --- a/tajo-storage/tajo-storage-kafka/pom.xml +++ b/tajo-storage/tajo-storage-kafka/pom.xml @@ -96,8 +96,6 @@ limitations under the License. - - org.apache.tajo @@ -125,34 +123,34 @@ limitations under the License. provided - org.apache.kafka - kafka_2.10 - 0.8.2.0 - provided - - + org.apache.kafka + kafka_2.10 + 0.8.2.0 + provided + + junit junit test - - org.apache.zookeeper - zookeeper - 3.4.6 - test - - - io.airlift - testing - 0.88 - test - - - com.101tec - zkclient - test - 0.4 - + + org.apache.zookeeper + zookeeper + 3.4.6 + test + + + io.airlift + testing + 0.88 + test + + + com.101tec + zkclient + test + 0.4 + From cb2ae01af5b3e4347a31e23c406bb23a614d4566 Mon Sep 17 00:00:00 2001 From: Yeonsu Han Date: Sun, 5 Apr 2015 22:04:11 +0900 Subject: [PATCH 3/4] TAJO-1480: Kafka Consumer for kafka strage.(modify version) --- tajo-storage/tajo-storage-kafka/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tajo-storage/tajo-storage-kafka/pom.xml b/tajo-storage/tajo-storage-kafka/pom.xml index ced6dfd570..3eff8344b7 100644 --- a/tajo-storage/tajo-storage-kafka/pom.xml +++ b/tajo-storage/tajo-storage-kafka/pom.xml @@ -21,7 +21,7 @@ limitations under the License. tajo-project org.apache.tajo - 0.10.0-SNAPSHOT + 0.11.0-SNAPSHOT ../../tajo-project 4.0.0 From 614657fb2548721ff8f17322c8763d0717ff5b9d Mon Sep 17 00:00:00 2001 From: Yeonsu Han Date: Mon, 6 Apr 2015 01:36:54 +0900 Subject: [PATCH 4/4] TAJO-1480: Kafka Consumer for kafka strage.(for build) --- tajo-project/pom.xml | 7 +++++++ tajo-storage/pom.xml | 1 + tajo-storage/tajo-storage-kafka/pom.xml | 4 ++-- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml index 7ad4ae0164..572302c8d6 100644 --- a/tajo-project/pom.xml +++ b/tajo-project/pom.xml @@ -37,6 +37,8 @@ 2.5.0 0.11.0-SNAPSHOT 0.98.7-hadoop2 + 0.8.2.0 + kafka_2.10 4.0.25.Final 2.6 ${project.parent.relativePath}/.. @@ -763,6 +765,11 @@ tajo-storage-hbase ${tajo.version} + + org.apache.tajo + tajo-storage-kafka + ${tajo.version} + org.apache.tajo tajo-pullserver diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml index e74c744864..4c13944900 100644 --- a/tajo-storage/pom.xml +++ b/tajo-storage/pom.xml @@ -35,6 +35,7 @@ tajo-storage-common tajo-storage-hdfs tajo-storage-hbase + tajo-storage-kafka diff --git a/tajo-storage/tajo-storage-kafka/pom.xml b/tajo-storage/tajo-storage-kafka/pom.xml index 3eff8344b7..c48f7f5cd7 100644 --- a/tajo-storage/tajo-storage-kafka/pom.xml +++ b/tajo-storage/tajo-storage-kafka/pom.xml @@ -124,8 +124,8 @@ limitations under the License. org.apache.kafka - kafka_2.10 - 0.8.2.0 + ${kafka.artifactId} + ${kafka.version} provided