Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

METRON-672: SolrIndexingIntegrationTest fails intermittently #424

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.apache.metron.common.configuration.ConfigurationsUtils.*;

Expand All @@ -38,6 +40,7 @@ public class ConfigUploadComponent implements InMemoryComponent {
private String enrichmentConfigsPath;
private String indexingConfigsPath;
private String profilerConfigPath;
private Optional<Consumer<ConfigUploadComponent>> postStartCallback = Optional.empty();
private Optional<String> globalConfig = Optional.empty();
private Map<String, SensorParserConfig> parserSensorConfigs = new HashMap<>();
public ConfigUploadComponent withTopologyProperties(Properties topologyProperties) {
Expand Down Expand Up @@ -78,6 +81,47 @@ public ConfigUploadComponent withGlobalConfig(String globalConfig) {
return this;
}

public ConfigUploadComponent withPostStartCallback(Consumer<ConfigUploadComponent> f) {
this.postStartCallback = Optional.ofNullable(f);
return this;
}

public Properties getTopologyProperties() {
return topologyProperties;
}

public String getGlobalConfigPath() {
return globalConfigPath;
}

public String getParserConfigsPath() {
return parserConfigsPath;
}

public String getEnrichmentConfigsPath() {
return enrichmentConfigsPath;
}

public String getIndexingConfigsPath() {
return indexingConfigsPath;
}

public String getProfilerConfigPath() {
return profilerConfigPath;
}

public Optional<Consumer<ConfigUploadComponent>> getPostStartCallback() {
return postStartCallback;
}

public Optional<String> getGlobalConfig() {
return globalConfig;
}

public Map<String, SensorParserConfig> getParserSensorConfigs() {
return parserSensorConfigs;
}

@Override
public void start() throws UnableToStartException {
try {
Expand All @@ -99,6 +143,9 @@ public void start() throws UnableToStartException {
if(globalConfig.isPresent()) {
writeGlobalConfigToZookeeper(globalConfig.get().getBytes(), zookeeperUrl);
}
if(postStartCallback.isPresent()) {
postStartCallback.get().accept(this);
}

} catch (Exception e) {
throw new UnableToStartException(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import org.apache.curator.framework.CuratorFramework;
import org.apache.metron.TestConstants;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.interfaces.FieldNameConverter;
import org.apache.metron.common.spout.kafka.SpoutConfig;
import org.apache.metron.common.utils.JSONUtils;
Expand All @@ -37,13 +39,18 @@
import org.apache.metron.integration.components.ZKServerComponent;
import org.apache.metron.integration.utils.TestUtils;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.zookeeper.KeeperException;
import org.junit.Assert;
import org.junit.Test;

import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.metron.common.configuration.ConfigurationsUtils.getClient;

public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
protected String hdfsDir = "target/indexingIntegrationTest/hdfs";
Expand Down Expand Up @@ -139,12 +146,22 @@ public void test() throws Exception {
inputDocs.add(m);

}
final AtomicBoolean isLoaded = new AtomicBoolean(false);
ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
.withTopologyProperties(topologyProperties)
.withGlobalConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
.withEnrichmentConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
.withIndexingConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
;
.withPostStartCallback(component -> {
try {
waitForIndex(component.getTopologyProperties().getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY));
} catch (Exception e) {
e.printStackTrace();
}
isLoaded.set(true);
}
);

FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
.withTopologyLocation(new File(fluxPath))
.withTopologyName("test")
Expand All @@ -166,10 +183,11 @@ public void test() throws Exception {
runner.start();

try {
while(!isLoaded.get()) {
Thread.sleep(100);
}
fluxComponent.submitTopology();

kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, inputMessages);
StringBuffer buffer = new StringBuffer();
List<Map<String, Object>> docs = cleanDocs(runner.process(getProcessor(inputMessages)));
Assert.assertEquals(docs.size(), inputMessages.size());
//assert that our input docs are equivalent to the output docs, converting the input docs keys based
Expand All @@ -184,6 +202,23 @@ public void test() throws Exception {
}
}

private void waitForIndex(String zookeeperQuorum) throws Exception {
try(CuratorFramework client = getClient(zookeeperQuorum)) {
client.start();
byte[] bytes = null;
do {
try {
bytes = ConfigurationsUtils.readSensorIndexingConfigBytesFromZookeeper(testSensorType, client);
Thread.sleep(1000);
}
catch(KeeperException.NoNodeException nne) {
//kindly ignore because the path might not exist just yet.
}
}
while(bytes == null || bytes.length == 0);
}
}

public List<Map<String, Object>> cleanDocs(ProcessorResult<List<Map<String, Object>>> result) {
List<Map<String,Object>> docs = result.getResult();
StringBuffer buffer = new StringBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,7 @@ protected static KafkaComponent getKafkaComponent(final Properties topologyPrope

protected static ZKServerComponent getZKServerComponent(final Properties topologyProperties) {
return new ZKServerComponent()
.withPostStartCallback(new Function<ZKServerComponent, Void>() {
@Nullable
@Override
public Void apply(@Nullable ZKServerComponent zkComponent) {
topologyProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString());
return null;
}
});
.withPostStartCallback((zkComponent) -> topologyProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,43 @@
import org.apache.metron.integration.UnableToStartException;
import org.apache.curator.test.TestingServer;
import java.util.Map;
public class ZKServerComponent implements InMemoryComponent{
public static final String ZOOKEEPER_PROPERTY = "kafka.zk";
private TestingServer testZkServer;
private String zookeeperUrl = null;
private Map<String,String> properties = null;
private Function<ZKServerComponent, Void> postStartCallback;
public String getConnectionString()
{
return this.zookeeperUrl;
}
public ZKServerComponent withPostStartCallback(Function<ZKServerComponent, Void> f) {
postStartCallback = f;
return this;
}
import java.util.Optional;
import java.util.function.Consumer;

@Override
public void start() throws UnableToStartException {
try {
testZkServer = new TestingServer(true);
zookeeperUrl = testZkServer.getConnectString();
if(postStartCallback != null) {
postStartCallback.apply(this);
}
}catch(Exception e){
throw new UnableToStartException("Unable to start TestingServer",e);
}
}
public class ZKServerComponent implements InMemoryComponent {
public static final String ZOOKEEPER_PROPERTY = "kafka.zk";
private TestingServer testZkServer;
private String zookeeperUrl = null;
private Map<String,String> properties = null;
private Optional<Consumer<ZKServerComponent>> postStartCallback = Optional.empty();
public String getConnectionString()
{
return this.zookeeperUrl;
}
public ZKServerComponent withPostStartCallback(Consumer<ZKServerComponent> f) {
postStartCallback = Optional.ofNullable(f);
return this;
}

@Override
public void stop() {
try {
if (testZkServer != null) {
testZkServer.close();
}
}catch(Exception e){}
@Override
public void start() throws UnableToStartException {
try {
testZkServer = new TestingServer(true);
zookeeperUrl = testZkServer.getConnectString();
if(postStartCallback.isPresent()) {
postStartCallback.get().accept(this);
}
}catch(Exception e){
throw new UnableToStartException("Unable to start TestingServer",e);
}
}

@Override
public void stop() {
try {
if (testZkServer != null) {
testZkServer.close();
}
}catch(Exception e){}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,9 @@ public void testTopology(Function<Properties, Void> updatePropertiesCallback
}};
updatePropertiesCallback.apply(topologyProperties);

final ZKServerComponent zkServerComponent = new ZKServerComponent().withPostStartCallback(new Function<ZKServerComponent, Void>() {
@Nullable
@Override
public Void apply(@Nullable ZKServerComponent zkComponent) {
topologyProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString());
return null;
}
});
final ZKServerComponent zkServerComponent = new ZKServerComponent().withPostStartCallback(
(zkComponent) -> topologyProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString())
);
final KafkaComponent kafkaComponent = new KafkaComponent().withTopics(new ArrayList<KafkaComponent.Topic>() {{
add(new KafkaComponent.Topic(KAFKA_TOPIC, 1));
}}).withTopologyProperties(topologyProperties);
Expand Down