Skip to content
This repository was archived by the owner on Aug 20, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,5 +192,14 @@
<description>The number of pcaps written to a page/file as a result of a pcap query.</description>
<value>10</value>
</property>
<property>
<name>pcap_yarn_queue</name>
<display-name>Pcap YARN Queue</display-name>
<description>The YARN queue pcap jobs will be submitted to.</description>
<value/>
<value-attributes>
<empty-value-valid>true</empty-value-valid>
</value-attributes>
</property>

</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@
pcap_base_interim_result_path = config['configurations']['metron-rest-env']['pcap_base_interim_result_path']
pcap_final_output_path = config['configurations']['metron-rest-env']['pcap_final_output_path']
pcap_page_size = config['configurations']['metron-rest-env']['pcap_page_size']
pcap_yarn_queue = config['configurations']['metron-rest-env']['pcap_yarn_queue']
pcap_configured_flag_file = status_params.pcap_configured_flag_file

# MapReduce
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ PCAP_BASE_PATH="{{pcap_base_path}}"
PCAP_BASE_INTERIM_RESULT_PATH="{{pcap_base_interim_result_path}}"
PCAP_FINAL_OUTPUT_PATH="{{pcap_final_output_path}}"
PCAP_PAGE_SIZE="{{pcap_page_size}}"
PCAP_YARN_QUEUE="{{pcap_yarn_queue}}"
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,10 @@
"config": "metron-rest-env/pcap_page_size",
"subsection-name": "subsection-rest"
},
{
"config": "metron-rest-env/pcap_yarn_queue",
"subsection-name": "subsection-rest"
},
{
"config": "metron-management-ui-env/metron_management_ui_port",
"subsection-name": "subsection-management-ui"
Expand Down Expand Up @@ -1430,6 +1434,12 @@
"type": "text-field"
}
},
{
"config": "metron-rest-env/pcap_yarn_queue",
"widget": {
"type": "text-field"
}
},
{
"config": "metron-management-ui-env/metron_management_ui_port",
"widget": {
Expand Down
2 changes: 2 additions & 0 deletions metron-interface/metron-rest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ The REST application uses a Java Process object to call out to the `pcap_to_pdml
Out of the box it is a simple wrapper around the tshark command to transform raw pcap data to PDML. However it can be extended to do additional processing as long as the expected input/output is maintained.
REST will supply the script with raw pcap data through standard in and expects PDML data serialized as XML.

Pcap query jobs can be configured for submission to a YARN queue. This setting is exposed as the Spring property `pcap.yarn.queue`. If configured, the REST application will set the `mapreduce.job.queuename` Hadoop property to that value.

## API

Request and Response objects are JSON formatted. The JSON schemas are available in the Swagger UI.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,4 @@ pcap:
base.interim.result.path: ${PCAP_BASE_INTERIM_RESULT_PATH}
final.output.path: ${PCAP_FINAL_OUTPUT_PATH}
page.size: ${PCAP_PAGE_SIZE}
yarn.queue: ${PCAP_YARN_QUEUE}
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,5 @@ public class MetronRestConstants {
public static final String PCAP_FINAL_OUTPUT_PATH_SPRING_PROPERTY = "pcap.final.output.path";
public static final String PCAP_PAGE_SIZE_SPRING_PROPERTY = "pcap.page.size";
public static final String PCAP_PDML_SCRIPT_PATH_SPRING_PROPERTY = "pcap.pdml.script.path";
public static final String PCAP_YARN_QUEUE_SPRING_PROPERTY = "pcap.yarn.queue";
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public Statusable<Path> get() {
PcapJob<Path> pcapJob = createPcapJob();
return pcapJob.submit(PcapFinalizerStrategies.REST, pcapRequest);
} catch (JobException e) {
throw new RuntimeJobException(e.getMessage());
throw new RuntimeJobException(e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.metron.job.JobException;
import org.apache.metron.job.JobNotFoundException;
import org.apache.metron.job.JobStatus;
Expand Down Expand Up @@ -57,6 +58,8 @@
import java.nio.charset.StandardCharsets;
import java.util.Map;

import static org.apache.metron.rest.MetronRestConstants.PCAP_YARN_QUEUE_SPRING_PROPERTY;

@Service
public class PcapServiceImpl implements PcapService {

Expand Down Expand Up @@ -250,7 +253,14 @@ public Map<String, Object> getConfiguration(String username, String jobId) throw
protected void setPcapOptions(String username, PcapRequest pcapRequest) throws IOException {
PcapOptions.JOB_NAME.put(pcapRequest, "jobName");
PcapOptions.USERNAME.put(pcapRequest, username);
PcapOptions.HADOOP_CONF.put(pcapRequest, configuration);
Configuration hadoopConf = new Configuration(configuration);
if (environment.containsProperty(PCAP_YARN_QUEUE_SPRING_PROPERTY)) {
String queue = environment.getProperty(PCAP_YARN_QUEUE_SPRING_PROPERTY);
if (queue != null && !queue.isEmpty()) {
hadoopConf.set(MRJobConfig.QUEUE_NAME, environment.getProperty(PCAP_YARN_QUEUE_SPRING_PROPERTY));
}
}
PcapOptions.HADOOP_CONF.put(pcapRequest, hadoopConf);
PcapOptions.FILESYSTEM.put(pcapRequest, getFileSystem());

if (pcapRequest.getBasePath() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
*/
package org.apache.metron.rest.mock;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.metron.job.Finalizer;
import org.apache.metron.job.JobException;
import org.apache.metron.job.JobStatus;
Expand Down Expand Up @@ -45,6 +47,7 @@ public class MockPcapJob extends PcapJob<Path> {
private PcapFilterConfigurator filterImpl;
private int recPerFile;
private String query;
private String yarnQueue;
private Statusable<Path> statusable;

public MockPcapJob() {
Expand All @@ -68,6 +71,7 @@ public Statusable<Path> submit(Finalizer<Path> finalizer, Map<String, Object> co
}
this.filterImpl = PcapOptions.FILTER_IMPL.get(configuration, PcapFilterConfigurator.class);
this.recPerFile = PcapOptions.NUM_RECORDS_PER_FILE.get(configuration, Integer.class);
this.yarnQueue = PcapOptions.HADOOP_CONF.get(configuration, Configuration.class).get(MRJobConfig.QUEUE_NAME);
return statusable;
}

Expand Down Expand Up @@ -144,4 +148,8 @@ public PcapFilterConfigurator getFilterImpl() {
public int getRecPerFile() {
return recPerFile;
}

public String getYarnQueue() {
return yarnQueue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.metron.common.Constants;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.job.JobException;
Expand Down Expand Up @@ -186,7 +187,7 @@ public class PcapServiceImplTest {
@Before
public void setUp() throws Exception {
environment = mock(Environment.class);
configuration = mock(Configuration.class);
configuration = new Configuration();
mockPcapJobSupplier = new MockPcapJobSupplier();
pcapToPdmlScriptWrapper = new PcapToPdmlScriptWrapper();

Expand All @@ -200,6 +201,9 @@ public void setUp() throws Exception {

@Test
public void submitShouldProperlySubmitFixedPcapRequest() throws Exception {
when(environment.containsProperty(MetronRestConstants.PCAP_YARN_QUEUE_SPRING_PROPERTY)).thenReturn(true);
when(environment.getProperty(MetronRestConstants.PCAP_YARN_QUEUE_SPRING_PROPERTY)).thenReturn("pcap");

FixedPcapRequest fixedPcapRequest = new FixedPcapRequest();
fixedPcapRequest.setBasePath("basePath");
fixedPcapRequest.setBaseInterimResultPath("baseOutputPath");
Expand Down Expand Up @@ -250,6 +254,7 @@ public void submitShouldProperlySubmitFixedPcapRequest() throws Exception {
Assert.assertEquals(2000000, mockPcapJob.getEndTimeNs());
Assert.assertEquals(2, mockPcapJob.getNumReducers());
Assert.assertEquals(100, mockPcapJob.getRecPerFile());
Assert.assertEquals("pcap", mockPcapJob.getYarnQueue());
Assert.assertTrue(mockPcapJob.getFilterImpl() instanceof FixedPcapFilter.Configurator);
Map<String, String> actualFixedFields = mockPcapJob.getFixedFields();
Assert.assertEquals("ip_src_addr", actualFixedFields.get(Constants.Fields.SRC_ADDR.getName()));
Expand Down
2 changes: 2 additions & 0 deletions metron-platform/metron-pcap-backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ usage: Fixed filter options
-sa,--ip_src_addr <arg> Source IP address
-sp,--ip_src_port <arg> Source port
-st,--start_time <arg> (required) Packet start time range.
-yq,--yarn_queue <arg> Yarn queue this job will be submitted to
```

```
Expand All @@ -158,6 +159,7 @@ usage: Query filter options
-ps,--print_status Print the status of the job as it runs
-q,--query <arg> Query string to use as a filter
-st,--start_time <arg> (required) Packet start time range.
-yq,--yarn_queue <arg> Yarn queue this job will be submitted to
```

The Query filter's `--query` argument specifies the Stellar expression to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public Options buildOptions() {
options.addOption(newOption("et", "end_time", true, "Packet end time range. Default is current system time."));
options.addOption(newOption("df", "date_format", true, "Date format to use for parsing start_time and end_time. Default is to use time in millis since the epoch."));
options.addOption(newOption("ps", "print_status", false, "Print the status of the job as it runs"));
options.addOption(newOption("yq", "yarn_queue", true, "Yarn queue this job will be submitted to"));
return options;
}

Expand Down Expand Up @@ -129,6 +130,9 @@ public void parse(CommandLine commandLine, PcapConfig config) throws java.text.P
if (commandLine.hasOption("print_status")) {
config.setPrintJobStatus(true);
}
if (commandLine.hasOption("yarn_queue")) {
config.setYarnQueue(commandLine.getOptionValue("yarn_queue"));
}
}

