Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka-plugins-0.10/docs/KAFKABATCHSOURCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Plugin Configuration
| :------------ | :------: | :----- | :---------- |
| **Kafka Brokers** | **Y** | N/A | List of Kafka brokers specified in host1:port1,host2:port2 form. |
| **Kafka Topic** | **Y** | N/A | The Kafka topic to read from. |
| **Offset Table Name** | **Y** | N/A | Optional table name to track the latest offset we read from kafka. It is recommended to name it same as the pipeline name to avoid conflict on table names. By default it will be the topic name.
| **Offset Directory** | **Y** | N/A | A directory path to store the latest Kafka offsets. A file named with the pipeline name will be created under the given directory.
| **Topic Partition** | **N** | N/A | List of topic partitions to read from. If not specified, all partitions will be read. |
| **Initial Partition Offsets** | **N** | N/A | The initial offset for each topic partition. If this is not specified, earliest offset will be used. This offset will only be used for the first run of the pipeline. Any subsequent run will read from the latest offset from previous run. Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read. |
| **Key Field** | **N** | N/A | Optional name of the field containing the message key. If this is not set, no key field will be added to output records. If set, this field must be present in the schema property and must be bytes. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
KafkaPartitionOffsets partitionOffsets = config.getInitialPartitionOffsets();

// If the offset directory is provided, try to load the file
if (config.getOffsetDir() != null) {
if (!context.isPreviewEnabled() && config.getOffsetDir() != null) {
Path offsetDir = new Path(URI.create(config.getOffsetDir()));
fileContext = FileContext.getFileContext(offsetDir.toUri(), conf);
try {
Expand Down Expand Up @@ -184,6 +184,9 @@ public void prepareRun(BatchSourceContext context) throws Exception {

@Override
public void onRunFinish(boolean succeeded, BatchSourceContext context) {
if (context.isPreviewEnabled()) {
return;
}
if (succeeded && kafkaRequests != null && fileContext != null && offsetsFile != null) {
KafkaPartitionOffsets partitionOffsets = new KafkaPartitionOffsets(
kafkaRequests.stream().collect(Collectors.toMap(KafkaRequest::getPartition, KafkaRequest::getEndOffset)));
Expand Down
2 changes: 1 addition & 1 deletion kafka-plugins-0.8/docs/KAFKABATCHSOURCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Plugin Configuration
| :------------ | :------: | :----- | :---------- |
| **Kafka Brokers** | **Y** | N/A | List of Kafka brokers specified in host1:port1,host2:port2 form. |
| **Kafka Topic** | **Y** | N/A | The Kafka topic to read from. |
| **Offset Table Name** | **Y** | N/A | Optional table name to track the latest offset we read from kafka. It is recommended to name it same as the pipeline name to avoid conflict on table names. By default it will be the topic name.
| **Offset Directory** | **Y** | N/A | A directory path to store the latest Kafka offsets. A file named with the pipeline name will be created under the given directory.
| **Topic Partition** | **N** | N/A | List of topic partitions to read from. If not specified, all partitions will be read. |
| **Initial Partition Offsets** | **N** | N/A | The initial offset for each topic partition. If this is not specified, earliest offset will be used. This offset will only be used for the first run of the pipeline. Any subsequent run will read from the latest offset from previous run. Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read. |
| **Key Field** | **N** | N/A | Optional name of the field containing the message key. If this is not set, no key field will be added to output records. If set, this field must be present in the schema property and must be bytes. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
KafkaPartitionOffsets partitionOffsets = config.getInitialPartitionOffsets();

// If the offset directory is provided, try to load the file
if (config.getOffsetDir() != null) {
if (!context.isPreviewEnabled() && config.getOffsetDir() != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we return before getting partitionOffsets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is still following steps that need to be done to generate kafka request.

Path offsetDir = new Path(URI.create(config.getOffsetDir()));
fileContext = FileContext.getFileContext(offsetDir.toUri(), conf);
try {
Expand Down Expand Up @@ -117,6 +117,9 @@ public void prepareRun(BatchSourceContext context) throws Exception {

@Override
public void onRunFinish(boolean succeeded, BatchSourceContext context) {
if (context.isPreviewEnabled()) {
return;
}
if (succeeded && kafkaRequests != null && fileContext != null && offsetsFile != null) {
KafkaPartitionOffsets partitionOffsets = new KafkaPartitionOffsets(
kafkaRequests.stream().collect(Collectors.toMap(KafkaRequest::getPartition, KafkaRequest::getEndOffset)));
Expand Down