Permalink
Browse files

merge from 0.8 and resolve conflicts

  • Loading branch information...
2 parents 82b11aa + 92ecebe commit 4f2742d60d16f5ba468aa66d2c3ed7aa37479dce @junrao junrao committed Mar 4, 2013
Showing with 1,200 additions and 922 deletions.
  1. +24 −9 bin/windows/kafka-run-class.bat
  2. +2 −1 config/log4j.properties
  3. +33 −86 contrib/hadoop-producer/README.md
  4. +6 −8 contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
  5. +43 −60 contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
  6. +22 −13 contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
  7. +3 −5 contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
  8. +36 −41 core/src/main/scala/kafka/admin/AdminUtils.scala
  9. +1 −1 core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala
  10. +5 −5 core/src/main/scala/kafka/admin/CreateTopicCommand.scala
  11. +4 −4 core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
  12. +2 −3 core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
  13. +20 −16 core/src/main/scala/kafka/admin/ShutdownBroker.scala
  14. +6 −2 core/src/main/scala/kafka/api/FetchRequest.scala
  15. +2 −2 core/src/main/scala/kafka/api/FetchResponse.scala
  16. +1 −1 core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
  17. +9 −4 core/src/main/scala/kafka/api/ProducerRequest.scala
  18. +0 −2 core/src/main/scala/kafka/api/ProducerResponse.scala
  19. +1 −1 core/src/main/scala/kafka/api/StopReplicaRequest.scala
  20. +1 −1 core/src/main/scala/kafka/api/TopicMetadataRequest.scala
  21. +15 −3 core/src/main/scala/kafka/cluster/Broker.scala
  22. +3 −2 core/src/main/scala/kafka/cluster/Partition.scala
  23. +4 −3 core/src/main/scala/kafka/common/LeaderNotAvailableException.scala
  24. +2 −1 core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
  25. +4 −4 core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
  26. +4 −4 core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
  27. +54 −73 core/src/main/scala/kafka/consumer/TopicCount.scala
  28. +5 −4 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
  29. +2 −3 core/src/main/scala/kafka/controller/KafkaController.scala
  30. +3 −2 core/src/main/scala/kafka/controller/PartitionStateMachine.scala
  31. +13 −3 core/src/main/scala/kafka/log/FileMessageSet.scala
  32. +24 −6 core/src/main/scala/kafka/log/Log.scala
  33. +24 −12 core/src/main/scala/kafka/log/OffsetIndex.scala
  34. +15 −11 core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
  35. +8 −4 core/src/main/scala/kafka/network/RequestChannel.scala
  36. +25 −8 core/src/main/scala/kafka/network/SocketServer.scala
  37. +17 −5 core/src/main/scala/kafka/network/Transmission.scala
  38. +3 −2 core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
  39. +8 −2 core/src/main/scala/kafka/producer/ConsoleProducer.scala
  40. +9 −4 core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
  41. +5 −6 core/src/main/scala/kafka/producer/Producer.scala
  42. +4 −4 core/src/main/scala/kafka/producer/ProducerRequestStats.scala
  43. +0 −1 core/src/main/scala/kafka/producer/ProducerStats.scala
  44. +6 −5 core/src/main/scala/kafka/producer/ProducerTopicStats.scala
  45. +12 −5 core/src/main/scala/kafka/producer/SyncProducer.scala
  46. +27 −25 core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
  47. +1 −1 core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
  48. +16 −11 core/src/main/scala/kafka/server/AbstractFetcherThread.scala
  49. +21 −16 core/src/main/scala/kafka/server/KafkaApis.scala
  50. +1 −2 core/src/main/scala/kafka/server/KafkaConfig.scala
  51. +3 −3 core/src/main/scala/kafka/server/KafkaRequestHandler.scala
  52. +3 −3 core/src/main/scala/kafka/server/KafkaServer.scala
  53. +1 −1 core/src/main/scala/kafka/server/KafkaServerStartable.scala
  54. +25 −18 core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
  55. +13 −0 core/src/main/scala/kafka/server/ReplicaManager.scala
  56. +13 −19 core/src/main/scala/kafka/server/RequestPurgatory.scala
  57. +20 −12 core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
  58. +7 −3 core/src/main/scala/kafka/tools/ExportZkOffsets.scala
  59. +236 −125 core/src/main/scala/kafka/tools/KafkaMigrationTool.java
  60. +4 −1 core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
  61. +3 −2 core/src/main/scala/kafka/utils/ShutdownableThread.scala
  62. +46 −22 core/src/main/scala/kafka/utils/Utils.scala
  63. +69 −82 core/src/main/scala/kafka/utils/ZkUtils.scala
  64. +39 −38 core/src/test/scala/unit/kafka/admin/AdminTest.scala
  65. +1 −1 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
  66. +44 −11 core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
  67. +1 −13 core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
  68. +5 −3 core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
  69. +4 −3 core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
  70. +39 −5 core/src/test/scala/unit/kafka/network/SocketServerTest.scala
  71. +6 −7 core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
  72. +2 −1 core/src/test/scala/unit/kafka/producer/ProducerTest.scala
  73. +34 −36 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
  74. +1 −1 core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
  75. +1 −1 core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
  76. +17 −8 core/src/test/scala/unit/kafka/utils/TestUtils.scala
  77. +3 −2 core/src/test/scala/unit/kafka/utils/UtilsTest.scala
  78. +1 −1 perf/src/main/scala/kafka/perf/ProducerPerformance.scala
  79. +1 −1 project/build/KafkaProject.scala
  80. +1 −1 system_test/mirror_maker_testsuite/config/mirror_consumer.properties
  81. +1 −1 system_test/mirror_maker_testsuite/testcase_5003/testcase_5003_properties.json
