Skip to content

Commit

Permalink
KREST-4467 First part of reworking produce tests (#1157)
Browse files Browse the repository at this point in the history
* KREST-4467 First part of reworking produce tests
  • Loading branch information
AndrewJSchofield committed May 2, 2023
1 parent ceb03db commit dcc9a56
Show file tree
Hide file tree
Showing 4 changed files with 1,337 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import io.confluent.kafka.schemaregistry.avro.AvroCompatibilityLevel;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.schemaregistry.CompatibilityLevel;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
Expand All @@ -35,9 +36,11 @@
import java.net.ServerSocket;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -66,6 +69,10 @@
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -75,6 +82,7 @@
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.eclipse.jetty.server.Server;
import org.glassfish.jersey.apache.connector.ApacheConnectorProvider;
import org.glassfish.jersey.client.ClientConfig;
Expand Down Expand Up @@ -139,7 +147,7 @@ public static int choosePort() {
protected String plaintextBrokerList = null;

// Schema registry config
protected String schemaRegCompatibility = AvroCompatibilityLevel.NONE.name;
protected String schemaRegCompatibility = CompatibilityLevel.NONE.name;
protected Properties schemaRegProperties = null;
protected String schemaRegConnect = null;
protected SchemaRegistryRestApplication schemaRegApp = null;
Expand Down Expand Up @@ -201,12 +209,13 @@ private void setupMethod() throws Exception {
if (withSchemaRegistry) {
int schemaRegPort = choosePort();
schemaRegProperties.put(
SchemaRegistryConfig.PORT_CONFIG, ((Integer) schemaRegPort).toString());
schemaRegProperties.put(SchemaRegistryConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect);
SchemaRegistryConfig.LISTENERS_CONFIG,
String.format("http://127.0.0.1:%d", schemaRegPort));
schemaRegProperties.put(
SchemaRegistryConfig.KAFKASTORE_TOPIC_CONFIG,
SchemaRegistryConfig.DEFAULT_KAFKASTORE_TOPIC);
schemaRegProperties.put(SchemaRegistryConfig.COMPATIBILITY_CONFIG, schemaRegCompatibility);
schemaRegProperties.put(
SchemaRegistryConfig.SCHEMA_COMPATIBILITY_CONFIG, schemaRegCompatibility);
String broker =
SecurityProtocol.PLAINTEXT.name
+ "://"
Expand Down Expand Up @@ -237,6 +246,8 @@ private void setupMethod() throws Exception {
// Reduce the metadata fetch timeout so requests for topics that don't exist timeout much
// faster than the default
restProperties.put("producer." + ProducerConfig.MAX_BLOCK_MS_CONFIG, "5000");
restProperties.put(
"producer." + ProducerConfig.MAX_REQUEST_SIZE_CONFIG, String.valueOf((2 << 20) * 10));

restConfig = new KafkaRestConfig(restProperties);

Expand Down Expand Up @@ -340,6 +351,7 @@ protected Properties getBrokerProperties(int i) {
(short) 1,
false);
props.setProperty("auto.create.topics.enable", "false");
props.setProperty("message.max.bytes", String.valueOf((2 << 20) * 10));
// We *must* override this to use the port we allocated (Kafka currently allocates one port
// that it always uses for ZK
props.setProperty("zookeeper.connect", this.zkConnect);
Expand Down Expand Up @@ -717,6 +729,63 @@ private <T> void doProduce(
});
}

protected final <K, V> ConsumerRecord<K, V> getMessage(
String topic,
int partition,
long offset,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {

Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

KafkaConsumer<K, V> consumer = new KafkaConsumer<>(props, keyDeserializer, valueDeserializer);
TopicPartition tp = new TopicPartition(topic, partition);
consumer.assign(Collections.singleton(tp));
consumer.seek(tp, offset);

ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(60));
consumer.close();

return records.isEmpty() ? null : records.records(tp).get(0);
}

protected final <K, V> ConsumerRecords<K, V> getMessages(
String topic,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
int messageCount) {

List<ConsumerRecord<K, V>> accumulator = new ArrayList<>(messageCount);
int numMessages = 0;

Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

KafkaConsumer<K, V> consumer = new KafkaConsumer<>(props, keyDeserializer, valueDeserializer);
TopicPartition tp = new TopicPartition(topic, 0);
consumer.assign(Collections.singleton(tp));
consumer.seekToBeginning(Collections.singleton(tp));

ConsumerRecords<K, V> records;
while (numMessages < messageCount) {
records = consumer.poll(Duration.ofSeconds(60));
Iterator<ConsumerRecord<K, V>> it = records.iterator();
while (it.hasNext() && (numMessages < messageCount)) {
ConsumerRecord<K, V> rec = it.next();
accumulator.add(rec);
numMessages++;
}
}
consumer.close();

return new ConsumerRecords<>(Collections.singletonMap(tp, accumulator));
}

protected ObjectMapper getObjectMapper() {
return restApp.getJsonMapper();
}

protected Map<Integer, List<Integer>> createAssignment(
List<Integer> replicaIds, int numReplicas) {
Map<Integer, List<Integer>> replicaAssignments = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import io.confluent.kafkarest.entities.v2.SchemaConsumerRecord;
import io.confluent.kafkarest.entities.v2.SchemaTopicProduceRequest;
import io.confluent.kafkarest.entities.v2.SchemaTopicProduceRequest.SchemaTopicProduceRecord;
import io.confluent.kafkarest.testing.DefaultKafkaRestTestEnvironment;
import io.confluent.kafkarest.integration.ClusterTestHarness;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -44,22 +44,18 @@
import javax.ws.rs.core.Response.Status;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag("IntegrationTest")
public abstract class SchemaProduceConsumeTest {
public abstract class SchemaProduceConsumeTest extends ClusterTestHarness {

private static final String TOPIC = "topic-1";

private static final String CONSUMER_GROUP = "group-1";

private static final Logger log = LoggerFactory.getLogger(SchemaProduceConsumeTest.class);

@RegisterExtension
public final DefaultKafkaRestTestEnvironment testEnv = new DefaultKafkaRestTestEnvironment();

protected abstract EmbeddedFormat getFormat();

protected abstract String getContentType();
Expand All @@ -70,18 +66,16 @@ public abstract class SchemaProduceConsumeTest {

protected abstract List<SchemaTopicProduceRecord> getProduceRecords();

public SchemaProduceConsumeTest() {
super(/* numBrokers= */ 1, /* withSchemaRegistry= */ true);
}

@Test
public void produceThenConsume_returnsExactlyProduced() throws Exception {
testEnv
.kafkaCluster()
.createTopic(TOPIC, /* numPartitions= */ 1, /* replicationFactor= */ (short) 3);
createTopic(TOPIC, /* numPartitions= */ 1, /* replicationFactor= */ (short) 1);

Response createConsumerInstanceResponse =
testEnv
.kafkaRest()
.target()
.path(String.format("/consumers/%s", CONSUMER_GROUP))
.request()
request(String.format("/consumers/%s", CONSUMER_GROUP))
.post(
Entity.entity(
new CreateConsumerInstanceRequest(
Expand All @@ -100,14 +94,10 @@ public void produceThenConsume_returnsExactlyProduced() throws Exception {
createConsumerInstanceResponse.readEntity(CreateConsumerInstanceResponse.class);

Response subscribeResponse =
testEnv
.kafkaRest()
.target()
.path(
request(
String.format(
"/consumers/%s/instances/%s/subscription",
CONSUMER_GROUP, createConsumerInstance.getInstanceId()))
.request()
.post(
Entity.entity(
new ConsumerSubscriptionRecord(singletonList(TOPIC), null),
Expand All @@ -116,14 +106,10 @@ public void produceThenConsume_returnsExactlyProduced() throws Exception {
assertEquals(Status.NO_CONTENT.getStatusCode(), subscribeResponse.getStatus());

// Needs to consume empty once before producing.
testEnv
.kafkaRest()
.target()
.path(
request(
String.format(
"/consumers/%s/instances/%s/records",
CONSUMER_GROUP, createConsumerInstance.getInstanceId()))
.request()
.accept(getContentType())
.get();

Expand All @@ -136,11 +122,7 @@ public void produceThenConsume_returnsExactlyProduced() throws Exception {
null);

Response genericResponse =
testEnv
.kafkaRest()
.target()
.path(String.format("/topics/%s", TOPIC))
.request()
request(String.format("/topics/%s", TOPIC))
.post(Entity.entity(produceRequest, getContentType()));

ProduceResponse produceResponse;
Expand All @@ -162,14 +144,10 @@ public void produceThenConsume_returnsExactlyProduced() throws Exception {
assertEquals(Status.OK, produceResponse.getRequestStatus());

Response readRecordsResponse =
testEnv
.kafkaRest()
.target()
.path(
request(
String.format(
"/consumers/%s/instances/%s/records",
CONSUMER_GROUP, createConsumerInstance.getInstanceId()))
.request()
.accept(getContentType())
.get();

Expand Down

0 comments on commit dcc9a56

Please sign in to comment.