Skip to content

Commit

Permalink
Add schema-registry ITs
Browse files Browse the repository at this point in the history
  • Loading branch information
sudeshwasnik committed Mar 14, 2023
1 parent 7671439 commit 4756a11
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 36 deletions.
33 changes: 32 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,37 @@
<version>${snakeyaml.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>${confluent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-protobuf-converter</artifactId>
<version>${confluent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-json-schema-converter</artifactId>
<version>${confluent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>${confluent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>${confluent.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -497,4 +528,4 @@
</build>
</profile>
</profiles>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import io.confluent.kafka.schemaregistry.RestApp;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
Expand All @@ -36,6 +37,7 @@ public abstract class BaseConnectorIT {
protected static final long CONNECTOR_STARTUP_DURATION_MS = TimeUnit.MINUTES.toMillis(60);

protected EmbeddedConnectCluster connect;
protected RestApp restApp;

protected void startConnect() {
connect = new EmbeddedConnectCluster.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,23 @@

package io.confluent.connect.elasticsearch.integration;

import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.confluent.connect.elasticsearch.ElasticsearchClient;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnector;
import io.confluent.connect.elasticsearch.jest.JestElasticsearchClient;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG;
Expand All @@ -28,22 +45,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.confluent.connect.elasticsearch.ElasticsearchClient;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnector;
import io.confluent.connect.elasticsearch.jest.JestElasticsearchClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;

public class ElasticsearchConnectorBaseIT extends BaseConnectorIT {
public abstract class ElasticsearchConnectorBaseIT extends BaseConnectorIT {

protected static final int NUM_RECORDS = 5;
protected static final int TASKS_MAX = 1;
Expand All @@ -56,13 +58,19 @@ public class ElasticsearchConnectorBaseIT extends BaseConnectorIT {
protected ElasticsearchClient client;
protected Map<String, String> props;

@BeforeClass
public static void setupBeforeAll() {
container = ElasticsearchContainer.fromSystemProperties();
container.start();
}

@AfterClass
public static void cleanupAfterAll() {
container.close();
}

@Before
public void setup() {
public void setup() throws Exception {
startConnect();
connect.kafka().createTopic(TOPIC);

Expand All @@ -71,7 +79,7 @@ public void setup() {
}

@After
public void cleanup() throws IOException {
public void cleanup() throws Exception {
stopConnect();
client.deleteAll();
client.close();
Expand All @@ -85,7 +93,7 @@ protected Map<String, String> createProps() {
Map<String, String> props = new HashMap<>();

// generic configs
props.put(CONNECTOR_CLASS_CONFIG, ElasticsearchSinkConnector.class.getName());
props.put(CONNECTOR_CLASS_CONFIG, ElasticsearchSinkConnector.class.getSimpleName());
props.put(TOPICS_CONFIG, TOPIC);
props.put(TASKS_MAX_CONFIG, Integer.toString(TASKS_MAX));
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
Expand All @@ -107,7 +115,6 @@ protected void runSimpleTest(Map<String, String> props) throws Exception {

// wait for tasks to spin up
waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);

writeRecords(NUM_RECORDS);

verifySearchResults(NUM_RECORDS);
Expand All @@ -118,7 +125,7 @@ protected void writeRecords(int numRecords) {
}

protected void writeRecordsFromIndex(int start, int numRecords) {
for (int i = start; i < start + numRecords; i++) {
for (int i = start; i < start + numRecords; i++) {
connect.kafka().produce(TOPIC, String.valueOf(i), String.format("{\"doc_num\":%d}", i));
}
}
Expand Down Expand Up @@ -156,4 +163,5 @@ protected void waitForRecords(int numRecords) throws InterruptedException {
"Sufficient amount of document were not found in ES on time."
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.elasticsearch.integration;

import io.confluent.connect.avro.AvroConverter;
import io.confluent.connect.json.JsonSchemaConverter;
import io.confluent.connect.protobuf.ProtobufConverter;
import io.confluent.kafka.schemaregistry.CompatibilityLevel;
import io.confluent.kafka.schemaregistry.RestApp;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import static io.confluent.kafka.schemaregistry.ClusterTestHarness.KAFKASTORE_TOPIC;

@RunWith(Parameterized.class)
public class ElasticsearchConnectorDataFormatIT extends ElasticsearchConnectorBaseIT {

protected void startSchemaRegistry() throws Exception {
int port = findAvailableOpenPort();
restApp = new RestApp(port, null, connect.kafka().bootstrapServers(),
KAFKASTORE_TOPIC, CompatibilityLevel.NONE.name, true, new Properties());
restApp.start();
waitForSchemaRegistryToStart();
}

protected void stopSchemaRegistry() throws Exception {
restApp.stop();
}

protected void waitForSchemaRegistryToStart() throws InterruptedException {
TestUtils.waitForCondition(
() -> restApp.restServer.isRunning(),
CONNECTOR_STARTUP_DURATION_MS,
"Schema-registry server did not start in time."
);
}

private Converter converter;
private Class<? extends Converter> converterClass;

@Override
public void setup() throws Exception {
startConnect();
startSchemaRegistry();
connect.kafka().createTopic(TOPIC);

props = createProps();
client = createClient();
}

@Override
public void cleanup() throws Exception {
stopConnect();
stopSchemaRegistry();
client.deleteAll();
client.close();
}

@Parameters
public static List<Class<? extends Converter>> data() {
return Arrays.asList(JsonSchemaConverter.class, ProtobufConverter.class, AvroConverter.class);
}


public ElasticsearchConnectorDataFormatIT(Class<? extends Converter> converter) throws Exception {
this.converterClass = converter;
this.converter = converterClass.getConstructor().newInstance();
}

@Test
public void testHappyPathDataFormat() throws Exception {
// configure configs and converter with schema-registry addr
props.put("value.converter", converterClass.getSimpleName());
props.put("value.converter.schema.registry.url", restApp.restServer.getURI().toString());
props.put("value.converter.scrub.invalid.names", "true");
converter.configure(Collections.singletonMap(
"schema.registry.url", restApp.restServer.getURI().toString()
), false
);

// wait for schema-registry to spin up
waitForSchemaRegistryToStart();

// run test
writeRecords(NUM_RECORDS);
}

@Override
protected void writeRecords(int numRecords) {
writeRecordsFromIndex(0, numRecords, converter);
}

protected void writeRecordsFromIndex(int start, int numRecords, Converter converter) {
// get defined schema for the test
Schema schema = getRecordSchema();

// configure producer with default properties
KafkaProducer<byte[], byte[]> producer = configureProducer();

List<SchemaAndValue> recordsList = getRecords(schema, start, numRecords);

// produce records into topic
produceRecords(producer, converter, recordsList, TOPIC);
}

private Integer findAvailableOpenPort() throws IOException {
try (ServerSocket socket = new ServerSocket(0)) {
return socket.getLocalPort();
}
}

protected List<SchemaAndValue> getRecords(Schema schema, int start, int numRecords) {
List<SchemaAndValue> recordList = new ArrayList<>();
for (int i = start; i < start + numRecords; i++) {
Struct struct = new Struct(schema);
struct.put("doc_num", i);
SchemaAndValue schemaAndValue = new SchemaAndValue(schema, struct);
recordList.add(schemaAndValue);
}
return recordList;
}

protected void produceRecords(
KafkaProducer<byte[], byte[]> producer,
Converter converter,
List<SchemaAndValue> recordsList,
String topic
) {
for (int i = 0; i < recordsList.size(); i++) {
SchemaAndValue schemaAndValue = recordsList.get(i);
byte[] convertedStruct = converter.fromConnectData(topic, schemaAndValue.schema(), schemaAndValue.value());
ProducerRecord<byte[], byte[]> msg = new ProducerRecord<>(topic, 0, String.valueOf(i).getBytes(), convertedStruct);
try {
producer.send(msg).get(TimeUnit.SECONDS.toMillis(120), TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new KafkaException("Could not produce message: " + msg, e);
}
}
}

protected KafkaProducer<byte[], byte[]> configureProducer() {
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, connect.kafka().bootstrapServers());
return new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer());
}

protected Schema getRecordSchema() {
SchemaBuilder schemaBuilder = SchemaBuilder.struct();
schemaBuilder.field("doc_num", Schema.INT32_SCHEMA);
return schemaBuilder.build();
}
}

0 comments on commit 4756a11

Please sign in to comment.