Skip to content

Commit

Permalink
Add json_get and json_set awk functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Dec 28, 2018
1 parent dcc11c1 commit 310bc89
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 13 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.

## Unreleased

### 0.42.3 - 2018-12-28

### Added

- Functions `json_get` and `json_set` added to `awk` processor.

### 0.42.1 - 2018-12-20

### Added
Expand Down
12 changes: 12 additions & 0 deletions docs/processors/awk_functions.md
Expand Up @@ -12,6 +12,18 @@ this document. These functions can be overridden by functions in the program.

## JSON Functions

### `json_get(path)`

Attempts to find a JSON value in the input message payload by a dot separated
path and returns it as a string. This function is always available even when the
`json` codec is not used.

### `json_set(path, value)`

Attempts to set a JSON value in the input message payload identified by a dot
separated path. This function is always available even when the `json` codec is
not used.

### `create_json_object(key1, val1, key2, val2, ...)`

Generates a valid JSON object of key value pair arguments. The arguments are
Expand Down
66 changes: 53 additions & 13 deletions lib/processor/awk.go
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/Jeffail/benthos/lib/log"
"github.com/Jeffail/benthos/lib/metrics"
"github.com/Jeffail/benthos/lib/types"
"github.com/Jeffail/gabs"
"github.com/benhoyt/goawk/interp"
"github.com/benhoyt/goawk/parser"
)
Expand Down Expand Up @@ -58,8 +59,8 @@ also automatically be extracted from the input based on a codec:
### ` + "`none`" + `
No variables are extracted. The full contents of the message are fed into the
program.
Only metadata variables are extracted and the full contents of the message are
fed into the program.
### ` + "`json`" + `
Expand Down Expand Up @@ -254,10 +255,16 @@ var awkFunctionsMap = map[string]interface{}{
// Do nothing, this is a placeholder for compilation.
return ""
},
"metadata_set": func(key, value string) string {
"metadata_set": func(key, value string) {
// Do nothing, this is a placeholder for compilation.
},
"json_get": func(path string) string {
// Do nothing, this is a placeholder for compilation.
return ""
},
"json_set": func(path, value string) {
// Do nothing, this is a placeholder for compilation.
},
"create_json_object": func(vals ...string) string {
pairs := map[string]string{}
for i := 0; i < len(vals)-1; i += 2 {
Expand Down Expand Up @@ -318,12 +325,45 @@ func (a *AWK) ProcessMessage(msg types.Message) ([]types.Message, types.Response
proc := func(index int) {
var outBuf, errBuf bytes.Buffer

part := newMsg.Get(index)

// Function overrides
a.functions["metadata_get"] = func(k string) string {
return newMsg.Get(index).Metadata().Get(k)
return part.Metadata().Get(k)
}
a.functions["metadata_set"] = func(k, v string) {
newMsg.Get(index).Metadata().Set(k, v)
part.Metadata().Set(k, v)
}
a.functions["json_get"] = func(path string) string {
var gPart *gabs.Container
jsonPart, err := part.JSON()
if err == nil {
gPart, err = gabs.Consume(jsonPart)
}
if err != nil {
a.log.Warnf("Failed to parse part into json: %v\n", err)
FlagFail(part)
return "null"
}
gTarget := gPart.Path(path)
if gTarget.Data() == nil {
return "null"
}
return gPart.Path(path).String()
}
a.functions["json_set"] = func(path, v string) {
var gPart *gabs.Container
jsonPart, err := part.JSON()
if err == nil {
gPart, err = gabs.Consume(jsonPart)
}
if err != nil {
a.log.Warnf("Failed to parse part into json: %v\n", err)
FlagFail(part)
return
}
gPart.SetP(v, path)
part.SetJSON(gPart.Data())
}

config := &interp.Config{
Expand All @@ -333,11 +373,11 @@ func (a *AWK) ProcessMessage(msg types.Message) ([]types.Message, types.Response
}

if a.conf.Codec == "json" {
jsonPart, err := newMsg.Get(index).JSON()
jsonPart, err := part.JSON()
if err != nil {
a.mErr.Incr(1)
a.log.Errorf("Failed to parse part into json: %v\n", err)
FlagFail(newMsg.Get(index))
FlagFail(part)
return
}

Expand All @@ -346,18 +386,18 @@ func (a *AWK) ProcessMessage(msg types.Message) ([]types.Message, types.Response
}
config.Stdin = bytes.NewReader([]byte(" "))
} else {
config.Stdin = bytes.NewReader(newMsg.Get(index).Get())
config.Stdin = bytes.NewReader(part.Get())
}

newMsg.Get(index).Metadata().Iter(func(k, v string) error {
part.Metadata().Iter(func(k, v string) error {
config.Vars = append(config.Vars, varInvalidRegexp.ReplaceAllString(k, "_"), v)
return nil
})

if _, err := interp.ExecProgram(a.program, config); err != nil {
a.mErr.Incr(1)
a.log.Errorf("Non-fatal execution error: %v\n", err)
FlagFail(newMsg.Get(index))
FlagFail(part)
return
}

Expand All @@ -366,22 +406,22 @@ func (a *AWK) ProcessMessage(msg types.Message) ([]types.Message, types.Response
} else if len(errMsg) > 0 {
a.mErr.Incr(1)
a.log.Errorf("Execution error: %s\n", errMsg)
FlagFail(newMsg.Get(index))
FlagFail(part)
}

resMsg, err := ioutil.ReadAll(&outBuf)
if err != nil {
a.mErr.Incr(1)
a.log.Errorf("Read output error: %v\n", err)
FlagFail(newMsg.Get(index))
FlagFail(part)
}

if len(resMsg) > 0 {
// Remove trailing line break
if resMsg[len(resMsg)-1] == '\n' {
resMsg = resMsg[:len(resMsg)-1]
}
newMsg.Get(index).Set(resMsg)
part.Set(resMsg)
}
}

Expand Down
45 changes: 45 additions & 0 deletions lib/processor/awk_test.go
Expand Up @@ -166,6 +166,51 @@ func TestAWK(t *testing.T) {
input: `hello world`,
output: `hello world`,
},
{
name: "json get 1",
codec: "none",
program: `{ print json_get("obj.foo") }`,
input: `{"obj":{"foo":12}}`,
output: `12`,
},
{
name: "json get 2",
codec: "none",
program: `{ print json_get("obj.bar") }`,
input: `{"obj":{"foo":12}}`,
output: `null`,
},
{
name: "json get 3",
codec: "none",
program: `{ print json_get("obj.bar") }`,
input: `not json content`,
output: `null`,
},
{
name: "json set 1",
codec: "none",
program: `{ json_set("obj.foo", "hello world") }`,
input: `{}`,
output: `{"obj":{"foo":"hello world"}}`,
},
{
name: "json set 2",
codec: "none",
program: `{ json_set("obj.foo", "hello world") }`,
input: `not json content`,
output: `not json content`,
},
{
name: "metadata get 2",
metadata: map[string]string{
"meta.foo": "12",
},
codec: "none",
program: `{ print metadata_get("meta.bar") }`,
input: `hello world`,
output: ``,
},
{
name: "json 1",
codec: "json",
Expand Down

0 comments on commit 310bc89

Please sign in to comment.