From 808f303e9cdb3145ed8b267b7046f6647c7fd5c4 Mon Sep 17 00:00:00 2001 From: cstella Date: Thu, 23 Jun 2016 15:38:43 -0400 Subject: [PATCH 1/2] Make the PCap values returned ordered by timestamp --- .../src/main/java/org/apache/metron/pcap/mr/PcapJob.java | 1 + 1 file changed, 1 insertion(+) diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index 3543b1d81d..a181637f3a 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -224,6 +224,7 @@ public Job createJob( Path basePath job.setMapperClass(PcapJob.PcapMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(BytesWritable.class); + job.setNumReduceTasks(1); job.setReducerClass(PcapReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(BytesWritable.class); From f403cdb5d0cfc4f755072b976bea48949b079929 Mon Sep 17 00:00:00 2001 From: cstella Date: Fri, 24 Jun 2016 11:37:06 -0400 Subject: [PATCH 2/2] Added better integration test. --- .../PcapTopologyIntegrationTest.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java index 03e3639367..8fcdeedb64 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java @@ -281,6 +281,7 @@ public Void getResult() { , FileSystem.get(new Configuration()) , new FixedPcapFilter.Configurator() ); + assertInOrder(results); Assert.assertEquals(results.size(), 2); } { @@ -296,6 +297,7 @@ public Void getResult() { , FileSystem.get(new Configuration()) , new QueryPcapFilter.Configurator() ); + assertInOrder(results); Assert.assertEquals(results.size(), 2); } { @@ -312,6 +314,7 @@ public Void getResult() { , FileSystem.get(new Configuration()) , new FixedPcapFilter.Configurator() ); + assertInOrder(results); Assert.assertEquals(results.size(), 0); } { @@ -327,6 +330,7 @@ public Void getResult() { , FileSystem.get(new Configuration()) , new QueryPcapFilter.Configurator() ); + assertInOrder(results); Assert.assertEquals(results.size(), 0); } { @@ -343,6 +347,7 @@ public Void getResult() { , FileSystem.get(new Configuration()) , new FixedPcapFilter.Configurator() ); + assertInOrder(results); Assert.assertEquals(results.size(), 0); } { @@ -358,6 +363,7 @@ public Void getResult() { , FileSystem.get(new Configuration()) , new QueryPcapFilter.Configurator() ); + assertInOrder(results); Assert.assertEquals(results.size(), 0); } { @@ -372,6 +378,7 @@ public Void getResult() { , FileSystem.get(new Configuration()) , new FixedPcapFilter.Configurator() ); + assertInOrder(results); Assert.assertEquals(results.size(), pcapEntries.size()); } { @@ -387,6 +394,7 @@ public Void getResult() { , FileSystem.get(new Configuration()) , new QueryPcapFilter.Configurator() ); + assertInOrder(results); Assert.assertEquals(results.size(), pcapEntries.size()); } { @@ -402,6 +410,7 @@ public Void getResult() { , FileSystem.get(new Configuration()) , new FixedPcapFilter.Configurator() ); + assertInOrder(results); Assert.assertTrue(results.size() > 0); Assert.assertEquals(results.size() , Iterables.size(filterPcaps(pcapEntries, new Predicate() { @@ -429,6 +438,7 @@ public boolean apply(@Nullable JSONObject input) { , FileSystem.get(new Configuration()) , new QueryPcapFilter.Configurator() ); + assertInOrder(results); Assert.assertTrue(results.size() > 0); Assert.assertEquals(results.size() , Iterables.size(filterPcaps(pcapEntries, new Predicate() { @@ -440,6 +450,7 @@ public boolean apply(@Nullable JSONObject input) { }, withHeaders) ) ); + assertInOrder(results); ByteArrayOutputStream baos = new ByteArrayOutputStream(); PcapMerger.merge(baos, results); Assert.assertTrue(baos.toByteArray().length > 0); @@ -452,6 +463,18 @@ public boolean apply(@Nullable JSONObject input) { } } + public static void assertInOrder(Iterable packets) { + long previous = 0; + for(byte[] packet : packets) { + for(JSONObject json : TO_JSONS.apply(packet)) { + Long current = Long.parseLong(json.get("ts_micro").toString()); + Assert.assertNotNull(current); + Assert.assertTrue(Long.compareUnsigned(current, previous) >= 0); + previous = current; + } + } + } + public static Function> TO_JSONS = new Function>() { @Nullable @Override