Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

METRON-672: SolrIndexingIntegrationTest fails intermittently #424

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Function;

import static org.apache.metron.common.configuration.ConfigurationsUtils.*;

Expand All @@ -38,6 +39,7 @@ public class ConfigUploadComponent implements InMemoryComponent {
private String enrichmentConfigsPath;
private String indexingConfigsPath;
private String profilerConfigPath;
private Optional<Function<ConfigUploadComponent, Void>> postStartCallback = Optional.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this just use Consumer instead of Function? Since the second type parameter is Void, it seems like the Function is just being a Consumer anyway

private Optional<String> globalConfig = Optional.empty();
private Map<String, SensorParserConfig> parserSensorConfigs = new HashMap<>();
public ConfigUploadComponent withTopologyProperties(Properties topologyProperties) {
Expand Down Expand Up @@ -78,6 +80,47 @@ public ConfigUploadComponent withGlobalConfig(String globalConfig) {
return this;
}

public ConfigUploadComponent withPostStartCallback(Function<ConfigUploadComponent, Void> f) {
this.postStartCallback = Optional.ofNullable(f);
return this;
}

public Properties getTopologyProperties() {
return topologyProperties;
}

public String getGlobalConfigPath() {
return globalConfigPath;
}

public String getParserConfigsPath() {
return parserConfigsPath;
}

public String getEnrichmentConfigsPath() {
return enrichmentConfigsPath;
}

public String getIndexingConfigsPath() {
return indexingConfigsPath;
}

public String getProfilerConfigPath() {
return profilerConfigPath;
}

public Optional<Function<ConfigUploadComponent, Void>> getPostStartCallback() {
return postStartCallback;
}

public Optional<String> getGlobalConfig() {
return globalConfig;
}

public Map<String, SensorParserConfig> getParserSensorConfigs() {
return parserSensorConfigs;
}

@Override
public void start() throws UnableToStartException {
try {
Expand All @@ -99,6 +142,9 @@ public void start() throws UnableToStartException {
if(globalConfig.isPresent()) {
writeGlobalConfigToZookeeper(globalConfig.get().getBytes(), zookeeperUrl);
}
if(postStartCallback.isPresent()) {
postStartCallback.get().apply(this);
}

} catch (Exception e) {
throw new UnableToStartException(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import org.apache.curator.framework.CuratorFramework;
import org.apache.metron.TestConstants;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.interfaces.FieldNameConverter;
import org.apache.metron.common.spout.kafka.SpoutConfig;
import org.apache.metron.common.utils.JSONUtils;
Expand All @@ -37,13 +39,18 @@
import org.apache.metron.integration.components.ZKServerComponent;
import org.apache.metron.integration.utils.TestUtils;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.zookeeper.KeeperException;
import org.junit.Assert;
import org.junit.Test;

import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.metron.common.configuration.ConfigurationsUtils.getClient;

public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
protected String hdfsDir = "target/indexingIntegrationTest/hdfs";
Expand Down Expand Up @@ -139,11 +146,22 @@ public void test() throws Exception {
inputDocs.add(m);

}
final AtomicBoolean isLoaded = new AtomicBoolean(false);
ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
.withTopologyProperties(topologyProperties)
.withGlobalConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
.withEnrichmentConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
.withIndexingConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
.withPostStartCallback(component -> {
try {
waitForIndex(component.getTopologyProperties().getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY));
} catch (Exception e) {
e.printStackTrace();
}
isLoaded.set(true);
return null;
}
);
;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you kill the extra semicolon?

FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
.withTopologyLocation(new File(fluxPath))
Expand All @@ -166,10 +184,11 @@ public void test() throws Exception {
runner.start();

try {
while(!isLoaded.get()) {
Thread.sleep(100);
}
fluxComponent.submitTopology();

kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, inputMessages);
StringBuffer buffer = new StringBuffer();
List<Map<String, Object>> docs = cleanDocs(runner.process(getProcessor(inputMessages)));
Assert.assertEquals(docs.size(), inputMessages.size());
//assert that our input docs are equivalent to the output docs, converting the input docs keys based
Expand All @@ -184,6 +203,26 @@ public void test() throws Exception {
}
}

private void waitForIndex(String zookeeperQuorum) throws Exception {
try(CuratorFramework client = getClient(zookeeperQuorum)) {
client.start();
byte[] bytes = null;
do {
try {
bytes = ConfigurationsUtils.readSensorIndexingConfigBytesFromZookeeper(testSensorType, client);
Thread.sleep(1000);
}
catch(KeeperException.NoNodeException nne) {
//kindly ignore because the path might not exist just yet.
}
}
while(bytes == null || bytes.length == 0);
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop the return, since it's a void method.

}


}

public List<Map<String, Object>> cleanDocs(ProcessorResult<List<Map<String, Object>>> result) {
List<Map<String,Object>> docs = result.getResult();
StringBuffer buffer = new StringBuffer();
Expand Down