From 03e59855a06dd679e4a854dd3c364377787bf386 Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ushio Date: Thu, 18 Apr 2019 17:09:45 -0700 Subject: [PATCH] initial on goging java implementation for share --- .gitignore | 11 ++ binding-library/java/pom.xml | 119 ++++++++++++++++++ .../kafka/BrokerAuthenticationMode.java | 25 ++++ .../azure/functions/kafka/BrokerProtocol.java | 19 +++ .../azure/functions/kafka/KafkaEventData.java | 33 +++++ .../functions/kafka/annotation/Kafka.java | 115 +++++++++++++++++ .../kafka/annotation/KafkaTrigger.java | 96 ++++++++++++++ 7 files changed, 418 insertions(+) create mode 100644 binding-library/java/pom.xml create mode 100644 binding-library/java/src/main/com/microsoft/azure/functions/kafka/BrokerAuthenticationMode.java create mode 100644 binding-library/java/src/main/com/microsoft/azure/functions/kafka/BrokerProtocol.java create mode 100644 binding-library/java/src/main/com/microsoft/azure/functions/kafka/KafkaEventData.java create mode 100644 binding-library/java/src/main/com/microsoft/azure/functions/kafka/annotation/Kafka.java create mode 100644 binding-library/java/src/main/com/microsoft/azure/functions/kafka/annotation/KafkaTrigger.java diff --git a/.gitignore b/.gitignore index e9f55229..6935aaa5 100644 --- a/.gitignore +++ b/.gitignore @@ -269,3 +269,14 @@ __pycache__/ local.settings.json local.appsettings.tests.json + +# Java output +**/target/* + +# Java IDE +.idea/ +*.iml +.classpath +.project +.settings/ +.checkstyle diff --git a/binding-library/java/pom.xml b/binding-library/java/pom.xml new file mode 100644 index 00000000..0fc3bcbb --- /dev/null +++ b/binding-library/java/pom.xml @@ -0,0 +1,119 @@ + + + 4.0.0 + + com.microsoft.azure.functions + azure-functions-java-library-kafka + 1.0.0 + jar + + Microsoft Azure Functions Java Kafka Types + This package contains all Java interfaces and annotations to interact with Microsoft Azure functions runtime for Kafka binding extensions. + https://github.com/Azure/azure-functions-kafka-extension + + Microsoft Azure + https://azure.microsoft.com + + + + UTF-8 + 3.8.0 + 3.0.1 + 3.0.1 + 1.8 + 1.8 + 1.3.0 + + + + + MIT License + https://opensource.org/licenses/MIT + repo + + + + + scm:git:https://github.com/Azure/azure-functions-kafka-extension + scm:git:git@github.com:Azure/azure-functions-kafka-extension + https://github.com/Azure/azure-functions-kafka-extension + HEAD + + + + + ossrh + Sonatype Snapshots + https://oss.sonatype.org/content/repositories/snapshots/ + true + default + + + + + + TsuyoshiUshio + Tsuyoshi Ushio + tsushi@microsoft.com + + + + + + maven.snapshots + Maven Central Snapshot Repository + https://oss.sonatype.org/content/repositories/snapshots/ + + false + + + true + + + + + + + com.microsoft.azure.functions + azure-functions-java-library + ${azure.functions.java.library.version} + + + + + + + maven-compiler-plugin + ${maven-compiler.version} + + + org.apache.maven.plugins + maven-source-plugin + ${maven-source.version} + + + attach-sources + + jar + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + ${maven-javadoc.version} + + + attach-javadocs + + jar + + + + + + + \ No newline at end of file diff --git a/binding-library/java/src/main/com/microsoft/azure/functions/kafka/BrokerAuthenticationMode.java b/binding-library/java/src/main/com/microsoft/azure/functions/kafka/BrokerAuthenticationMode.java new file mode 100644 index 00000000..27b99460 --- /dev/null +++ b/binding-library/java/src/main/com/microsoft/azure/functions/kafka/BrokerAuthenticationMode.java @@ -0,0 +1,25 @@ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for + * license information. + */ +package main.com.microsoft.azure.functions.kafka; + +/** + * Defines the broker authentication modes + */ +public enum BrokerAuthenticationMode { + NOTSET(-1), + GSSAPI(0), + PLAIN(1), + SCRAMSHA256(2), + SCRAMSHA512(3); + + private final int value; + + BrokerAuthenticationMode(final int value) { + this.value = value; + } + + public int getValue() { return value; } +} diff --git a/binding-library/java/src/main/com/microsoft/azure/functions/kafka/BrokerProtocol.java b/binding-library/java/src/main/com/microsoft/azure/functions/kafka/BrokerProtocol.java new file mode 100644 index 00000000..a3ec01b0 --- /dev/null +++ b/binding-library/java/src/main/com/microsoft/azure/functions/kafka/BrokerProtocol.java @@ -0,0 +1,19 @@ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for + * license information. + */ +package main.com.microsoft.azure.functions.kafka; + +public enum BrokerProtocol { + NOTSET(-1), + PLAINTEXT(0), + SSL(1), + SASLPLAINTEXT(2), + SASLSSL(3); + + private int value; + BrokerProtocol(final int value) { + this.value = value; + } +} diff --git a/binding-library/java/src/main/com/microsoft/azure/functions/kafka/KafkaEventData.java b/binding-library/java/src/main/com/microsoft/azure/functions/kafka/KafkaEventData.java new file mode 100644 index 00000000..e070d448 --- /dev/null +++ b/binding-library/java/src/main/com/microsoft/azure/functions/kafka/KafkaEventData.java @@ -0,0 +1,33 @@ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for + * license information. + */ +package main.com.microsoft.azure.functions.kafka; + +/** + * Kafka Event Data to use with Kafka binding + */ +public class KafkaEventData { + /** + * Constructor + */ + public KafkaEventData() { + + } + + // .NET has other constructor which has an interface called IConsumerResultData. + + public Object key; + + public long offset; + + public int partition; + + public String topic; + + public String timestamp; // TODO this is DateTime in C#. Can we use Date for this? or String? + + public Object value; + +} diff --git a/binding-library/java/src/main/com/microsoft/azure/functions/kafka/annotation/Kafka.java b/binding-library/java/src/main/com/microsoft/azure/functions/kafka/annotation/Kafka.java new file mode 100644 index 00000000..350ce077 --- /dev/null +++ b/binding-library/java/src/main/com/microsoft/azure/functions/kafka/annotation/Kafka.java @@ -0,0 +1,115 @@ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for + * license information. + */ +package main.com.microsoft.azure.functions.kafka.annotation; + +import com.microsoft.azure.functions.annotation.CustomBinding; +import main.com.microsoft.azure.functions.kafka.BrokerAuthenticationMode; +import main.com.microsoft.azure.functions.kafka.BrokerProtocol; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target(ElementType.PARAMETER) +@Retention(RetentionPolicy.RUNTIME) +@CustomBinding(direction = "in", name = "", type = "Kafka") +public @interface Kafka { // TODO Should I name it as KafkaOutput? + /** + * Gets the Topic. + * @return + */ + String topic(); + + /** + * Gets or sets the BrokerList. + */ + String brokerList(); + + /** + * Gets or sets the KeyType + * This method is used internally. Don't pass the value to this method. + */ + String keyType(); // TODO Originally Type type. Should I pass the serialized value for them? + + /** + * Gets or sets the ValueType + * This method is used internally. Don't pass the value to this method. + */ + String valueType(); // TODO Originally Type type. Should I pass the serialized value for them? + + /** + * Gets or sets the Avro schema. + * Json format + * Default: Plain* + */ + String AvroScema(); + + /** + * Gets or sets the Maximum transmit message size. Default: 1MB + */ + int maxMessageBytes(); + + /** + * Maximum number of messages batched in one MessageSet. default: 10000 + */ + int batchSize(); + + /** + * When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. default: false + */ + boolean enableIdempotence(); + + /** + * Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. default: 300000 + */ + int messageTimeoutMs(); + + /** + * The ack timeout of the producer request in milliseconds. default: 5000 + */ + int requestTimeoutMs(); + + /** + * How many times to retry sending a failing Message. **Note:** default: 2 + * Retrying may cause reordering unless EnableIdempotence is set to true. + * @see #enableIdempotence() + */ + int maxRetries(); + + /** + * SASL mechanism to use for authentication. + * Default: PLAIN + */ + BrokerAuthenticationMode authenticationMode(); // TODO double check if it is OK + + /** + * SASL username with the PLAIN and SASL-SCRAM-.. mechanisms + * Default: "" + */ + String username(); + + /** + * SASL password with the PLAIN and SASL-SCRAM-.. mechanisms + * Default is plaintext + * + * security.protocol in librdkafka + */ + String password(); + + /** + * Gets or sets the security protocol used to communicate with brokers + * default is PLAINTEXT + */ + BrokerProtocol protocol(); + + /** + * Path to client's private key (PEM) used for authentication. + * Default "" + * ssl.key.location in librdkafka + */ + String sslKeyLocation(); +} diff --git a/binding-library/java/src/main/com/microsoft/azure/functions/kafka/annotation/KafkaTrigger.java b/binding-library/java/src/main/com/microsoft/azure/functions/kafka/annotation/KafkaTrigger.java new file mode 100644 index 00000000..f509a7d6 --- /dev/null +++ b/binding-library/java/src/main/com/microsoft/azure/functions/kafka/annotation/KafkaTrigger.java @@ -0,0 +1,96 @@ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for + * license information. + */ +package main.com.microsoft.azure.functions.kafka.annotation; + +import com.microsoft.azure.functions.annotation.CustomBinding; +import main.com.microsoft.azure.functions.kafka.BrokerAuthenticationMode; +import main.com.microsoft.azure.functions.kafka.BrokerProtocol; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.ElementType; + + +/** + *