@@ -25,23 +25,38 @@ set BASE_DIR=%CD%\..
set CLASSPATH=
echo %BASE_DIR%
-for %%i in (%BASE_DIR%\project\boot\scala-2.8.0\lib\*.jar) do (
- call :concat %%i
-)
+set ivyPath=%USERPROFILE%\.ivy2\cache
-for %%i in (%BASE_DIR%\core\target\scala_2.8.0\*.jar) do (
- call :concat %%i
-)
+set snappy=%ivyPath%/org.xerial.snappy/snappy-java/bundles/snappy-java-1.0.4.1.jar
+ call :concat %snappy%
-for %%i in (%BASE_DIR%\core\lib\*.jar) do (
+set library=%ivyPath%/org.scala-lang/scala-library/jars/scala-library-2.8.0.jar
+ call :concat %library%
+
+set compiler=%ivyPath%/org.scala-lang/scala-compiler/jars/scala-compiler-2.8.0.jar
+ call :concat %compiler%
+
+set log4j=%ivyPath%/log4j/log4j/jars/log4j-1.2.15.jar
+ call :concat %log4j%
+
+set slf=%ivyPath%/org.slf4j/slf4j-api/jars/slf4j-api-1.6.4.jar
+ call :concat %slf%
+
+set zookeeper=%ivyPath%/org.apache.zookeeper/zookeeper/jars/zookeeper-3.3.4.jar
+ call :concat %zookeeper%
+
+set jopt=%ivyPath%/net.sf.jopt-simple/jopt-simple/jars/jopt-simple-3.2.jar
+ call :concat %jopt%
+
+for %%i in (%BASE_DIR%\core\target\scala-2.8.0\*.jar) do (
call :concat %%i
)
-for %%i in (%BASE_DIR%\perf\target\scala_2.8.0/kafka*.jar) do (
+for %%i in (%BASE_DIR%\core\lib\*.jar) do (
call :concat %%i
)
-for %%i in (%BASE_DIR%\core\lib_managed\scala_2.8.0\compile\*.jar) do (
+for %%i in (%BASE_DIR%\perf\target\scala-2.8.0/kafka*.jar) do (
call :concat %%i
)
View
@@ -48,11 +48,12 @@ log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
#log4j.logger.kafka.perf=DEBUG, kafkaAppender
#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
-log4j.logger.kafka=INFO
+log4j.logger.kafka=INFO, kafkaAppender
log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
log4j.additivity.kafka.network.RequestChannel$=false
+#log4j.logger.kafka.network.Processor=TRACE, requestAppender
#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
#log4j.additivity.kafka.server.KafkaApis=false
log4j.logger.kafka.request.logger=WARN, requestAppender
@@ -4,16 +4,20 @@ Hadoop to Kafka Bridge
What's new?
-----------
-* Now supports Kafka's software load balancer (Kafka URIs are specified with
- kafka+zk as the scheme, as described below)
-* Supports Kafka 0.7. Now uses the new Producer API, rather than the legacy
- SyncProducer.
+* Kafka 0.8 support
+ * No more ZK-based load balancing (backwards incompatible change)
+* Semantic partitioning is now supported in KafkaOutputFormat. Just specify a
+ key in the output committer of your job. The Pig StoreFunc doesn't support
+ semantic partitioning.
+* Config parameters are now the same as the Kafka producer, just prepended with
+ kafka.output (e.g., kafka.output.max.message.size). This is a backwards
+ incompatible change.
What is it?
-----------
The Hadoop to Kafka bridge is a way to publish data from Hadoop to Kafka. There
-are two possible mechanisms, varying from easy to difficult: writing a Pig
+are two possible mechanisms, varying from easy to difficult: writing a Pig
script and writing messages in Avro format, or rolling your own job using the
Kafka `OutputFormat`.
@@ -25,10 +29,8 @@ multiple times in the same push.
How do I use it?
----------------
-With this bridge, Kafka topics are URIs and are specified in one of two
-formats: `kafka+zk://<zk-path>#<kafka-topic>`, which uses the software load
-balancer, or the legacy `kafka://<kafka-server>/<kafka-topic>` to connect to a
-specific Kafka broker.
+With this bridge, Kafka topics are URIs and are specified as URIs of the form
+`kafka://<kafka-server>/<kafka-topic>` to connect to a specific Kafka broker.
### Pig ###
@@ -37,19 +39,17 @@ row. To push data via Kafka, store to the Kafka URI using `AvroKafkaStorage`
with the Avro schema as its first argument. You'll need to register the
appropriate Kafka JARs. Here is what an example Pig script looks like:
- REGISTER hadoop-producer_2.8.0-0.7.0.jar;
+ REGISTER hadoop-producer_2.8.0-0.8.0.jar;
REGISTER avro-1.4.0.jar;
REGISTER piggybank.jar;
- REGISTER kafka-0.7.0.jar;
+ REGISTER kafka-0.8.0.jar;
REGISTER jackson-core-asl-1.5.5.jar;
REGISTER jackson-mapper-asl-1.5.5.jar;
- REGISTER zkclient-20110412.jar;
- REGISTER zookeeper-3.3.4.jar;
REGISTER scala-library.jar;
- member_info = LOAD 'member_info.tsv' as (member_id : int, name : chararray);
+ member_info = LOAD 'member_info.tsv' AS (member_id : int, name : chararray);
names = FOREACH member_info GENERATE name;
- STORE member_info INTO 'kafka+zk://my-zookeeper:2181/kafka#member_info' USING kafka.bridge.AvroKafkaStorage('"string"');
+ STORE member_info INTO 'kafka://my-kafka:9092/member_info' USING kafka.bridge.AvroKafkaStorage('"string"');
That's it! The Pig StoreFunc makes use of AvroStorage in Piggybank to convert
from Pig's data model to the specified Avro schema.
@@ -58,8 +58,8 @@ Further, multi-store is possible with KafkaStorage, so you can easily write to
multiple topics and brokers in the same job:
SPLIT member_info INTO early_adopters IF member_id < 1000, others IF member_id >= 1000;
- STORE early_adopters INTO 'kafka+zk://my-zookeeper:2181/kafka#early_adopters' USING AvroKafkaStorage('$schema');
- STORE others INTO 'kafka://my-broker:9092,my-broker2:9092/others' USING AvroKafkaStorage('$schema');
+ STORE early_adopters INTO 'kafka://my-broker:9092/early_adopters' USING AvroKafkaStorage('$schema');
+ STORE others INTO 'kafka://my-broker2:9092/others' USING AvroKafkaStorage('$schema');
### KafkaOutputFormat ###
@@ -68,80 +68,27 @@ uses the newer 0.20 mapreduce APIs and simply pushes bytes (i.e.,
BytesWritable). This is a lower-level method of publishing data, as it allows
you to precisely control output.
-Here is an example that publishes some input text. With KafkaOutputFormat, the
-key is a NullWritable and is ignored; only values are published. Speculative
-execution is turned off by the OutputFormat.
-
- import kafka.bridge.hadoop.KafkaOutputFormat;
-
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.BytesWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
- import java.io.IOException;
-
- public class TextPublisher
- {
- public static void main(String[] args) throws Exception
- {
- if (args.length != 2) {
- System.err.println("usage: <input path> <kafka output url>");
- return;
- }
-
- Job job = new Job();
-
- job.setJarByClass(TextPublisher.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(BytesWritable.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(KafkaOutputFormat.class);
-
- job.setMapperClass(TheMapper.class);
- job.setNumReduceTasks(0);
-
- FileInputFormat.addInputPath(job, new Path(args[0]));
- KafkaOutputFormat.setOutputPath(job, new Path(args[1]));
-
- if (!job.waitForCompletion(true)) {
- throw new RuntimeException("Job failed!");
- }
- }
-
- public static class TheMapper extends Mapper<Object, Object, NullWritable, BytesWritable>
- {
- @Override
- protected void map(Object key, Object value, Context context) throws IOException, InterruptedException
- {
- context.write(NullWritable.get(), new BytesWritable(((Text) value).getBytes()));
- }
- }
- }
+Included is an example that publishes some input text line-by-line to a topic.
+With KafkaOutputFormat, the key can be a null, where it is ignored by the
+producer (random partitioning), or any object for semantic partitioning of the
+stream (with an appropriate Kafka partitioner set). Speculative execution is
+turned off by the OutputFormat.
What can I tune?
----------------
-Normally, you needn't change any of these parameters:
-
-* kafka.output.queue_size: Bytes to queue in memory before pushing to the Kafka
+* kafka.output.queue.size: Bytes to queue in memory before pushing to the Kafka
producer (i.e., the batch size). Default is 10*1024*1024 (10MB).
-* kafka.output.connect_timeout: Connection timeout in milliseconds (see Kafka
- producer docs). Default is 30*1000 (30s).
-* kafka.output.reconnect_timeout: Milliseconds to wait until attempting
- reconnection (see Kafka producer docs). Default is 1000 (1s).
-* kafka.output.bufsize: Producer buffer size in bytes (see Kafka producer
- docs). Default is 64*1024 (64KB).
-* kafka.output.max_msgsize: Maximum message size in bytes (see Kafka producer
- docs). Default is 1024*1024 (1MB).
-* kafka.output.compression_codec: The compression codec to use (see Kafka producer
- docs). Default is 0 (no compression).
+
+Any of Kafka's producer parameters can be changed by prefixing them with
+"kafka.output" in one's job configuration. For example, to change the
+compression codec, one would add the "kafka.output.compression.codec" parameter
+(e.g., "SET kafka.output.compression.codec 0" in one's Pig script for no
+compression).
For easier debugging, the above values as well as the Kafka broker information
-(either kafka.zk.connect or kafka.broker.list), the topic (kafka.output.topic),
-and the schema (kafka.output.schema) are injected into the job's configuration.
+(kafka.broker.list), the topic (kafka.output.topic), and the schema
+(kafka.output.schema) are injected into the job's configuration. By default,
+the Hadoop producer uses Kafka's sync producer as asynchronous operation
+doesn't make sense in the batch Hadoop case.
@@ -16,18 +16,18 @@
*/
package kafka.bridge.examples;
-
import java.io.IOException;
import kafka.bridge.hadoop.KafkaOutputFormat;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+/**
+ * Publish a text file line by line to a Kafka topic
+ */
public class TextPublisher
{
public static void main(String[] args) throws Exception
@@ -40,8 +40,6 @@ public static void main(String[] args) throws Exception
Job job = new Job();
job.setJarByClass(TextPublisher.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(BytesWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(KafkaOutputFormat.class);
@@ -56,12 +54,12 @@ public static void main(String[] args) throws Exception
}
}
- public static class TheMapper extends Mapper<Object, Object, NullWritable, BytesWritable>
+ public static class TheMapper extends Mapper<Object, Text, Object, Object>
{
@Override
- protected void map(Object key, Object value, Context context) throws IOException, InterruptedException
+ protected void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
- context.write(NullWritable.get(), new BytesWritable(((Text) value).getBytes()));
+ context.write(null, value.getBytes());
}
}
}
Oops, something went wrong.

0 comments on commit 4f2742d

Please sign in to comment.