Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

METRON-682: Unify and Improve the Flat File Loader #432

Closed
wants to merge 22 commits into from

Conversation

cestella
Copy link
Member

@cestella cestella commented Feb 1, 2017

Currently the flat file loader is deficient in a couple ways:

  • It only supports importing local data despite there being a separate, poorly named, application which supports importing enrichment via MapReduce called threat_intel_loader.sh
  • It does not support local imports from HDFS
  • It does not support local imports from URLs
  • It does not support importing zipped archives locally
  • You cannot import more than one file at once

This JIRA will:

  • Unify the MapReduce and local imports into one program and allow the user to specify the import mode with a CLI flag
  • Support local imports from HDFS and URLs
  • Support local imports from zipped files
  • Support importing more than one file at once

Testing plan in comments

@cestella
Copy link
Member Author

cestella commented Feb 1, 2017

I know it seems like a lot of code changed, but a lot of this was reorganizing the flat file loader class, reformatting old code to conform to current standard spacing and splitting it into separate reusable components, rather than new code.

@cestella
Copy link
Member Author

cestella commented Feb 1, 2017

Testing Plan

Preliminaries

  • Download the alexa 1m dataset:
wget http://s3.amazonaws.com/alexa-static/top-1m.csv.zip
unzip top-1m.csv.zip
  • Stage import files
head -n 10000 top-1m.csv > top-10k.csv
hadoop fs -put top-10k.csv /tmp
head -n 10000 top-1m.csv | gzip - > top-10k.csv.gz
head -n 10000 top-1m.csv | zip > top-10k.csv.zip
  • Create an extractor.json for the CSV data by editing extractor.json and pasting in these contents:
{
  "config" : {
    "columns" : {
       "domain" : 1,
       "rank" : 0
                }
    ,"indicator_column" : "domain"
    ,"type" : "alexa"
    ,"separator" : ","
             },
  "extractor" : "CSV"
}

Import from URL

# truncate hbase
echo "truncate 'enrichment'" | hbase shell
# import data into hbase from URL.  This should take approximately 5 or 6 minutes
/usr/metron/0.3.0/bin/flatfile_loader.sh -i http://s3.amazonaws.com/alexa-static/top-1m.csv.zip -t enrichment -c t -e ./extractor.json -p 5 -b 128
# count data written and verify it's 1M
echo "count 'enrichment'" | hbase shell

Import from local file (non-zipped)

# truncate hbase
echo "truncate 'enrichment'" | hbase shell
# import data into hbase 
/usr/metron/0.3.0/bin/flatfile_loader.sh -i ./top-10k.csv -t enrichment -c t -e ./extractor.json -p 5 -b 128
# count data written and verify it's 10k
echo "count 'enrichment'" | hbase shell

Import from local file (gzipped)

# truncate hbase
echo "truncate 'enrichment'" | hbase shell
# import data into hbase 
/usr/metron/0.3.0/bin/flatfile_loader.sh -i ./top-10k.csv.gz -t enrichment -c t -e ./extractor.json -p 5 -b 128
# count data written and verify it's 10k
echo "count 'enrichment'" | hbase shell

Import from local file (zipped)

# truncate hbase
echo "truncate 'enrichment'" | hbase shell
# import data into hbase 
/usr/metron/0.3.0/bin/flatfile_loader.sh -i ./top-10k.csv.zip -t enrichment -c t -e ./extractor.json -p 5 -b 128
# count data written and verify it's 10k
echo "count 'enrichment'" | hbase shell

Import from HDFS via MR

# truncate hbase
echo "truncate 'enrichment'" | hbase shell
# import data into hbase 
/usr/metron/0.3.0/bin/flatfile_loader.sh -i /tmp/top-10k.csv -t enrichment -c t -e ./extractor.json -m MR
# count data written and verify it's 10k
echo "count 'enrichment'" | hbase shell

Copy link
Contributor

@mmiklavc mmiklavc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good PR. I actually pulled this into a branch to test out top-domain (Alexa) enrichments. The flexible file loading is helpful, and the enhancements to the integration tests are nice. No real issues on inspection. The architectural changes seem reasonable to me and fit with the idioms employed in other parts of the system.

import java.util.Map;

