From 41ecf36cd7c2da0399d03a37fee5916a9bfa87e7 Mon Sep 17 00:00:00 2001 From: Michael Miklavcic Date: Tue, 12 Jun 2018 19:48:41 -0600 Subject: [PATCH 1/8] Add metron-job project. Update pcap to be Statusable. --- metron-platform/metron-job/pom.xml | 39 ++++ .../java/org/apache/metron/job/JobStatus.java | 67 +++++++ .../org/apache/metron/job/Statusable.java | 37 ++++ .../org/apache/metron/pcap/PcapJobTest.java | 180 ++++++++++++++++++ metron-platform/metron-pcap/pom.xml | 7 +- .../org/apache/metron/pcap/mr/PcapJob.java | 164 ++++++++++++---- metron-platform/pom.xml | 1 + 7 files changed, 454 insertions(+), 41 deletions(-) create mode 100644 metron-platform/metron-job/pom.xml create mode 100644 metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java create mode 100644 metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java diff --git a/metron-platform/metron-job/pom.xml b/metron-platform/metron-job/pom.xml new file mode 100644 index 0000000000..fa36ff7125 --- /dev/null +++ b/metron-platform/metron-job/pom.xml @@ -0,0 +1,39 @@ + + + + + 4.0.0 + + + org.apache.metron + metron-platform + 0.5.0 + + + metron-job + metron-job + Metron Job + https://metron.apache.org/ + + + + org.apache.metron + metron-common + ${project.parent.version} + + + + diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java new file mode 100644 index 0000000000..656c43ba51 --- /dev/null +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.job; + +import org.apache.hadoop.fs.Path; + +/** + * Capture metadata about a batch job + */ +public class JobStatus { + + public enum STATE { + NOT_RUNNING, + RUNNING, + SUCCEEDED, + FAILED, + KILLED + } + + private STATE state = STATE.NOT_RUNNING; + private double percentComplete = 0.0; + private Path resultPath; + + public JobStatus withState(STATE state) { + this.state = state; + return this; + } + + public JobStatus withPercentComplete(double percentComplete) { + this.percentComplete = percentComplete; + return this; + } + + public JobStatus withResultPath(Path resultPath) { + this.resultPath = resultPath; + return this; + } + + public STATE getState() { + return state; + } + + public double getPercentComplete() { + return percentComplete; + } + + public Path getResultPath() { + return resultPath; + } + +} diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java new file mode 100644 index 0000000000..dbe52be63c --- /dev/null +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.job; + +public interface Statusable { + + /** + * Current job status. + * + * @return status + */ + JobStatus getStatus(); + + /** + * Completion flag. + * + * @return true if job is completed, whether KILLED, FAILED, SUCCEEDED. False otherwise. + */ + boolean isDone(); + +} diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java index 3536a7e113..9bb3d6758c 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java @@ -19,18 +19,64 @@ package org.apache.metron.pcap; import static java.lang.Long.toUnsignedString; +import static java.lang.String.format; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.startsWith; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.metron.common.utils.timestamp.TimestampConverters; +import org.apache.metron.job.JobStatus; +import org.apache.metron.job.JobStatus.STATE; +import org.apache.metron.job.Statusable; +import org.apache.metron.pcap.filter.PcapFilterConfigurator; +import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; import org.apache.metron.pcap.mr.PcapJob; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; public class PcapJobTest { + @Mock + private Job job; + @Mock + private org.apache.hadoop.mapreduce.JobStatus mrStatus; + private Path basePath; + private Path baseOutPath; + private long startTime; + private long endTime; + private int numReducers; + private Map fixedFields; + private Configuration hadoopConfig; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + basePath = new Path("basepath"); + baseOutPath = new Path("outpath"); + startTime = 100; + endTime = 200; + numReducers = 5; + fixedFields = new HashMap<>(); + fixedFields.put("ip_src_addr", "192.168.1.1"); + hadoopConfig = new Configuration(); + } + @Test public void partition_gives_value_in_range() throws Exception { long start = 1473897600000000000L; @@ -46,4 +92,138 @@ public void partition_gives_value_in_range() throws Exception { equalTo(8)); } + private class TestJob extends PcapJob { + + @Override + public Job createJob(Path basePath, Path outputPath, long beginNS, long endNS, + int numReducers, T fields, Configuration conf, FileSystem fs, + PcapFilterConfigurator filterImpl) throws IOException { + return job; + } + } + + @Test + public void job_succeeds_synchronously() throws Exception { + when(job.isComplete()).thenReturn(true); + when(mrStatus.getState()).thenReturn(State.SUCCEEDED); + when(job.getStatus()).thenReturn(mrStatus); + TestJob testJob = new TestJob(); + Statusable statusable = testJob.query(basePath, + baseOutPath, + startTime, + endTime, + numReducers, + fixedFields, + hadoopConfig, + FileSystem.get(hadoopConfig), + new FixedPcapFilter.Configurator(), + true); + verify(job, times(1)).waitForCompletion(true); + JobStatus status = statusable.getStatus(); + Assert.assertThat(status.getState(), equalTo(STATE.SUCCEEDED)); + Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); + String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString(); + Assert.assertThat(status.getResultPath(), notNullValue()); + Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath)); + } + + @Test + public void job_fails_synchronously() throws Exception { + when(job.isComplete()).thenReturn(true); + when(mrStatus.getState()).thenReturn(State.FAILED); + when(job.getStatus()).thenReturn(mrStatus); + TestJob testJob = new TestJob(); + Statusable statusable = testJob.query(basePath, + baseOutPath, + startTime, + endTime, + numReducers, + fixedFields, + hadoopConfig, + FileSystem.get(hadoopConfig), + new FixedPcapFilter.Configurator(), + true); + JobStatus status = statusable.getStatus(); + Assert.assertThat(status.getState(), equalTo(STATE.FAILED)); + Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); + String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString(); + Assert.assertThat(status.getResultPath(), notNullValue()); + Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath)); + } + + @Test + public void job_fails_with_killed_status_synchronously() throws Exception { + when(job.isComplete()).thenReturn(true); + when(mrStatus.getState()).thenReturn(State.KILLED); + when(job.getStatus()).thenReturn(mrStatus); + TestJob testJob = new TestJob(); + Statusable statusable = testJob.query(basePath, + baseOutPath, + startTime, + endTime, + numReducers, + fixedFields, + hadoopConfig, + FileSystem.get(hadoopConfig), + new FixedPcapFilter.Configurator(), + true); + JobStatus status = statusable.getStatus(); + Assert.assertThat(status.getState(), equalTo(STATE.KILLED)); + Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); + String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString(); + Assert.assertThat(status.getResultPath(), notNullValue()); + Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath)); + } + + @Test + public void job_succeeds_asynchronously() throws Exception { + when(job.isComplete()).thenReturn(true); + when(mrStatus.getState()).thenReturn(State.SUCCEEDED); + when(job.getStatus()).thenReturn(mrStatus); + TestJob testJob = new TestJob(); + Statusable statusable = testJob.query(basePath, + baseOutPath, + startTime, + endTime, + numReducers, + fixedFields, + hadoopConfig, + FileSystem.get(hadoopConfig), + new FixedPcapFilter.Configurator(), + false); + JobStatus status = statusable.getStatus(); + Assert.assertThat(status.getState(), equalTo(STATE.SUCCEEDED)); + Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); + String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString(); + Assert.assertThat(status.getResultPath(), notNullValue()); + Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath)); + } + + @Test + public void job_reports_percent_complete() throws Exception { + when(job.isComplete()).thenReturn(false); + when(mrStatus.getState()).thenReturn(State.RUNNING); + when(job.getStatus()).thenReturn(mrStatus); + TestJob testJob = new TestJob(); + Statusable statusable = testJob.query(basePath, + baseOutPath, + startTime, + endTime, + numReducers, + fixedFields, + hadoopConfig, + FileSystem.get(hadoopConfig), + new FixedPcapFilter.Configurator(), + false); + when(job.mapProgress()).thenReturn(0.5f); + when(job.reduceProgress()).thenReturn(0f); + JobStatus status = statusable.getStatus(); + Assert.assertThat(status.getState(), equalTo(STATE.RUNNING)); + Assert.assertThat(status.getPercentComplete(), equalTo(25.0)); + when(job.mapProgress()).thenReturn(1.0f); + when(job.reduceProgress()).thenReturn(0.5f); + status = statusable.getStatus(); + Assert.assertThat(status.getPercentComplete(), equalTo(75.0)); + } + } diff --git a/metron-platform/metron-pcap/pom.xml b/metron-platform/metron-pcap/pom.xml index c9f873ea20..640cdc6f9f 100644 --- a/metron-platform/metron-pcap/pom.xml +++ b/metron-platform/metron-pcap/pom.xml @@ -39,9 +39,14 @@ + + org.apache.metron + metron-hbase + ${project.parent.version} + org.apache.metron - metron-hbase + metron-job ${project.parent.version} diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index 14ea3cb753..99bcb9a303 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -48,6 +48,9 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.metron.common.hadoop.SequenceFileIterable; +import org.apache.metron.job.JobStatus; +import org.apache.metron.job.JobStatus.STATE; +import org.apache.metron.job.Statusable; import org.apache.metron.pcap.PacketInfo; import org.apache.metron.pcap.PcapHelper; import org.apache.metron.pcap.filter.PcapFilter; @@ -57,12 +60,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PcapJob { +public class PcapJob implements Statusable { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final String START_TS_CONF = "start_ts"; public static final String END_TS_CONF = "end_ts"; public static final String WIDTH_CONF = "width"; + private Job job; // store a running MR job reference for async status check + private Path outputPath; public static enum PCAP_COUNTER { MALFORMED_PACKET_COUNT @@ -156,23 +161,8 @@ protected void reduce(LongWritable key, Iterable values, Context } /** - * Returns a lazily-read Iterable over a set of sequence files + * Run query synchronously. */ - private SequenceFileIterable readResults(Path outputPath, Configuration config, FileSystem fs) throws IOException { - List files = new ArrayList<>(); - for (RemoteIterator it = fs.listFiles(outputPath, false); it.hasNext(); ) { - Path p = it.next().getPath(); - if (p.getName().equals("_SUCCESS")) { - fs.delete(p, false); - continue; - } - files.add(p); - } - LOG.debug("Output path={}", outputPath); - Collections.sort(files, (o1,o2) -> o1.getName().compareTo(o2.getName())); - return new SequenceFileIterable(files, config); - } - public SequenceFileIterable query(Path basePath , Path baseOutputPath , long beginNS @@ -183,42 +173,87 @@ public SequenceFileIterable query(Path basePath , FileSystem fs , PcapFilterConfigurator filterImpl ) throws IOException, ClassNotFoundException, InterruptedException { + Statusable statusable = query(basePath, baseOutputPath, beginNS, endNS, numReducers, fields, + conf, + fs, filterImpl, true); + JobStatus jobStatus = statusable.getStatus(); + if (jobStatus.getState() == STATE.SUCCEEDED) { + Path resultPath = jobStatus.getResultPath(); + return readResults(resultPath, conf, fs); + } else { + throw new RuntimeException( + "Unable to complete query due to errors. Please check logs for full errors."); + } + } + + /** + * Run query sync OR async based on flag. Async mode allows the client to check the returned + * statusable object for status details. + */ + public Statusable query(Path basePath, + Path baseOutputPath, + long beginNS, + long endNS, + int numReducers, + T fields, + Configuration conf, + FileSystem fs, + PcapFilterConfigurator filterImpl, + boolean sync) + throws IOException, ClassNotFoundException, InterruptedException { String fileName = Joiner.on("_").join(beginNS, endNS, filterImpl.queryToString(fields), UUID.randomUUID().toString()); if(LOG.isDebugEnabled()) { DateFormat format = SimpleDateFormat.getDateTimeInstance( SimpleDateFormat.LONG - , SimpleDateFormat.LONG - ); + , SimpleDateFormat.LONG + ); String from = format.format(new Date(Long.divideUnsigned(beginNS, 1000000))); String to = format.format(new Date(Long.divideUnsigned(endNS, 1000000))); LOG.debug("Executing query {} on timerange from {} to {}", filterImpl.queryToString(fields), from, to); } - Path outputPath = new Path(baseOutputPath, fileName); - Job job = createJob( basePath - , outputPath - , beginNS - , endNS - , numReducers - , fields - , conf - , fs - , filterImpl - ); - if (job == null) { - LOG.info("No files to process with specified date range."); - return new SequenceFileIterable(new ArrayList<>(), conf); - } - boolean completed = job.waitForCompletion(true); - if(completed) { - return readResults(outputPath, conf, fs); + outputPath = new Path(baseOutputPath, fileName); + job = createJob( basePath + , outputPath + , beginNS + , endNS + , numReducers + , fields + , conf + , fs + , filterImpl + ); + if (sync) { + job.waitForCompletion(true); } else { - throw new RuntimeException("Unable to complete query due to errors. Please check logs for full errors."); + job.submit(); } + return this; } - public static long findWidth(long start, long end, int numReducers) { - return Long.divideUnsigned(end - start, numReducers) + 1; + /** + * Returns a lazily-read Iterable over a set of sequence files + */ + private SequenceFileIterable readResults(Path outputPath, Configuration config, FileSystem fs) throws IOException { + List files = new ArrayList<>(); + for (RemoteIterator it = fs.listFiles(outputPath, false); it.hasNext(); ) { + Path p = it.next().getPath(); + if (p.getName().equals("_SUCCESS")) { + fs.delete(p, false); + continue; + } + files.add(p); + } + if (files.size() == 0) { + LOG.info("No files to process with specified date range."); + } else { + LOG.debug("Output path={}", outputPath); + Collections.sort(files, (o1, o2) -> o1.getName().compareTo(o2.getName())); + } + return new SequenceFileIterable(files, config); } + /** + * Creates, but does not submit the job. + */ public Job createJob( Path basePath , Path outputPath , long beginNS @@ -256,6 +291,10 @@ public Job createJob( Path basePath return job; } + public static long findWidth(long start, long end, int numReducers) { + return Long.divideUnsigned(end - start, numReducers) + 1; + } + protected Iterable listFiles(FileSystem fs, Path basePath) throws IOException { List ret = new ArrayList<>(); RemoteIterator filesIt = fs.listFiles(basePath, true); @@ -265,4 +304,49 @@ protected Iterable listFiles(FileSystem fs, Path basePath) throws IOExcept return ret; } + @Override + public JobStatus getStatus() { + // Note: this method is only reading state from the underlying job, so locking not needed + JobStatus status = new JobStatus().withResultPath(outputPath); + if (job == null) { + status.withPercentComplete(100).withState(STATE.SUCCEEDED); + } else { + try { + if (job.isComplete()) { + status.withPercentComplete(100); + switch (job.getStatus().getState()) { + case SUCCEEDED: + status.withState(STATE.SUCCEEDED); + break; + case FAILED: + status.withState(STATE.FAILED); + break; + case KILLED: + status.withState(STATE.KILLED); + break; + } + } else { + float mapProg = job.mapProgress(); + float reduceProg = job.reduceProgress(); + float totalProgress = ((mapProg / 2) + (reduceProg / 2)) * 100; + status.withPercentComplete(totalProgress).withState(STATE.RUNNING); + } + } catch (Exception e) { + throw new RuntimeException("Error occurred while attempting to retrieve job status.", e); + } + } + return status; + } + + @Override + public boolean isDone() { + // Note: this method is only reading state from the underlying job, so locking not needed + try { + return job.isComplete(); + } catch (Exception e) { + throw new RuntimeException("Error occurred while attempting to retrieve job status.", e); + } + } + + } diff --git a/metron-platform/pom.xml b/metron-platform/pom.xml index a99dbc7cc7..6866386330 100644 --- a/metron-platform/pom.xml +++ b/metron-platform/pom.xml @@ -46,6 +46,7 @@ metron-enrichment metron-solr metron-parsers + metron-job metron-pcap-backend metron-data-management metron-pcap From a83b472778e8e40cd81132f86402873ab4b8b2b1 Mon Sep 17 00:00:00 2001 From: Michael Miklavcic Date: Sat, 16 Jun 2018 17:10:49 -0600 Subject: [PATCH 2/8] Save progress on Pageable implementation --- .../apache/metron/common/utils/HDFSUtils.java | 18 +++++++ .../metron/common/utils/HDFSUtilsTest.java | 48 +++++++++++++++++++ .../java/org/apache/metron/job/JobStatus.java | 12 ++++- .../java/org/apache/metron/job/Pageable.java | 38 +++++++++++++++ .../org/apache/metron/job/Statusable.java | 19 ++++++++ .../org/apache/metron/pcap/query/PcapCli.java | 27 +++-------- .../org/apache/metron/pcap/PcapJobTest.java | 3 ++ .../apache/metron/pcap/query/PcapCliTest.java | 1 + .../org/apache/metron/pcap/mr/PcapJob.java | 43 ++++++++++++++++- .../metron/pcap/writer}/ResultsWriter.java | 31 ++++++++---- 10 files changed, 207 insertions(+), 33 deletions(-) create mode 100644 metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HDFSUtilsTest.java create mode 100644 metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java rename metron-platform/{metron-pcap-backend/src/main/java/org/apache/metron/pcap/query => metron-pcap/src/main/java/org/apache/metron/pcap/writer}/ResultsWriter.java (59%) diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java index ee00b7e6f8..1d03a6d5a0 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java @@ -23,6 +23,7 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -56,4 +57,21 @@ public static List readFile(Configuration config, String path) throws IO return IOUtils.readLines(inputStream, "UTF-8"); } + /** + * Write file to HDFS. Writes to local FS if file:// used as protocol. + * + * @param config filesystem configuration + * @param bytes bytes to write + * @param path output path + * @throws IOException This is the generic Hadoop "everything is an IOException." + */ + public static void write(Configuration config, byte[] bytes, String path) throws IOException { + FileSystem fs = FileSystem.newInstance(config); + Path outPath = new Path(path); + fs.mkdirs(outPath.getParent()); + try (FSDataOutputStream outputStream = fs.create(outPath)) { + outputStream.write(bytes); + outputStream.flush(); + } + } } diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HDFSUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HDFSUtilsTest.java new file mode 100644 index 0000000000..53b6f299c2 --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HDFSUtilsTest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.utils; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import org.apache.hadoop.conf.Configuration; +import org.apache.metron.integration.utils.TestUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class HDFSUtilsTest { + + @Rule + public TemporaryFolder tempDir = new TemporaryFolder(); + + @Test + public void writes_file_to_local_fs() throws Exception { + String outText = "small brown bike and casket lottery"; + String outFile = tempDir.getRoot().getAbsolutePath() + "outfile.txt"; + Configuration config = new Configuration(); + config.set("fs.default.name", "file:///"); + HDFSUtils.write(config, outText.getBytes(StandardCharsets.UTF_8), outFile); + String actual = TestUtils.read(new File(outFile)); + assertThat("Text should match", actual, equalTo(outText)); + } + +} diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java index 656c43ba51..d3cda65e54 100644 --- a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java @@ -23,7 +23,7 @@ /** * Capture metadata about a batch job */ -public class JobStatus { +public class JobStatus { public enum STATE { NOT_RUNNING, @@ -36,6 +36,7 @@ public enum STATE { private STATE state = STATE.NOT_RUNNING; private double percentComplete = 0.0; private Path resultPath; + private Pageable pagedResults; public JobStatus withState(STATE state) { this.state = state; @@ -52,6 +53,11 @@ public JobStatus withResultPath(Path resultPath) { return this; } + public JobStatus withPagedResults(Pageable pagedResults) { + this.pagedResults = pagedResults; + return this; + } + public STATE getState() { return state; } @@ -64,4 +70,8 @@ public Path getResultPath() { return resultPath; } + public Pageable getPagedResults() { + return pagedResults; + } + } diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java new file mode 100644 index 0000000000..1038ab8e3c --- /dev/null +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.job; + +public interface Pageable { + + /** + * Transform into an Iterable. + * + * @return Iterable version of this Pageable. + */ + Iterable asIterable(); + + /** + * Provides access to a specific page of results in the result set. + * + * @param num page number to access. + * @return value at the specified page. + */ + T getPage(int num); + +} diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java index dbe52be63c..7a8fc02cef 100644 --- a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java @@ -18,6 +18,12 @@ package org.apache.metron.job; +import java.io.IOException; +import java.util.Map; + +/** + * Abstraction for getting status on running jobs. Also provides options for killing and validating. + */ public interface Statusable { /** @@ -34,4 +40,17 @@ public interface Statusable { */ boolean isDone(); + /** + * Kill job. + */ + void kill() throws IOException; + + /** + * Validate job after submitted. + * + * @param configuration config for validating the job. + * @return true if job is valid based on passed configuration, false if invalid. + */ + boolean validate(Map configuration); + } diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java index b50d488b7c..06545b6894 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java @@ -17,11 +17,9 @@ */ package org.apache.metron.pcap.query; -import com.google.common.collect.Iterables; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.Arrays; -import java.util.List; import java.util.UUID; import org.apache.commons.cli.ParseException; import org.apache.commons.lang3.tuple.Pair; @@ -35,6 +33,7 @@ import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; import org.apache.metron.pcap.filter.query.QueryPcapFilter; import org.apache.metron.pcap.mr.PcapJob; +import org.apache.metron.pcap.writer.ResultsWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -156,30 +155,16 @@ public int run(String[] args) { printBasicHelp(); return -1; } - try { - Iterable> partitions = Iterables.partition(results, commonConfig.getNumRecordsPerFile()); - int part = 1; - if (partitions.iterator().hasNext()) { - for (List data : partitions) { - String outFileName = String.format("pcap-data-%s+%04d.pcap", commonConfig.getPrefix(), part++); - if(data.size() > 0) { - resultsWriter.write(data, outFileName); - } - } - } else { - System.out.println("No results returned."); - } + try { + jobRunner.writeResults(results, resultsWriter, new Path("file://."), + commonConfig.getNumRecordsPerFile(), + commonConfig.getPrefix()); } catch (IOException e) { LOGGER.error("Unable to write file", e); return -1; - } finally { - try { - results.cleanup(); - } catch(IOException e) { - LOGGER.warn("Unable to cleanup files in HDFS", e); - } } + return 0; } diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java index 9bb3d6758c..e24b43f316 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java @@ -20,6 +20,7 @@ import static java.lang.Long.toUnsignedString; import static java.lang.String.format; +import static org.hamcrest.CoreMatchers.endsWith; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.startsWith; @@ -125,6 +126,8 @@ public void job_succeeds_synchronously() throws Exception { String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString(); Assert.assertThat(status.getResultPath(), notNullValue()); Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath)); + Assert.assertThat(status.getPagedResults(), notNullValue()); + Assert.assertThat(((Path)status.getPagedResults().getPage(0)).getName(), endsWith("pcap-data-1234+0001.pcap")); } @Test diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java index 7202819f6c..0d16651adb 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java @@ -28,6 +28,7 @@ import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; import org.apache.metron.pcap.filter.query.QueryPcapFilter; import org.apache.metron.pcap.mr.PcapJob; +import org.apache.metron.pcap.writer.ResultsWriter; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index 99bcb9a303..deee1834fb 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -22,6 +22,7 @@ import static org.apache.metron.pcap.PcapHelper.lessThanOrEqualTo; import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.text.DateFormat; @@ -30,6 +31,7 @@ import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; @@ -57,6 +59,7 @@ import org.apache.metron.pcap.filter.PcapFilterConfigurator; import org.apache.metron.pcap.filter.PcapFilters; import org.apache.metron.pcap.utils.FileFilterUtil; +import org.apache.metron.pcap.writer.ResultsWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -173,7 +176,7 @@ public SequenceFileIterable query(Path basePath , FileSystem fs , PcapFilterConfigurator filterImpl ) throws IOException, ClassNotFoundException, InterruptedException { - Statusable statusable = query(basePath, baseOutputPath, beginNS, endNS, numReducers, fields, + Statusable statusable = query(basePath, baseOutputPath, finalBaseOutputPath, beginNS, endNS, numReducers, fields, conf, fs, filterImpl, true); JobStatus jobStatus = statusable.getStatus(); @@ -192,6 +195,7 @@ public SequenceFileIterable query(Path basePath */ public Statusable query(Path basePath, Path baseOutputPath, + Path finalBaseOutputPath, long beginNS, long endNS, int numReducers, @@ -251,6 +255,33 @@ private SequenceFileIterable readResults(Path outputPath, Configuration config, return new SequenceFileIterable(files, config); } + public List writeResults(SequenceFileIterable results, ResultsWriter resultsWriter, + Path outPath, int recPerFile, String prefix) throws IOException { + List outFiles = new ArrayList<>(); + try { + Iterable> partitions = Iterables.partition(results, recPerFile); + int part = 1; + if (partitions.iterator().hasNext()) { + for (List data : partitions) { + String outFileName = String.format("%s/pcap-data-%s+%04d.pcap", outPath, prefix, part++); + if (data.size() > 0) { + resultsWriter.write(new Configuration(), data, outFileName); + outFiles.add(new Path(outFileName)); + } + } + } else { + LOG.info("No results returned."); + } + } finally { + try { + results.cleanup(); + } catch (IOException e) { + LOG.warn("Unable to cleanup files in HDFS", e); + } + } + return outFiles; + } + /** * Creates, but does not submit the job. */ @@ -348,5 +379,15 @@ public boolean isDone() { } } + @Override + public void kill() throws IOException { + job.killJob(); + } + + @Override + public boolean validate(Map configuration) { + // default implementation placeholder + return true; + } } diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/ResultsWriter.java similarity index 59% rename from metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java rename to metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/ResultsWriter.java index ab11770b01..3934aca885 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/ResultsWriter.java @@ -15,25 +15,36 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.metron.pcap.query; - -import org.apache.metron.pcap.PcapMerger; +package org.apache.metron.pcap.writer; import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.metron.common.utils.HDFSUtils; +import org.apache.metron.pcap.PcapMerger; public class ResultsWriter { - public void write(List pcaps, String outPath) throws IOException { - File out = new File(outPath); - try (FileOutputStream fos = new FileOutputStream(out)) { - fos.write(mergePcaps(pcaps)); - } + /** + * Write out pcaps. Configuration offers ability to configure for HDFS or local FS, if desired. + * + * @param config Standard hadoop filesystem config. + * @param pcaps pcap data to write. Pre-merged format as a list of pcaps as byte arrays. + * @param outPath where to write the pcap data to. + * @throws IOException I/O issue encountered. + */ + public void write(Configuration config, List pcaps, String outPath) throws IOException { + HDFSUtils.write(config, mergePcaps(pcaps), outPath); } + /** + * Creates a pcap file with proper global header from individual pcaps. + * + * @param pcaps pcap records to merge into a pcap file with header. + * @return merged result. + * @throws IOException I/O issue encountered. + */ public byte[] mergePcaps(List pcaps) throws IOException { if (pcaps == null) { return new byte[]{}; From 976e1bcdf9619cc0672de4bcd7dd0d865a01cc1b Mon Sep 17 00:00:00 2001 From: Michael Miklavcic Date: Mon, 25 Jun 2018 13:00:03 -0600 Subject: [PATCH 3/8] Rev metron-job to 0.5.1 after merge with master --- metron-platform/metron-job/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metron-platform/metron-job/pom.xml b/metron-platform/metron-job/pom.xml index fa36ff7125..49cc3cb8bc 100644 --- a/metron-platform/metron-job/pom.xml +++ b/metron-platform/metron-job/pom.xml @@ -20,7 +20,7 @@ org.apache.metron metron-platform - 0.5.0 + 0.5.1 metron-job From f9fc106f4a33f323dc06260d861fc543726e2518 Mon Sep 17 00:00:00 2001 From: Michael Miklavcic Date: Tue, 26 Jun 2018 16:03:21 -0600 Subject: [PATCH 4/8] Move result writing code to pcapjob. Get pcapclitest working again. --- .../org/apache/metron/pcap/query/PcapCli.java | 1 + .../apache/metron/pcap/query/PcapCliTest.java | 51 +++++++++++-------- .../org/apache/metron/pcap/mr/PcapJob.java | 12 ++--- 3 files changed, 38 insertions(+), 26 deletions(-) diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java index 06545b6894..3328bb4416 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java @@ -157,6 +157,7 @@ public int run(String[] args) { } try { + // write to local FS in the executing directory jobRunner.writeResults(results, resultsWriter, new Path("file://."), commonConfig.getNumRecordsPerFile(), commonConfig.getPrefix()); diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java index 0d16651adb..fcb4d1995c 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java @@ -17,6 +17,30 @@ */ package org.apache.metron.pcap.query; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -35,20 +59,6 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -import java.io.BufferedOutputStream; -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.nio.charset.StandardCharsets; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.*; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class PcapCliTest { @Mock @@ -59,8 +69,9 @@ public class PcapCliTest { private Clock clock; @Before - public void setup() { + public void setup() throws IOException { MockitoAnnotations.initMocks(this); + doCallRealMethod().when(jobRunner).writeResults(anyObject(), anyObject(), anyObject(), anyInt(), anyObject()); } @Test @@ -96,7 +107,7 @@ public void runs_fixed_pcap_filter_job_with_default_argument_list() throws Excep PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); - Mockito.verify(resultsWriter).write(pcaps, "pcap-data-random_prefix+0001.pcap"); + Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file://./pcap-data-random_prefix+0001.pcap")); } @Test @@ -136,7 +147,7 @@ public void runs_fixed_pcap_filter_job_with_full_argument_list_and_default_datef PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); - Mockito.verify(resultsWriter).write(pcaps, "pcap-data-random_prefix+0001.pcap"); + Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file://./pcap-data-random_prefix+0001.pcap")); } @Test @@ -179,7 +190,7 @@ public void runs_fixed_pcap_filter_job_with_full_argument_list() throws Exceptio PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); - Mockito.verify(resultsWriter).write(pcaps, "pcap-data-random_prefix+0001.pcap"); + Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file://./pcap-data-random_prefix+0001.pcap")); } private long asNanos(String inDate, String format) throws ParseException { @@ -212,7 +223,7 @@ public void runs_query_pcap_filter_job_with_default_argument_list() throws Excep PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); - Mockito.verify(resultsWriter).write(pcaps, "pcap-data-random_prefix+0001.pcap"); + Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file://./pcap-data-random_prefix+0001.pcap")); } @Test @@ -240,7 +251,7 @@ public void runs_query_pcap_filter_job_with_full_argument_list() throws Exceptio PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); - Mockito.verify(resultsWriter).write(pcaps, "pcap-data-random_prefix+0001.pcap"); + Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file://./pcap-data-random_prefix+0001.pcap")); } // INVALID OPTION CHECKS diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index deee1834fb..1992b6b1b2 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -83,12 +83,12 @@ public static class PcapPartitioner extends Partitioner numPartitions) { + if (ret > numPartitions) { throw new IllegalArgumentException(String.format("Bad partition: key=%s, width=%d, partition=%d, numPartitions=%d" , Long.toUnsignedString(x), width, ret, numPartitions) ); @@ -112,6 +112,7 @@ public Configuration getConf() { return configuration; } } + public static class PcapMapper extends Mapper { PcapFilter filter; @@ -137,7 +138,7 @@ protected void map(LongWritable key, BytesWritable value, Context context) throw List packetInfos; try { packetInfos = PcapHelper.toPacketInfo(value.copyBytes()); - } catch(Exception e) { + } catch (Exception e) { // toPacketInfo is throwing RuntimeExceptions. Attempt to catch and count errors with malformed packets context.getCounter(PCAP_COUNTER.MALFORMED_PACKET_COUNT).increment(1); return; @@ -157,7 +158,7 @@ private Stream filteredPacketInfo(List packetInfos) thro public static class PcapReducer extends Reducer { @Override protected void reduce(LongWritable key, Iterable values, Context context) throws IOException, InterruptedException { - for(BytesWritable value : values) { + for (BytesWritable value : values) { context.write(key, value); } } @@ -176,7 +177,7 @@ public SequenceFileIterable query(Path basePath , FileSystem fs , PcapFilterConfigurator filterImpl ) throws IOException, ClassNotFoundException, InterruptedException { - Statusable statusable = query(basePath, baseOutputPath, finalBaseOutputPath, beginNS, endNS, numReducers, fields, + Statusable statusable = query(basePath, baseOutputPath, beginNS, endNS, numReducers, fields, conf, fs, filterImpl, true); JobStatus jobStatus = statusable.getStatus(); @@ -195,7 +196,6 @@ public SequenceFileIterable query(Path basePath */ public Statusable query(Path basePath, Path baseOutputPath, - Path finalBaseOutputPath, long beginNS, long endNS, int numReducers, From 8b4ef9cdc37235c333f3cbd294f20ae15a7438aa Mon Sep 17 00:00:00 2001 From: Michael Miklavcic Date: Tue, 26 Jun 2018 23:31:21 -0600 Subject: [PATCH 5/8] Add Pageable results. --- .../java/org/apache/metron/job/JobStatus.java | 22 +++------- .../org/apache/metron/pcap/query/PcapCli.java | 10 +---- .../org/apache/metron/pcap/PcapJobTest.java | 26 +++++------- .../org/apache/metron/pcap/PcapFiles.java | 42 +++++++++++++++++++ .../org/apache/metron/pcap/mr/PcapJob.java | 24 ++++++----- 5 files changed, 74 insertions(+), 50 deletions(-) create mode 100644 metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java index d3cda65e54..8b67487706 100644 --- a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java @@ -21,11 +21,11 @@ import org.apache.hadoop.fs.Path; /** - * Capture metadata about a batch job + * Capture metadata about a batch job. */ -public class JobStatus { +public class JobStatus { - public enum STATE { + public enum State { NOT_RUNNING, RUNNING, SUCCEEDED, @@ -33,12 +33,11 @@ public enum STATE { KILLED } - private STATE state = STATE.NOT_RUNNING; + private State state = State.NOT_RUNNING; private double percentComplete = 0.0; private Path resultPath; - private Pageable pagedResults; - public JobStatus withState(STATE state) { + public JobStatus withState(State state) { this.state = state; return this; } @@ -53,12 +52,7 @@ public JobStatus withResultPath(Path resultPath) { return this; } - public JobStatus withPagedResults(Pageable pagedResults) { - this.pagedResults = pagedResults; - return this; - } - - public STATE getState() { + public State getState() { return state; } @@ -70,8 +64,4 @@ public Path getResultPath() { return resultPath; } - public Pageable getPagedResults() { - return pagedResults; - } - } diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java index 3328bb4416..61f83c61fd 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java @@ -107,10 +107,7 @@ public int run(String[] args) { hadoopConf, FileSystem.get(hadoopConf), new FixedPcapFilter.Configurator()); - } catch (IOException | ClassNotFoundException e) { - LOGGER.error("Failed to execute fixed filter job: {}", e.getMessage(), e); - return -1; - } catch (InterruptedException e) { + } catch (IOException | ClassNotFoundException | InterruptedException e) { LOGGER.error("Failed to execute fixed filter job: {}", e.getMessage(), e); return -1; } @@ -144,10 +141,7 @@ public int run(String[] args) { hadoopConf, FileSystem.get(hadoopConf), new QueryPcapFilter.Configurator()); - } catch (IOException | ClassNotFoundException e) { - LOGGER.error("Failed to execute query filter job: {}", e.getMessage(), e); - return -1; - } catch (InterruptedException e) { + } catch (IOException | ClassNotFoundException | InterruptedException e) { LOGGER.error("Failed to execute query filter job: {}", e.getMessage(), e); return -1; } diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java index e24b43f316..af1448fe3e 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java @@ -20,7 +20,6 @@ import static java.lang.Long.toUnsignedString; import static java.lang.String.format; -import static org.hamcrest.CoreMatchers.endsWith; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.startsWith; @@ -37,10 +36,9 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.metron.common.utils.timestamp.TimestampConverters; import org.apache.metron.job.JobStatus; -import org.apache.metron.job.JobStatus.STATE; +import org.apache.metron.job.JobStatus.State; import org.apache.metron.job.Statusable; import org.apache.metron.pcap.filter.PcapFilterConfigurator; import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; @@ -106,7 +104,7 @@ public Job createJob(Path basePath, Path outputPath, long beginNS, long endN @Test public void job_succeeds_synchronously() throws Exception { when(job.isComplete()).thenReturn(true); - when(mrStatus.getState()).thenReturn(State.SUCCEEDED); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); when(job.getStatus()).thenReturn(mrStatus); TestJob testJob = new TestJob(); Statusable statusable = testJob.query(basePath, @@ -121,19 +119,17 @@ public void job_succeeds_synchronously() throws Exception { true); verify(job, times(1)).waitForCompletion(true); JobStatus status = statusable.getStatus(); - Assert.assertThat(status.getState(), equalTo(STATE.SUCCEEDED)); + Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED)); Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString(); Assert.assertThat(status.getResultPath(), notNullValue()); Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath)); - Assert.assertThat(status.getPagedResults(), notNullValue()); - Assert.assertThat(((Path)status.getPagedResults().getPage(0)).getName(), endsWith("pcap-data-1234+0001.pcap")); } @Test public void job_fails_synchronously() throws Exception { when(job.isComplete()).thenReturn(true); - when(mrStatus.getState()).thenReturn(State.FAILED); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.FAILED); when(job.getStatus()).thenReturn(mrStatus); TestJob testJob = new TestJob(); Statusable statusable = testJob.query(basePath, @@ -147,7 +143,7 @@ public void job_fails_synchronously() throws Exception { new FixedPcapFilter.Configurator(), true); JobStatus status = statusable.getStatus(); - Assert.assertThat(status.getState(), equalTo(STATE.FAILED)); + Assert.assertThat(status.getState(), equalTo(State.FAILED)); Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString(); Assert.assertThat(status.getResultPath(), notNullValue()); @@ -157,7 +153,7 @@ public void job_fails_synchronously() throws Exception { @Test public void job_fails_with_killed_status_synchronously() throws Exception { when(job.isComplete()).thenReturn(true); - when(mrStatus.getState()).thenReturn(State.KILLED); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED); when(job.getStatus()).thenReturn(mrStatus); TestJob testJob = new TestJob(); Statusable statusable = testJob.query(basePath, @@ -171,7 +167,7 @@ public void job_fails_with_killed_status_synchronously() throws Exception { new FixedPcapFilter.Configurator(), true); JobStatus status = statusable.getStatus(); - Assert.assertThat(status.getState(), equalTo(STATE.KILLED)); + Assert.assertThat(status.getState(), equalTo(State.KILLED)); Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString(); Assert.assertThat(status.getResultPath(), notNullValue()); @@ -181,7 +177,7 @@ public void job_fails_with_killed_status_synchronously() throws Exception { @Test public void job_succeeds_asynchronously() throws Exception { when(job.isComplete()).thenReturn(true); - when(mrStatus.getState()).thenReturn(State.SUCCEEDED); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); when(job.getStatus()).thenReturn(mrStatus); TestJob testJob = new TestJob(); Statusable statusable = testJob.query(basePath, @@ -195,7 +191,7 @@ public void job_succeeds_asynchronously() throws Exception { new FixedPcapFilter.Configurator(), false); JobStatus status = statusable.getStatus(); - Assert.assertThat(status.getState(), equalTo(STATE.SUCCEEDED)); + Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED)); Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString(); Assert.assertThat(status.getResultPath(), notNullValue()); @@ -205,7 +201,7 @@ public void job_succeeds_asynchronously() throws Exception { @Test public void job_reports_percent_complete() throws Exception { when(job.isComplete()).thenReturn(false); - when(mrStatus.getState()).thenReturn(State.RUNNING); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING); when(job.getStatus()).thenReturn(mrStatus); TestJob testJob = new TestJob(); Statusable statusable = testJob.query(basePath, @@ -221,7 +217,7 @@ public void job_reports_percent_complete() throws Exception { when(job.mapProgress()).thenReturn(0.5f); when(job.reduceProgress()).thenReturn(0f); JobStatus status = statusable.getStatus(); - Assert.assertThat(status.getState(), equalTo(STATE.RUNNING)); + Assert.assertThat(status.getState(), equalTo(State.RUNNING)); Assert.assertThat(status.getPercentComplete(), equalTo(25.0)); when(job.mapProgress()).thenReturn(1.0f); when(job.reduceProgress()).thenReturn(0.5f); diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java new file mode 100644 index 0000000000..997c5f79d8 --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap; + +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.metron.job.Pageable; + +public class PcapFiles implements Pageable { + + private List files; + + public PcapFiles(List files) { + this.files = files; + } + + @Override + public Iterable asIterable() { + return files; + } + + @Override + public Path getPage(int num) { + return files.get(num); + } +} diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index 1992b6b1b2..b7a0edcd0e 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -51,9 +51,11 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.metron.common.hadoop.SequenceFileIterable; import org.apache.metron.job.JobStatus; -import org.apache.metron.job.JobStatus.STATE; +import org.apache.metron.job.JobStatus.State; +import org.apache.metron.job.Pageable; import org.apache.metron.job.Statusable; import org.apache.metron.pcap.PacketInfo; +import org.apache.metron.pcap.PcapFiles; import org.apache.metron.pcap.PcapHelper; import org.apache.metron.pcap.filter.PcapFilter; import org.apache.metron.pcap.filter.PcapFilterConfigurator; @@ -181,7 +183,7 @@ public SequenceFileIterable query(Path basePath conf, fs, filterImpl, true); JobStatus jobStatus = statusable.getStatus(); - if (jobStatus.getState() == STATE.SUCCEEDED) { + if (jobStatus.getState() == State.SUCCEEDED) { Path resultPath = jobStatus.getResultPath(); return readResults(resultPath, conf, fs); } else { @@ -205,7 +207,7 @@ public Statusable query(Path basePath, PcapFilterConfigurator filterImpl, boolean sync) throws IOException, ClassNotFoundException, InterruptedException { - String fileName = Joiner.on("_").join(beginNS, endNS, filterImpl.queryToString(fields), UUID.randomUUID().toString()); + String outputDirName = Joiner.on("_").join(beginNS, endNS, filterImpl.queryToString(fields), UUID.randomUUID().toString()); if(LOG.isDebugEnabled()) { DateFormat format = SimpleDateFormat.getDateTimeInstance( SimpleDateFormat.LONG , SimpleDateFormat.LONG @@ -214,7 +216,7 @@ public Statusable query(Path basePath, String to = format.format(new Date(Long.divideUnsigned(endNS, 1000000))); LOG.debug("Executing query {} on timerange from {} to {}", filterImpl.queryToString(fields), from, to); } - outputPath = new Path(baseOutputPath, fileName); + outputPath = new Path(baseOutputPath, outputDirName); job = createJob( basePath , outputPath , beginNS @@ -255,7 +257,7 @@ private SequenceFileIterable readResults(Path outputPath, Configuration config, return new SequenceFileIterable(files, config); } - public List writeResults(SequenceFileIterable results, ResultsWriter resultsWriter, + public Pageable writeResults(SequenceFileIterable results, ResultsWriter resultsWriter, Path outPath, int recPerFile, String prefix) throws IOException { List outFiles = new ArrayList<>(); try { @@ -279,7 +281,7 @@ public List writeResults(SequenceFileIterable results, ResultsWriter resul LOG.warn("Unable to cleanup files in HDFS", e); } } - return outFiles; + return new PcapFiles(outFiles); } /** @@ -340,27 +342,27 @@ public JobStatus getStatus() { // Note: this method is only reading state from the underlying job, so locking not needed JobStatus status = new JobStatus().withResultPath(outputPath); if (job == null) { - status.withPercentComplete(100).withState(STATE.SUCCEEDED); + status.withPercentComplete(100).withState(State.SUCCEEDED); } else { try { if (job.isComplete()) { status.withPercentComplete(100); switch (job.getStatus().getState()) { case SUCCEEDED: - status.withState(STATE.SUCCEEDED); + status.withState(State.SUCCEEDED); break; case FAILED: - status.withState(STATE.FAILED); + status.withState(State.FAILED); break; case KILLED: - status.withState(STATE.KILLED); + status.withState(State.KILLED); break; } } else { float mapProg = job.mapProgress(); float reduceProg = job.reduceProgress(); float totalProgress = ((mapProg / 2) + (reduceProg / 2)) * 100; - status.withPercentComplete(totalProgress).withState(STATE.RUNNING); + status.withPercentComplete(totalProgress).withState(State.RUNNING); } } catch (Exception e) { throw new RuntimeException("Error occurred while attempting to retrieve job status.", e); From cd302a989320a6a8970b18a94df8fbf4f11576b1 Mon Sep 17 00:00:00 2001 From: Michael Miklavcic Date: Wed, 27 Jun 2018 09:12:42 -0600 Subject: [PATCH 6/8] change pom change with spaces to tabs --- metron-platform/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metron-platform/pom.xml b/metron-platform/pom.xml index 6866386330..cb64f9efa3 100644 --- a/metron-platform/pom.xml +++ b/metron-platform/pom.xml @@ -46,7 +46,7 @@ metron-enrichment metron-solr metron-parsers - metron-job + metron-job metron-pcap-backend metron-data-management metron-pcap From 7c1d4a0bedb23e86e323b0daec21c73113065360 Mon Sep 17 00:00:00 2001 From: Michael Miklavcic Date: Wed, 27 Jun 2018 17:20:16 -0600 Subject: [PATCH 7/8] Address review comments. Fix local FS write path problem. --- .../apache/metron/common/utils/HDFSUtils.java | 25 +++++++---- .../metron/common/utils/HDFSUtilsTest.java | 13 +++++- .../java/org/apache/metron/job/JobStatus.java | 16 ++++++++ .../org/apache/metron/pcap/query/PcapCli.java | 8 ++-- .../org/apache/metron/pcap/PcapJobTest.java | 31 +++++++++++--- .../PcapTopologyIntegrationTest.java | 41 ++++++++++++------- .../apache/metron/pcap/query/PcapCliTest.java | 23 ++++++----- .../org/apache/metron/pcap/mr/PcapJob.java | 23 +++++++---- 8 files changed, 130 insertions(+), 50 deletions(-) diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java index 1d03a6d5a0..ae09edf881 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java @@ -30,8 +30,11 @@ public class HDFSUtils { /** - * Reads full HDFS FS file contents into a List of Strings. Initializes file system with default - * configuration. Opens and closes the file system on each call. Never null. + * Reads full file contents into a List of Strings. Reads from local FS if file:/// used as the + * scheme. Initializes file system with default configuration. + * Automatically handles file system translation for the provided path's scheme. + * Opens and closes the file system on each call. Never null. + * Null/empty scheme defaults to default configured FS. * * @param path path to file * @return file contents as a String @@ -42,8 +45,10 @@ public static List readFile(String path) throws IOException { } /** - * Reads full HDFS FS file contents into a String. Opens and closes the file system on each call. - * Never null. + * Reads full file contents into a String. Reads from local FS if file:/// used as the scheme. + * Opens and closes the file system on each call. + * Never null. Automatically handles file system translation for the provided path's scheme. + * Null/empty scheme defaults to default configured FS. * * @param config Hadoop configuration * @param path path to file @@ -51,14 +56,16 @@ public static List readFile(String path) throws IOException { * @throws IOException */ public static List readFile(Configuration config, String path) throws IOException { - FileSystem fs = FileSystem.newInstance(config); - Path hdfsPath = new Path(path); - FSDataInputStream inputStream = fs.open(hdfsPath); + Path inPath = new Path(path); + FileSystem fs = FileSystem.newInstance(inPath.toUri(), config); + FSDataInputStream inputStream = fs.open(inPath); return IOUtils.readLines(inputStream, "UTF-8"); } /** - * Write file to HDFS. Writes to local FS if file:// used as protocol. + * Write file to HDFS. Writes to local FS if file:/// used as the scheme. + * Automatically handles file system translation for the provided path's scheme. + * Null/empty scheme defaults to default configured FS. * * @param config filesystem configuration * @param bytes bytes to write @@ -66,8 +73,8 @@ public static List readFile(Configuration config, String path) throws IO * @throws IOException This is the generic Hadoop "everything is an IOException." */ public static void write(Configuration config, byte[] bytes, String path) throws IOException { - FileSystem fs = FileSystem.newInstance(config); Path outPath = new Path(path); + FileSystem fs = FileSystem.newInstance(outPath.toUri(), config); fs.mkdirs(outPath.getParent()); try (FSDataOutputStream outputStream = fs.create(outPath)) { outputStream.write(bytes); diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HDFSUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HDFSUtilsTest.java index 53b6f299c2..a572e24408 100644 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HDFSUtilsTest.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HDFSUtilsTest.java @@ -37,7 +37,7 @@ public class HDFSUtilsTest { @Test public void writes_file_to_local_fs() throws Exception { String outText = "small brown bike and casket lottery"; - String outFile = tempDir.getRoot().getAbsolutePath() + "outfile.txt"; + String outFile = tempDir.getRoot().getAbsolutePath() + "/outfile.txt"; Configuration config = new Configuration(); config.set("fs.default.name", "file:///"); HDFSUtils.write(config, outText.getBytes(StandardCharsets.UTF_8), outFile); @@ -45,4 +45,15 @@ public void writes_file_to_local_fs() throws Exception { assertThat("Text should match", actual, equalTo(outText)); } + @Test + public void writes_file_to_local_fs_with_scheme_defined_only_in_uri() throws Exception { + String outText = "small brown bike and casket lottery"; + String outFile = tempDir.getRoot().getAbsolutePath() + "/outfile.txt"; + Configuration config = new Configuration(); + HDFSUtils.write(config, outText.getBytes(StandardCharsets.UTF_8), "file:///" + outFile); + + String actual = TestUtils.read(new File(outFile)); + assertThat("Text should match", actual, equalTo(outText)); + } + } diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java index 8b67487706..ec006fb2c7 100644 --- a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java @@ -33,10 +33,17 @@ public enum State { KILLED } + private String jobId; private State state = State.NOT_RUNNING; private double percentComplete = 0.0; + private String description; private Path resultPath; + public JobStatus withJobId(String jobId) { + this.jobId = jobId; + return this; + } + public JobStatus withState(State state) { this.state = state; return this; @@ -47,6 +54,11 @@ public JobStatus withPercentComplete(double percentComplete) { return this; } + public JobStatus withDescription(String description) { + this.description = description; + return this; + } + public JobStatus withResultPath(Path resultPath) { this.resultPath = resultPath; return this; @@ -60,6 +72,10 @@ public double getPercentComplete() { return percentComplete; } + public String getDescription() { + return description; + } + public Path getResultPath() { return resultPath; } diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java index 61f83c61fd..e28b7f42c0 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.Arrays; +import java.util.Optional; import java.util.UUID; import org.apache.commons.cli.ParseException; import org.apache.commons.lang3.tuple.Pair; @@ -97,7 +98,7 @@ public int run(String[] args) { long endTime = time.getRight(); try { - results = jobRunner.query( + results = jobRunner.query(Optional.empty(), new Path(config.getBasePath()), new Path(config.getBaseOutputPath()), startTime, @@ -131,7 +132,7 @@ public int run(String[] args) { long endTime = time.getRight(); try { - results = jobRunner.query( + results = jobRunner.query(Optional.empty(), new Path(config.getBasePath()), new Path(config.getBaseOutputPath()), startTime, @@ -152,7 +153,8 @@ public int run(String[] args) { try { // write to local FS in the executing directory - jobRunner.writeResults(results, resultsWriter, new Path("file://."), + String execDir = System.getProperty("user.dir"); + jobRunner.writeResults(results, resultsWriter, new Path("file:///" + execDir), commonConfig.getNumRecordsPerFile(), commonConfig.getPrefix()); } catch (IOException e) { diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java index af1448fe3e..5a5d4063e6 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java @@ -30,12 +30,14 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobID; import org.apache.metron.common.utils.timestamp.TimestampConverters; import org.apache.metron.job.JobStatus; import org.apache.metron.job.JobStatus.State; @@ -55,6 +57,9 @@ public class PcapJobTest { private Job job; @Mock private org.apache.hadoop.mapreduce.JobStatus mrStatus; + @Mock + private JobID jobId; + private static final String JOB_ID_VAL = "job_abc_123"; private Path basePath; private Path baseOutPath; private long startTime; @@ -74,6 +79,8 @@ public void setup() { fixedFields = new HashMap<>(); fixedFields.put("ip_src_addr", "192.168.1.1"); hadoopConfig = new Configuration(); + when(jobId.toString()).thenReturn(JOB_ID_VAL); + when(mrStatus.getJobID()).thenReturn(jobId); } @Test @@ -94,7 +101,7 @@ public void partition_gives_value_in_range() throws Exception { private class TestJob extends PcapJob { @Override - public Job createJob(Path basePath, Path outputPath, long beginNS, long endNS, + public Job createJob(Optional jobName, Path basePath, Path outputPath, long beginNS, long endNS, int numReducers, T fields, Configuration conf, FileSystem fs, PcapFilterConfigurator filterImpl) throws IOException { return job; @@ -107,7 +114,9 @@ public void job_succeeds_synchronously() throws Exception { when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); when(job.getStatus()).thenReturn(mrStatus); TestJob testJob = new TestJob(); - Statusable statusable = testJob.query(basePath, + Statusable statusable = testJob.query( + Optional.empty(), + basePath, baseOutPath, startTime, endTime, @@ -132,7 +141,9 @@ public void job_fails_synchronously() throws Exception { when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.FAILED); when(job.getStatus()).thenReturn(mrStatus); TestJob testJob = new TestJob(); - Statusable statusable = testJob.query(basePath, + Statusable statusable = testJob.query( + Optional.empty(), + basePath, baseOutPath, startTime, endTime, @@ -156,7 +167,9 @@ public void job_fails_with_killed_status_synchronously() throws Exception { when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED); when(job.getStatus()).thenReturn(mrStatus); TestJob testJob = new TestJob(); - Statusable statusable = testJob.query(basePath, + Statusable statusable = testJob.query( + Optional.empty(), + basePath, baseOutPath, startTime, endTime, @@ -180,7 +193,9 @@ public void job_succeeds_asynchronously() throws Exception { when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); when(job.getStatus()).thenReturn(mrStatus); TestJob testJob = new TestJob(); - Statusable statusable = testJob.query(basePath, + Statusable statusable = testJob.query( + Optional.empty(), + basePath, baseOutPath, startTime, endTime, @@ -204,7 +219,9 @@ public void job_reports_percent_complete() throws Exception { when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING); when(job.getStatus()).thenReturn(mrStatus); TestJob testJob = new TestJob(); - Statusable statusable = testJob.query(basePath, + Statusable statusable = testJob.query( + Optional.empty(), + basePath, baseOutPath, startTime, endTime, @@ -218,11 +235,13 @@ public void job_reports_percent_complete() throws Exception { when(job.reduceProgress()).thenReturn(0f); JobStatus status = statusable.getStatus(); Assert.assertThat(status.getState(), equalTo(State.RUNNING)); + Assert.assertThat(status.getDescription(), equalTo("map: 50.0%, reduce: 0.0%")); Assert.assertThat(status.getPercentComplete(), equalTo(25.0)); when(job.mapProgress()).thenReturn(1.0f); when(job.reduceProgress()).thenReturn(0.5f); status = statusable.getStatus(); Assert.assertThat(status.getPercentComplete(), equalTo(75.0)); + Assert.assertThat(status.getDescription(), equalTo("map: 100.0%, reduce: 50.0%")); } } diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java index 29c68d0606..b32fa1808f 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java @@ -29,11 +29,11 @@ import java.io.IOException; import java.util.AbstractMap; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import javax.annotation.Nullable; import kafka.consumer.ConsumerIterator; @@ -238,7 +238,8 @@ public ProcessorResult getResult() { { //Ensure that only two pcaps are returned when we look at 4 and 5 Iterable results = - job.query(new Path(outDir.getAbsolutePath()) + job.query(Optional.empty() + , new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(4, pcapEntries) , getTimestamp(5, pcapEntries) @@ -255,7 +256,8 @@ public ProcessorResult getResult() { // Ensure that only two pcaps are returned when we look at 4 and 5 // test with empty query filter Iterable results = - job.query(new Path(outDir.getAbsolutePath()) + job.query(Optional.empty() + , new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(4, pcapEntries) , getTimestamp(5, pcapEntries) @@ -271,7 +273,8 @@ public ProcessorResult getResult() { { //ensure that none get returned since that destination IP address isn't in the dataset Iterable results = - job.query(new Path(outDir.getAbsolutePath()) + job.query(Optional.empty() + , new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(1, pcapEntries) @@ -290,7 +293,8 @@ public ProcessorResult getResult() { // ensure that none get returned since that destination IP address isn't in the dataset // test with query filter Iterable results = - job.query(new Path(outDir.getAbsolutePath()) + job.query(Optional.empty() + , new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(1, pcapEntries) @@ -306,7 +310,8 @@ public ProcessorResult getResult() { { //same with protocol as before with the destination addr Iterable results = - job.query(new Path(outDir.getAbsolutePath()) + job.query(Optional.empty() + , new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(1, pcapEntries) @@ -325,7 +330,8 @@ public ProcessorResult getResult() { //same with protocol as before with the destination addr //test with query filter Iterable results = - job.query(new Path(outDir.getAbsolutePath()) + job.query(Optional.empty() + , new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(1, pcapEntries) @@ -341,7 +347,8 @@ public ProcessorResult getResult() { { //make sure I get them all. Iterable results = - job.query(new Path(outDir.getAbsolutePath()) + job.query(Optional.empty() + , new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 @@ -358,7 +365,8 @@ public ProcessorResult getResult() { //make sure I get them all. //with query filter Iterable results = - job.query(new Path(outDir.getAbsolutePath()) + job.query(Optional.empty() + , new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 @@ -373,7 +381,8 @@ public ProcessorResult getResult() { } { Iterable results = - job.query(new Path(outDir.getAbsolutePath()) + job.query(Optional.empty() + , new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 @@ -404,7 +413,8 @@ public boolean apply(@Nullable JSONObject input) { { //test with query filter and byte array matching Iterable results = - job.query(new Path(outDir.getAbsolutePath()) + job.query(Optional.empty() + , new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 @@ -423,7 +433,8 @@ public boolean apply(@Nullable JSONObject input) { { //test with query filter Iterable results = - job.query(new Path(outDir.getAbsolutePath()) + job.query(Optional.empty() + , new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 @@ -453,7 +464,8 @@ public boolean apply(@Nullable JSONObject input) { { //test with query filter Iterable results = - job.query(new Path(outDir.getAbsolutePath()) + job.query(Optional.empty() + , new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 @@ -483,7 +495,8 @@ public boolean apply(@Nullable JSONObject input) { { //test with query filter Iterable results = - job.query(new Path(outDir.getAbsolutePath()) + job.query(Optional.empty() + , new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java index fcb4d1995c..dc2434cd72 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java @@ -41,6 +41,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -67,11 +68,13 @@ public class PcapCliTest { private ResultsWriter resultsWriter; @Mock private Clock clock; + private String execDir; @Before public void setup() throws IOException { MockitoAnnotations.initMocks(this); doCallRealMethod().when(jobRunner).writeResults(anyObject(), anyObject(), anyObject(), anyInt(), anyObject()); + execDir = System.getProperty("user.dir"); } @Test @@ -103,11 +106,11 @@ public void runs_fixed_pcap_filter_job_with_default_argument_list() throws Excep put(PcapHelper.PacketFields.PACKET_FILTER.getName(), "`casey`"); }}; - when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); + when(jobRunner.query(eq(Optional.empty()), eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); - Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file://./pcap-data-random_prefix+0001.pcap")); + Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap")); } @Test @@ -143,11 +146,11 @@ public void runs_fixed_pcap_filter_job_with_full_argument_list_and_default_datef put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true"); }}; - when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); + when(jobRunner.query(eq(Optional.empty()), eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); - Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file://./pcap-data-random_prefix+0001.pcap")); + Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap")); } @Test @@ -186,11 +189,11 @@ public void runs_fixed_pcap_filter_job_with_full_argument_list() throws Exceptio long startAsNanos = asNanos("2016-06-13-18:35.00", "yyyy-MM-dd-HH:mm.ss"); long endAsNanos = asNanos("2016-06-15-18:35.00", "yyyy-MM-dd-HH:mm.ss"); - when(jobRunner.query(eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); + when(jobRunner.query(eq(Optional.empty()), eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); - Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file://./pcap-data-random_prefix+0001.pcap")); + Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap")); } private long asNanos(String inDate, String format) throws ParseException { @@ -219,11 +222,11 @@ public void runs_query_pcap_filter_job_with_default_argument_list() throws Excep Path base_output_path = new Path(CliParser.BASE_OUTPUT_PATH_DEFAULT); String query = "some query string"; - when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable); + when(jobRunner.query(eq(Optional.empty()), eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); - Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file://./pcap-data-random_prefix+0001.pcap")); + Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap")); } @Test @@ -247,11 +250,11 @@ public void runs_query_pcap_filter_job_with_full_argument_list() throws Exceptio Path base_output_path = new Path("/base/output/path"); String query = "some query string"; - when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable); + when(jobRunner.query(eq(Optional.empty()), eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); - Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file://./pcap-data-random_prefix+0001.pcap")); + Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap")); } // INVALID OPTION CHECKS diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index b7a0edcd0e..02fa8aa0da 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -32,6 +32,7 @@ import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; @@ -169,7 +170,8 @@ protected void reduce(LongWritable key, Iterable values, Context /** * Run query synchronously. */ - public SequenceFileIterable query(Path basePath + public SequenceFileIterable query(Optional jobName + ,Path basePath , Path baseOutputPath , long beginNS , long endNS @@ -179,7 +181,7 @@ public SequenceFileIterable query(Path basePath , FileSystem fs , PcapFilterConfigurator filterImpl ) throws IOException, ClassNotFoundException, InterruptedException { - Statusable statusable = query(basePath, baseOutputPath, beginNS, endNS, numReducers, fields, + Statusable statusable = query(jobName, basePath, baseOutputPath, beginNS, endNS, numReducers, fields, conf, fs, filterImpl, true); JobStatus jobStatus = statusable.getStatus(); @@ -196,7 +198,8 @@ public SequenceFileIterable query(Path basePath * Run query sync OR async based on flag. Async mode allows the client to check the returned * statusable object for status details. */ - public Statusable query(Path basePath, + public Statusable query(Optional jobName, + Path basePath, Path baseOutputPath, long beginNS, long endNS, @@ -217,7 +220,8 @@ public Statusable query(Path basePath, LOG.debug("Executing query {} on timerange from {} to {}", filterImpl.queryToString(fields), from, to); } outputPath = new Path(baseOutputPath, outputDirName); - job = createJob( basePath + job = createJob(jobName + , basePath , outputPath , beginNS , endNS @@ -287,7 +291,8 @@ public Pageable writeResults(SequenceFileIterable results, ResultsWriter r /** * Creates, but does not submit the job. */ - public Job createJob( Path basePath + public Job createJob(Optional jobName + ,Path basePath , Path outputPath , long beginNS , long endNS @@ -303,6 +308,7 @@ public Job createJob( Path basePath conf.set(WIDTH_CONF, "" + findWidth(beginNS, endNS, numReducers)); filterImpl.addToConfig(fields, conf); Job job = Job.getInstance(conf); + jobName.ifPresent(job::setJobName); job.setJarByClass(PcapJob.class); job.setMapperClass(PcapJob.PcapMapper.class); job.setMapOutputKeyClass(LongWritable.class); @@ -345,11 +351,12 @@ public JobStatus getStatus() { status.withPercentComplete(100).withState(State.SUCCEEDED); } else { try { + status.withJobId(job.getStatus().getJobID().toString()); if (job.isComplete()) { status.withPercentComplete(100); switch (job.getStatus().getState()) { case SUCCEEDED: - status.withState(State.SUCCEEDED); + status.withState(State.SUCCEEDED).withDescription(State.SUCCEEDED.toString()); break; case FAILED: status.withState(State.FAILED); @@ -362,7 +369,9 @@ public JobStatus getStatus() { float mapProg = job.mapProgress(); float reduceProg = job.reduceProgress(); float totalProgress = ((mapProg / 2) + (reduceProg / 2)) * 100; - status.withPercentComplete(totalProgress).withState(State.RUNNING); + String description = String.format("map: %s%%, reduce: %s%%", mapProg * 100, reduceProg * 100); + status.withPercentComplete(totalProgress).withState(State.RUNNING) + .withDescription(description); } } catch (Exception e) { throw new RuntimeException("Error occurred while attempting to retrieve job status.", e); From a00e300d8bd812589fd09970ce4db1f25dc29165 Mon Sep 17 00:00:00 2001 From: Michael Miklavcic Date: Wed, 27 Jun 2018 21:31:02 -0600 Subject: [PATCH 8/8] Let's try this again. --- .../rest/service/impl/PcapServiceImpl.java | 11 +++-- .../apache/metron/rest/mock/MockPcapJob.java | 15 +++---- .../org/apache/metron/pcap/query/PcapCli.java | 5 +-- .../PcapTopologyIntegrationTest.java | 40 ++++++------------- .../apache/metron/pcap/query/PcapCliTest.java | 11 +++-- .../org/apache/metron/pcap/mr/PcapJob.java | 5 +-- 6 files changed, 33 insertions(+), 54 deletions(-) diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java index 4dae1e5ec6..dd4af5cb03 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java @@ -17,6 +17,11 @@ */ package org.apache.metron.rest.service.impl; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -35,12 +40,6 @@ import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - @Service public class PcapServiceImpl implements PcapService { diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java index 3aa9ce37ed..a7eca319b4 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java @@ -17,6 +17,12 @@ */ package org.apache.metron.rest.mock; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -24,15 +30,6 @@ import org.apache.metron.pcap.filter.PcapFilterConfigurator; import org.apache.metron.pcap.mr.PcapJob; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -import static org.mockito.Matchers.anyList; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class MockPcapJob extends PcapJob { private String basePath; diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java index e28b7f42c0..0fda8011bf 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.Arrays; -import java.util.Optional; import java.util.UUID; import org.apache.commons.cli.ParseException; import org.apache.commons.lang3.tuple.Pair; @@ -98,7 +97,7 @@ public int run(String[] args) { long endTime = time.getRight(); try { - results = jobRunner.query(Optional.empty(), + results = jobRunner.query( new Path(config.getBasePath()), new Path(config.getBaseOutputPath()), startTime, @@ -132,7 +131,7 @@ public int run(String[] args) { long endTime = time.getRight(); try { - results = jobRunner.query(Optional.empty(), + results = jobRunner.query( new Path(config.getBasePath()), new Path(config.getBaseOutputPath()), startTime, diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java index b32fa1808f..c7292abc08 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java @@ -33,7 +33,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Properties; import javax.annotation.Nullable; import kafka.consumer.ConsumerIterator; @@ -238,8 +237,7 @@ public ProcessorResult getResult() { { //Ensure that only two pcaps are returned when we look at 4 and 5 Iterable results = - job.query(Optional.empty() - , new Path(outDir.getAbsolutePath()) + job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(4, pcapEntries) , getTimestamp(5, pcapEntries) @@ -256,8 +254,7 @@ public ProcessorResult getResult() { // Ensure that only two pcaps are returned when we look at 4 and 5 // test with empty query filter Iterable results = - job.query(Optional.empty() - , new Path(outDir.getAbsolutePath()) + job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(4, pcapEntries) , getTimestamp(5, pcapEntries) @@ -273,8 +270,7 @@ public ProcessorResult getResult() { { //ensure that none get returned since that destination IP address isn't in the dataset Iterable results = - job.query(Optional.empty() - , new Path(outDir.getAbsolutePath()) + job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(1, pcapEntries) @@ -293,8 +289,7 @@ public ProcessorResult getResult() { // ensure that none get returned since that destination IP address isn't in the dataset // test with query filter Iterable results = - job.query(Optional.empty() - , new Path(outDir.getAbsolutePath()) + job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(1, pcapEntries) @@ -310,8 +305,7 @@ public ProcessorResult getResult() { { //same with protocol as before with the destination addr Iterable results = - job.query(Optional.empty() - , new Path(outDir.getAbsolutePath()) + job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(1, pcapEntries) @@ -330,8 +324,7 @@ public ProcessorResult getResult() { //same with protocol as before with the destination addr //test with query filter Iterable results = - job.query(Optional.empty() - , new Path(outDir.getAbsolutePath()) + job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(1, pcapEntries) @@ -347,8 +340,7 @@ public ProcessorResult getResult() { { //make sure I get them all. Iterable results = - job.query(Optional.empty() - , new Path(outDir.getAbsolutePath()) + job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 @@ -365,8 +357,7 @@ public ProcessorResult getResult() { //make sure I get them all. //with query filter Iterable results = - job.query(Optional.empty() - , new Path(outDir.getAbsolutePath()) + job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 @@ -381,8 +372,7 @@ public ProcessorResult getResult() { } { Iterable results = - job.query(Optional.empty() - , new Path(outDir.getAbsolutePath()) + job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 @@ -413,8 +403,7 @@ public boolean apply(@Nullable JSONObject input) { { //test with query filter and byte array matching Iterable results = - job.query(Optional.empty() - , new Path(outDir.getAbsolutePath()) + job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 @@ -433,8 +422,7 @@ public boolean apply(@Nullable JSONObject input) { { //test with query filter Iterable results = - job.query(Optional.empty() - , new Path(outDir.getAbsolutePath()) + job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 @@ -464,8 +452,7 @@ public boolean apply(@Nullable JSONObject input) { { //test with query filter Iterable results = - job.query(Optional.empty() - , new Path(outDir.getAbsolutePath()) + job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 @@ -495,8 +482,7 @@ public boolean apply(@Nullable JSONObject input) { { //test with query filter Iterable results = - job.query(Optional.empty() - , new Path(outDir.getAbsolutePath()) + job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java index dc2434cd72..3468a7c658 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java @@ -41,7 +41,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Optional; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -106,7 +105,7 @@ public void runs_fixed_pcap_filter_job_with_default_argument_list() throws Excep put(PcapHelper.PacketFields.PACKET_FILTER.getName(), "`casey`"); }}; - when(jobRunner.query(eq(Optional.empty()), eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); + when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); @@ -146,7 +145,7 @@ public void runs_fixed_pcap_filter_job_with_full_argument_list_and_default_datef put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true"); }}; - when(jobRunner.query(eq(Optional.empty()), eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); + when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); @@ -189,7 +188,7 @@ public void runs_fixed_pcap_filter_job_with_full_argument_list() throws Exceptio long startAsNanos = asNanos("2016-06-13-18:35.00", "yyyy-MM-dd-HH:mm.ss"); long endAsNanos = asNanos("2016-06-15-18:35.00", "yyyy-MM-dd-HH:mm.ss"); - when(jobRunner.query(eq(Optional.empty()), eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); + when(jobRunner.query(eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); @@ -222,7 +221,7 @@ public void runs_query_pcap_filter_job_with_default_argument_list() throws Excep Path base_output_path = new Path(CliParser.BASE_OUTPUT_PATH_DEFAULT); String query = "some query string"; - when(jobRunner.query(eq(Optional.empty()), eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable); + when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); @@ -250,7 +249,7 @@ public void runs_query_pcap_filter_job_with_full_argument_list() throws Exceptio Path base_output_path = new Path("/base/output/path"); String query = "some query string"; - when(jobRunner.query(eq(Optional.empty()), eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable); + when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index 02fa8aa0da..269f69b098 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -170,8 +170,7 @@ protected void reduce(LongWritable key, Iterable values, Context /** * Run query synchronously. */ - public SequenceFileIterable query(Optional jobName - ,Path basePath + public SequenceFileIterable query(Path basePath , Path baseOutputPath , long beginNS , long endNS @@ -181,7 +180,7 @@ public SequenceFileIterable query(Optional jobName , FileSystem fs , PcapFilterConfigurator filterImpl ) throws IOException, ClassNotFoundException, InterruptedException { - Statusable statusable = query(jobName, basePath, baseOutputPath, beginNS, endNS, numReducers, fields, + Statusable statusable = query(Optional.empty(), basePath, baseOutputPath, beginNS, endNS, numReducers, fields, conf, fs, filterImpl, true); JobStatus jobStatus = statusable.getStatus();