Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update APIs to obtain the schema registry client as a separate parameter #5

Merged
merged 7 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,17 @@ import ballerinax/confluent.cavroserdes;
### Step 2: Instantiate a new connector

```ballerina
configurable string baseUrl = ?;
configurable int identityMapCapacity = ?;
configurable map<anydata> originals = ?;
configurable map<string> headers = ?;

cavroserdes:Client avroSerDes = check new ({
baseUrl,
identityMapCapacity,
originals,
headers
});
cavroserdes:Client avroSerDes = new;
```

### Step 3: Invoke the connector operation

You can now utilize the operations available within the connector.
You can now utilize the operations available within the connector. To instantiate a `cregistry:Client` instance refer to the guidelines [here](https://central.ballerina.io/ballerinax/confluent.cregistry/latest).

```ballerina
public function main() returns error? {
cregistry:Client registry = ; // instantiate a schema registry client

string schema = string `
{
"type": "int",
Expand All @@ -53,8 +45,8 @@ public function main() returns error? {
}`;

int value = 1;
byte[] bytes = check avroSerDes->serialize(schema, value, "subject");
int number = check avroSerDes->deserialize(bytes);
byte[] bytes = check avroSerDes->serialize(registry, schema, value, "subject");
Nuvindu marked this conversation as resolved.
Show resolved Hide resolved
int number = check avroSerDes->deserialize(registry, bytes);
}
```

Expand All @@ -66,6 +58,14 @@ Use the following command to compile and run the Ballerina program.
bal run
```

## Examples

The Ballerina Avro Serializer/Deserializer connector for Confluent Schema Registry provides practical examples illustrating usage in various scenarios. Explore these [examples](https://github.com/ballerina-platform/module-ballerinax-confluent.cavroserdes/tree/main/examples).

1. [Kafka Avro producer](https://github.com/ballerina-platform/module-ballerinax-confluent.cavroserdes/tree/main/examples/kafka-avro-producer) - This example demonstrates how to publish Avro serialized data to a Kafka topic.

2. [Kafka Avro consumer](https://github.com/ballerina-platform/module-ballerinax-confluent.cavroserdes/tree/main/examples/kafka-avro-consumer) - This guide demonstrates how to consume data in the correct format according to the Avro schema from a Kafka topic.

## Issues and projects

The **Issues** and **Projects** tabs are disabled for this repository as this is part of the Ballerina library. To report bugs, request new features, start new discussions, view project boards, etc., visit the Ballerina library [parent repository](https://github.com/ballerina-platform/ballerina-library).
Expand Down
28 changes: 14 additions & 14 deletions ballerina/Module.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,17 @@ import ballerinax/confluent.cavroserdes;
### Step 2: Instantiate a new connector

```ballerina
configurable string baseUrl = ?;
configurable int identityMapCapacity = ?;
configurable map<anydata> originals = ?;
configurable map<string> headers = ?;

cavroserdes:Client avroSerDes = check new ({
baseUrl,
identityMapCapacity,
originals,
headers
});
cavroserdes:Client avroSerDes = new;
```

### Step 3: Invoke the connector operation

You can now utilize the operations available within the connector.
You can now utilize the operations available within the connector. To instantiate a `cregistry:Client` instance refer to the guidelines [here](https://central.ballerina.io/ballerinax/confluent.cregistry/latest).

```ballerina
public function main() returns error? {
cregistry:Client registry = ; // instantiate a schema registry client

string schema = string `
{
"type": "int",
Expand All @@ -46,8 +38,8 @@ public function main() returns error? {
}`;

int value = 1;
byte[] bytes = check avroSerDes->serialize(schema, value, "subject");
int number = check avroSerDes->deserialize(bytes);
byte[] bytes = check avroSerDes->serialize(registry, schema, value, "subject");
int number = check avroSerDes->deserialize(registry, bytes);
Nuvindu marked this conversation as resolved.
Show resolved Hide resolved
}
```

Expand All @@ -58,3 +50,11 @@ Use the following command to compile and run the Ballerina program.
```bash
bal run
```

## Examples

The Ballerina Avro Serializer/Deserializer connector for Confluent Schema Registry provides practical examples illustrating usage in various scenarios. Explore these [examples](https://github.com/ballerina-platform/module-ballerinax-confluent.cavroserdes/tree/main/examples).

1. [Kafka Avro producer](https://github.com/ballerina-platform/module-ballerinax-confluent.cavroserdes/tree/main/examples/kafka-avro-producer) - This example demonstrates how to publish Avro serialized data to a Kafka topic.

2. [Kafka Avro consumer](https://github.com/ballerina-platform/module-ballerinax-confluent.cavroserdes/tree/main/examples/kafka-avro-consumer) - This guide demonstrates how to consume data in the correct format according to the Avro schema from a Kafka topic.
28 changes: 14 additions & 14 deletions ballerina/Package.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,17 @@ import ballerinax/confluent.cavroserdes;
### Step 2: Instantiate a new connector

```ballerina
configurable string baseUrl = ?;
configurable int identityMapCapacity = ?;
configurable map<anydata> originals = ?;
configurable map<string> headers = ?;

cavroserdes:Client avroSerDes = check new ({
baseUrl,
identityMapCapacity,
originals,
headers
});
cavroserdes:Client avroSerDes = new;
```

### Step 3: Invoke the connector operation

You can now utilize the operations available within the connector.
You can now utilize the operations available within the connector. To instantiate a `cregistry:Client` instance refer to the guidelines [here](https://central.ballerina.io/ballerinax/confluent.cregistry/latest).

```ballerina
public function main() returns error? {
cregistry:Client registry = ; // instantiate a schema registry client

string schema = string `
{
"type": "int",
Expand All @@ -46,8 +38,8 @@ public function main() returns error? {
}`;

int value = 1;
byte[] bytes = check avroSerDes->serialize(schema, value, "subject");
int number = check avroSerDes->deserialize(bytes);
byte[] bytes = check avroSerDes->serialize(registry, schema, value, "subject");
int number = check avroSerDes->deserialize(registry, bytes);
}
```

Expand All @@ -58,3 +50,11 @@ Use the following command to compile and run the Ballerina program.
```bash
bal run
```

## Examples

The Ballerina Avro Serializer/Deserializer connector for Confluent Schema Registry provides practical examples illustrating usage in various scenarios. Explore these [examples](https://github.com/ballerina-platform/module-ballerinax-confluent.cavroserdes/tree/main/examples).

1. [Kafka Avro producer](https://github.com/ballerina-platform/module-ballerinax-confluent.cavroserdes/tree/main/examples/kafka-avro-producer) - This example demonstrates how to publish Avro serialized data to a Kafka topic.

2. [Kafka Avro consumer](https://github.com/ballerina-platform/module-ballerinax-confluent.cavroserdes/tree/main/examples/kafka-avro-consumer) - This guide demonstrates how to consume data in the correct format according to the Avro schema from a Kafka topic.
22 changes: 8 additions & 14 deletions ballerina/client.bal
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,18 @@ import ballerinax/confluent.cregistry;

# Consists of APIs to integrate with Avro Serializer/Deserializer for Confluent Schema Registry.
public isolated client class Client {
private final cregistry:Client schemaClient;

public isolated function init(*cregistry:ConnectionConfig config) returns Error? {
cregistry:Client|error schemaClient = new (config);
if schemaClient is error {
return error Error("Client invocation error", schemaClient);
}
self.schemaClient = schemaClient;
}

# Serializes the given data according to the Avro format and registers the schema into the schema registry.
#
# + registry - The schema registry client
# + schema - The Avro schema
# + data - The data to be serialized according to the schema
# + subject - The subject under which the schema should be registered
# + return - A `byte` array of the serialized data or else an `cavroserdes:Error`
remote isolated function serialize(string schema, anydata data, string subject) returns byte[]|Error {
remote isolated function serialize(cregistry:Client registry, string schema, anydata data,
string subject) returns byte[]|Error {
do {
int id = check self.schemaClient->register(subject, schema);
int id = check registry->register(subject, schema);
byte[] encodedId = check toBytes(id);
avro:Schema avroClient = check new (schema);
byte[] serializedData = check avroClient.toAvro(data);
Expand All @@ -51,18 +44,19 @@ public isolated client class Client {
# Deserializes the given Avro serialized message to the given data type by retrieving the schema
# from the schema registry.
#
# + registry - The schema registry client
# + data - Avro serialized data which includes the schema id
# + targetType - Default parameter use to infer the user specified type
# + return - A deserialized data with the given type or else an `cavroserdes:Error`
remote isolated function deserialize(byte[] data, typedesc<anydata> targetType = <>)
remote isolated function deserialize(cregistry:Client registry, byte[] data, typedesc<anydata> targetType = <>)
returns targetType|Error = @java:Method {
'class: "io.ballerina.lib.confluent.avro.serdes.AvroSerializer"
} external;

isolated function deserializeData(byte[] data) returns anydata|Error {
isolated function deserializeData(cregistry:Client registry, byte[] data) returns anydata|Error {
do {
int schemaId = getId(data.slice(1, 5));
string retrievedSchema = check self.schemaClient->getSchemaById(schemaId);
string retrievedSchema = check registry->getSchemaById(schemaId);
avro:Schema avroClient = check new (retrievedSchema);
anydata deserializedData = check avroClient.fromAvro(data.slice(5, data.length()));
return deserializedData;
Expand Down
22 changes: 12 additions & 10 deletions ballerina/tests/tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
// under the License.

import ballerina/test;
import ballerinax/confluent.cregistry;

configurable string baseUrl = ?;
configurable int identityMapCapacity = ?;
configurable map<anydata> originals = ?;
configurable map<string> headers = ?;

Client avroSerDes = check new ({
Client avroSerDes = new;

cregistry:Client regsitry = check new ({
baseUrl,
identityMapCapacity,
originals,
Expand All @@ -45,9 +48,8 @@ public function testSerDes() returns error? {
name: "Red",
colors: ["maroon", "dark red", "light red"]
};

byte[] bytes = check avroSerDes->serialize(schema, colors, "subject-0");
Color getColors = check avroSerDes->deserialize(bytes);
byte[] bytes = check avroSerDes->serialize(regsitry, schema, colors, "subject-0");
Color getColors = check avroSerDes->deserialize(regsitry, bytes);
test:assertEquals(getColors, colors);
}

Expand All @@ -69,8 +71,8 @@ public function testWithRecords() returns error? {
subject: "Math"
};

byte[] bytes = check avroSerDes->serialize(schema, student, "subject-1");
Student getStudent = check avroSerDes->deserialize(bytes);
byte[] bytes = check avroSerDes->serialize(regsitry, schema, student, "subject-1");
Student getStudent = check avroSerDes->deserialize(regsitry, bytes);
test:assertEquals(getStudent, student);
}

Expand All @@ -85,8 +87,8 @@ public function testSerDesWithInteger() returns error? {

int value = 1;

byte[] bytes = check avroSerDes->serialize(schema, value, "subject-5");
int getValue = check avroSerDes->deserialize(bytes);
byte[] bytes = check avroSerDes->serialize(regsitry, schema, value, "subject-5");
int getValue = check avroSerDes->deserialize(regsitry, bytes);
test:assertEquals(getValue, value);
}

Expand All @@ -108,8 +110,8 @@ public function testSerDesWithCourse() returns error? {
credits: 3
};

byte[] bytes = check avroSerDes->serialize(schema, course, "subject-3");
Course getCourse = check avroSerDes->deserialize(bytes);
byte[] bytes = check avroSerDes->serialize(regsitry, schema, course, "subject-3");
Course getCourse = check avroSerDes->deserialize(regsitry, bytes);
test:assertEquals(getCourse, course);
}

Expand Down
9 changes: 5 additions & 4 deletions examples/kafka-avro-consumer/main.bal
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import ballerina/io;
import ballerinax/confluent.cavroserdes;
import ballerinax/confluent.cregistry;
import ballerinax/kafka;

type Order readonly & record {
Expand All @@ -34,17 +35,17 @@ public function main() returns error? {
topics: "test-topic"
});

cavroserdes:Client registry = check new ({
cregistry:Client registry = check new ({
baseUrl,
identityMapCapacity,
originals,
headers
});

cavroserdes:Client avroSerDes = new;
while true {
kafka:AnydataConsumerRecord[] getValues = check orderConsumer->poll(60);
byte[] orderData = <byte[]>getValues[0].value;
Order getOrder = check registry->deserialize(orderData);
byte[][] getValues = check orderConsumer->pollPayload(60);
Order getOrder = check avroSerDes->deserialize(registry, getValues[0]);
io:println("Order : ", getOrder);
}
}
6 changes: 4 additions & 2 deletions examples/kafka-avro-producer/main.bal
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import ballerina/http;
import ballerinax/confluent.cavroserdes;
import ballerinax/confluent.cregistry;
import ballerinax/kafka;

configurable string baseUrl = ?;
Expand All @@ -29,7 +30,7 @@ type Order readonly & record {

service / on new http:Listener(9090) {
private final kafka:Producer orderProducer;
private final cavroserdes:Client registry;
private final cregistry:Client registry;

function init() returns error? {
self.orderProducer = check new (kafka:DEFAULT_URL);
Expand All @@ -51,7 +52,8 @@ service / on new http:Listener(9090) {
{"name": "productName", "type": "string"}
]
}`;
byte[] byteValue = check self.registry->serialize(schema, newOrder, "new-subject");
cavroserdes:Client avroSerDes = new;
byte[] byteValue = check avroSerDes->serialize(self.registry, schema, newOrder, "new-subject");
check self.orderProducer->send({
topic: "test-topic",
value: byteValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ public class AvroSerializer {
getModule().getMajorVersion(),
DESERIALIZE_FUNCTION);

public static Object deserialize(Environment env, BObject kafkaSerDes, BArray data,
public static Object deserialize(Environment env, BObject kafkaSerDes, BObject registry, BArray data,
BTypedesc typeDesc) {
Future future = env.markAsync();
ExecutionCallback executionCallback = new ExecutionCallback(future, typeDesc);
Object[] arguments = new Object[]{data, true};
Object[] arguments = new Object[]{registry, true, data, true};
UnionType typeUnion = TypeCreator.createUnionType(PredefinedTypes.TYPE_ANYDATA_ARRAY,
PredefinedTypes.TYPE_ERROR);
env.getRuntime()
Expand Down
Loading