Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

METRON-706: Add Stellar transformations and filters to enrichment and threat intel loaders #445

Closed
wants to merge 39 commits into from

Conversation

mmiklavc
Copy link
Contributor

@mmiklavc mmiklavc commented Feb 8, 2017

This PR completes work in https://issues.apache.org/jira/browse/METRON-706

(Note: there are commits from @cestella that I had merged in the process of working on this. They are squashed in master but show up here. They only show in the commit history, not the diff)

Motivation for this PR is to expand where we expose Stellar capabilities. This work enables transformations and filtering on enrichment and threatintel extractors. The user is now able to specify transformation expressions on the column values and separately filter records based on a provided predicate. The same can also be done independently for the key indicator value used as part of the HBase key. In addition, a new property has been added to the configuration that allows a user to specify a Zookeeper quorum and reference global properties specified in the global config.

See the updated README for documentation details on the new properties.

Testing

Testing follows closely with the methods defined in #432

  • Download the Alexa top 1m data set
wget http://s3.amazonaws.com/alexa-static/top-1m.csv.zip
unzip top-1m.csv.zip
  • Stage import file
head -n 10000 top-1m.csv > top-10k.csv
head -n 10 top-1m.csv > top-10.csv
  • Create an extractor.json for the CSV data by editing extractor.json and pasting in these contents. (Set your zk_quorum to your own value if different from the default Vagrant quick-dev environment):
{
  "config" : {
    "zk_quorum" : "node1:2181",
    "columns" : {
       "rank" : 0,
       "domain" : 1
    },
    "value_transform" : {
       "domain" : "DOMAIN_REMOVE_TLD(domain)",
       "port" : "es.port"
    },
    "value_filter" : "LENGTH(domain) > 0",
    "indicator_column" : "domain",
    "indicator_transform" : {
       "indicator" : "DOMAIN_REMOVE_TLD(indicator)"
    },
    "indicator_filter" : "LENGTH(indicator) > 0",
    "type" : "top_domains",
    "separator" : ","
  },
  "extractor" : "CSV"
}

The "port" property/variable here is referencing "es.port" from the global config.

  • Run the import (parallelism of 5, batch size of 128)
echo "truncate 'enrichment'" | hbase shell && /usr/metron/0.3.0/bin/flatfile_loader.sh -i ./top-10k.csv -t enrichment -c t -e ./extractor.json -p 5 -b 128 && echo "count 'enrichment'" | hbase shell

You should see 9275 records in HBase. (Less than the perhaps expected 10k)

  • Now run it again on the top-10 set.
echo "truncate 'enrichment'" | hbase shell && /usr/metron/0.3.0/bin/flatfile_loader.sh -i ./top-10.csv -t enrichment -c t -e ./extractor.json -p 5 -b 128 && echo "count 'enrichment'" | hbase shell

You should get 9 values as below:

