diff --git a/.travis.yml b/.travis.yml index 93ddf47e5be0..770c8fcc7ab5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -337,6 +337,14 @@ jobs: script: *run_integration_test after_failure: *integration_test_diags + - &integration_kafka_format_tests + name: "(Compile=openjdk8, Run=openjdk8) Kafka index integration test with various formats" + jdk: openjdk8 + services: *integration_test_services + env: TESTNG_GROUPS='-Dgroups=kafka-data-format' JVM_RUNTIME='-Djvm.runtime=8' + script: *run_integration_test + after_failure: *integration_test_diags + - &integration_query name: "(Compile=openjdk8, Run=openjdk8) query integration test" jdk: openjdk8 @@ -365,7 +373,7 @@ jobs: name: "(Compile=openjdk8, Run=openjdk8) other integration test" jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=8' + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format' JVM_RUNTIME='-Djvm.runtime=8' script: *run_integration_test after_failure: *integration_test_diags # END - Integration tests for Compile with Java 8 and Run with Java 8 @@ -399,7 +407,7 @@ jobs: - <<: *integration_tests name: "(Compile=openjdk8, Run=openjdk11) other integration test" jdk: openjdk8 - env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=11' + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format' JVM_RUNTIME='-Djvm.runtime=11' # END - Integration tests for Compile with Java 8 and Run with Java 11 - name: "security vulnerabilities" diff --git a/integration-tests/README.md b/integration-tests/README.md index f390e8586806..897c8890a38d 100644 --- a/integration-tests/README.md +++ b/integration-tests/README.md @@ -319,7 +319,10 @@ Refer ITIndexerTest as an example on how to use dependency Injection By default, test methods in a test class will be run in sequential order one at a time. Test methods for a given test class can be set to run in parallel (multiple test methods of each class running at the same time) by excluding the given class/package from the "AllSerializedTests" test tag section and including it in the "AllParallelizedTests" -test tag section in integration-tests/src/test/resources/testng.xml +test tag section in integration-tests/src/test/resources/testng.xml. TestNG uses two parameters, i.e., +`thread-count` and `data-provider-thread-count`, for parallel test execution, which are set to 2 for Druid integration tests. +You may want to modify those values for faster execution. +See https://testng.org/doc/documentation-main.html#parallel-running and https://testng.org/doc/documentation-main.html#parameters-dataproviders for details. Please be mindful when adding tests to the "AllParallelizedTests" test tag that the tests can run in parallel with other tests from the same class at the same time. i.e. test does not modify/restart/stop the druid cluster or other dependency containers, test does not use excessive memory starving other concurent task, test does not modify and/or use other task, diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 9c4c429f7384..a4f32604c671 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -37,6 +37,14 @@ amazon-kinesis-producer 0.13.1 + + org.apache.avro + avro + + + com.opencsv + opencsv + com.amazonaws aws-java-sdk-kinesis @@ -81,6 +89,12 @@ ${project.parent.version} runtime + + org.apache.druid.extensions + druid-avro-extensions + ${project.parent.version} + runtime + org.apache.druid.extensions druid-s3-extensions diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index 7995595bb2be..2b06ba5e06f7 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -253,7 +253,7 @@ public String submitSupervisor(String spec) ).get(); if (!response.getStatus().equals(HttpResponseStatus.OK)) { throw new ISE( - "Error while submitting supervisor to overlord, response [%s %s]", + "Error while submitting supervisor to overlord, response [%s: %s]", response.getStatus(), response.getContent() ); diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java new file mode 100644 index 000000000000..284fd098fc0e --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; +import org.apache.druid.java.util.common.Pair; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class AvroEventSerializer implements EventSerializer +{ + public static final String TYPE = "avro"; + + private static final Schema SCHEMA = SchemaBuilder + .record("wikipedia") + .namespace("org.apache.druid") + .fields() + .requiredString("timestamp") + .requiredString("page") + .requiredString("language") + .requiredString("user") + .requiredString("unpatrolled") + .requiredString("newPage") + .requiredString("robot") + .requiredString("anonymous") + .requiredString("namespace") + .requiredString("continent") + .requiredString("country") + .requiredString("region") + .requiredString("city") + .requiredInt("added") + .requiredInt("deleted") + .requiredInt("delta") + .endRecord(); + + private final DatumWriter writer = new GenericDatumWriter<>(SCHEMA); + + @Override + public byte[] serialize(List> event) throws IOException + { + final WikipediaRecord record = new WikipediaRecord(); + event.forEach(pair -> record.put(pair.lhs, pair.rhs)); + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + writer.write(record, encoder); + encoder.flush(); + out.close(); + return out.toByteArray(); + } + + @Override + public void close() + { + } + + private static class WikipediaRecord implements GenericRecord + { + private final Map event = new HashMap<>(); + private final BiMap indexes = HashBiMap.create(SCHEMA.getFields().size()); + + private int nextIndex = 0; + + @Override + public void put(String key, Object v) + { + event.put(key, v); + indexes.inverse().computeIfAbsent(key, k -> nextIndex++); + } + + @Override + public Object get(String key) + { + return event.get(key); + } + + @Override + public void put(int i, Object v) + { + final String key = indexes.get(i); + if (key == null) { + throw new IndexOutOfBoundsException(); + } + put(key, v); + } + + @Override + public Object get(int i) + { + final String key = indexes.get(i); + if (key == null) { + throw new IndexOutOfBoundsException(); + } + return get(key); + } + + @Override + public Schema getSchema() + { + return SCHEMA; + } + } +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/CsvEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/CsvEventSerializer.java new file mode 100644 index 000000000000..77d54553b037 --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/CsvEventSerializer.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.opencsv.CSVWriter; +import org.apache.druid.java.util.common.Pair; + +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.List; + +public class CsvEventSerializer implements EventSerializer +{ + public static final String TYPE = "csv"; + + private final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + private final CSVWriter writer = new CSVWriter( + new BufferedWriter(new OutputStreamWriter(bos, StandardCharsets.UTF_8)) + ); + + @Override + public byte[] serialize(List> event) throws IOException + { + //noinspection ConstantConditions + writer.writeNext(event.stream().map(pair -> pair.rhs.toString()).toArray(String[]::new)); + writer.flush(); + final byte[] serialized = bos.toByteArray(); + bos.reset(); + return serialized; + } + + @Override + public void close() throws IOException + { + writer.close(); + } +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierEventGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/DelimitedEventSerializer.java similarity index 51% rename from integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierEventGenerator.java rename to integration-tests/src/main/java/org/apache/druid/testing/utils/DelimitedEventSerializer.java index bb56c794a152..d30e21d90768 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierEventGenerator.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/DelimitedEventSerializer.java @@ -19,37 +19,25 @@ package org.apache.druid.testing.utils; -import org.apache.druid.java.util.common.DateTimes; -import org.joda.time.DateTime; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; -import java.util.UUID; +import java.util.List; +import java.util.stream.Collectors; -public class StreamVerifierEventGenerator extends SyntheticStreamGenerator +public class DelimitedEventSerializer implements EventSerializer { - public StreamVerifierEventGenerator(int eventsPerSeconds, long cyclePaddingMs) - { - super(eventsPerSeconds, cyclePaddingMs); - } + public static final String TYPE = "tsv"; @Override - Object getEvent(int i, DateTime timestamp) + public byte[] serialize(List> event) { - return StreamVerifierSyntheticEvent.of( - UUID.randomUUID().toString(), - timestamp.getMillis(), - DateTimes.nowUtc().getMillis(), - i, - i == getEventsPerSecond() ? getSumOfEventSequence(getEventsPerSecond()) : null, - i == 1 - ); + //noinspection ConstantConditions + return StringUtils.toUtf8(event.stream().map(pair -> pair.rhs.toString()).collect(Collectors.joining("\t"))); } - - /** - * Assumes the first number in the sequence is 1, incrementing by 1, until numEvents. - */ - private long getSumOfEventSequence(int numEvents) + @Override + public void close() { - return (numEvents * (1 + numEvents)) / 2; } } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java new file mode 100644 index 000000000000..014d8c80e66c --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonSubTypes.Type; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; +import org.apache.druid.java.util.common.Pair; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * EventSerializer is for serializing an event into a byte array. + * This interface is used to write generated events on stream processing systems such as Kafka or Kinesis + * in integration tests. + * + * @see SyntheticStreamGenerator + * @see StreamEventWriter + */ +@JsonTypeInfo(use = Id.NAME, property = "type") +@JsonSubTypes(value = { + @Type(name = JsonEventSerializer.TYPE, value = JsonEventSerializer.class), + @Type(name = CsvEventSerializer.TYPE, value = CsvEventSerializer.class), + @Type(name = DelimitedEventSerializer.TYPE, value = DelimitedEventSerializer.class), + @Type(name = AvroEventSerializer.TYPE, value = AvroEventSerializer.class) +}) +public interface EventSerializer extends Closeable +{ + byte[] serialize(List> event) throws IOException; +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/JsonEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/JsonEventSerializer.java new file mode 100644 index 000000000000..2d276602f1df --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/JsonEventSerializer.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.Pair; + +import java.util.List; +import java.util.Map; + +public class JsonEventSerializer implements EventSerializer +{ + public static final String TYPE = "json"; + + private final ObjectMapper jsonMapper; + + @JsonCreator + public JsonEventSerializer(@JacksonInject @Json ObjectMapper jsonMapper) + { + this.jsonMapper = jsonMapper; + } + + @Override + public byte[] serialize(List> event) throws JsonProcessingException + { + Map map = Maps.newHashMapWithExpectedSize(event.size()); + event.forEach(pair -> map.put(pair.lhs, pair.rhs)); + return jsonMapper.writeValueAsBytes(map); + } + + @Override + public void close() + { + } +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java index f7ec75507b9a..14b57141dd56 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java @@ -31,12 +31,13 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class KafkaEventWriter implements StreamEventWriter { private static final String TEST_PROPERTY_PREFIX = "kafka.test.property."; - private final KafkaProducer producer; + private final KafkaProducer producer; private final boolean txnEnabled; private final List> pendingWriteRecords = new ArrayList<>(); @@ -57,7 +58,7 @@ public KafkaEventWriter(IntegrationTestingConfig config, boolean txnEnabled) this.producer = new KafkaProducer<>( properties, new StringSerializer(), - new StringSerializer() + new ByteArraySerializer() ); if (txnEnabled) { producer.initTransactions(); @@ -91,25 +92,42 @@ public void commitTransaction() } @Override - public void write(String topic, String event) + public void write(String topic, byte[] event) { Future future = producer.send(new ProducerRecord<>(topic, event)); pendingWriteRecords.add(future); } @Override - public void shutdown() + public void close() { + flush(); producer.close(); } @Override - public void flush() throws Exception + public void flush() { + Exception e = null; for (Future future : pendingWriteRecords) { - future.get(); + try { + future.get(); + } + catch (InterruptedException | ExecutionException ex) { + if (ex instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + if (e == null) { + e = ex; + } else { + e.addSuppressed(ex); + } + } } pendingWriteRecords.clear(); + if (e != null) { + throw new RuntimeException(e); + } } private void addFilteredProperties(IntegrationTestingConfig config, Properties properties) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java index 0377e9e42e0f..39b700ce093d 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java @@ -25,17 +25,13 @@ import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; import com.amazonaws.util.AwsHostNameUtils; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.druid.java.util.common.logger.Logger; import java.io.FileInputStream; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.Properties; public class KinesisEventWriter implements StreamEventWriter { - private static final Logger LOG = new Logger(KinesisEventWriter.class); - private final KinesisProducer kinesisProducer; public KinesisEventWriter(String endpoint, boolean aggregate) throws Exception @@ -82,20 +78,19 @@ public void commitTransaction() } @Override - public void write(String streamName, String event) + public void write(String streamName, byte[] event) { kinesisProducer.addUserRecord( streamName, DigestUtils.sha1Hex(event), - ByteBuffer.wrap(event.getBytes(StandardCharsets.UTF_8)) + ByteBuffer.wrap(event) ); } @Override - public void shutdown() + public void close() { - LOG.info("Shutting down Kinesis client"); - kinesisProducer.flushSync(); + flush(); } @Override diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/LoggerListener.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/LoggerListener.java index 7282396b933a..e74dc2f2cf06 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/LoggerListener.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/LoggerListener.java @@ -19,10 +19,13 @@ package org.apache.druid.testing.utils; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.testng.ITestResult; import org.testng.TestListenerAdapter; +import java.util.Arrays; + public class LoggerListener extends TestListenerAdapter { private static final Logger LOG = new Logger(LoggerListener.class); @@ -30,25 +33,38 @@ public class LoggerListener extends TestListenerAdapter @Override public void onTestFailure(ITestResult tr) { - LOG.info("[%s] -- Test method failed", tr.getName()); + LOG.error(tr.getThrowable(), "Failed %s", formatTestName(tr)); } @Override public void onTestSkipped(ITestResult tr) { - LOG.info("[%s] -- Test method skipped", tr.getName()); + LOG.warn("Skipped %s", formatTestName(tr)); } @Override public void onTestSuccess(ITestResult tr) { - LOG.info("[%s] -- Test method passed", tr.getName()); + LOG.info("Passed %s", formatTestName(tr)); } @Override public void onTestStart(ITestResult tr) { - LOG.info("[%s] -- TEST START", tr.getName()); + LOG.info("Starting %s", formatTestName(tr)); } + private static String formatTestName(ITestResult tr) + { + if (tr.getParameters().length == 0) { + return StringUtils.format("[%s.%s]", tr.getTestClass().getName(), tr.getName()); + } else { + return StringUtils.format( + "[%s.%s] with parameters %s", + tr.getTestClass().getName(), + tr.getName(), + Arrays.toString(tr.getParameters()) + ); + } + } } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java index 5d25916b6f62..747cbd8c2ac9 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java @@ -20,21 +20,32 @@ package org.apache.druid.testing.utils; +import java.io.Closeable; + /** * This interface is use to write test event data to the underlying stream (such as Kafka, Kinesis) * This can also be use with {@link StreamGenerator} to write particular set of test data */ -public interface StreamEventWriter +public interface StreamEventWriter extends Closeable { - void write(String topic, String event); - - void shutdown(); - - void flush() throws Exception; - boolean isTransactionEnabled(); void initTransaction(); void commitTransaction(); + + void write(String topic, byte[] event); + + /** + * Flush pending writes on the underlying stream. This method is synchronous and waits until the flush completes. + * Note that this method is not interruptible + */ + void flush(); + + /** + * Close this writer. Any resource should be cleaned up when this method is called. + * Implementations must call {@link #flush()} before closing the writer. + */ + @Override + void close(); } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamGenerator.java index f2d1f489d886..015cc91a0405 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamGenerator.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamGenerator.java @@ -26,6 +26,4 @@ public interface StreamGenerator void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds); void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds, DateTime overrrideFirstEventTime); - - void shutdown(); } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierSyntheticEvent.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierSyntheticEvent.java deleted file mode 100644 index e8c314a6b4bd..000000000000 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierSyntheticEvent.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.testing.utils; - -import com.fasterxml.jackson.annotation.JsonProperty; - -public class StreamVerifierSyntheticEvent -{ - private String id; - private long groupingTimestamp; - private long insertionTimestamp; - private long sequenceNumber; - private Long expectedSequenceNumberSum; - private boolean firstEvent; - - public StreamVerifierSyntheticEvent( - String id, - long groupingTimestamp, - long insertionTimestamp, - long sequenceNumber, - Long expectedSequenceNumberSum, - boolean firstEvent - ) - { - this.id = id; - this.groupingTimestamp = groupingTimestamp; - this.insertionTimestamp = insertionTimestamp; - this.sequenceNumber = sequenceNumber; - this.expectedSequenceNumberSum = expectedSequenceNumberSum; - this.firstEvent = firstEvent; - } - - @JsonProperty - public String getId() - { - return id; - } - - @JsonProperty - public long getGroupingTimestamp() - { - return groupingTimestamp; - } - - @JsonProperty - public long getInsertionTimestamp() - { - return insertionTimestamp; - } - - @JsonProperty - public long getSequenceNumber() - { - return sequenceNumber; - } - - @JsonProperty - public Long getExpectedSequenceNumberSum() - { - return expectedSequenceNumberSum; - } - - @JsonProperty - public Integer getFirstEventFlag() - { - return firstEvent ? 1 : null; - } - - public static StreamVerifierSyntheticEvent of( - String id, - long groupingTimestamp, - long insertionTimestamp, - long sequenceNumber, - Long expectedSequenceNumberSum, - boolean firstEvent - ) - { - return new StreamVerifierSyntheticEvent( - id, - groupingTimestamp, - insertionTimestamp, - sequenceNumber, - expectedSequenceNumberSum, - firstEvent - ); - } -} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java index f2bfde857c61..c68db10fb043 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java @@ -19,32 +19,18 @@ package org.apache.druid.testing.utils; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.databind.InjectableValues; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.logger.Logger; import org.joda.time.DateTime; +import java.util.List; + public abstract class SyntheticStreamGenerator implements StreamGenerator { - private static final Logger log = new Logger(SyntheticStreamGenerator.class); - static final ObjectMapper MAPPER = new DefaultObjectMapper(); - - static { - MAPPER.setInjectableValues( - new InjectableValues.Std() - .addValue(ObjectMapper.class.getName(), MAPPER) - ); - MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL); - } - - public int getEventsPerSecond() - { - return eventsPerSecond; - } + private static final Logger LOG = new Logger(SyntheticStreamGenerator.class); + private final EventSerializer serializer; private final int eventsPerSecond; // When calculating rates, leave this buffer to minimize overruns where we're still writing messages from the previous @@ -52,13 +38,14 @@ public int getEventsPerSecond() // second to begin. private final long cyclePaddingMs; - public SyntheticStreamGenerator(int eventsPerSecond, long cyclePaddingMs) + public SyntheticStreamGenerator(EventSerializer serializer, int eventsPerSecond, long cyclePaddingMs) { + this.serializer = serializer; this.eventsPerSecond = eventsPerSecond; this.cyclePaddingMs = cyclePaddingMs; } - abstract Object getEvent(int row, DateTime timestamp); + abstract List> newEvent(int row, DateTime timestamp); @Override public void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds) @@ -83,12 +70,12 @@ public void run(String streamTopic, StreamEventWriter streamEventWriter, int tot try { long sleepMillis = nowCeilingToSecond.getMillis() - DateTimes.nowUtc().getMillis(); if (sleepMillis > 0) { - log.info("Waiting %s ms for next run cycle (at %s)", sleepMillis, nowCeilingToSecond); + LOG.info("Waiting %s ms for next run cycle (at %s)", sleepMillis, nowCeilingToSecond); Thread.sleep(sleepMillis); continue; } - log.info( + LOG.info( "Beginning run cycle with %s events, target completion time: %s", eventsPerSecond, nowCeilingToSecond.plusSeconds(1).minus(cyclePaddingMs) @@ -99,11 +86,11 @@ public void run(String streamTopic, StreamEventWriter streamEventWriter, int tot } for (int i = 1; i <= eventsPerSecond; i++) { - streamEventWriter.write(streamTopic, MAPPER.writeValueAsString(getEvent(i, eventTimestamp))); + streamEventWriter.write(streamTopic, serializer.serialize(newEvent(i, eventTimestamp))); long sleepTime = calculateSleepTimeMs(eventsPerSecond - i, nowCeilingToSecond); if ((i <= 100 && i % 10 == 0) || i % 100 == 0) { - log.info("Event: %s/%s, sleep time: %s ms", i, eventsPerSecond, sleepTime); + LOG.info("Event: %s/%s, sleep time: %s ms", i, eventsPerSecond, sleepTime); } if (sleepTime > 0) { @@ -119,7 +106,7 @@ public void run(String streamTopic, StreamEventWriter streamEventWriter, int tot eventTimestamp = eventTimestamp.plusSeconds(1); seconds++; - log.info( + LOG.info( "Finished writing %s events, current time: %s - updating next timestamp to: %s", eventsPerSecond, DateTimes.nowUtc(), @@ -128,7 +115,7 @@ public void run(String streamTopic, StreamEventWriter streamEventWriter, int tot if (seconds >= totalNumberOfSeconds) { streamEventWriter.flush(); - log.info( + LOG.info( "Finished writing %s seconds", seconds ); @@ -141,11 +128,6 @@ public void run(String streamTopic, StreamEventWriter streamEventWriter, int tot } } - @Override - public void shutdown() - { - } - /** * Dynamically adjust delay between messages to spread them out over the remaining time left in the second. */ diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/WikipediaStreamEventStreamGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/WikipediaStreamEventStreamGenerator.java index 4fea67d7be57..075e77bd6821 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/WikipediaStreamEventStreamGenerator.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/WikipediaStreamEventStreamGenerator.java @@ -19,42 +19,44 @@ package org.apache.druid.testing.utils; +import org.apache.druid.java.util.common.Pair; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; -import java.util.HashMap; -import java.util.Map; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; public class WikipediaStreamEventStreamGenerator extends SyntheticStreamGenerator { private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'"); - public WikipediaStreamEventStreamGenerator(int eventsPerSeconds, long cyclePaddingMs) + public WikipediaStreamEventStreamGenerator(EventSerializer serializer, int eventsPerSeconds, long cyclePaddingMs) { - super(eventsPerSeconds, cyclePaddingMs); + super(serializer, eventsPerSeconds, cyclePaddingMs); } @Override - Object getEvent(int i, DateTime timestamp) + List> newEvent(int i, DateTime timestamp) { - Map event = new HashMap<>(); - event.put("page", "Gypsy Danger"); - event.put("language", "en"); - event.put("user", "nuclear"); - event.put("unpatrolled", "true"); - event.put("newPage", "true"); - event.put("robot", "false"); - event.put("anonymous", "false"); - event.put("namespace", "article"); - event.put("continent", "North America"); - event.put("country", "United States"); - event.put("region", "Bay Area"); - event.put("city", "San Francisco"); - event.put("timestamp", DATE_TIME_FORMATTER.print(timestamp)); - event.put("added", i); - event.put("deleted", 0); - event.put("delta", i); - return event; + List> event = new ArrayList<>(); + event.add(Pair.of("timestamp", DATE_TIME_FORMATTER.print(timestamp))); + event.add(Pair.of("page", "Gypsy Danger")); + event.add(Pair.of("language", "en")); + event.add(Pair.of("user", "nuclear")); + event.add(Pair.of("unpatrolled", "true")); + event.add(Pair.of("newPage", "true")); + event.add(Pair.of("robot", "false")); + event.add(Pair.of("anonymous", "false")); + event.add(Pair.of("namespace", "article")); + event.add(Pair.of("continent", "North America")); + event.add(Pair.of("country", "United States")); + event.add(Pair.of("region", "Bay Area")); + event.add(Pair.of("city", "San Francisco")); + event.add(Pair.of("added", i)); + event.add(Pair.of("deleted", 0)); + event.add(Pair.of("delta", i)); + return Collections.unmodifiableList(event); } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java index 9d8b7fe0b4ea..c4c68644fa88 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java @@ -37,6 +37,8 @@ public class TestNGGroup public static final String TRANSACTIONAL_KAFKA_INDEX_SLOW = "kafka-transactional-index-slow"; + public static final String KAFKA_DATA_FORMAT = "kafka-data-format"; + public static final String OTHER_INDEX = "other-index"; public static final String PERFECT_ROLLUP_PARALLEL_BATCH_INDEX = "perfect-rollup-parallel-batch-index"; @@ -110,4 +112,12 @@ public class TestNGGroup * Kinesis stream endpoint for a region must also be pass to mvn with -Ddruid.test.config.streamEndpoint= */ public static final String KINESIS_INDEX = "kinesis-index"; + + /** + * This group is not part of CI. To run this group, AWS kinesis configs/credentials for your AWS kinesis must be + * provided in a file. The path of the file must then be pass to mvn with -Doverride.config.path= + * See integration-tests/docker/environment-configs/override-examples/kinesis for env vars to provide. + * Kinesis stream endpoint for a region must also be pass to mvn with -Ddruid.test.config.streamEndpoint= + */ + public static final String KINESIS_DATA_FORMAT = "kinesis-data-format"; } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java index 03df90eb99d6..1bbced460320 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java @@ -25,6 +25,7 @@ import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.clients.CoordinatorResourceTestClient; import org.apache.druid.testing.clients.OverlordResourceTestClient; @@ -32,17 +33,19 @@ import org.apache.druid.testing.utils.TestQueryHelper; import org.joda.time.Interval; +import java.io.BufferedReader; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; public abstract class AbstractIndexerTest { - @Inject protected CoordinatorResourceTestClient coordinator; @Inject @@ -113,15 +116,33 @@ protected void waitForAllTasksToComplete() ); } - protected String getResourceAsString(String file) throws IOException + public static String getResourceAsString(String file) throws IOException { - final InputStream inputStream = ITRealtimeIndexTaskTest.class.getResourceAsStream(file); - try { + try (final InputStream inputStream = getResourceAsStream(file)) { return IOUtils.toString(inputStream, StandardCharsets.UTF_8); } - finally { - IOUtils.closeQuietly(inputStream); - } } + public static InputStream getResourceAsStream(String resource) + { + return ITRealtimeIndexTaskTest.class.getResourceAsStream(resource); + } + + public static List listResources(String dir) throws IOException + { + List resources = new ArrayList<>(); + + try ( + InputStream in = getResourceAsStream(dir); + BufferedReader br = new BufferedReader(new InputStreamReader(in, StringUtils.UTF8_STRING)) + ) { + String resource; + + while ((resource = br.readLine()) != null) { + resources.add(resource); + } + } + + return resources; + } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java index ce769bfb59ba..3bb9693dcbdf 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java @@ -33,8 +33,6 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamIndexingTest { - protected abstract boolean isKafkaWriterTransactionalEnabled(); - @Override StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) { @@ -42,15 +40,19 @@ StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) } @Override - public StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) + public StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled) { - return new KafkaEventWriter(config, isKafkaWriterTransactionalEnabled()); + return new KafkaEventWriter(config, transactionEnabled); } @Override - Function generateStreamIngestionPropsTransform(String streamName, - String fullDatasourceName, - IntegrationTestingConfig config) + Function generateStreamIngestionPropsTransform( + String streamName, + String fullDatasourceName, + String parserType, + String parserOrInputFormat, + IntegrationTestingConfig config + ) { final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); final Properties consumerProperties = new Properties(); @@ -78,6 +80,29 @@ Function generateStreamIngestionPropsTransform(String streamName "%%TOPIC_VALUE%%", streamName ); + if (AbstractStreamIndexingTest.INPUT_FORMAT.equals(parserType)) { + spec = StringUtils.replace( + spec, + "%%INPUT_FORMAT%%", + parserOrInputFormat + ); + spec = StringUtils.replace( + spec, + "%%PARSER%%", + "null" + ); + } else if (AbstractStreamIndexingTest.INPUT_ROW_PARSER.equals(parserType)) { + spec = StringUtils.replace( + spec, + "%%PARSER%%", + parserOrInputFormat + ); + spec = StringUtils.replace( + spec, + "%%INPUT_FORMAT%%", + "null" + ); + } spec = StringUtils.replace( spec, "%%USE_EARLIEST_KEY%%", diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java index 14c9cac8ac2a..b8095a310b44 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java @@ -37,15 +37,20 @@ StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throw } @Override - StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) throws Exception + StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled) + throws Exception { return new KinesisEventWriter(config.getStreamEndpoint(), false); } @Override - Function generateStreamIngestionPropsTransform(String streamName, - String fullDatasourceName, - IntegrationTestingConfig config) + Function generateStreamIngestionPropsTransform( + String streamName, + String fullDatasourceName, + String parserType, + String parserOrInputFormat, + IntegrationTestingConfig config + ) { return spec -> { try { @@ -69,6 +74,29 @@ Function generateStreamIngestionPropsTransform(String streamName "%%TOPIC_VALUE%%", streamName ); + if (AbstractStreamIndexingTest.INPUT_FORMAT.equals(parserType)) { + spec = StringUtils.replace( + spec, + "%%INPUT_FORMAT%%", + parserOrInputFormat + ); + spec = StringUtils.replace( + spec, + "%%PARSER%%", + "null" + ); + } else if (AbstractStreamIndexingTest.INPUT_ROW_PARSER.equals(parserType)) { + spec = StringUtils.replace( + spec, + "%%PARSER%%", + parserOrInputFormat + ); + spec = StringUtils.replace( + spec, + "%%INPUT_FORMAT%%", + "null" + ); + } spec = StringUtils.replace( spec, "%%USE_EARLIEST_KEY%%", diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java index 2f0c65afaebc..506e80fe1ec2 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java @@ -20,24 +20,34 @@ package org.apache.druid.tests.indexer; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.inject.Inject; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.utils.DruidClusterAdminClient; +import org.apache.druid.testing.utils.EventSerializer; import org.apache.druid.testing.utils.ITRetryUtil; +import org.apache.druid.testing.utils.JsonEventSerializer; import org.apache.druid.testing.utils.StreamAdminClient; import org.apache.druid.testing.utils.StreamEventWriter; +import org.apache.druid.testing.utils.StreamGenerator; import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import java.io.Closeable; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.function.Function; +import java.util.stream.Collectors; public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest { @@ -48,17 +58,32 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'"); static final int EVENTS_PER_SECOND = 6; static final int TOTAL_NUMBER_OF_SECOND = 10; + + private static final Logger LOG = new Logger(AbstractStreamIndexingTest.class); // Since this integration test can terminates or be killed un-expectedly, this tag is added to all streams created // to help make stream clean up easier. (Normally, streams should be cleanup automattically by the teardown method) // The value to this tag is a timestamp that can be used by a lambda function to remove unused stream. private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after"; private static final int STREAM_SHARD_COUNT = 2; private static final long WAIT_TIME_MILLIS = 3 * 60 * 1000L; - private static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json"; - private static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json"; - private static final String QUERIES_FILE = "/indexer/stream_index_queries.json"; private static final long CYCLE_PADDING_MS = 100; - private static final Logger LOG = new Logger(AbstractStreamIndexingTest.class); + + private static final String QUERIES_FILE = "/stream/queries/stream_index_queries.json"; + private static final String SUPERVISOR_SPEC_TEMPLATE_FILE = "supervisor_spec_template.json"; + + protected static final String DATA_RESOURCE_ROOT = "/stream/data"; + protected static final String SUPERVISOR_SPEC_TEMPLATE_PATH = + String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_SPEC_TEMPLATE_FILE); + protected static final String SERIALIZER_SPEC_DIR = "serializer"; + protected static final String INPUT_FORMAT_SPEC_DIR = "input_format"; + protected static final String INPUT_ROW_PARSER_SPEC_DIR = "parser"; + + protected static final String SERIALIZER = "serializer"; + protected static final String INPUT_FORMAT = "inputFormat"; + protected static final String INPUT_ROW_PARSER = "parser"; + + private static final String JSON_INPUT_FORMAT_PATH = + String.join("/", DATA_RESOURCE_ROOT, "json", INPUT_FORMAT_SPEC_DIR, "input_format.json"); @Inject private DruidClusterAdminClient druidClusterAdminClient; @@ -67,92 +92,147 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest private IntegrationTestingConfig config; private StreamAdminClient streamAdminClient; - private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator; abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception; - abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) throws Exception; - abstract Function generateStreamIngestionPropsTransform(String streamName, - String fullDatasourceName, - IntegrationTestingConfig config); + + abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled) + throws Exception; + + abstract Function generateStreamIngestionPropsTransform( + String streamName, + String fullDatasourceName, + String parserType, + String parserOrInputFormat, + IntegrationTestingConfig config + ); + abstract Function generateStreamQueryPropsTransform(String streamName, String fullDatasourceName); + public abstract String getTestNamePrefix(); protected void doBeforeClass() throws Exception { streamAdminClient = createStreamAdminClient(config); - wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS); } - protected void doClassTeardown() + private static String getOnlyResourcePath(String resourceRoot) throws IOException { - wikipediaStreamEventGenerator.shutdown(); + return String.join("/", resourceRoot, Iterables.getOnlyElement(listResources(resourceRoot))); } - protected void doTestIndexDataWithLegacyParserStableState() throws Exception + protected static List listDataFormatResources() throws IOException { - StreamEventWriter streamEventWriter = createStreamEventWriter(config); - final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(); - try ( - final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName()) - ) { - final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER)); - LOG.info("supervisorSpec: [%s]\n", taskSpec); - // Start supervisor - generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec)); - LOG.info("Submitted supervisor"); - // Start data generator - wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME); - verifyIngestedData(generatedTestConfig); + return listResources(DATA_RESOURCE_ROOT) + .stream() + .filter(resource -> !SUPERVISOR_SPEC_TEMPLATE_FILE.equals(resource)) + .collect(Collectors.toList()); + } + + /** + * Returns a map of key to path to spec. The returned map contains at least 2 specs and one of them + * should be a {@link #SERIALIZER} spec. + */ + protected static Map findTestSpecs(String resourceRoot) throws IOException + { + final List specDirs = listResources(resourceRoot); + final Map map = new HashMap<>(); + for (String eachSpec : specDirs) { + if (SERIALIZER_SPEC_DIR.equals(eachSpec)) { + map.put(SERIALIZER, getOnlyResourcePath(String.join("/", resourceRoot, SERIALIZER_SPEC_DIR))); + } else if (INPUT_ROW_PARSER_SPEC_DIR.equals(eachSpec)) { + map.put(INPUT_ROW_PARSER, getOnlyResourcePath(String.join("/", resourceRoot, INPUT_ROW_PARSER_SPEC_DIR))); + } else if (INPUT_FORMAT_SPEC_DIR.equals(eachSpec)) { + map.put(INPUT_FORMAT, getOnlyResourcePath(String.join("/", resourceRoot, INPUT_FORMAT_SPEC_DIR))); + } } - finally { - doMethodTeardown(generatedTestConfig, streamEventWriter); + if (!map.containsKey(SERIALIZER_SPEC_DIR)) { + throw new IAE("Failed to find serializer spec under [%s]. Found resources are %s", resourceRoot, map); } + if (map.size() == 1) { + throw new IAE("Failed to find input format or parser spec under [%s]. Found resources are %s", resourceRoot, map); + } + return map; + } + + private Closeable createResourceCloser(GeneratedTestConfig generatedTestConfig) + { + return Closer.create().register(() -> doMethodTeardown(generatedTestConfig)); } - protected void doTestIndexDataWithInputFormatStableState() throws Exception + protected void doTestIndexDataStableState( + boolean transactionEnabled, + String serializerPath, + String parserType, + String specPath + ) throws Exception { - StreamEventWriter streamEventWriter = createStreamEventWriter(config); - final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(); + final EventSerializer serializer = jsonMapper.readValue(getResourceAsStream(serializerPath), EventSerializer.class); + final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator( + serializer, + EVENTS_PER_SECOND, + CYCLE_PADDING_MS + ); + final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(parserType, getResourceAsString(specPath)); try ( - final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName()) + final Closeable closer = createResourceCloser(generatedTestConfig); + final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled) ) { - final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT)); + final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform() + .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH)); LOG.info("supervisorSpec: [%s]\n", taskSpec); // Start supervisor generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec)); LOG.info("Submitted supervisor"); // Start data generator - wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME); + streamGenerator.run( + generatedTestConfig.getStreamName(), + streamEventWriter, + TOTAL_NUMBER_OF_SECOND, + FIRST_EVENT_TIME + ); verifyIngestedData(generatedTestConfig); } - finally { - doMethodTeardown(generatedTestConfig, streamEventWriter); - } } - void doTestIndexDataWithLosingCoordinator() throws Exception + void doTestIndexDataWithLosingCoordinator(boolean transactionEnabled) throws Exception { - testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartCoordinatorContainer(), () -> druidClusterAdminClient.waitUntilCoordinatorReady()); + testIndexWithLosingNodeHelper( + () -> druidClusterAdminClient.restartCoordinatorContainer(), + () -> druidClusterAdminClient.waitUntilCoordinatorReady(), + transactionEnabled + ); } - void doTestIndexDataWithLosingOverlord() throws Exception + void doTestIndexDataWithLosingOverlord(boolean transactionEnabled) throws Exception { - testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartIndexerContainer(), () -> druidClusterAdminClient.waitUntilIndexerReady()); + testIndexWithLosingNodeHelper( + () -> druidClusterAdminClient.restartIndexerContainer(), + () -> druidClusterAdminClient.waitUntilIndexerReady(), + transactionEnabled + ); } - void doTestIndexDataWithLosingHistorical() throws Exception + void doTestIndexDataWithLosingHistorical(boolean transactionEnabled) throws Exception { - testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartHistoricalContainer(), () -> druidClusterAdminClient.waitUntilHistoricalReady()); + testIndexWithLosingNodeHelper( + () -> druidClusterAdminClient.restartHistoricalContainer(), + () -> druidClusterAdminClient.waitUntilHistoricalReady(), + transactionEnabled + ); } - protected void doTestIndexDataWithStartStopSupervisor() throws Exception + protected void doTestIndexDataWithStartStopSupervisor(boolean transactionEnabled) throws Exception { - StreamEventWriter streamEventWriter = createStreamEventWriter(config); - final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(); + final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig( + INPUT_FORMAT, + getResourceAsString(JSON_INPUT_FORMAT_PATH) + ); try ( - final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName()) + final Closeable closer = createResourceCloser(generatedTestConfig); + final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled) ) { - final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT)); + final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform() + .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH)); LOG.info("supervisorSpec: [%s]\n", taskSpec); // Start supervisor generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec)); @@ -161,7 +241,17 @@ protected void doTestIndexDataWithStartStopSupervisor() throws Exception int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND; int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2; secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound; - wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME); + final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator( + new JsonEventSerializer(jsonMapper), + EVENTS_PER_SECOND, + CYCLE_PADDING_MS + ); + streamGenerator.run( + generatedTestConfig.getStreamName(), + streamEventWriter, + secondsToGenerateFirstRound, + FIRST_EVENT_TIME + ); // Verify supervisor is healthy before suspension ITRetryUtil.retryUntil( () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), @@ -173,7 +263,12 @@ protected void doTestIndexDataWithStartStopSupervisor() throws Exception // Suspend the supervisor indexer.suspendSupervisor(generatedTestConfig.getSupervisorId()); // Start generating remainning half of the data - wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)); + streamGenerator.run( + generatedTestConfig.getStreamName(), + streamEventWriter, + secondsToGenerateRemaining, + FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound) + ); // Resume the supervisor indexer.resumeSupervisor(generatedTestConfig.getSupervisorId()); // Verify supervisor is healthy after suspension @@ -187,31 +282,36 @@ protected void doTestIndexDataWithStartStopSupervisor() throws Exception // Verify that supervisor can catch up with the stream verifyIngestedData(generatedTestConfig); } - finally { - doMethodTeardown(generatedTestConfig, streamEventWriter); - } } - protected void doTestIndexDataWithStreamReshardSplit() throws Exception + protected void doTestIndexDataWithStreamReshardSplit(boolean transactionEnabled) throws Exception { // Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT * 2 - testIndexWithStreamReshardHelper(STREAM_SHARD_COUNT * 2); + testIndexWithStreamReshardHelper(transactionEnabled, STREAM_SHARD_COUNT * 2); } - protected void doTestIndexDataWithStreamReshardMerge() throws Exception + protected void doTestIndexDataWithStreamReshardMerge(boolean transactionEnabled) throws Exception { // Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT / 2 - testIndexWithStreamReshardHelper(STREAM_SHARD_COUNT / 2); + testIndexWithStreamReshardHelper(transactionEnabled, STREAM_SHARD_COUNT / 2); } - private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable waitForReadyRunnable) throws Exception + private void testIndexWithLosingNodeHelper( + Runnable restartRunnable, + Runnable waitForReadyRunnable, + boolean transactionEnabled + ) throws Exception { - StreamEventWriter streamEventWriter = createStreamEventWriter(config); - final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(); + final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig( + INPUT_FORMAT, + getResourceAsString(JSON_INPUT_FORMAT_PATH) + ); try ( - final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName()) + final Closeable closer = createResourceCloser(generatedTestConfig); + final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled) ) { - final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT)); + final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform() + .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH)); LOG.info("supervisorSpec: [%s]\n", taskSpec); // Start supervisor generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec)); @@ -220,7 +320,17 @@ private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable wa int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND; int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3; secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound; - wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME); + final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator( + new JsonEventSerializer(jsonMapper), + EVENTS_PER_SECOND, + CYCLE_PADDING_MS + ); + streamGenerator.run( + generatedTestConfig.getStreamName(), + streamEventWriter, + secondsToGenerateFirstRound, + FIRST_EVENT_TIME + ); // Verify supervisor is healthy before restart ITRetryUtil.retryUntil( () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), @@ -236,13 +346,23 @@ private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable wa // Start generating one third of the data (while restarting) int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3; secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound; - wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)); + streamGenerator.run( + generatedTestConfig.getStreamName(), + streamEventWriter, + secondsToGenerateSecondRound, + FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound) + ); // Wait for Druid process to be available LOG.info("Waiting for Druid process to be available"); waitForReadyRunnable.run(); LOG.info("Druid process is now available"); // Start generating remaining data (after restarting) - wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound)); + streamGenerator.run( + generatedTestConfig.getStreamName(), + streamEventWriter, + secondsToGenerateRemaining, + FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound) + ); // Verify supervisor is healthy ITRetryUtil.retryUntil( () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), @@ -254,19 +374,20 @@ private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable wa // Verify that supervisor ingested all data verifyIngestedData(generatedTestConfig); } - finally { - doMethodTeardown(generatedTestConfig, streamEventWriter); - } } - private void testIndexWithStreamReshardHelper(int newShardCount) throws Exception + private void testIndexWithStreamReshardHelper(boolean transactionEnabled, int newShardCount) throws Exception { - StreamEventWriter streamEventWriter = createStreamEventWriter(config); - final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(); + final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig( + INPUT_FORMAT, + getResourceAsString(JSON_INPUT_FORMAT_PATH) + ); try ( - final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName()) + final Closeable closer = createResourceCloser(generatedTestConfig); + final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled) ) { - final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT)); + final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform() + .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH)); LOG.info("supervisorSpec: [%s]\n", taskSpec); // Start supervisor generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec)); @@ -275,7 +396,17 @@ private void testIndexWithStreamReshardHelper(int newShardCount) throws Exceptio int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND; int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3; secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound; - wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME); + final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator( + new JsonEventSerializer(jsonMapper), + EVENTS_PER_SECOND, + CYCLE_PADDING_MS + ); + streamGenerator.run( + generatedTestConfig.getStreamName(), + streamEventWriter, + secondsToGenerateFirstRound, + FIRST_EVENT_TIME + ); // Verify supervisor is healthy before resahrding ITRetryUtil.retryUntil( () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), @@ -289,7 +420,12 @@ private void testIndexWithStreamReshardHelper(int newShardCount) throws Exceptio // Start generating one third of the data (while resharding) int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3; secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound; - wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)); + streamGenerator.run( + generatedTestConfig.getStreamName(), + streamEventWriter, + secondsToGenerateSecondRound, + FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound) + ); // Wait for stream to finish resharding ITRetryUtil.retryUntil( () -> streamAdminClient.isStreamActive(generatedTestConfig.getStreamName()), @@ -299,14 +435,23 @@ private void testIndexWithStreamReshardHelper(int newShardCount) throws Exceptio "Waiting for stream to finish resharding" ); ITRetryUtil.retryUntil( - () -> streamAdminClient.verfiyPartitionCountUpdated(generatedTestConfig.getStreamName(), STREAM_SHARD_COUNT, newShardCount), + () -> streamAdminClient.verfiyPartitionCountUpdated( + generatedTestConfig.getStreamName(), + STREAM_SHARD_COUNT, + newShardCount + ), true, 10000, 30, "Waiting for stream to finish resharding" ); // Start generating remaining data (after resharding) - wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound)); + streamGenerator.run( + generatedTestConfig.getStreamName(), + streamEventWriter, + secondsToGenerateRemaining, + FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound) + ); // Verify supervisor is healthy after resahrding ITRetryUtil.retryUntil( () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), @@ -318,9 +463,6 @@ private void testIndexWithStreamReshardHelper(int newShardCount) throws Exceptio // Verify that supervisor can catch up with the stream verifyIngestedData(generatedTestConfig); } - finally { - doMethodTeardown(generatedTestConfig, streamEventWriter); - } } private void verifyIngestedData(GeneratedTestConfig generatedTestConfig) throws Exception @@ -329,11 +471,14 @@ private void verifyIngestedData(GeneratedTestConfig generatedTestConfig) throws LOG.info("Waiting for [%s] millis for stream indexing tasks to consume events", WAIT_TIME_MILLIS); Thread.sleep(WAIT_TIME_MILLIS); // Query data - final String querySpec = generatedTestConfig.getStreamQueryPropsTransform().apply(getResourceAsString(QUERIES_FILE)); + final String querySpec = generatedTestConfig.getStreamQueryPropsTransform() + .apply(getResourceAsString(QUERIES_FILE)); // this query will probably be answered from the indexing tasks but possibly from 2 historical segments / 2 indexing this.queryHelper.testQueriesFromString(querySpec, 2); LOG.info("Shutting down supervisor"); indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId()); + // Clear supervisor ID to not shutdown again. + generatedTestConfig.setSupervisorId(null); // wait for all indexing tasks to finish LOG.info("Waiting for all indexing tasks to finish"); ITRetryUtil.retryUntilTrue( @@ -358,22 +503,16 @@ long getSumOfEventSequence(int numEvents) return (numEvents * (1 + numEvents)) / 2; } - private void doMethodTeardown(GeneratedTestConfig generatedTestConfig, StreamEventWriter streamEventWriter) + private void doMethodTeardown(GeneratedTestConfig generatedTestConfig) { - try { - streamEventWriter.flush(); - streamEventWriter.shutdown(); - } - catch (Exception e) { - // Best effort cleanup as the writer may have already been cleanup - LOG.warn(e, "Failed to cleanup writer. This might be expected depending on the test method"); - } - try { - indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId()); - } - catch (Exception e) { - // Best effort cleanup as the supervisor may have already been cleanup - LOG.warn(e, "Failed to cleanup supervisor. This might be expected depending on the test method"); + if (generatedTestConfig.getSupervisorId() != null) { + try { + indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId()); + } + catch (Exception e) { + // Best effort cleanup as the supervisor may have already been cleanup + LOG.warn(e, "Failed to cleanup supervisor. This might be expected depending on the test method"); + } } try { unloader(generatedTestConfig.getFullDatasourceName()); @@ -393,17 +532,20 @@ private void doMethodTeardown(GeneratedTestConfig generatedTestConfig, StreamEve private class GeneratedTestConfig { - private String streamName; - private String fullDatasourceName; + private final String streamName; + private final String fullDatasourceName; private String supervisorId; private Function streamIngestionPropsTransform; private Function streamQueryPropsTransform; - GeneratedTestConfig() throws Exception + GeneratedTestConfig(String parserType, String parserOrInputFormat) throws Exception { streamName = getTestNamePrefix() + "_index_test_" + UUID.randomUUID(); String datasource = getTestNamePrefix() + "_indexing_service_test_" + UUID.randomUUID(); - Map tags = ImmutableMap.of(STREAM_EXPIRE_TAG, Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis())); + Map tags = ImmutableMap.of( + STREAM_EXPIRE_TAG, + Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis()) + ); streamAdminClient.createStream(streamName, STREAM_SHARD_COUNT, tags); ITRetryUtil.retryUntil( () -> streamAdminClient.isStreamActive(streamName), @@ -413,7 +555,13 @@ private class GeneratedTestConfig "Wait for stream active" ); fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix(); - streamIngestionPropsTransform = generateStreamIngestionPropsTransform(streamName, fullDatasourceName, config); + streamIngestionPropsTransform = generateStreamIngestionPropsTransform( + streamName, + fullDatasourceName, + parserType, + parserOrInputFormat, + config + ); streamQueryPropsTransform = generateStreamQueryPropsTransform(streamName, fullDatasourceName); } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java index 99713a729847..a3b845b87004 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java @@ -21,7 +21,6 @@ import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.tests.TestNGGroup; -import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Guice; import org.testng.annotations.Test; @@ -30,12 +29,6 @@ @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITKafkaIndexingServiceNonTransactionalSerializedTest extends AbstractKafkaIndexingServiceTest { - @Override - protected boolean isKafkaWriterTransactionalEnabled() - { - return false; - } - @Override public String getTestNamePrefix() { @@ -48,19 +41,13 @@ public void beforeClass() throws Exception doBeforeClass(); } - @AfterClass - public void tearDown() - { - doClassTeardown(); - } - /** * This test must be run individually since the test affect and modify the state of the Druid cluster */ @Test public void testKafkaIndexDataWithLosingCoordinator() throws Exception { - doTestIndexDataWithLosingCoordinator(); + doTestIndexDataWithLosingCoordinator(false); } /** @@ -69,7 +56,7 @@ public void testKafkaIndexDataWithLosingCoordinator() throws Exception @Test public void testKafkaIndexDataWithLosingOverlord() throws Exception { - doTestIndexDataWithLosingOverlord(); + doTestIndexDataWithLosingOverlord(false); } /** @@ -78,6 +65,6 @@ public void testKafkaIndexDataWithLosingOverlord() throws Exception @Test public void testKafkaIndexDataWithLosingHistorical() throws Exception { - doTestIndexDataWithLosingHistorical(); + doTestIndexDataWithLosingHistorical(false); } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java index 06bcf050d678..fdd06ff4f883 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java @@ -21,7 +21,6 @@ import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.tests.TestNGGroup; -import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Guice; import org.testng.annotations.Test; @@ -30,12 +29,6 @@ @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITKafkaIndexingServiceTransactionalSerializedTest extends AbstractKafkaIndexingServiceTest { - @Override - protected boolean isKafkaWriterTransactionalEnabled() - { - return true; - } - @Override public String getTestNamePrefix() { @@ -48,19 +41,13 @@ public void beforeClass() throws Exception doBeforeClass(); } - @AfterClass - public void tearDown() - { - doClassTeardown(); - } - /** * This test must be run individually since the test affect and modify the state of the Druid cluster */ @Test public void testKafkaIndexDataWithLosingCoordinator() throws Exception { - doTestIndexDataWithLosingCoordinator(); + doTestIndexDataWithLosingCoordinator(true); } /** @@ -69,7 +56,7 @@ public void testKafkaIndexDataWithLosingCoordinator() throws Exception @Test public void testKafkaIndexDataWithLosingOverlord() throws Exception { - doTestIndexDataWithLosingOverlord(); + doTestIndexDataWithLosingOverlord(true); } /** @@ -78,6 +65,6 @@ public void testKafkaIndexDataWithLosingOverlord() throws Exception @Test public void testKafkaIndexDataWithLosingHistorical() throws Exception { - doTestIndexDataWithLosingHistorical(); + doTestIndexDataWithLosingHistorical(true); } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java index 8e64abb65560..fed13619dc06 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java @@ -21,7 +21,6 @@ import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.tests.TestNGGroup; -import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Guice; import org.testng.annotations.Test; @@ -42,19 +41,13 @@ public void beforeClass() throws Exception doBeforeClass(); } - @AfterClass - public void tearDown() - { - doClassTeardown(); - } - /** * This test must be run individually since the test affect and modify the state of the Druid cluster */ @Test public void testKinesisIndexDataWithLosingCoordinator() throws Exception { - doTestIndexDataWithLosingCoordinator(); + doTestIndexDataWithLosingCoordinator(false); } /** @@ -63,7 +56,7 @@ public void testKinesisIndexDataWithLosingCoordinator() throws Exception @Test public void testKinesisIndexDataWithLosingOverlord() throws Exception { - doTestIndexDataWithLosingOverlord(); + doTestIndexDataWithLosingOverlord(false); } /** @@ -72,6 +65,6 @@ public void testKinesisIndexDataWithLosingOverlord() throws Exception @Test public void testKinesisIndexDataWithLosingHistorical() throws Exception { - doTestIndexDataWithLosingHistorical(); + doTestIndexDataWithLosingHistorical(false); } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java new file mode 100644 index 000000000000..9143d9b38b76 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.parallelized; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.tests.TestNGGroup; +import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest; +import org.apache.druid.tests.indexer.AbstractStreamIndexingTest; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@Test(groups = TestNGGroup.KAFKA_DATA_FORMAT) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITKafkaIndexingServiceDataFormatTest extends AbstractKafkaIndexingServiceTest +{ + private static final boolean TRANSACTION_DISABLED = false; + private static final boolean TRANSACTION_ENABLED = true; + + /** + * Generates test parameters based on the given resources. The resources should be structured as + * + *
{@code
+   * {RESOURCES_ROOT}/stream/data/{DATA_FORMAT}/serializer
+   *                                           /input_format
+   *                                           /parser
+   * }
+ * + * The {@code serializer} directory contains the spec of {@link org.apache.druid.testing.utils.EventSerializer} and + * must be present. Either {@code input_format} or {@code parser} directory should be present if {@code serializer} + * is present. + */ + @DataProvider(parallel = true) + public static Object[][] resources() throws IOException + { + final List resources = new ArrayList<>(); + final List dataFormats = listDataFormatResources(); + for (String eachFormat : dataFormats) { + final Map spec = findTestSpecs(String.join("/", DATA_RESOURCE_ROOT, eachFormat)); + final String serializerPath = spec.get(AbstractStreamIndexingTest.SERIALIZER); + spec.forEach((k, path) -> { + if (!AbstractStreamIndexingTest.SERIALIZER.equals(k)) { + resources.add(new Object[]{TRANSACTION_DISABLED, serializerPath, k, path}); + resources.add(new Object[]{TRANSACTION_ENABLED, serializerPath, k, path}); + } + }); + } + + return resources.toArray(new Object[0][]); + } + + @Inject + private @Json ObjectMapper jsonMapper; + + @BeforeClass + public void beforeClass() throws Exception + { + doBeforeClass(); + } + + @Test(dataProvider = "resources") + public void testIndexData(boolean transactionEnabled, String serializerPath, String parserType, String specPath) + throws Exception + { + doTestIndexDataStableState(transactionEnabled, serializerPath, parserType, specPath); + } + + @Override + public String getTestNamePrefix() + { + return "kafka_data_format"; + } +} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java index 199530e0a32a..2c648ea630b2 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java @@ -22,7 +22,6 @@ import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.tests.TestNGGroup; import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest; -import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Guice; import org.testng.annotations.Test; @@ -31,12 +30,6 @@ @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITKafkaIndexingServiceNonTransactionalParallelizedTest extends AbstractKafkaIndexingServiceTest { - @Override - protected boolean isKafkaWriterTransactionalEnabled() - { - return false; - } - @Override public String getTestNamePrefix() { @@ -49,32 +42,6 @@ public void beforeClass() throws Exception doBeforeClass(); } - @AfterClass - public void tearDown() - { - doClassTeardown(); - } - - /** - * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource - * and supervisor maintained and scoped within this test only - */ - @Test - public void testKafkaIndexDataWithLegacyParserStableState() throws Exception - { - doTestIndexDataWithLegacyParserStableState(); - } - - /** - * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource - * and supervisor maintained and scoped within this test only - */ - @Test - public void testKafkaIndexDataWithInputFormatStableState() throws Exception - { - doTestIndexDataWithInputFormatStableState(); - } - /** * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource * and supervisor maintained and scoped within this test only @@ -82,7 +49,7 @@ public void testKafkaIndexDataWithInputFormatStableState() throws Exception @Test public void testKafkaIndexDataWithStartStopSupervisor() throws Exception { - doTestIndexDataWithStartStopSupervisor(); + doTestIndexDataWithStartStopSupervisor(false); } /** @@ -92,6 +59,6 @@ public void testKafkaIndexDataWithStartStopSupervisor() throws Exception @Test public void testKafkaIndexDataWithKafkaReshardSplit() throws Exception { - doTestIndexDataWithStreamReshardSplit(); + doTestIndexDataWithStreamReshardSplit(false); } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java index 7db3a7fd832c..d61e977a7081 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java @@ -22,7 +22,6 @@ import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.tests.TestNGGroup; import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest; -import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Guice; import org.testng.annotations.Test; @@ -31,12 +30,6 @@ @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITKafkaIndexingServiceTransactionalParallelizedTest extends AbstractKafkaIndexingServiceTest { - @Override - protected boolean isKafkaWriterTransactionalEnabled() - { - return true; - } - @Override public String getTestNamePrefix() { @@ -49,32 +42,6 @@ public void beforeClass() throws Exception doBeforeClass(); } - @AfterClass - public void tearDown() - { - doClassTeardown(); - } - - /** - * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource - * and supervisor maintained and scoped within this test only - */ - @Test - public void testKafkaIndexDataWithLegacyParserStableState() throws Exception - { - doTestIndexDataWithLegacyParserStableState(); - } - - /** - * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource - * and supervisor maintained and scoped within this test only - */ - @Test - public void testKafkaIndexDataWithInputFormatStableState() throws Exception - { - doTestIndexDataWithInputFormatStableState(); - } - /** * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource * and supervisor maintained and scoped within this test only @@ -82,7 +49,7 @@ public void testKafkaIndexDataWithInputFormatStableState() throws Exception @Test public void testKafkaIndexDataWithStartStopSupervisor() throws Exception { - doTestIndexDataWithStartStopSupervisor(); + doTestIndexDataWithStartStopSupervisor(true); } /** @@ -92,6 +59,6 @@ public void testKafkaIndexDataWithStartStopSupervisor() throws Exception @Test public void testKafkaIndexDataWithKafkaReshardSplit() throws Exception { - doTestIndexDataWithStreamReshardSplit(); + doTestIndexDataWithStreamReshardSplit(true); } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java new file mode 100644 index 000000000000..c302cd1a9224 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.parallelized; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.tests.TestNGGroup; +import org.apache.druid.tests.indexer.AbstractKinesisIndexingServiceTest; +import org.apache.druid.tests.indexer.AbstractStreamIndexingTest; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@Test(groups = TestNGGroup.KINESIS_DATA_FORMAT) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITKinesisIndexingServiceDataFormatTest extends AbstractKinesisIndexingServiceTest +{ + /** + * Generates test parameters based on the given resources. The resources should be structured as + * + *
{@code
+   * {RESOURCES_ROOT}/stream/data/{DATA_FORMAT}/serializer
+   *                                           /input_format
+   *                                           /parser
+   * }
+ * + * The {@code serializer} directory contains the spec of {@link org.apache.druid.testing.utils.EventSerializer} and + * must be present. Either {@code input_format} or {@code parser} directory should be present if {@code serializer} + * is present. + */ + @DataProvider(parallel = true) + public static Object[][] resources() throws IOException + { + final List resources = new ArrayList<>(); + final List dataFormats = listDataFormatResources(); + for (String eachFormat : dataFormats) { + final Map spec = findTestSpecs(String.join("/", DATA_RESOURCE_ROOT, eachFormat)); + final String serializerPath = spec.get(AbstractStreamIndexingTest.SERIALIZER); + spec.forEach((k, path) -> { + if (!AbstractStreamIndexingTest.SERIALIZER.equals(k)) { + resources.add(new Object[]{serializerPath, k, path}); + } + }); + } + + return resources.toArray(new Object[0][]); + } + + @Inject + private @Json + ObjectMapper jsonMapper; + + @BeforeClass + public void beforeClass() throws Exception + { + doBeforeClass(); + } + + @Test(dataProvider = "resources") + public void testIndexData(String serializerPath, String parserType, String specPath) + throws Exception + { + doTestIndexDataStableState(false, serializerPath, parserType, specPath); + } + + @Override + public String getTestNamePrefix() + { + return "kinesis_data_format"; + } +} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java index 38816dc1328f..efd107fa1aa2 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java @@ -22,7 +22,6 @@ import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.tests.TestNGGroup; import org.apache.druid.tests.indexer.AbstractKinesisIndexingServiceTest; -import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Guice; import org.testng.annotations.Test; @@ -43,32 +42,6 @@ public void beforeClass() throws Exception doBeforeClass(); } - @AfterClass - public void tearDown() - { - doClassTeardown(); - } - - /** - * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource - * and supervisor maintained and scoped within this test only - */ - @Test - public void testKinesisIndexDataWithLegacyParserStableState() throws Exception - { - doTestIndexDataWithLegacyParserStableState(); - } - - /** - * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource - * and supervisor maintained and scoped within this test only - */ - @Test - public void testKinesisIndexDataWithInputFormatStableState() throws Exception - { - doTestIndexDataWithInputFormatStableState(); - } - /** * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource * and supervisor maintained and scoped within this test only @@ -76,7 +49,7 @@ public void testKinesisIndexDataWithInputFormatStableState() throws Exception @Test public void testKinesisIndexDataWithStartStopSupervisor() throws Exception { - doTestIndexDataWithStartStopSupervisor(); + doTestIndexDataWithStartStopSupervisor(false); } /** @@ -86,7 +59,7 @@ public void testKinesisIndexDataWithStartStopSupervisor() throws Exception @Test public void testKinesisIndexDataWithKinesisReshardSplit() throws Exception { - doTestIndexDataWithStreamReshardSplit(); + doTestIndexDataWithStreamReshardSplit(false); } /** @@ -96,6 +69,6 @@ public void testKinesisIndexDataWithKinesisReshardSplit() throws Exception @Test public void testKinesisIndexDataWithKinesisReshardMerge() throws Exception { - doTestIndexDataWithStreamReshardMerge(); + doTestIndexDataWithStreamReshardMerge(false); } } diff --git a/integration-tests/src/test/resources/indexer/stream_supervisor_spec_legacy_parser.json b/integration-tests/src/test/resources/indexer/stream_supervisor_spec_legacy_parser.json deleted file mode 100644 index 623aadf6583b..000000000000 --- a/integration-tests/src/test/resources/indexer/stream_supervisor_spec_legacy_parser.json +++ /dev/null @@ -1,61 +0,0 @@ -{ - "type": "%%STREAM_TYPE%%", - "dataSchema": { - "dataSource": "%%DATASOURCE%%", - "parser": { - "type": "string", - "parseSpec": { - "format": "json", - "timestampSpec": { - "column": "timestamp", - "format": "auto" - }, - "dimensionsSpec": { - "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"], - "dimensionExclusions": [], - "spatialDimensions": [] - } - } - }, - "metricsSpec": [ - { - "type": "count", - "name": "count" - }, - { - "type": "doubleSum", - "name": "added", - "fieldName": "added" - }, - { - "type": "doubleSum", - "name": "deleted", - "fieldName": "deleted" - }, - { - "type": "doubleSum", - "name": "delta", - "fieldName": "delta" - } - ], - "granularitySpec": { - "type": "uniform", - "segmentGranularity": "MINUTE", - "queryGranularity": "NONE" - } - }, - "tuningConfig": { - "type": "%%STREAM_TYPE%%", - "intermediatePersistPeriod": "PT30S", - "maxRowsPerSegment": 5000000, - "maxRowsInMemory": 500000 - }, - "ioConfig": { - "%%TOPIC_KEY%%": "%%TOPIC_VALUE%%", - "%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%, - "taskCount": 2, - "replicas": 1, - "taskDuration": "PT5M", - "%%USE_EARLIEST_KEY%%": true - } -} diff --git a/integration-tests/src/test/resources/stream/data/avro/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/avro/parser/input_row_parser.json new file mode 100644 index 000000000000..4feea54b39f9 --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/avro/parser/input_row_parser.json @@ -0,0 +1,39 @@ +{ + "type": "avro_stream", + "avroBytesDecoder" : { + "type": "schema_inline", + "schema": { + "namespace": "org.apache.druid", + "name": "wikipedia", + "type": "record", + "fields": [ + { "name": "timestamp", "type": "string" }, + { "name": "page", "type": "string" }, + { "name": "language", "type": "string" }, + { "name": "user", "type": "string" }, + { "name": "unpatrolled", "type": "string" }, + { "name": "newPage", "type": "string" }, + { "name": "robot", "type": "string" }, + { "name": "anonymous", "type": "string" }, + { "name": "namespace", "type": "string" }, + { "name": "continent", "type": "string" }, + { "name": "country", "type": "string" }, + { "name": "region", "type": "string" }, + { "name": "city", "type": "string" }, + { "name": "added", "type": "long" }, + { "name": "deleted", "type": "long" }, + { "name": "delta", "type": "long" } + ] + } + }, + "parseSpec": { + "format": "avro", + "timestampSpec": { + "column": "timestamp", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"] + } + } +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/stream/data/avro/serializer/serializer.json b/integration-tests/src/test/resources/stream/data/avro/serializer/serializer.json new file mode 100644 index 000000000000..20b3462c2747 --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/avro/serializer/serializer.json @@ -0,0 +1,3 @@ +{ + "type": "avro" +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/stream/data/csv/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/csv/input_format/input_format.json new file mode 100644 index 000000000000..914306976e33 --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/csv/input_format/input_format.json @@ -0,0 +1,4 @@ +{ + "type" : "csv", + "columns": ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"] +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/stream/data/csv/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/csv/parser/input_row_parser.json new file mode 100644 index 000000000000..11fa718a9346 --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/csv/parser/input_row_parser.json @@ -0,0 +1,16 @@ +{ + "type": "string", + "parseSpec": { + "format": "csv", + "timestampSpec": { + "column": "timestamp", + "format": "auto" + }, + "columns" : ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"], + "dimensionsSpec": { + "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"], + "dimensionExclusions": [], + "spatialDimensions": [] + } + } +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/stream/data/csv/serializer/serializer.json b/integration-tests/src/test/resources/stream/data/csv/serializer/serializer.json new file mode 100644 index 000000000000..b7dd0d8a0785 --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/csv/serializer/serializer.json @@ -0,0 +1,3 @@ +{ + "type": "csv" +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/stream/data/json/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/json/input_format/input_format.json new file mode 100644 index 000000000000..cfb17e2dd200 --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/json/input_format/input_format.json @@ -0,0 +1,3 @@ +{ + "type" : "json" +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/stream/data/json/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/json/parser/input_row_parser.json new file mode 100644 index 000000000000..b5fa892dee2e --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/json/parser/input_row_parser.json @@ -0,0 +1,15 @@ +{ + "type": "string", + "parseSpec": { + "format": "json", + "timestampSpec": { + "column": "timestamp", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"], + "dimensionExclusions": [], + "spatialDimensions": [] + } + } +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/stream/data/json/serializer/serializer.json b/integration-tests/src/test/resources/stream/data/json/serializer/serializer.json new file mode 100644 index 000000000000..126d77baca55 --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/json/serializer/serializer.json @@ -0,0 +1,3 @@ +{ + "type": "json" +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/stream_supervisor_spec_input_format.json b/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json similarity index 95% rename from integration-tests/src/test/resources/indexer/stream_supervisor_spec_input_format.json rename to integration-tests/src/test/resources/stream/data/supervisor_spec_template.json index ce9bedc84431..9943431a05a4 100644 --- a/integration-tests/src/test/resources/indexer/stream_supervisor_spec_input_format.json +++ b/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json @@ -2,6 +2,7 @@ "type": "%%STREAM_TYPE%%", "dataSchema": { "dataSource": "%%DATASOURCE%%", + "parser": %%PARSER%%, "timestampSpec": { "column": "timestamp", "format": "auto" @@ -51,8 +52,6 @@ "replicas": 1, "taskDuration": "PT5M", "%%USE_EARLIEST_KEY%%": true, - "inputFormat" : { - "type" : "json" - } + "inputFormat" : %%INPUT_FORMAT%% } } diff --git a/integration-tests/src/test/resources/stream/data/tsv/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/tsv/input_format/input_format.json new file mode 100644 index 000000000000..ece2968001ca --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/tsv/input_format/input_format.json @@ -0,0 +1,4 @@ +{ + "type" : "tsv", + "columns": ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"] +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/stream/data/tsv/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/tsv/parser/input_row_parser.json new file mode 100644 index 000000000000..c60a0c162b19 --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/tsv/parser/input_row_parser.json @@ -0,0 +1,16 @@ +{ + "type": "string", + "parseSpec": { + "format": "tsv", + "timestampSpec": { + "column": "timestamp", + "format": "auto" + }, + "columns" : ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"], + "dimensionsSpec": { + "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"], + "dimensionExclusions": [], + "spatialDimensions": [] + } + } +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/stream/data/tsv/serializer/serializer.json b/integration-tests/src/test/resources/stream/data/tsv/serializer/serializer.json new file mode 100644 index 000000000000..731ad412a028 --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/tsv/serializer/serializer.json @@ -0,0 +1,3 @@ +{ + "type": "tsv" +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/stream_index_queries.json b/integration-tests/src/test/resources/stream/queries/stream_index_queries.json similarity index 100% rename from integration-tests/src/test/resources/indexer/stream_index_queries.json rename to integration-tests/src/test/resources/stream/queries/stream_index_queries.json diff --git a/integration-tests/src/test/resources/testng.xml b/integration-tests/src/test/resources/testng.xml index 5a0735a05782..88c64158978b 100644 --- a/integration-tests/src/test/resources/testng.xml +++ b/integration-tests/src/test/resources/testng.xml @@ -20,7 +20,7 @@ - +