Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

METRON-2301 Building Against Wrong Storm Flux Version #1544

Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -72,7 +72,7 @@
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>flux-core</artifactId>
<version>${global_flux_version}</version>
<version>${global_storm_version}</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
Expand Down
Expand Up @@ -62,7 +62,7 @@
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>flux-core</artifactId>
<version>${global_flux_version}</version>
<version>${global_storm_version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
Expand Down
2 changes: 1 addition & 1 deletion metron-platform/metron-integration-test/pom.xml
Expand Up @@ -45,7 +45,7 @@
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>flux-core</artifactId>
<version>${global_flux_version}</version>
<version>${global_storm_version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
Expand Down
Expand Up @@ -17,22 +17,6 @@
*/
package org.apache.metron.integration.components;

import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Comparator;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
Expand All @@ -55,6 +39,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.InvocationTargetException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Comparator;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;

public class FluxTopologyComponent implements InMemoryComponent {

protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
Expand Down Expand Up @@ -249,7 +246,7 @@ public void submitTopology() throws NoSuchMethodException, IOException, Instanti
}

private void startTopology(String topologyName, File topologyLoc, File templateFile, Properties properties) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, TException, NoSuchFieldException{
TopologyDef topologyDef = loadYaml(topologyName, topologyLoc, templateFile, properties);
TopologyDef topologyDef = loadYaml(topologyLoc, templateFile, properties);
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
Expand All @@ -267,26 +264,30 @@ private void startTopology(String topologyName, File topologyLoc, File templateF
}
}

private static TopologyDef loadYaml(String topologyName, File yamlFile, File templateFile, Properties properties) throws IOException {
File tmpFile = File.createTempFile(topologyName, "props");
tmpFile.deleteOnExit();
/**
* Creates a Storm topology.
* @param yamlFile The Flux file defining the topology.
* @param templateFile The template file used by the Mpack to create the topology's properties. For example, 'enrichment.properties.j2'.
* @param properties The topology properties.
* @return The Storm topology.
* @throws IOException
*/
private static TopologyDef loadYaml(File yamlFile, File templateFile, Properties properties) throws IOException {
Properties topologyProperties;
if (templateFile != null) {
try (Writer propWriter = new OutputStreamWriter(new FileOutputStream(tmpFile), StandardCharsets.UTF_8)){
String templateContents = FileUtils.readFileToString(templateFile);
for(Map.Entry prop: properties.entrySet()) {
String replacePattern = String.format("{{%s}}", prop.getKey());
templateContents = templateContents.replaceAll(Pattern.quote(replacePattern), (String) prop.getValue());
}
propWriter.write(templateContents);
propWriter.flush();
return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false);
// use the MPack template file (like 'enrichment.properties.j2') to generate the topology properties
String templateContents = FileUtils.readFileToString(templateFile);
for(Map.Entry prop: properties.entrySet()) {
String replacePattern = String.format("{{%s}}", prop.getKey());
templateContents = templateContents.replaceAll(Pattern.quote(replacePattern), (String) prop.getValue());
}
topologyProperties = new Properties();
topologyProperties.load(new StringReader(templateContents));
return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, topologyProperties, false);

} else {
try (Writer propWriter = new OutputStreamWriter(new FileOutputStream(tmpFile), StandardCharsets.UTF_8)){
properties.store(propWriter, topologyName + " properties");
return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false);
}
// otherwise, just use the properties directly
return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, properties, false);
}

}
}
2 changes: 1 addition & 1 deletion metron-platform/metron-pcap-backend/pom.xml
Expand Up @@ -56,7 +56,7 @@
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>flux-core</artifactId>
<version>${global_flux_version}</version>
<version>${global_storm_version}</version>
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
Expand Down
2 changes: 1 addition & 1 deletion metron-platform/metron-solr/metron-solr-storm/pom.xml
Expand Up @@ -114,7 +114,7 @@
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>flux-core</artifactId>
<version>${global_flux_version}</version>
<version>${global_storm_version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
Expand Down
4 changes: 1 addition & 3 deletions pom.xml
Expand Up @@ -87,8 +87,6 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- base project versions -->
<base_storm_version>1.0.1</base_storm_version>
<base_flux_version>1.0.1</base_flux_version>
<base_kafka_version>0.10.0</base_kafka_version>
<base_hadoop_version>2.7.1</base_hadoop_version>
<base_flume_version>1.5.2</base_flume_version>
Expand All @@ -102,7 +100,6 @@
<global_hbase_guava_version>12.0</global_hbase_guava_version>
<global_storm_version>1.0.3</global_storm_version>
<global_storm_kafka_version>1.2.2</global_storm_kafka_version>
<global_flux_version>${base_flux_version}</global_flux_version>
<global_pcap_version>1.7.1</global_pcap_version>
<global_hadoop_version>${base_hadoop_version}</global_hadoop_version>
<global_flume_version>${base_flume_version}</global_flume_version>
Expand Down Expand Up @@ -165,6 +162,7 @@
<global_hbase_version>1.1.1</global_hbase_version>
<global_hbase_guava_version>12.0</global_hbase_guava_version>
<global_storm_kafka_version>1.2.2</global_storm_kafka_version>
<base_storm_version>1.0.1</base_storm_version>
<global_storm_version>${base_storm_version}.${hdp_version}-${build_number}</global_storm_version>
<global_kafka_version>${base_kafka_version}.${hdp_version}-${build_number}</global_kafka_version>
<global_zeppelin_version>0.7.3</global_zeppelin_version>
Expand Down