Annotation for Kafka bindings

+ */ +@Target(ElementType.PARAMETER) +@Retention(RetentionPolicy.RUNTIME) +@CustomBinding(direction = "in", name = "", type = "KafkaTrigger") +public @interface KafkaTrigger { + /** + * Gets the Topic. + */ + String topic(); + + /** + * Gets or sets the BrokerList. + */ + String brokerList(); + + /** + * Gets or sets the EventHub connection string when using Kafka protocol header feature of Azure EventHubs. + */ + String eventHubConnectionString(); + + /** + * Gets or sets the consumer group. + */ + String consumerGroup(); + + /** + * Gets or sets the KeyType + * This method is used internally. Don't pass the value to this method. + */ + String keyType(); // TODO Originally Type type. Should I pass the serialized value for them? + + /** + * Gets or sets the ValueType + * This method is used internally. Don't pass the value to this method. + */ + String valueType(); // TODO Originally Type type. Should I pass the serialized value for them? + + /** + * Gets or sets the Avro schema. + * Json format + * Default: Plain* + */ + String avroScema(); + + /** + * SASL mechanism to use for authentication. + * Default: PLAIN + */ + BrokerAuthenticationMode authenticationMode(); // TODO double check if it is OK + + /** + * SASL username with the PLAIN and SASL-SCRAM-.. mechanisms + * Default: "" + */ + String username(); + + /** + * SASL password with the PLAIN and SASL-SCRAM-.. mechanisms + * Default is plaintext + * + * security.protocol in librdkafka + */ + String password(); + + /** + * Gets or sets the security protocol used to communicate with brokers + * default is PLAINTEXT + */ + BrokerProtocol protocol(); + + /** + * Path to client's private key (PEM) used for authentication. + * Default "" + * ssl.key.location in librdkafka + */ + String sslKeyLocation(); +}