Skip to content

Commit

Permalink
Merge pull request #1560 from Dippatel98:gmk-templates
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 634016034
  • Loading branch information
cloud-teleport committed May 15, 2024
2 parents 3e25241 + 7907ccf commit c420cf4
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,14 @@ public interface KafkaToBigQueryFlexOptions

@TemplateParameter.Enum(
order = 6,
name = "messageFormat",
enumOptions = {
@TemplateParameter.TemplateEnumOption("AVRO"),
@TemplateParameter.TemplateEnumOption("JSON")
},
optional = true,
description = "The message format",
helpText = "The message format. Can be AVRO or JSON.")
description = "The Kafka message format",
helpText = "The Kafka message format. Can be AVRO or JSON.")
@Default.String("AVRO")
String getMessageFormat();

Expand All @@ -97,21 +98,26 @@ public interface KafkaToBigQueryFlexOptions
// TODO: Sync the enum options with all the Kafka Templates.
@TemplateParameter.Enum(
order = 7,
name = "avroFormat",
parentName = "messageFormat",
parentTriggerValues = {"AVRO"},
enumOptions = {
@TemplateParameter.TemplateEnumOption("CONFLUENT_WIRE_FORMAT"),
@TemplateParameter.TemplateEnumOption("NON_WIRE_FORMAT")
},
optional = true,
description = "Use the confluent wire format for avro messages.",
description = "The format to use for avro messages.",
helpText =
"This parameter is used to indicate if the avro messages use confluent wire format. Default is true (Confluent Wire Format)")
"This parameter is used to indicate what format to use for the avro messages. Default is CONFLUENT_WIRE_FORMAT.")
@Default.String("CONFLUENT_WIRE_FORMAT")
String getAvroFormat();

void setAvroFormat(String value);

@TemplateParameter.GcsReadFile(
order = 8,
parentName = "messageFormat",
parentTriggerValues = {"AVRO"},
optional = true,
description = "Cloud Storage path to the Avro schema file",
helpText = "Cloud Storage path to Avro schema file. For example, gs://MyBucket/file.avsc.")
Expand All @@ -121,6 +127,8 @@ public interface KafkaToBigQueryFlexOptions

@TemplateParameter.Text(
order = 9,
parentName = "avroFormat",
parentTriggerValues = {"CONFLUENT_WIRE_FORMAT"},
optional = true,
description = "Schema Registry Connection URL.",
helpText =
Expand All @@ -131,25 +139,31 @@ public interface KafkaToBigQueryFlexOptions

@TemplateParameter.Text(
order = 11,
parentName = "avroFormat",
parentTriggerValues = {"CONFLUENT_WIRE_FORMAT"},
optional = true,
description = "BigQuery output dataset",
helpText =
"BigQuery output dataset to write the output to."
+ "Tables will be created dynamically in the dataset.")
"BigQuery output dataset to write the output to. Tables will be created dynamically in the dataset."
+ " If the tables are created beforehand, the table names should follow the specified naming convention."
+ " The name should be `bqTableNamePrefix + Avro Schema FullName` {@link org.apache.avro.Schema.getFullName},"
+ " each word will be seperated by a hyphen '-'.")
String getOutputDataset();

void setOutputDataset(String value);

@TemplateParameter.Text(
order = 12,
parentName = "avroFormat",
parentTriggerValues = {"CONFLUENT_WIRE_FORMAT"},
optional = true,
description = "Name prefix to be used while creating BigQuery output tables.",
description = "Naming prefix to be used while creating BigQuery output tables.",
helpText =
"Name prefix to be used while creating BigQuery output tables. Only applicable when using schema registry.")
"Naming prefix to be used while creating BigQuery output tables. Only applicable when using schema registry.")
@Default.String("")
String getBQTableNamePrefix();
String getBqTableNamePrefix();

void setBQTableNamePrefix(String value);
void setBqTableNamePrefix(String value);

@TemplateParameter.Boolean(
order = 13,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public WriteResult expand(PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords)
BigQueryDynamicDestination.of(
options.getProject(),
options.getOutputDataset(),
options.getBQTableNamePrefix()))
options.getBqTableNamePrefix()))
.withWriteDisposition(WriteDisposition.valueOf(options.getWriteDisposition()))
.withCreateDisposition(CreateDisposition.valueOf(options.getCreateDisposition()))
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public TableDestination getTable(GenericRecord element) {

@Override
public TableSchema getSchema(GenericRecord element) {
// TODO: Test if sending null can work here, might be mroe efficient.
// TODO: Test if sending null can work here, might be more efficient.
return BigQueryUtils.toTableSchema(AvroUtils.toBeamSchema(element.getSchema()));
}

Expand Down

0 comments on commit c420cf4

Please sign in to comment.