diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 797dc3cc0b8ef..d4bda578a5ded 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -791,7 +791,7 @@ - name: Kafka sourceDefinitionId: d917a47b-8537-4d0d-8c10-36a9928d4265 dockerRepository: airbyte/source-kafka - dockerImageTag: 0.2.2 + dockerImageTag: 0.2.3 documentationUrl: https://docs.airbyte.com/integrations/sources/kafka icon: kafka.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 471a370b5e61f..ebbb941253ef9 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -6733,7 +6733,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-kafka:0.2.2" +- dockerImage: "airbyte/source-kafka:0.2.3" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/kafka" connectionSpecification: @@ -6744,7 +6744,7 @@ - "bootstrap_servers" - "subscription" - "protocol" - additionalProperties: false + additionalProperties: true properties: MessageFormat: title: "MessageFormat" @@ -6755,16 +6755,11 @@ properties: deserialization_type: type: "string" - enum: - - "JSON" - default: "JSON" + const: "JSON" - title: "AVRO" properties: deserialization_type: - type: "string" - enum: - - "AVRO" - default: "AVRO" + const: "AVRO" deserialization_strategy: type: "string" enum: @@ -6815,9 +6810,6 @@ \ list of topic partitions is empty, it is treated the same as unsubscribe()." type: "string" const: "assign" - enum: - - "assign" - default: "assign" topic_partitions: title: "List of topic:partition Pairs" type: "string" @@ -6832,9 +6824,6 @@ description: "The Topic pattern from which the records will be read." type: "string" const: "subscribe" - enum: - - "subscribe" - default: "subscribe" topic_pattern: title: "Topic Pattern" type: "string" @@ -6876,9 +6865,7 @@ properties: security_protocol: type: "string" - enum: - - "PLAINTEXT" - default: "PLAINTEXT" + const: "PLAINTEXT" - title: "SASL PLAINTEXT" required: - "security_protocol" @@ -6887,17 +6874,13 @@ properties: security_protocol: type: "string" - enum: - - "SASL_PLAINTEXT" - default: "SASL_PLAINTEXT" + const: "SASL_PLAINTEXT" sasl_mechanism: title: "SASL Mechanism" description: "The SASL mechanism used for client connections. This\ \ may be any mechanism for which a security provider is available." type: "string" - default: "PLAIN" - enum: - - "PLAIN" + const: "PLAIN" sasl_jaas_config: title: "SASL JAAS Config" description: "The JAAS login context parameters for SASL connections\ @@ -6913,9 +6896,7 @@ properties: security_protocol: type: "string" - enum: - - "SASL_SSL" - default: "SASL_SSL" + const: "SASL_SSL" sasl_mechanism: title: "SASL Mechanism" description: "The SASL mechanism used for client connections. This\ diff --git a/airbyte-integrations/connectors/source-kafka/Dockerfile b/airbyte-integrations/connectors/source-kafka/Dockerfile index 6b1e8104bae87..434bec9868ff6 100644 --- a/airbyte-integrations/connectors/source-kafka/Dockerfile +++ b/airbyte-integrations/connectors/source-kafka/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-kafka COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.2.2 +LABEL io.airbyte.version=0.2.3 LABEL io.airbyte.name=airbyte/source-kafka diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java index 2b593278c848a..0f7e10db5df49 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java @@ -145,6 +145,12 @@ public AutoCloseableIterator read() { getTopicsToSubscribe().forEach(topic -> poll_lookup.put(topic, 0)); while (true) { final ConsumerRecords consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS)); + consumerRecords.forEach(record -> { + record_count.getAndIncrement(); + recordsList.add(record); + }); + consumer.commitAsync(); + if (consumerRecords.count() == 0) { consumer.assignment().stream().map(record -> record.topic()).distinct().forEach( topic -> { @@ -160,12 +166,6 @@ public AutoCloseableIterator read() { LOGGER.info("Max record count is reached !!"); break; } - - consumerRecords.forEach(record -> { - record_count.getAndIncrement(); - recordsList.add(record); - }); - consumer.commitAsync(); } consumer.close(); final Iterator> iterator = recordsList.iterator(); diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java index e5670b96bd691..3acf0e7e46a7e 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java @@ -110,6 +110,12 @@ public AutoCloseableIterator read() { getTopicsToSubscribe().forEach(topic -> poll_lookup.put(topic, 0)); while (true) { final ConsumerRecords consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS)); + consumerRecords.forEach(record -> { + record_count.getAndIncrement(); + recordsList.add(record); + }); + consumer.commitAsync(); + if (consumerRecords.count() == 0) { consumer.assignment().stream().map(record -> record.topic()).distinct().forEach( topic -> { @@ -125,12 +131,6 @@ public AutoCloseableIterator read() { LOGGER.info("Max record count is reached !!"); break; } - - consumerRecords.forEach(record -> { - record_count.getAndIncrement(); - recordsList.add(record); - }); - consumer.commitAsync(); } consumer.close(); final Iterator> iterator = recordsList.iterator(); diff --git a/docs/integrations/sources/kafka.md b/docs/integrations/sources/kafka.md index c0029ceb91a70..4e963345dfdd2 100644 --- a/docs/integrations/sources/kafka.md +++ b/docs/integrations/sources/kafka.md @@ -50,6 +50,7 @@ The Kafka source connector supports the following [sync modes](https://docs.airb | Version | Date | Pull Request | Subject | | :------ | :-------- | :------------------------------------------------------| :---------------------------------------- | +| 0.2.3 | 2022-12-06 | [19587](https://github.com/airbytehq/airbyte/pull/19587) | Fix missing data before consumer is closed | | 0.2.2 | 2022-11-04 | [18648](https://github.com/airbytehq/airbyte/pull/18648) | Add missing record_count increment for JSON| | 0.2.1 | 2022-11-04 | This version was the same as 0.2.0 and was committed so using 0.2.2 next to keep versions in order| | 0.2.0 | 2022-08-22 | [13864](https://github.com/airbytehq/airbyte/pull/13864) | Added AVRO format support and Support for maximum records to process|