Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

METRON-1533 Create KAFKA_FIND Stellar function #1025

Closed
wants to merge 10 commits into from

Conversation

nickwallen
Copy link
Contributor

@nickwallen nickwallen commented May 22, 2018

This PR is built on #1024 and should not be merged until #1024 is addressed.

Changes

I created a KAFKA_FIND function that allows you to provide a filter expression so that only messages satisfying a condition are returned. For example...

  • Find a message that has been enriched with geolocation data.

    KAFKA_FIND('indexing', m -> MAP_EXISTS('geo.city', m))
    
  • Find a Bro message.

    KAFKA_FIND('indexing', m -> MAP_GET('source.type', m) == 'bro')
    

The message is presented to the filter lambda expression as a map of field values. This makes creating the filter expression a bit simpler.

Like the other KAFKA_* functions, this is not intended to be highly performant. This is only intended to make the process of creating and modifying enrichments simpler in the REPL. See the Use Case section for more details on how I see this being used.

Future

If we were in the future to provide map literals in Stellar, this would become a fair bit simpler.

KAFKA_FIND('indexing', m -> m['source.type'] == 'bro')

Use Case

When creating enrichments, I often find that I want to validate that the enrichment I just created was successful on the live, incoming stream of telemetry. My workflow looks something like this.

  1. Create and test the enrichment that I want to create.

    [Stellar]>>> ip_src_addr := "72.34.49.86"
    72.34.49.86
    
    [Stellar]>>> geo := GEO_GET(ip_src_addr)
    {country=US, dmaCode=803, city=Los Angeles, postalCode=90014, latitude=34.0438, location_point=34.0438,-118.2512, locID=5368361, longitude=-118.2512}
    
  2. That looks good to me. Now let's add that to my Bro telemetry.

    [Stellar]>>> conf := SHELL_EDIT(conf)
    {
      "enrichment" : {
        "fieldMap": {
          "stellar": {
            "config": [
               "geo := GEO_GET(ip_src_addr)"
            ]
          }
        }
      },
      "threatIntel": {
      }
    }
    
    [Stellar]>>> CONFIG_PUT("ENRICHMENTS", conf, "bro")
    
  3. It looks like that worked, but did that really work?

    At this point, I would run KAFKA_GET as many times as it takes to retrieve a Bro message. You would just have to get lucky and hope that the enrichment worked and secondly that you would pull down a Bro message (as opposed to a different sensor).

    I would rather have a function that lets me only pull back the messages that I care about. In this case I could either retrieve only Bro messages.

    KAFKA_FIND('indexing', m -> MAP_GET('source.type', m) == 'bro')
    

    Or I could look for messages that contain geolocation data.

    KAFKA_FIND('indexing', m -> MAP_EXISTS('geo.city', m))
    

Changes

  • Created the KAFKA_FIND function along with unit tests.

  • Updated all KAFKA_* functions to use a standard getArg function so that argument handling is all done the same way.

Pull Request Checklist

  • Is there a JIRA ticket associated with this PR? If not one needs to be created at Metron Jira.
  • Does your PR title start with METRON-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
  • Has your PR been rebased against the latest commit within the target branch (typically master)?
  • Have you included steps to reproduce the behavior or problem that is being changed or addressed?
  • Have you included steps or a guide to how the change may be verified and tested manually?
  • Have you ensured that the full suite of tests and checks have been executed in the root metron folder via:
  • Have you written or updated unit tests and or integration tests to verify your changes?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent?

@james-sirota
Copy link

+1 There are minor errors in your test instructions. CONFIG_PUT("ENRICHMENTS", e, "bro") should be CONFIG_PUT("ENRICHMENT", conf, "bro")

@asfgit asfgit closed this in 8202cd2 Jun 8, 2018
@nickwallen nickwallen deleted the METRON-1533-NEW branch September 17, 2018 19:34
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
2 participants