From 7907ccff459a8c742fe993ff41031be6ce2e6bc7 Mon Sep 17 00:00:00 2001 From: Dippatel98 Date: Tue, 14 May 2024 21:08:49 +0000 Subject: [PATCH] Refactor KafkaToBigQueryFlexOptions --- .../options/KafkaToBigQueryFlexOptions.java | 34 +++++++++++++------ .../v2/transforms/AvroDynamicTransform.java | 2 +- .../BigQueryDynamicDestination.java | 2 +- 3 files changed, 26 insertions(+), 12 deletions(-) diff --git a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/options/KafkaToBigQueryFlexOptions.java b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/options/KafkaToBigQueryFlexOptions.java index 18644118b0..4aa70cfeca 100644 --- a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/options/KafkaToBigQueryFlexOptions.java +++ b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/options/KafkaToBigQueryFlexOptions.java @@ -82,13 +82,14 @@ public interface KafkaToBigQueryFlexOptions @TemplateParameter.Enum( order = 6, + name = "messageFormat", enumOptions = { @TemplateParameter.TemplateEnumOption("AVRO"), @TemplateParameter.TemplateEnumOption("JSON") }, optional = true, - description = "The message format", - helpText = "The message format. Can be AVRO or JSON.") + description = "The Kafka message format", + helpText = "The Kafka message format. Can be AVRO or JSON.") @Default.String("AVRO") String getMessageFormat(); @@ -97,14 +98,17 @@ public interface KafkaToBigQueryFlexOptions // TODO: Sync the enum options with all the Kafka Templates. @TemplateParameter.Enum( order = 7, + name = "avroFormat", + parentName = "messageFormat", + parentTriggerValues = {"AVRO"}, enumOptions = { @TemplateParameter.TemplateEnumOption("CONFLUENT_WIRE_FORMAT"), @TemplateParameter.TemplateEnumOption("NON_WIRE_FORMAT") }, optional = true, - description = "Use the confluent wire format for avro messages.", + description = "The format to use for avro messages.", helpText = - "This parameter is used to indicate if the avro messages use confluent wire format. Default is true (Confluent Wire Format)") + "This parameter is used to indicate what format to use for the avro messages. Default is CONFLUENT_WIRE_FORMAT.") @Default.String("CONFLUENT_WIRE_FORMAT") String getAvroFormat(); @@ -112,6 +116,8 @@ public interface KafkaToBigQueryFlexOptions @TemplateParameter.GcsReadFile( order = 8, + parentName = "messageFormat", + parentTriggerValues = {"AVRO"}, optional = true, description = "Cloud Storage path to the Avro schema file", helpText = "Cloud Storage path to Avro schema file. For example, gs://MyBucket/file.avsc.") @@ -121,6 +127,8 @@ public interface KafkaToBigQueryFlexOptions @TemplateParameter.Text( order = 9, + parentName = "avroFormat", + parentTriggerValues = {"CONFLUENT_WIRE_FORMAT"}, optional = true, description = "Schema Registry Connection URL.", helpText = @@ -131,25 +139,31 @@ public interface KafkaToBigQueryFlexOptions @TemplateParameter.Text( order = 11, + parentName = "avroFormat", + parentTriggerValues = {"CONFLUENT_WIRE_FORMAT"}, optional = true, description = "BigQuery output dataset", helpText = - "BigQuery output dataset to write the output to." - + "Tables will be created dynamically in the dataset.") + "BigQuery output dataset to write the output to. Tables will be created dynamically in the dataset." + + " If the tables are created beforehand, the table names should follow the specified naming convention." + + " The name should be `bqTableNamePrefix + Avro Schema FullName` {@link org.apache.avro.Schema.getFullName}," + + " each word will be seperated by a hyphen '-'.") String getOutputDataset(); void setOutputDataset(String value); @TemplateParameter.Text( order = 12, + parentName = "avroFormat", + parentTriggerValues = {"CONFLUENT_WIRE_FORMAT"}, optional = true, - description = "Name prefix to be used while creating BigQuery output tables.", + description = "Naming prefix to be used while creating BigQuery output tables.", helpText = - "Name prefix to be used while creating BigQuery output tables. Only applicable when using schema registry.") + "Naming prefix to be used while creating BigQuery output tables. Only applicable when using schema registry.") @Default.String("") - String getBQTableNamePrefix(); + String getBqTableNamePrefix(); - void setBQTableNamePrefix(String value); + void setBqTableNamePrefix(String value); @TemplateParameter.Boolean( order = 13, diff --git a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/AvroDynamicTransform.java b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/AvroDynamicTransform.java index 074084ee94..ec9ec4e73e 100644 --- a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/AvroDynamicTransform.java +++ b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/AvroDynamicTransform.java @@ -84,7 +84,7 @@ public WriteResult expand(PCollection> kafkaRecords) BigQueryDynamicDestination.of( options.getProject(), options.getOutputDataset(), - options.getBQTableNamePrefix())) + options.getBqTableNamePrefix())) .withWriteDisposition(WriteDisposition.valueOf(options.getWriteDisposition())) .withCreateDisposition(CreateDisposition.valueOf(options.getCreateDisposition())) .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) diff --git a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryDynamicDestination.java b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryDynamicDestination.java index 8f43357d00..b02c862abb 100644 --- a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryDynamicDestination.java +++ b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryDynamicDestination.java @@ -65,7 +65,7 @@ public TableDestination getTable(GenericRecord element) { @Override public TableSchema getSchema(GenericRecord element) { - // TODO: Test if sending null can work here, might be mroe efficient. + // TODO: Test if sending null can work here, might be more efficient. return BigQueryUtils.toTableSchema(AvroUtils.toBeamSchema(element.getSchema())); }