public interface InputFormatHandler {
void set(Job job, Path input, Map<String, Object> config) throws IOException;
void set(Job job, List<Path> input, Map<String, Object> config) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like


import java.util.Optional;

public abstract class OptionHandler implements Function<String, Option>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be useful in metron-common? Doesn't have to be this PR, but maybe "LoadOptions" becomes a parameterized type OPT_TYPE.

Job job = Job.getInstance(hadoopConfig);
List<String> inputs = (List<String>) config.get(LoadOptions.INPUT).get();
job.setJobName("MapReduceImporter: " + inputs.stream().collect(Collectors.joining(",")) + " => " + table + ":" + cf);
System.out.println("Configuring " + job.getJobName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why system out instead of a logger?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

@ottobackwards
Copy link
Contributor

I am trying to test this out - my vm is not doing well at the moment however. Hopefully I can get it straight

@ottobackwards
Copy link
Contributor

[vagrant@node1 tmp]$ /usr/metron/0.3.0/bin/flatfile_loader.sh -i http://s3.amazonaws.com/alexa-static/top-1m.csv.zip -t enrichment -c t -e ./extractor.json -p 5 -b 128
Exception in thread "main" org.codehaus.jackson.map.JsonMappingException: Can not instantiate value of type [map type; class java.util.LinkedHashMap, [simple type, class java.lang.String] -> [simple type, class java.lang.Object]] from JSON String; no single-String constructor/factory method (through reference chain: org.apache.metron.dataloads.extractor.ExtractorHandler["config"])
at org.codehaus.jackson.map.deser.std.StdValueInstantiator._createFromStringFallbacks(StdValueInstantiator.java:379)
at org.codehaus.jackson.map.deser.std.StdValueInstantiator.createFromString(StdValueInstantiator.java:268)
at org.codehaus.jackson.map.deser.std.MapDeserializer.deserialize(MapDeserializer.java:244)
at org.codehaus.jackson.map.deser.std.MapDeserializer.deserialize(MapDeserializer.java:33)
at org.codehaus.jackson.map.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:299)
at org.codehaus.jackson.map.deser.SettableBeanProperty$MethodProperty.deserializeAndSet(SettableBeanProperty.java:414)
at org.codehaus.jackson.map.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:697)
at org.codehaus.jackson.map.deser.BeanDeserializer.deserialize(BeanDeserializer.java:580)
at org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2732)
at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1909)
at org.apache.metron.dataloads.extractor.ExtractorHandler.load(ExtractorHandler.java:70)
at org.apache.metron.dataloads.extractor.ExtractorHandler.load(ExtractorHandler.java:75)
at org.apache.metron.dataloads.extractor.ExtractorHandler.load(ExtractorHandler.java:78)
at org.apache.metron.dataloads.nonbulk.flatfile.SimpleEnrichmentFlatFileLoader.main(SimpleEnrichmentFlatFileLoader.java:49)
at org.apache.metron.dataloads.nonbulk.flatfile.SimpleEnrichmentFlatFileLoader.main(SimpleEnrichmentFlatFileLoader.java:40)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:233)
at org.apache.hadoop.util.RunJar.main(RunJar.java:148)
[vagrant@node1 tmp]$

with extractor.json of:

{
  "config" :
    "columns" : {
      "domain" : 1,
      "rank" : 0
      }
    ,"indicator_column" : "domain"
    ,"type" : "alexa"
    ,"separator" : ","
    },
  "extractor" : "CSV"
}               

@cestella
Copy link
Member Author

cestella commented Feb 3, 2017

@ottobackwards your config field is missing a { on line 2.

@cestella
Copy link
Member Author

cestella commented Feb 3, 2017

Also, I'll point out that you can make your life easier and kill pretty much everything on your vagrant and do this. The only reliance is on HBase and MR.

I would suggest killing:

  • monit via service monit stop
  • all the storm topologies via storm kill bro && storm kill snort && storm kill enrichment && storm kill indexing
  • tcpreplay via for i in $(ps -ef | grep tcpreplay | awk '{print $2}');do kill -9 $i;done

@cestella
Copy link
Member Author

cestella commented Feb 3, 2017

I'll point out as well that it'd be nice to have a decent exception there, kinda like what you'd get from jsonlint.com:

Error: Parse error on line 2:
...	"config": "columns": {		"domain": 1,	
-----------------------^
Expecting 'EOF', '}', ',', ']', got ':'

That might be worth a JIRA, honestly.

@ottobackwards
Copy link
Contributor

Works like a champ. Nice Job +1

@cestella cestella closed this Feb 6, 2017
@cestella cestella reopened this Feb 6, 2017
@cestella cestella closed this Feb 6, 2017
@cestella cestella reopened this Feb 6, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
3 participants