From 52a1e81b9e2f248023e76f03d019c20651931971 Mon Sep 17 00:00:00 2001 From: yaojie Date: Fri, 22 Mar 2019 15:28:58 -0700 Subject: [PATCH] CDAP-15105 do not create directory for preview --- kafka-plugins-0.10/docs/KAFKABATCHSOURCE.md | 2 +- .../cask/hydrator/plugin/batch/source/KafkaBatchSource.java | 5 ++++- kafka-plugins-0.8/docs/KAFKABATCHSOURCE.md | 2 +- .../cask/hydrator/plugin/batch/source/KafkaBatchSource.java | 5 ++++- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/kafka-plugins-0.10/docs/KAFKABATCHSOURCE.md b/kafka-plugins-0.10/docs/KAFKABATCHSOURCE.md index 5d0683a..acf8068 100644 --- a/kafka-plugins-0.10/docs/KAFKABATCHSOURCE.md +++ b/kafka-plugins-0.10/docs/KAFKABATCHSOURCE.md @@ -21,7 +21,7 @@ Plugin Configuration | :------------ | :------: | :----- | :---------- | | **Kafka Brokers** | **Y** | N/A | List of Kafka brokers specified in host1:port1,host2:port2 form. | | **Kafka Topic** | **Y** | N/A | The Kafka topic to read from. | -| **Offset Table Name** | **Y** | N/A | Optional table name to track the latest offset we read from kafka. It is recommended to name it same as the pipeline name to avoid conflict on table names. By default it will be the topic name. +| **Offset Directory** | **Y** | N/A | A directory path to store the latest Kafka offsets. A file named with the pipeline name will be created under the given directory. | **Topic Partition** | **N** | N/A | List of topic partitions to read from. If not specified, all partitions will be read. | | **Initial Partition Offsets** | **N** | N/A | The initial offset for each topic partition. If this is not specified, earliest offset will be used. This offset will only be used for the first run of the pipeline. Any subsequent run will read from the latest offset from previous run. Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read. | | **Key Field** | **N** | N/A | Optional name of the field containing the message key. If this is not set, no key field will be added to output records. If set, this field must be present in the schema property and must be bytes. | diff --git a/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java index 010cafe..a32777d 100644 --- a/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java +++ b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java @@ -146,7 +146,7 @@ public void prepareRun(BatchSourceContext context) throws Exception { KafkaPartitionOffsets partitionOffsets = config.getInitialPartitionOffsets(); // If the offset directory is provided, try to load the file - if (config.getOffsetDir() != null) { + if (!context.isPreviewEnabled() && config.getOffsetDir() != null) { Path offsetDir = new Path(URI.create(config.getOffsetDir())); fileContext = FileContext.getFileContext(offsetDir.toUri(), conf); try { @@ -184,6 +184,9 @@ public void prepareRun(BatchSourceContext context) throws Exception { @Override public void onRunFinish(boolean succeeded, BatchSourceContext context) { + if (context.isPreviewEnabled()) { + return; + } if (succeeded && kafkaRequests != null && fileContext != null && offsetsFile != null) { KafkaPartitionOffsets partitionOffsets = new KafkaPartitionOffsets( kafkaRequests.stream().collect(Collectors.toMap(KafkaRequest::getPartition, KafkaRequest::getEndOffset))); diff --git a/kafka-plugins-0.8/docs/KAFKABATCHSOURCE.md b/kafka-plugins-0.8/docs/KAFKABATCHSOURCE.md index 297ed4d..c8414d3 100644 --- a/kafka-plugins-0.8/docs/KAFKABATCHSOURCE.md +++ b/kafka-plugins-0.8/docs/KAFKABATCHSOURCE.md @@ -21,7 +21,7 @@ Plugin Configuration | :------------ | :------: | :----- | :---------- | | **Kafka Brokers** | **Y** | N/A | List of Kafka brokers specified in host1:port1,host2:port2 form. | | **Kafka Topic** | **Y** | N/A | The Kafka topic to read from. | -| **Offset Table Name** | **Y** | N/A | Optional table name to track the latest offset we read from kafka. It is recommended to name it same as the pipeline name to avoid conflict on table names. By default it will be the topic name. +| **Offset Directory** | **Y** | N/A | A directory path to store the latest Kafka offsets. A file named with the pipeline name will be created under the given directory. | **Topic Partition** | **N** | N/A | List of topic partitions to read from. If not specified, all partitions will be read. | | **Initial Partition Offsets** | **N** | N/A | The initial offset for each topic partition. If this is not specified, earliest offset will be used. This offset will only be used for the first run of the pipeline. Any subsequent run will read from the latest offset from previous run. Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read. | | **Key Field** | **N** | N/A | Optional name of the field containing the message key. If this is not set, no key field will be added to output records. If set, this field must be present in the schema property and must be bytes. | diff --git a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java index 74fe08c..448eab7 100644 --- a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java +++ b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java @@ -85,7 +85,7 @@ public void prepareRun(BatchSourceContext context) throws Exception { KafkaPartitionOffsets partitionOffsets = config.getInitialPartitionOffsets(); // If the offset directory is provided, try to load the file - if (config.getOffsetDir() != null) { + if (!context.isPreviewEnabled() && config.getOffsetDir() != null) { Path offsetDir = new Path(URI.create(config.getOffsetDir())); fileContext = FileContext.getFileContext(offsetDir.toUri(), conf); try { @@ -117,6 +117,9 @@ public void prepareRun(BatchSourceContext context) throws Exception { @Override public void onRunFinish(boolean succeeded, BatchSourceContext context) { + if (context.isPreviewEnabled()) { + return; + } if (succeeded && kafkaRequests != null && fileContext != null && offsetsFile != null) { KafkaPartitionOffsets partitionOffsets = new KafkaPartitionOffsets( kafkaRequests.stream().collect(Collectors.toMap(KafkaRequest::getPartition, KafkaRequest::getEndOffset)));