scan 'enrichment'
ROW                                                                     COLUMN+CELL
 \x09\x00\x0F,\x10\xE5\xD1\xDE_\xBF\x9E\xA7d\xF2\xA8\x94\x00\x0Btop_dom column=t:v, timestamp=1486513090953, value={"port":"9300","domain":"yahoo","rank":"5"}
 ains\x00\x05yahoo
 \x11\xCA\xCF\x01\xB4\xC5\x11@\x0C\xA1A,\xE9j~O\x00\x0Btop_domains\x00\ column=t:v, timestamp=1486513090979, value={"port":"9300","domain":"tmall","rank":"10"}
 x05tmall
 \x13)`\xFC\xF2\xBF\xF9\xC1a\xC8a\xF1h\x0E\xB5\x11\x00\x0Btop_domains\x column=t:v, timestamp=1486513090930, value={"port":"9300","domain":"youtube","rank":"2"}
 00\x07youtube
 1\xC2I\x05k\xEA\x0EY\xE1\xAD\xA0$U\xA9kc\x00\x0Btop_domains\x00\x06goo column=t:v, timestamp=1486513090964, value={"port":"9300","domain":"google","rank":"7"}
 gle
 =\xDD\xDFH\x95\xC0\xB9\xD9\xBAKX\x8B\x9B2T\x9F\x00\x0Btop_domains\x00\ column=t:v, timestamp=1486513090942, value={"port":"9300","domain":"facebook","rank":"3"}
 x08facebook
 D\xDE\x1C\x9A\xCF\x07S\x9A\xDEB\xDB\x87D\x1F\x1D\xF4\x00\x0Btop_domain column=t:v, timestamp=1486513090974, value={"port":"9300","domain":"qq","rank":"9"}
 s\x00\x02qq
 u\xBC\xFC\xC9\x09\x9Af\xE1\xC8\xA5\x9A\x93\xCB0c\x01\x00\x0Btop_domain column=t:v, timestamp=1486513090970, value={"port":"9300","domain":"amazon","rank":"8"}
 s\x00\x06amazon
 \xC7\xA5.l\xC21\xFAQ8\x1E\x5C\x99p\x93_\x9A\x00\x0Btop_domains\x00\x09 column=t:v, timestamp=1486513090958, value={"port":"9300","domain":"wikipedia","rank":"6"}
 wikipedia
 \xCC\xCA\xBF;\x92\xA1\xA0k\xE4\x83i\xBD\xC3\xA8\xE8p\x00\x0Btop_domain column=t:v, timestamp=1486513090948, value={"port":"9300","domain":"baidu","rank":"4"}
 s\x00\x05baidu

Once again, we get fewer than the original dataset size. This is because multiple records are mapping to the same resulting keys in HBase.

mmiklavc and others added 30 commits January 25, 2017 14:38
Copy link
Member

@cestella cestella left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this; ultimately, very clear and clean, @mmiklavc

A few minor nits, but I'm +1 shortly thereafter.

@@ -35,6 +35,14 @@ protected ConvertUtilsBean initialValue() {
}
};

public static <T> T convertOrFail(Object o, Class<T> clazz) {
if (clazz.isInstance(o)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why this isn't just clazz.cast(o) and called cast?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'm removing this entirely. I'll just use casting in place, e.g.

Example 1
String val = (String) map.get("foo"); // throws class cast exception on failure, which is what we want

Example 2

Map<Object, Object> a = new HashMap() {{
    put("hello", "world");
    put(1, 2);
}};

Map<String, Object> b = new HashMap() {{
    put("a", a);
}};
Map<Object, Object> c = (Map) b.get("a"); // throws class cast exception if not a Map
Map<String, String> d = new HashMap<>();
for (Map.Entry<Object, Object> entry : c.entrySet()) {
    d.put((String) entry.getKey(), (String) entry.getValue()); // throws class cast exception. also what we want
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

Context.Builder builder = new Context.Builder();
if (zkClient.isPresent()) {
builder.with(Context.Capabilities.ZOOKEEPER_CLIENT, zkClient::get)
.with(Context.Capabilities.GLOBAL_CONFIG, () -> globalConfig);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want an empty global config even if the zkClient isn't present. Not sure, just a thought.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By empty, do you mean null or "{}"? Does Stellar handle that differently from choosing not to add the capability at all?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean {}

@@ -200,6 +295,7 @@ public static void teardown() throws Exception {
multilineZipFile.delete();
lineByLineExtractorConfigFile.delete();
wholeFileExtractorConfigFile.delete();
stellarExtractorConfigFile.delete();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you forget customLineByLineExtractorConfigFile.delete();?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixing

@mmiklavc
Copy link
Contributor Author

mmiklavc commented Feb 8, 2017

Note: Per the recent issue in master with Ansible, I tested the following as well

  • Create threat_ip.csv
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#Add single column of ip address to alert
#Public lists are available on the internet
# example:
23.113.113.105
24.107.205.249
24.108.62.255
24.224.153.71
27.4.1.212
27.131.149.102
31.24.30.31
31.131.251.33
31.186.99.250
31.192.209.119
31.192.209.150
31.200.244.17
37.34.52.185
37.58.112.101
37.99.146.27
37.128.132.96
37.140.195.177
37.140.199.100
  • Uploaded threat_ip.csv to HDFS:
hdfs dfs -put -f threat_ip.csv
  • Create extractor.json
{
  "config": {
    "columns": {
      "ip": 0
    },
    "indicator_column": "ip",
    "type" : "malicious_ip",
    "separator": ","
  },
  "extractor": "CSV"
}
  • Run as root user from /root
echo "truncate 'threatintel'" | hbase shell && /usr/metron/0.3.0/bin/flatfile_loader.sh -c t -t threatintel -e extractor.json -i /user/root -m MR
  • Verify the records are there
echo "scan 'threatintel'" | hbase shell

@cestella
Copy link
Member

cestella commented Feb 9, 2017

+1 by inspection

@asfgit asfgit closed this in c5bbf5a Feb 9, 2017
@mmiklavc
Copy link
Contributor Author

mmiklavc commented Feb 9, 2017

Adding these timing notes about the import for reference:

No filter, local load, multiple threads (5), batch 128
real 10m22.127s
user 11m11.873s
sys 4m23.912s
903392 rows

With filters, multiple threads (5), batch 128 (1 record less)
real 10m58.210s
user 11m10.592s
sys 4m16.585s
903391 rows

MapReduce mode
real 9m20.566s
user 0m26.853s
sys 0m10.334s
903391 rows

@cestella cestella mentioned this pull request Dec 21, 2017
10 tasks
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
2 participants