Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,14 @@ jobs:
script: *run_integration_test
after_failure: *integration_test_diags

- &integration_kafka_format_tests
name: "(Compile=openjdk8, Run=openjdk8) Kafka index integration test with various formats"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=kafka-data-format' JVM_RUNTIME='-Djvm.runtime=8'
script: *run_integration_test
after_failure: *integration_test_diags

- &integration_query
name: "(Compile=openjdk8, Run=openjdk8) query integration test"
jdk: openjdk8
Expand Down Expand Up @@ -365,7 +373,7 @@ jobs:
name: "(Compile=openjdk8, Run=openjdk8) other integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=8'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format' JVM_RUNTIME='-Djvm.runtime=8'
script: *run_integration_test
after_failure: *integration_test_diags
# END - Integration tests for Compile with Java 8 and Run with Java 8
Expand Down Expand Up @@ -399,7 +407,7 @@ jobs:
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk11) other integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format' JVM_RUNTIME='-Djvm.runtime=11'
# END - Integration tests for Compile with Java 8 and Run with Java 11

- name: "security vulnerabilities"
Expand Down
5 changes: 4 additions & 1 deletion integration-tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,10 @@ Refer ITIndexerTest as an example on how to use dependency Injection
By default, test methods in a test class will be run in sequential order one at a time. Test methods for a given test
class can be set to run in parallel (multiple test methods of each class running at the same time) by excluding
the given class/package from the "AllSerializedTests" test tag section and including it in the "AllParallelizedTests"
test tag section in integration-tests/src/test/resources/testng.xml
test tag section in integration-tests/src/test/resources/testng.xml. TestNG uses two parameters, i.e.,
`thread-count` and `data-provider-thread-count`, for parallel test execution, which are set to 2 for Druid integration tests.
You may want to modify those values for faster execution.
See https://testng.org/doc/documentation-main.html#parallel-running and https://testng.org/doc/documentation-main.html#parameters-dataproviders for details.
Please be mindful when adding tests to the "AllParallelizedTests" test tag that the tests can run in parallel with
other tests from the same class at the same time. i.e. test does not modify/restart/stop the druid cluster or other dependency containers,
test does not use excessive memory starving other concurent task, test does not modify and/or use other task,
Expand Down
14 changes: 14 additions & 0 deletions integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@
<artifactId>amazon-kinesis-producer</artifactId>
<version>0.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
Expand Down Expand Up @@ -81,6 +89,12 @@
<version>${project.parent.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-avro-extensions</artifactId>
<version>${project.parent.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-s3-extensions</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public String submitSupervisor(String spec)
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while submitting supervisor to overlord, response [%s %s]",
"Error while submitting supervisor to overlord, response [%s: %s]",
response.getStatus(),
response.getContent()
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testing.utils;

import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.druid.java.util.common.Pair;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class AvroEventSerializer implements EventSerializer
{
public static final String TYPE = "avro";

private static final Schema SCHEMA = SchemaBuilder
.record("wikipedia")
.namespace("org.apache.druid")
.fields()
.requiredString("timestamp")
.requiredString("page")
.requiredString("language")
.requiredString("user")
.requiredString("unpatrolled")
.requiredString("newPage")
.requiredString("robot")
.requiredString("anonymous")
.requiredString("namespace")
.requiredString("continent")
.requiredString("country")
.requiredString("region")
.requiredString("city")
.requiredInt("added")
.requiredInt("deleted")
.requiredInt("delta")
.endRecord();

private final DatumWriter<Object> writer = new GenericDatumWriter<>(SCHEMA);

@Override
public byte[] serialize(List<Pair<String, Object>> event) throws IOException
{
final WikipediaRecord record = new WikipediaRecord();
event.forEach(pair -> record.put(pair.lhs, pair.rhs));
final ByteArrayOutputStream out = new ByteArrayOutputStream();
final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(record, encoder);
encoder.flush();
out.close();
return out.toByteArray();
}

@Override
public void close()
{
}

private static class WikipediaRecord implements GenericRecord
{
private final Map<String, Object> event = new HashMap<>();
private final BiMap<Integer, String> indexes = HashBiMap.create(SCHEMA.getFields().size());

private int nextIndex = 0;

@Override
public void put(String key, Object v)
{
event.put(key, v);
indexes.inverse().computeIfAbsent(key, k -> nextIndex++);
}

@Override
public Object get(String key)
{
return event.get(key);
}

@Override
public void put(int i, Object v)
{
final String key = indexes.get(i);
if (key == null) {
throw new IndexOutOfBoundsException();
}
put(key, v);
}

@Override
public Object get(int i)
{
final String key = indexes.get(i);
if (key == null) {
throw new IndexOutOfBoundsException();
}
return get(key);
}

@Override
public Schema getSchema()
{
return SCHEMA;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testing.utils;

import com.opencsv.CSVWriter;
import org.apache.druid.java.util.common.Pair;

import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.List;

public class CsvEventSerializer implements EventSerializer
{
public static final String TYPE = "csv";

private final ByteArrayOutputStream bos = new ByteArrayOutputStream();
private final CSVWriter writer = new CSVWriter(
new BufferedWriter(new OutputStreamWriter(bos, StandardCharsets.UTF_8))
);

@Override
public byte[] serialize(List<Pair<String, Object>> event) throws IOException
{
//noinspection ConstantConditions
writer.writeNext(event.stream().map(pair -> pair.rhs.toString()).toArray(String[]::new));
writer.flush();
final byte[] serialized = bos.toByteArray();
bos.reset();
return serialized;
}

@Override
public void close() throws IOException
{
writer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,25 @@

package org.apache.druid.testing.utils;

import org.apache.druid.java.util.common.DateTimes;
import org.joda.time.DateTime;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;

import java.util.UUID;
import java.util.List;
import java.util.stream.Collectors;

public class StreamVerifierEventGenerator extends SyntheticStreamGenerator
public class DelimitedEventSerializer implements EventSerializer
{
public StreamVerifierEventGenerator(int eventsPerSeconds, long cyclePaddingMs)
{
super(eventsPerSeconds, cyclePaddingMs);
}
public static final String TYPE = "tsv";

@Override
Object getEvent(int i, DateTime timestamp)
public byte[] serialize(List<Pair<String, Object>> event)
{
return StreamVerifierSyntheticEvent.of(
UUID.randomUUID().toString(),
timestamp.getMillis(),
DateTimes.nowUtc().getMillis(),
i,
i == getEventsPerSecond() ? getSumOfEventSequence(getEventsPerSecond()) : null,
i == 1
);
//noinspection ConstantConditions
return StringUtils.toUtf8(event.stream().map(pair -> pair.rhs.toString()).collect(Collectors.joining("\t")));
}


/**
* Assumes the first number in the sequence is 1, incrementing by 1, until numEvents.
*/
private long getSumOfEventSequence(int numEvents)
@Override
public void close()
{
return (numEvents * (1 + numEvents)) / 2;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testing.utils;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import org.apache.druid.java.util.common.Pair;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;

/**
* EventSerializer is for serializing an event into a byte array.
* This interface is used to write generated events on stream processing systems such as Kafka or Kinesis
* in integration tests.
*
* @see SyntheticStreamGenerator
* @see StreamEventWriter
*/
@JsonTypeInfo(use = Id.NAME, property = "type")
@JsonSubTypes(value = {
@Type(name = JsonEventSerializer.TYPE, value = JsonEventSerializer.class),
@Type(name = CsvEventSerializer.TYPE, value = CsvEventSerializer.class),
@Type(name = DelimitedEventSerializer.TYPE, value = DelimitedEventSerializer.class),
@Type(name = AvroEventSerializer.TYPE, value = AvroEventSerializer.class)
})
public interface EventSerializer extends Closeable
{
byte[] serialize(List<Pair<String, Object>> event) throws IOException;
}
Loading