From 1d34d0e688f608c1ad5470207b7946298a061837 Mon Sep 17 00:00:00 2001 From: an2x <52892974+an2x@users.noreply.github.com> Date: Thu, 9 May 2024 10:42:43 -0400 Subject: [PATCH 1/2] Add support for KAFKA_TOPIC template parameters. --- .../teleport/metadata/TemplateParameter.java | 40 +++++++++++++++++++ .../teleport/metadata/util/MetadataUtils.java | 1 + .../plugin/model/ImageSpecParameter.java | 17 ++++++++ .../plugin/model/ImageSpecParameterType.java | 5 ++- .../plugin/model/TemplateDefinitionsTest.java | 3 ++ .../cloud/teleport/plugin/sample/AtoBOk.java | 10 ++++- 6 files changed, 74 insertions(+), 2 deletions(-) diff --git a/metadata/src/main/java/com/google/cloud/teleport/metadata/TemplateParameter.java b/metadata/src/main/java/com/google/cloud/teleport/metadata/TemplateParameter.java index 24e1e67891..15c5d78efb 100644 --- a/metadata/src/main/java/com/google/cloud/teleport/metadata/TemplateParameter.java +++ b/metadata/src/main/java/com/google/cloud/teleport/metadata/TemplateParameter.java @@ -701,4 +701,44 @@ public final class TemplateParameter { /** Parameter visibility in the UI. */ boolean hiddenUi() default false; } + + /** + * Template parameter containing a Kafka Topic. + * + *

The parameter specifies the fully-qualified name of an Apache Kafka topic. This can be + * either a Google Managed Kafka topic or a non-managed Kafka topic. + */ + @Retention(RetentionPolicy.RUNTIME) + @Target({ElementType.FIELD, ElementType.METHOD}) + public @interface KafkaTopic { + /** Order of appearance. */ + int order() default 999; + + /** Name of the parameter. */ + String name() default ""; + + /** Group Name of the parameter. */ + String groupName() default ""; + + /** Parent Name of the parameter. */ + String parentName() default ""; + + /** List of parent trigger values. */ + String[] parentTriggerValues() default ""; + + /** If parameter is optional. */ + boolean optional() default false; + + /** Description of the parameter. */ + String description(); + + /** Help text of the parameter. */ + String helpText(); + + /** Example of the parameter. */ + String example() default ""; + + /** Parameter visibility in the UI. */ + boolean hiddenUi() default false; + } } diff --git a/metadata/src/main/java/com/google/cloud/teleport/metadata/util/MetadataUtils.java b/metadata/src/main/java/com/google/cloud/teleport/metadata/util/MetadataUtils.java index 9eaf664589..da1ced4fb5 100644 --- a/metadata/src/main/java/com/google/cloud/teleport/metadata/util/MetadataUtils.java +++ b/metadata/src/main/java/com/google/cloud/teleport/metadata/util/MetadataUtils.java @@ -42,6 +42,7 @@ public final class MetadataUtils { TemplateParameter.GcsWriteFile.class, TemplateParameter.GcsWriteFolder.class, TemplateParameter.Integer.class, + TemplateParameter.KafkaTopic.class, TemplateParameter.KmsEncryptionKey.class, TemplateParameter.Long.class, TemplateParameter.Password.class, diff --git a/plugins/core-plugin/src/main/java/com/google/cloud/teleport/plugin/model/ImageSpecParameter.java b/plugins/core-plugin/src/main/java/com/google/cloud/teleport/plugin/model/ImageSpecParameter.java index 1774e215f8..4bc677dccf 100644 --- a/plugins/core-plugin/src/main/java/com/google/cloud/teleport/plugin/model/ImageSpecParameter.java +++ b/plugins/core-plugin/src/main/java/com/google/cloud/teleport/plugin/model/ImageSpecParameter.java @@ -468,6 +468,23 @@ public void processParamType(Annotation parameterAnnotation) { this.setHiddenUi(durationParam.hiddenUi()); this.setParamType(ImageSpecParameterType.TEXT); break; + case "KafkaTopic": + TemplateParameter.KafkaTopic kafkaTopic = + (TemplateParameter.KafkaTopic) parameterAnnotation; + if (!kafkaTopic.name().isEmpty()) { + this.setName(kafkaTopic.name()); + } + processDescriptions( + kafkaTopic.groupName(), + kafkaTopic.description(), + kafkaTopic.helpText(), + kafkaTopic.example()); + this.setParentName(kafkaTopic.parentName()); + this.setParentTriggerValues(kafkaTopic.parentTriggerValues()); + this.setOptional(kafkaTopic.optional()); + this.setHiddenUi(kafkaTopic.hiddenUi()); + this.setParamType(ImageSpecParameterType.KAFKA_TOPIC); + break; default: throw new IllegalArgumentException("Invalid type " + parameterAnnotation); } diff --git a/plugins/core-plugin/src/main/java/com/google/cloud/teleport/plugin/model/ImageSpecParameterType.java b/plugins/core-plugin/src/main/java/com/google/cloud/teleport/plugin/model/ImageSpecParameterType.java index 7f12cc959e..0d1fd972d6 100644 --- a/plugins/core-plugin/src/main/java/com/google/cloud/teleport/plugin/model/ImageSpecParameterType.java +++ b/plugins/core-plugin/src/main/java/com/google/cloud/teleport/plugin/model/ImageSpecParameterType.java @@ -49,5 +49,8 @@ public enum ImageSpecParameterType { ENUM, /** Number parameter. */ - NUMBER; + NUMBER, + + /** Kafka Topic parameter. */ + KAFKA_TOPIC; } diff --git a/plugins/core-plugin/src/test/java/com/google/cloud/teleport/plugin/model/TemplateDefinitionsTest.java b/plugins/core-plugin/src/test/java/com/google/cloud/teleport/plugin/model/TemplateDefinitionsTest.java index 6864e74012..181ff10366 100644 --- a/plugins/core-plugin/src/test/java/com/google/cloud/teleport/plugin/model/TemplateDefinitionsTest.java +++ b/plugins/core-plugin/src/test/java/com/google/cloud/teleport/plugin/model/TemplateDefinitionsTest.java @@ -63,6 +63,9 @@ public void testSampleAtoBOk() { ImageSpecParameter to = metadata.getParameter("to").get(); assertEquals(ImageSpecParameterType.BIGQUERY_TABLE, to.getParamType()); + ImageSpecParameter inputKafkaTopic = metadata.getParameter("inputKafkaTopic").get(); + assertEquals(ImageSpecParameterType.KAFKA_TOPIC, inputKafkaTopic.getParamType()); + ImageSpecParameter logical = metadata.getParameter("logical").get(); assertEquals(ImageSpecParameterType.BOOLEAN, logical.getParamType()); assertEquals("^(true|false)$", logical.getRegexes().get(0)); diff --git a/plugins/core-plugin/src/test/java/com/google/cloud/teleport/plugin/sample/AtoBOk.java b/plugins/core-plugin/src/test/java/com/google/cloud/teleport/plugin/sample/AtoBOk.java index 491d09f776..f0ea357288 100644 --- a/plugins/core-plugin/src/test/java/com/google/cloud/teleport/plugin/sample/AtoBOk.java +++ b/plugins/core-plugin/src/test/java/com/google/cloud/teleport/plugin/sample/AtoBOk.java @@ -99,12 +99,20 @@ public interface AtoBOptions { Boolean getParamWithGroupName(); @TemplateParameter.Text( - order = 8, + order = 9, parentName = "paramWithGroupName", parentTriggerValues = {"true"}, description = "N/A", helpText = "Text that has parent name and parent trigger value") @Default.Boolean(false) Boolean getParamWithParentName(); + + @TemplateParameter.KafkaTopic( + order = 10, + description = "Kafka input topic", + helpText = "Kafka topic to trad from", + example = + "projects/project-foo/locations/us-central1/clusters/cluster-bar/topics/topic-baz") + String getInputKafkaTopic(); } } From 5c92cc829a8b91fb112e3edfb2aed896cffd8611 Mon Sep 17 00:00:00 2001 From: Nick Anikin <52892974+an2x@users.noreply.github.com> Date: Fri, 10 May 2024 10:30:13 -0700 Subject: [PATCH 2/2] Fix typo. Co-authored-by: Anand Inguva <34158215+AnandInguva@users.noreply.github.com> --- .../java/com/google/cloud/teleport/plugin/sample/AtoBOk.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/core-plugin/src/test/java/com/google/cloud/teleport/plugin/sample/AtoBOk.java b/plugins/core-plugin/src/test/java/com/google/cloud/teleport/plugin/sample/AtoBOk.java index f0ea357288..48d3d56cae 100644 --- a/plugins/core-plugin/src/test/java/com/google/cloud/teleport/plugin/sample/AtoBOk.java +++ b/plugins/core-plugin/src/test/java/com/google/cloud/teleport/plugin/sample/AtoBOk.java @@ -110,7 +110,7 @@ public interface AtoBOptions { @TemplateParameter.KafkaTopic( order = 10, description = "Kafka input topic", - helpText = "Kafka topic to trad from", + helpText = "Kafka topic to read from", example = "projects/project-foo/locations/us-central1/clusters/cluster-bar/topics/topic-baz") String getInputKafkaTopic();