From 1fee540969b2f192e58805cdca1ce0a666759af6 Mon Sep 17 00:00:00 2001 From: Michael Miklavcic Date: Tue, 7 Aug 2018 13:29:41 -0600 Subject: [PATCH 1/2] Handle null values in config backend more gracefully --- .../common/configuration/ConfigOption.java | 32 +++-- .../configuration/ConfigOptionTest.java | 112 ++++++++++++++++++ .../apache/metron/pcap/query/CliParser.java | 25 ++-- .../org/apache/metron/pcap/PcapJobTest.java | 23 ++++ .../apache/metron/pcap/query/PcapCliTest.java | 10 +- .../pcap/config/PcapGlobalDefaults.java | 28 +++++ .../metron/pcap/finalizer/PcapFinalizer.java | 8 +- .../pcap/finalizer/PcapRestFinalizer.java | 11 +- .../org/apache/metron/pcap/mr/PcapJob.java | 16 ++- 9 files changed, 232 insertions(+), 33 deletions(-) create mode 100644 metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigOptionTest.java create mode 100644 metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java index 8e4211bf1d..6308f0a185 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java @@ -18,36 +18,54 @@ package org.apache.metron.common.configuration; -import org.apache.metron.stellar.common.utils.ConversionUtils; - import java.util.Map; import java.util.function.BiFunction; +import org.apache.metron.stellar.common.utils.ConversionUtils; public interface ConfigOption { + String getKey(); + default BiFunction transform() { - return (s,o) -> o; + return (s, o) -> o; } default void put(Map map, Object value) { map.put(getKey(), value); } + default T getOrDefault(Map map, Class clazz, T defaultValue) { + T val; + return ((val = get(map, clazz)) == null ? defaultValue : val); + } + default T get(Map map, Class clazz) { Object obj = map.get(getKey()); - if(clazz.isInstance(obj)) { + if (clazz.isInstance(obj)) { return clazz.cast(obj); - } - else { + } else { return ConversionUtils.convert(obj, clazz); } } - default T get(Map map, BiFunction transform, Class clazz) { + default T getOrDefault(Map map, BiFunction transform, + Class clazz, T defaultValue) { + T val; + return ((val = get(map, transform, clazz)) == null ? defaultValue : val); + } + + default T get(Map map, BiFunction transform, + Class clazz) { return clazz.cast(transform.apply(getKey(), map.get(getKey()))); } + default T getTransformedOrDefault(Map map, Class clazz, T defaultValue) { + T val; + return ((val = getTransformed(map, clazz)) == null ? defaultValue : val); + } + default T getTransformed(Map map, Class clazz) { return clazz.cast(transform().apply(getKey(), map.get(getKey()))); } + } diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigOptionTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigOptionTest.java new file mode 100644 index 0000000000..95db080f86 --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigOptionTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.configuration; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.BiFunction; +import org.junit.Before; +import org.junit.Test; + +/** + * Test the default interface methods + */ +public class ConfigOptionTest { + + @Before + public void setup() { + } + + @Test + public void gets_value_of_specified_type() { + ConfigOption option = newOption("foo"); + Map config = new HashMap<>(); + option.put(config, 25L); + assertThat(option.get(config, Long.class), equalTo(25L)); + assertThat(option.get(mapWith("foo", 25L), Long.class), equalTo(25L)); + } + + @Test + public void gets_value_of_specified_type_with_transform() { + ConfigOption option = newOption("foo"); + Map config = new HashMap<>(); + option.put(config, "25"); + BiFunction transform = (s, o) -> o == null ? null + : new Long(o.toString()); + assertThat(option.get(config, transform, Long.class), equalTo(25L)); + assertThat(option.get(mapWith("foo", "25"), transform, Long.class), equalTo(25L)); + } + + @Test + public void gets_default_value_of_specified_type_with_transform() { + ConfigOption option = newOption("foo"); + Map config = new HashMap<>(); + option.put(config, null); + BiFunction transform = (s, o) -> o == null ? null + : new Long(o.toString()); + assertThat(option.getOrDefault(config, transform, Long.class, 25L), equalTo(25L)); + assertThat(option.getOrDefault(mapWith("foo", null), transform, Long.class, 25L), equalTo(25L)); + } + + @Test + public void gets_default_when_null_value() { + ConfigOption option = newOption("foo"); + Map config = new HashMap<>(); + option.put(config, null); + assertThat(option.getOrDefault(config, Long.class, 0L), equalTo(0L)); + assertThat(option.getOrDefault(mapWith("foo", null), Long.class, 0L), equalTo(0L)); + } + + @Test + public void gets_object_transformed_by_class_cast() { + ConfigOption option = newOption("foo"); + Map config = new HashMap<>(); + option.put(config, (Object) 25L); + assertThat(option.getTransformed(config, Long.class), equalTo(25L)); + assertThat(option.getTransformed(mapWith("foo", (Object) 25L), Long.class), equalTo(25L)); + } + + @Test + public void gets_default_null_with_cast_when_null() { + ConfigOption option = newOption("foo"); + Map config = new HashMap<>(); + option.put(config, null); + assertThat(option.getTransformedOrDefault(config, Long.class, 25L), equalTo(25L)); + assertThat(option.getTransformedOrDefault(mapWith("foo", null), Long.class, 25L), equalTo(25L)); + } + + private Map mapWith(K key, V val) { + Map map = new HashMap<>(); + map.put(key, val); + return map; + } + + private ConfigOption newOption(final String key) { + return new ConfigOption() { + @Override + public String getKey() { + return key; + } + }; + } + +} diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java index 4ad6ffa9c3..2d15e8b3ce 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java @@ -18,17 +18,23 @@ package org.apache.metron.pcap.query; -import org.apache.commons.cli.*; +import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INTERIM_RESULT_PATH_DEFAULT; +import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INPUT_PATH_DEFAULT; +import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_RECORDS_PER_FILE_DEFAULT; +import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_REDUCERS_DEFAULT; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; import org.apache.metron.pcap.config.PcapConfig; /** * Provides commmon required fields for the PCAP filter jobs */ public class CliParser { - public static final String BASE_PATH_DEFAULT = "/apps/metron/pcap/input"; - public static final String BASE_INTERIM_OUTPUT_PATH_DEFAULT = "/tmp"; - public static final int NUM_REDUCERS_DEFAULT = 10; - public static final int NUM_RECORDS_PER_FILE_DEFAULT = 10000; private CommandLineParser parser; protected PcapConfig.PrefixStrategy prefixStrategy; @@ -40,9 +46,10 @@ public CliParser(PcapConfig.PrefixStrategy prefixStrategy) { public Options buildOptions() { Options options = new Options(); options.addOption(newOption("h", "help", false, "Display help")); - options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", BASE_PATH_DEFAULT))); + options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", + BASE_INPUT_PATH_DEFAULT))); options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", - BASE_INTERIM_OUTPUT_PATH_DEFAULT))); + BASE_INTERIM_RESULT_PATH_DEFAULT))); options.addOption(newOption("st", "start_time", true, "(required) Packet start time range.", true)); options.addOption(newOption("nr", "num_reducers", true, String.format("Number of reducers to use (defaults to %s)", NUM_REDUCERS_DEFAULT))); options.addOption(newOption("rpf", "records_per_file", true, String.format("Number of records to include in each output pcap file (defaults to %s)", NUM_RECORDS_PER_FILE_DEFAULT))); @@ -71,12 +78,12 @@ public void parse(CommandLine commandLine, PcapConfig config) throws java.text.P if (commandLine.hasOption("base_path")) { config.setBasePath(commandLine.getOptionValue("base_path")); } else { - config.setBasePath(BASE_PATH_DEFAULT); + config.setBasePath(BASE_INPUT_PATH_DEFAULT); } if (commandLine.hasOption("base_output_path")) { config.setBaseInterimResultPath(commandLine.getOptionValue("base_output_path")); } else { - config.setBaseInterimResultPath(BASE_INTERIM_OUTPUT_PATH_DEFAULT); + config.setBaseInterimResultPath(BASE_INTERIM_RESULT_PATH_DEFAULT); } if (commandLine.hasOption("start_time")) { try { diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java index 14963fdd16..796c8a580b 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java @@ -264,4 +264,27 @@ public void killing_job_causes_status_to_return_KILLED_state() throws Exception Assert.assertThat(status.getState(), equalTo(State.KILLED)); } + @Test + public void handles_null_values_with_defaults() throws Exception { + PcapOptions.START_TIME_NS.put(config, null); + PcapOptions.END_TIME_NS.put(config, null); + PcapOptions.NUM_REDUCERS.put(config, null); + PcapOptions.NUM_RECORDS_PER_FILE.put(config, null); + + pageableResult = new PcapPages( + Arrays.asList(new Path("1.txt"), new Path("2.txt"), new Path("3.txt"))); + when(finalizer.finalizeJob(any())).thenReturn(pageableResult); + when(mrJob.isComplete()).thenReturn(true); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); + when(mrJob.getStatus()).thenReturn(mrStatus); + Statusable statusable = testJob.submit(finalizer, config); + timer.updateJobStatus(); + Pageable results = statusable.get(); + Assert.assertThat(results.getSize(), equalTo(3)); + JobStatus status = statusable.getStatus(); + Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED)); + Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); + Assert.assertThat(status.getJobId(), equalTo(jobIdVal)); + } + } diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java index c7d6fdf525..96ca354dd9 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java @@ -17,6 +17,8 @@ */ package org.apache.metron.pcap.query; +import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INTERIM_RESULT_PATH_DEFAULT; +import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INPUT_PATH_DEFAULT; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.argThat; @@ -91,8 +93,8 @@ public void runs_fixed_pcap_filter_job_with_default_argument_list() throws Excep put(PcapHelper.PacketFields.PACKET_FILTER.getName(), "`casey`"); }}; FixedPcapConfig config = new FixedPcapConfig(prefixStrategy); - PcapOptions.BASE_PATH.put(config, CliParser.BASE_PATH_DEFAULT); - PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, CliParser.BASE_INTERIM_OUTPUT_PATH_DEFAULT); + PcapOptions.BASE_PATH.put(config, BASE_INPUT_PATH_DEFAULT); + PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, BASE_INTERIM_RESULT_PATH_DEFAULT); PcapOptions.FIELDS.put(config, query); PcapOptions.NUM_REDUCERS.put(config, 10); PcapOptions.START_TIME_MS.put(config, 500L); @@ -237,8 +239,8 @@ public void runs_query_pcap_filter_job_with_default_argument_list() throws Excep String query = "some query string"; FixedPcapConfig config = new FixedPcapConfig(prefixStrategy); - PcapOptions.BASE_PATH.put(config, CliParser.BASE_PATH_DEFAULT); - PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, CliParser.BASE_INTERIM_OUTPUT_PATH_DEFAULT); + PcapOptions.BASE_PATH.put(config, BASE_INPUT_PATH_DEFAULT); + PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, BASE_INTERIM_RESULT_PATH_DEFAULT); PcapOptions.FIELDS.put(config, query); PcapOptions.NUM_REDUCERS.put(config, 10); PcapOptions.START_TIME_MS.put(config, 500L); diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java new file mode 100644 index 0000000000..b8c674c97f --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap.config; + +public class PcapGlobalDefaults { + public static final String BASE_PCAP_PATH_DEFAULT = "/apps/metron/pcap"; + public static final String BASE_INPUT_PATH_DEFAULT = BASE_PCAP_PATH_DEFAULT + "/input"; + public static final String BASE_INTERIM_RESULT_PATH_DEFAULT = BASE_PCAP_PATH_DEFAULT + "/interim"; + public static final String FINAL_OUTPUT_PATH_DEFAULT = BASE_PCAP_PATH_DEFAULT + "/output"; + public static final int NUM_REDUCERS_DEFAULT = 10; + public static final int NUM_RECORDS_PER_FILE_DEFAULT = 10000; +} diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java index 8dcc4013ab..5a61f9beb9 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java @@ -18,6 +18,8 @@ package org.apache.metron.pcap.finalizer; +import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_RECORDS_PER_FILE_DEFAULT; + import com.google.common.collect.Iterables; import java.io.IOException; import java.lang.invoke.MethodHandles; @@ -62,9 +64,9 @@ protected PcapResultsWriter getResultsWriter() { @Override public Pageable finalizeJob(Map config) throws JobException { Configuration hadoopConfig = PcapOptions.HADOOP_CONF.get(config, Configuration.class); - int recPerFile = PcapOptions.NUM_RECORDS_PER_FILE.get(config, Integer.class); - Path interimResultPath = PcapOptions.INTERIM_RESULT_PATH - .get(config, PcapOptions.STRING_TO_PATH, Path.class); + int recPerFile = PcapOptions.NUM_RECORDS_PER_FILE + .getOrDefault(config, Integer.class, NUM_RECORDS_PER_FILE_DEFAULT); + Path interimResultPath = PcapOptions.INTERIM_RESULT_PATH.get(config, PcapOptions.STRING_TO_PATH, Path.class); FileSystem fs = PcapOptions.FILESYSTEM.get(config, FileSystem.class); SequenceFileIterable interimResults = null; diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java index 93a32220ac..13fa795d2c 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java @@ -18,14 +18,15 @@ package org.apache.metron.pcap.finalizer; +import static org.apache.metron.pcap.config.PcapGlobalDefaults.FINAL_OUTPUT_PATH_DEFAULT; + import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.metron.job.Statusable; import org.apache.metron.pcap.config.PcapOptions; - -import java.util.Map; import org.apache.metron.pcap.writer.PcapResultsWriter; /** @@ -45,10 +46,12 @@ protected void write(PcapResultsWriter resultsWriter, Configuration hadoopConfig @Override protected Path getOutputPath(Map config, int partition) { - String finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.get(config, String.class); + String finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH + .getOrDefault(config, String.class, FINAL_OUTPUT_PATH_DEFAULT); String user = PcapOptions.USERNAME.get(config, String.class); String jobId = PcapOptions.JOB_ID.get(config, String.class); - return new Path(String.format(PCAP_REST_FILEPATH_FORMAT, finalOutputPath, user, jobType, jobId, partition)); + return new Path( + String.format(PCAP_REST_FILEPATH_FORMAT, finalOutputPath, user, jobType, jobId, partition)); } } diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index ea2aa29092..e0789d5896 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -20,6 +20,7 @@ import static org.apache.metron.pcap.PcapHelper.greaterThanOrEqualTo; import static org.apache.metron.pcap.PcapHelper.lessThanOrEqualTo; +import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_REDUCERS_DEFAULT; import com.google.common.base.Joiner; import java.io.IOException; @@ -60,6 +61,7 @@ import org.apache.metron.pcap.PacketInfo; import org.apache.metron.pcap.PcapHelper; import org.apache.metron.pcap.PcapPages; +import org.apache.metron.pcap.config.PcapGlobalDefaults; import org.apache.metron.pcap.config.PcapOptions; import org.apache.metron.pcap.filter.PcapFilter; import org.apache.metron.pcap.filter.PcapFilterConfigurator; @@ -216,20 +218,22 @@ public Statusable submit(Finalizer finalizer, Map co Configuration hadoopConf = PcapOptions.HADOOP_CONF.get(configuration, Configuration.class); FileSystem fileSystem = PcapOptions.FILESYSTEM.get(configuration, FileSystem.class); Path basePath = PcapOptions.BASE_PATH.getTransformed(configuration, Path.class); - Path baseInterimResultPath = PcapOptions.BASE_INTERIM_RESULT_PATH.getTransformed(configuration, Path.class); + Path baseInterimResultPath = PcapOptions.BASE_INTERIM_RESULT_PATH + .getTransformedOrDefault(configuration, Path.class, + new Path(PcapGlobalDefaults.BASE_INTERIM_RESULT_PATH_DEFAULT)); long startTime; if (configuration.containsKey(PcapOptions.START_TIME_NS.getKey())) { - startTime = PcapOptions.START_TIME_NS.get(configuration, Long.class); + startTime = PcapOptions.START_TIME_NS.getOrDefault(configuration, Long.class, 0L); } else { - startTime = PcapOptions.START_TIME_MS.get(configuration, Long.class) * 1000000; + startTime = PcapOptions.START_TIME_MS.getOrDefault(configuration, Long.class, 0L) * 1000000; } long endTime; if (configuration.containsKey(PcapOptions.END_TIME_NS.getKey())) { - endTime = PcapOptions.END_TIME_NS.get(configuration, Long.class); + endTime = PcapOptions.END_TIME_NS.getOrDefault(configuration, Long.class, System.nanoTime()); } else { - endTime = PcapOptions.END_TIME_MS.get(configuration, Long.class) * 1000000; + endTime = PcapOptions.END_TIME_MS.getOrDefault(configuration, Long.class, System.currentTimeMillis()) * 1000000; } - int numReducers = PcapOptions.NUM_REDUCERS.get(configuration, Integer.class); + int numReducers = PcapOptions.NUM_REDUCERS.getOrDefault(configuration, Integer.class, NUM_REDUCERS_DEFAULT); T fields = (T) PcapOptions.FIELDS.get(configuration, Object.class); PcapFilterConfigurator filterImpl = PcapOptions.FILTER_IMPL.get(configuration, PcapFilterConfigurator.class); From 341e3f93b4f3f9e68db1520105d91c49be5699a1 Mon Sep 17 00:00:00 2001 From: Michael Miklavcic Date: Wed, 8 Aug 2018 15:23:03 -0600 Subject: [PATCH 2/2] Fix nanos/millis --- .../java/org/apache/metron/pcap/mr/PcapJob.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index e0789d5896..23bd51088e 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -52,6 +52,7 @@ import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.metron.common.utils.timestamp.TimestampConverters; import org.apache.metron.job.Finalizer; import org.apache.metron.job.JobException; import org.apache.metron.job.JobStatus; @@ -221,17 +222,17 @@ public Statusable submit(Finalizer finalizer, Map co Path baseInterimResultPath = PcapOptions.BASE_INTERIM_RESULT_PATH .getTransformedOrDefault(configuration, Path.class, new Path(PcapGlobalDefaults.BASE_INTERIM_RESULT_PATH_DEFAULT)); - long startTime; + long startTimeNs; if (configuration.containsKey(PcapOptions.START_TIME_NS.getKey())) { - startTime = PcapOptions.START_TIME_NS.getOrDefault(configuration, Long.class, 0L); + startTimeNs = PcapOptions.START_TIME_NS.getOrDefault(configuration, Long.class, 0L); } else { - startTime = PcapOptions.START_TIME_MS.getOrDefault(configuration, Long.class, 0L) * 1000000; + startTimeNs = TimestampConverters.MILLISECONDS.toNanoseconds(PcapOptions.START_TIME_MS.getOrDefault(configuration, Long.class, 0L)); } - long endTime; + long endTimeNs; if (configuration.containsKey(PcapOptions.END_TIME_NS.getKey())) { - endTime = PcapOptions.END_TIME_NS.getOrDefault(configuration, Long.class, System.nanoTime()); + endTimeNs = PcapOptions.END_TIME_NS.getOrDefault(configuration, Long.class, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis())); } else { - endTime = PcapOptions.END_TIME_MS.getOrDefault(configuration, Long.class, System.currentTimeMillis()) * 1000000; + endTimeNs = TimestampConverters.MILLISECONDS.toNanoseconds(PcapOptions.END_TIME_MS.getOrDefault(configuration, Long.class, System.currentTimeMillis())); } int numReducers = PcapOptions.NUM_REDUCERS.getOrDefault(configuration, Integer.class, NUM_REDUCERS_DEFAULT); T fields = (T) PcapOptions.FIELDS.get(configuration, Object.class); @@ -241,8 +242,8 @@ public Statusable submit(Finalizer finalizer, Map co Statusable statusable = query(jobName, basePath, baseInterimResultPath, - startTime, - endTime, + startTimeNs, + endTimeNs, numReducers, fields, // create a new copy for each job, bad things happen when hadoop config is reused