public void printHelp(String msg, Options opts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
import org.apache.metron.job.JobException;
Expand Down Expand Up @@ -99,6 +100,7 @@ public int run(String[] args) {
return 0;
}
PcapOptions.FILTER_IMPL.put(commonConfig, new FixedPcapFilter.Configurator());
config.getYarnQueue().ifPresent(s -> hadoopConf.set(MRJobConfig.QUEUE_NAME, s));
PcapOptions.HADOOP_CONF.put(commonConfig, hadoopConf);
try {
PcapOptions.FILESYSTEM.put(commonConfig, FileSystem.get(hadoopConf));
Expand All @@ -124,6 +126,7 @@ public int run(String[] args) {
return 0;
}
PcapOptions.FILTER_IMPL.put(commonConfig, new FixedPcapFilter.Configurator());
config.getYarnQueue().ifPresent(s -> hadoopConf.set(MRJobConfig.QUEUE_NAME, s));
PcapOptions.HADOOP_CONF.put(commonConfig, hadoopConf);
try {
PcapOptions.FILESYSTEM.put(commonConfig, FileSystem.get(hadoopConf));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,18 @@
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.metron.common.Constants;
import org.apache.metron.common.system.Clock;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
Expand Down Expand Up @@ -114,7 +122,24 @@ private <K, V> Matcher<Map<K, V>> mapContaining(Map<K, V> map) {
return new TypeSafeMatcher<Map<K, V>>() {
@Override
protected boolean matchesSafely(Map<K, V> item) {
return item.entrySet().containsAll(map.entrySet());
for(K key: map.keySet()) {
if (key.equals(PcapOptions.HADOOP_CONF.getKey())) {
Configuration itemConfiguration = (Configuration) item.get(PcapOptions.HADOOP_CONF.getKey());
Map<String, Object> mapConfiguration = (Map<String, Object>) map.get(PcapOptions.HADOOP_CONF.getKey());
for(String setting: mapConfiguration.keySet()) {
if (!mapConfiguration.get(setting).equals(itemConfiguration.get(setting, ""))) {
return false;
}
}
} else {
V itemValue = item.get(key);
V mapValue = map.get(key);
if (itemValue != null ? !itemValue.equals(mapValue) : mapValue != null) {
return false;
}
}
}
return true;
}

@Override
Expand Down Expand Up @@ -192,7 +217,8 @@ public void runs_fixed_pcap_filter_job_with_full_argument_list() throws Exceptio
"-include_reverse",
"-num_reducers", "10",
"-records_per_file", "1000",
"-ps"
"-ps",
"-yq", "pcap"
};
Map<String, String> query = new HashMap<String, String>() {{
put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
Expand All @@ -215,6 +241,9 @@ public void runs_fixed_pcap_filter_job_with_full_argument_list() throws Exceptio
PcapOptions.END_TIME_MS.put(config, endAsNanos / 1000000L); // needed bc defaults in config
PcapOptions.NUM_RECORDS_PER_FILE.put(config, 1000);
PcapOptions.PRINT_JOB_STATUS.put(config, true);
PcapOptions.HADOOP_CONF.put(config, new HashMap<String, Object>() {{
put(MRJobConfig.QUEUE_NAME, "pcap");
}});

when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Optional;
import java.util.function.Function;

public class PcapConfig extends AbstractMapDecorator<String, Object>{
public interface PrefixStrategy extends Function<Clock, String>{}

private boolean showHelp;
private DateFormat dateFormat;
private String yarnQueue;

public PcapConfig() {
super(new HashMap<>());
Expand Down Expand Up @@ -137,4 +139,12 @@ public int getNumRecordsPerFile() {
public void setNumRecordsPerFile(int numRecordsPerFile) {
PcapOptions.NUM_RECORDS_PER_FILE.put(this, numRecordsPerFile);
}

public void setYarnQueue(String yarnQueue) {
this.yarnQueue = yarnQueue;
}

public Optional<String> getYarnQueue() {
return Optional.ofNullable(yarnQueue);
}
}