Skip to content

Commit

Permalink
test for generic record in pipeline api
Browse files Browse the repository at this point in the history
  • Loading branch information
shultseva committed Dec 6, 2023
1 parent 814d580 commit 5c28b1c
Showing 1 changed file with 32 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,14 @@
import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;

Expand All @@ -49,20 +42,32 @@
import java.util.concurrent.ExecutionException;

import static com.hazelcast.jet.Util.entry;
import static com.hazelcast.jet.kafka.impl.AbstractHazelcastAvroSerde.OPTION_KEY_AVRO_SCHEMA;
import static com.hazelcast.jet.kafka.impl.AbstractHazelcastAvroSerde.OPTION_VALUE_AVRO_SCHEMA;
import static com.hazelcast.jet.pipeline.test.TestSources.items;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import static org.assertj.core.api.Assertions.assertThat;

@Category({QuickTest.class, ParallelJVMTest.class})
public class StreamKafkaAvroTest extends SimpleTestInClusterSupport {

private static final int INITIAL_PARTITION_COUNT = 4;

private static final Schema KEY_SCHEMA = SchemaBuilder.record("schema.key").fields()
.optionalInt("key")
.endRecord();
private static final Schema VALUE_SCHEMA = SchemaBuilder.record("schema.value").fields()
.optionalString("value")
.endRecord();

private static final Map<String, String> AVRO_SCHEMA_PROPERTIES = ImmutableMap.of(
OPTION_VALUE_AVRO_SCHEMA, VALUE_SCHEMA.toString()
OPTION_VALUE_AVRO_SCHEMA, VALUE_SCHEMA.toString(),
OPTION_KEY_AVRO_SCHEMA, KEY_SCHEMA.toString()
);

private static KafkaTestSupport kafkaTestSupport;
Expand Down Expand Up @@ -96,134 +101,38 @@ public static void afterClass() {
public void readGenericRecord() {
IList<Object> sinkList = instance().getList("output");
Pipeline p = Pipeline.create();
p.readFrom(
KafkaSources.kafka(
createProperties(HazelcastKafkaAvroSerializer.class, HazelcastKafkaAvroDeserializer.class),
TOPIC_NAME
)
)
p.readFrom(KafkaSources.kafka(createProperties(), TOPIC_NAME))
.withoutTimestamps()
.writeTo(Sinks.list(sinkList));

instance().getJet().newJob(p);

produceSafe(TOPIC_NAME, 1, toGenericRecord("read_value", VALUE_SCHEMA));
produceSafe(TOPIC_NAME, toGenericRecord(1, KEY_SCHEMA), toGenericRecord("value", VALUE_SCHEMA));

assertTrueEventually(() -> assertThat(sinkList).contains(entry(1, toGenericRecord("read_value", VALUE_SCHEMA))));
assertTrueEventually(
() -> assertThat(sinkList).contains(
entry(toGenericRecord(1, KEY_SCHEMA), toGenericRecord("value", VALUE_SCHEMA))
)
);
}

@Test
public void writeGenericRecord() {
var list = instance().getList("input");
list.add(1);

Pipeline p = Pipeline.create();
p.readFrom(Sources.list(list))
.map(v -> entry(v, toGenericRecord("write_value", VALUE_SCHEMA)))
.writeTo(
KafkaSinks.kafka(
createProperties(
HazelcastKafkaAvroSerializer.class,
HazelcastKafkaAvroDeserializer.class
),
TOPIC_NAME
)
);
p.readFrom(items(1))
.map(v -> entry(toGenericRecord(v, KEY_SCHEMA), toGenericRecord("value", VALUE_SCHEMA)))
.writeTo(KafkaSinks.kafka(createProperties(), TOPIC_NAME));
instance().getJet().newJob(p).join();

kafkaTestSupport.assertTopicContentsEventually(
TOPIC_NAME,
Map.of(1, toGenericRecord("write_value", VALUE_SCHEMA)),
IntegerDeserializer.class,
Map.of(toGenericRecord(1, KEY_SCHEMA), toGenericRecord("value", VALUE_SCHEMA)),
HazelcastKafkaAvroDeserializer.class,
HazelcastKafkaAvroDeserializer.class,
AVRO_SCHEMA_PROPERTIES
);
}


@Test
@Ignore
public void readSpecificRecord() {
// todo new ser\deser for specific record
IList<Object> sinkList = instance().getList("actual");
Pipeline p = Pipeline.create();
p.readFrom(KafkaSources.kafka(createProperties(null, null), TOPIC_NAME))
.withoutTimestamps()
.writeTo(Sinks.list(sinkList));

instance().getJet().newJob(p);

produceSafe(TOPIC_NAME, 1, toTestSpecificRecord("value"));

assertTrueEventually(() -> assertThat(sinkList).contains(entry(1, toTestSpecificRecord("value"))));

}

@Test
@Ignore
public void writeSpecificRecord() {
// todo new ser\deser for specific record
var list = instance().getList("input");
list.add(1);

Pipeline p = Pipeline.create();
p.readFrom(Sources.list(list))
.map(v -> Map.entry(1, toTestSpecificRecord("specific_value")))
.writeTo(KafkaSinks.kafka(createProperties(null, null), TOPIC_NAME));

instance().getJet().newJob(p).join();

kafkaTestSupport.assertTopicContentsEventually(
TOPIC_NAME,
Map.of(1, toTestSpecificRecord("specific_value")),
IntegerDeserializer.class,
null, // todo new deserializer
AVRO_SCHEMA_PROPERTIES
);
}

@Test
@Ignore
public void readReflection() {
// todo new ser\deser for reflection
IList<Object> sinkList = instance().getList("actual");
Pipeline p = Pipeline.create();
p.readFrom(KafkaSources.kafka(createProperties(null, null), TOPIC_NAME))
.withoutTimestamps()
.writeTo(Sinks.list(sinkList));

instance().getJet().newJob(p);

produceSafe(TOPIC_NAME, 1, new TestRecord("value"));

assertTrueEventually(() -> assertThat(sinkList).contains(entry(1, new TestRecord("value"))));
}

@Test
@Ignore
public void writeReflection() {
// todo new ser\deser for reflection
var list = instance().getList("input");
list.add(1);

Pipeline p = Pipeline.create();
p.readFrom(Sources.list(list))
.map(v -> Map.entry(1, new TestRecord("value")))
// todo add class name somewhere. may be in properties?
.writeTo(KafkaSinks.kafka(createProperties(null, null), TOPIC_NAME));

instance().getJet().newJob(p).join();

kafkaTestSupport.assertTopicContentsEventually(
TOPIC_NAME,
Map.of(1, new TestRecord("value")),
IntegerDeserializer.class,
null, // todo new deserializer
AVRO_SCHEMA_PROPERTIES
);
}


private void produceSafe(String topic, Object key, Object value) {
try {
kafkaTestSupport.produce(topic, key, value).get();
Expand All @@ -232,14 +141,14 @@ private void produceSafe(String topic, Object key, Object value) {
}
}

private Properties createProperties(Class<?> serializer, Class<?> deserializer) {
private Properties createProperties() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaTestSupport.getBrokerConnectionString());
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getCanonicalName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer.getCanonicalName());
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getCanonicalName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer.getCanonicalName());
properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, kafkaTestSupport.getBrokerConnectionString());
properties.setProperty(KEY_DESERIALIZER_CLASS_CONFIG, HazelcastKafkaAvroDeserializer.class.getCanonicalName());
properties.setProperty(VALUE_DESERIALIZER_CLASS_CONFIG, HazelcastKafkaAvroDeserializer.class.getCanonicalName());
properties.setProperty(KEY_SERIALIZER_CLASS_CONFIG, HazelcastKafkaAvroSerializer.class.getCanonicalName());
properties.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, HazelcastKafkaAvroSerializer.class.getCanonicalName());
properties.setProperty(AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.putAll(AVRO_SCHEMA_PROPERTIES);

return properties;
Expand All @@ -250,46 +159,4 @@ var record = new GenericData.Record(schema);
record.put(0, value);
return record;
}

private static SpecificRecord toTestSpecificRecord(String value) {
return new TestSpecificRecord(value);
}


public static class TestSpecificRecord implements SpecificRecord {

private final Schema schema = VALUE_SCHEMA;

private String value;

public TestSpecificRecord(String value) {
this.value = value;
}

@Override
public void put(int i, Object v) {
this.value = (String) v;
}

@Override
public Object get(int i) {
return value;
}

@Override
public Schema getSchema() {
return schema;
}
}


public static class TestRecord {

public String value;

public TestRecord(String value) {
this.value = value;
}

}
}

0 comments on commit 5c28b1c

Please sign in to comment.