Skip to content

Commit

Permalink
More refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
sudeshwasnik committed Mar 13, 2023
1 parent 46b8cc1 commit de4b5e5
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 143 deletions.
59 changes: 13 additions & 46 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand Down Expand Up @@ -203,7 +202,6 @@
<version>${snakeyaml.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
Expand Down Expand Up @@ -235,8 +233,6 @@
<version>${confluent.version}</version>
<scope>test</scope>
</dependency>


</dependencies>

<build>
Expand All @@ -257,35 +253,18 @@
</goals>
<configuration>
<title>Kafka Connect Elasticsearch</title>
<documentationUrl>https://docs.confluent.io/kafka-connect-elasticsearch/current/index.html
</documentationUrl>
<documentationUrl>https://docs.confluent.io/kafka-connect-elasticsearch/current/index.html</documentationUrl>
<description>
The Elasticsearch connector allows moving data from Kafka to Elasticsearch 2.x, 5.x,
6.x, and 7.x. It writes data from a topic in Kafka to an index in Elasticsearch and all
data for a topic have the same type.
The Elasticsearch connector allows moving data from Kafka to Elasticsearch 2.x, 5.x, 6.x, and 7.x. It writes data from a topic in Kafka to an index in Elasticsearch and all data for a topic have the same type.

Elasticsearch is often used for text queries, analytics and as an key-value store (use
cases). The connector covers both the analytics and key-value store use cases. For the
analytics use case, each message is in Kafka is treated as an event and the connector
uses topic+partition+offset as a unique identifier for events, which then converted to
unique documents in Elasticsearch. For the key-value store use case, it supports using
keys from Kafka messages as document ids in Elasticsearch and provides configurations
ensuring that updates to a key are written to Elasticsearch in order. For both use
cases, Elasticsearch’s idempotent write semantics guarantees exactly once delivery.
Elasticsearch is often used for text queries, analytics and as an key-value store (use cases). The connector covers both the analytics and key-value store use cases. For the analytics use case, each message is in Kafka is treated as an event and the connector uses topic+partition+offset as a unique identifier for events, which then converted to unique documents in Elasticsearch. For the key-value store use case, it supports using keys from Kafka messages as document ids in Elasticsearch and provides configurations ensuring that updates to a key are written to Elasticsearch in order. For both use cases, Elasticsearch’s idempotent write semantics guarantees exactly once delivery.

Mapping is the process of defining how a document, and the fields it contains, are
stored and indexed. Users can explicitly define mappings for types in indices. When a
mapping is not explicitly defined, Elasticsearch can determine field names and types
from data, however, some types such as timestamp and decimal, may not be correctly
inferred. To ensure that the types are correctly inferred, the connector provides a
feature to infer a mapping from the schemas of Kafka messages.
Mapping is the process of defining how a document, and the fields it contains, are stored and indexed. Users can explicitly define mappings for types in indices. When a mapping is not explicitly defined, Elasticsearch can determine field names and types from data, however, some types such as timestamp and decimal, may not be correctly inferred. To ensure that the types are correctly inferred, the connector provides a feature to infer a mapping from the schemas of Kafka messages.
</description>
<logo>logos/elasticsearch.jpg</logo>

<supportProviderName>Confluent, Inc.</supportProviderName>
<supportSummary>Confluent supports the Elasticsearch sink connector alongside community
members as part of its Confluent Platform offering.
</supportSummary>
<supportSummary>Confluent supports the Elasticsearch sink connector alongside community members as part of its Confluent Platform offering.</supportSummary>
<supportUrl>https://docs.confluent.io/home/overview.html</supportUrl>
<supportLogo>logos/confluent.png</supportLogo>

Expand Down Expand Up @@ -354,9 +333,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>@{argLine} -Djava.awt.headless=true -Dtests.security.manager=false
-Dtests.jarhell.check=false
</argLine>
<argLine>@{argLine} -Djava.awt.headless=true -Dtests.security.manager=false -Dtests.jarhell.check=false</argLine>
</configuration>
</plugin>
<plugin>
Expand Down Expand Up @@ -449,20 +426,12 @@
<mainClass>io.confluent.licenses.LicenseFinder</mainClass>
<arguments>
<!-- Note use of development instead of package so we pick up all dependencies. -->
<argument>-i
${project.build.directory}/${project.build.finalName}-package/share/java/kafka-connect-elasticsearch
</argument>
<argument>-i ${project.build.directory}/${project.build.finalName}-package/share/java/kafka-connect-elasticsearch</argument>
<argument>-o ${project.basedir}/licenses</argument>
<argument>-f</argument>
<argument>-h
${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-elasticsearch/licenses.html
</argument>
<argument>-l
${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-elasticsearch/licenses
</argument>
<argument>-n
${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-elasticsearch/notices
</argument>
<argument>-h ${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-elasticsearch/licenses.html</argument>
<argument>-l ${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-elasticsearch/licenses</argument>
<argument>-n ${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-elasticsearch/notices</argument>
<argument>-t ${project.name}</argument>
<argument>-x licenses-${confluent.version}.jar</argument>
</arguments>
Expand Down Expand Up @@ -507,9 +476,7 @@
<mainClass>io.confluent.licenses.LicenseFinder</mainClass>
<arguments>
<!-- Note use of development instead of package so we pick up all dependencies. -->
<argument>-i
${project.build.directory}/${project.build.finalName}-development/share/java/kafka-connect-elasticsearch
</argument>
<argument>-i ${project.build.directory}/${project.build.finalName}-development/share/java/kafka-connect-elasticsearch</argument>
<argument>-o ${project.basedir}/licenses</argument>
<argument>-f</argument>
<argument>-h ${project.basedir}/licenses.html</argument>
Expand Down Expand Up @@ -561,4 +528,4 @@
</build>
</profile>
</profiles>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -77,26 +77,6 @@ protected long waitForConnectorToStart(String name, int numTasks) throws Interru
return System.currentTimeMillis();
}

protected void startSchemaRegistry() throws Exception {
int port = findAvailableOpenPort();
restApp = new RestApp(port, null, connect.kafka().bootstrapServers(),
KAFKASTORE_TOPIC, CompatibilityLevel.NONE.name, true, new Properties());
restApp.start();
waitForSchemaRegistryToStart();
}

protected void stopSchemaRegistry() throws Exception {
restApp.stop();
}

protected void waitForSchemaRegistryToStart() throws InterruptedException {
TestUtils.waitForCondition(
() -> restApp.restServer.isRunning(),
CONNECTOR_STARTUP_DURATION_MS,
"Connector tasks did not start in time."
);
}

/**
* Confirm that a connector with an exact number of tasks is running.
*
Expand All @@ -117,10 +97,4 @@ protected Optional<Boolean> assertConnectorAndTasksRunning(String connectorName,
return Optional.empty();
}
}

private Integer findAvailableOpenPort() throws IOException {
try (ServerSocket socket = new ServerSocket(0)) {
return socket.getLocalPort();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class ElasticsearchConnectorBaseIT extends BaseConnectorIT {
public abstract class ElasticsearchConnectorBaseIT extends BaseConnectorIT {

protected static final int NUM_RECORDS = 5;
protected static final int TASKS_MAX = 1;
Expand Down Expand Up @@ -132,18 +132,6 @@ protected void runSimpleTest(Map<String, String> props) throws Exception {
verifySearchResults(NUM_RECORDS);
}

protected void runSimpleSRTest(Map<String, String> props, Converter converter) throws Exception {
// start the connector
connect.configureConnector(CONNECTOR_NAME, props);

// wait for tasks to spin up
waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);

writeRecordsFromIndexSR(0, NUM_RECORDS, converter);

verifySearchResults(NUM_RECORDS);
}

protected void writeRecords(int numRecords) {
writeRecordsFromIndex(0, numRecords);
}
Expand All @@ -154,59 +142,6 @@ protected void writeRecordsFromIndex(int start, int numRecords) {
}
}

protected void writeRecordsFromIndexSR(int start, int numRecords, Converter converter) {
// get defined schema for the test
Schema schema = getRecordSchema();

// configure producer with default properties
KafkaProducer<byte[], byte[]> producer = configureProducer();

List<Struct> recordsList = getRecords(schema, start, numRecords);

// produce records into topic
produceRecords(producer, converter, schema, recordsList, TOPIC);
}

private List<Struct> getRecords(Schema schema, int start, int numRecords) {
List<Struct> recordList = new ArrayList<>();
for (int i = start; i < start + numRecords; i++) {
Struct struct = new Struct(schema);
struct.put("doc_num", i);
recordList.add(struct);
}
return recordList;
}

private void produceRecords(
KafkaProducer<byte[], byte[]> producer,
Converter converter,
Schema schema,
List<Struct> recordsList,
String topic
) {
for (int i = 0; i < recordsList.size(); i++) {
byte[] convertedStruct = converter.fromConnectData(topic, schema, recordsList.get(i));
ProducerRecord<byte[], byte[]> msg = new ProducerRecord<>(topic, 0, String.valueOf(i).getBytes(), convertedStruct);
try {
producer.send(msg).get(TimeUnit.SECONDS.toMillis(120), TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new KafkaException("Could not produce message: " + msg, e);
}
}
}

private KafkaProducer<byte[], byte[]> configureProducer() {
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, connect.kafka().bootstrapServers());
return new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer());
}

private Schema getRecordSchema() {
SchemaBuilder schemaBuilder = SchemaBuilder.struct();
schemaBuilder.field("doc_num", Schema.INT32_SCHEMA);
return schemaBuilder.build();
}

protected void verifySearchResults(int numRecords) throws Exception {
waitForRecords(numRecords);

Expand Down Expand Up @@ -240,4 +175,5 @@ protected void waitForRecords(int numRecords) throws InterruptedException {
"Sufficient amount of document were not found in ES on time."
);
}

}

0 comments on commit de4b5e5

Please sign in to comment.