Skip to content

Commit

Permalink
Instead of defining path as props they are now in ports (#239)
Browse files Browse the repository at this point in the history
This PR improves upon #238 where the JSON paths where configured through properties - which led to unparseable port references in the resulting bundle with the message Message: "unknown port: bpi: jsonpath)bpi.USD.rate. That is because the properties would get translated into ports with names like bpi.USD.rate which confuses the parser.

I changed the operator to now accept the paths as inputs and map them to the outports via names given in the properties.

//cc @td5r
-- related to #237
  • Loading branch information
kairichard committed Dec 3, 2019
1 parent 5085124 commit 2b6c249
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 26 deletions.
34 changes: 22 additions & 12 deletions pkg/elem/encoding_json_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,23 @@ var encodingJSONPathCfg = &builtinConfig{
ServiceDefs: map[string]*core.ServiceDef{
core.MAIN_SERVICE: {
In: core.TypeDef{
Type: "binary",
Type: "map",
Map: map[string]*core.TypeDef{
"document": {
Type: "binary",
},
"{path_names}": {
Type: "string",
},
},
},
Out: core.TypeDef{
Type: "map",
Map: map[string]*core.TypeDef{
"valid": {
Type: "boolean",
},
"{paths}": {
"{path_names}": {
Type: "primitive",
},
},
Expand All @@ -37,7 +45,7 @@ var encodingJSONPathCfg = &builtinConfig{
},
DelegateDefs: map[string]*core.DelegateDef{},
PropertyDefs: map[string]*core.TypeDef{
"paths": {
"path_names": {
Type: "stream",
Stream: &core.TypeDef{
Type: "string",
Expand All @@ -48,7 +56,7 @@ var encodingJSONPathCfg = &builtinConfig{
opFunc: func(op *core.Operator) {
in := op.Main().In()
out := op.Main().Out()
paths := op.Property("paths").([]interface{})
path_names := op.Property("path_names").([]interface{})
for !op.CheckStop() {
in := in.Pull()
valid := true
Expand All @@ -57,22 +65,24 @@ var encodingJSONPathCfg = &builtinConfig{
continue
}

jsonDoc := []byte(in.(core.Binary))
data := in.(map[string]interface{})
jsonDoc := []byte(data["document"].(core.Binary))

if !gjson.ValidBytes(jsonDoc) {
for _, v := range paths {
out.Map(v.(string)).Push(nil)
for _, path_name := range path_names {
out.Map(path_name.(string)).Push(nil)
}
valid = false
} else {
for _, v := range paths {
res := gjson.GetBytes(jsonDoc, v.(string))
for _, path_name := range path_names {
res := gjson.GetBytes(jsonDoc, data[path_name.(string)].(string))
if !res.Exists() {
out.Map(v.(string)).Push(nil)
out.Map(path_name.(string)).Push(nil)
}
if res.IsArray() || res.IsObject() {
out.Map(v.(string)).Push(res.Raw)
out.Map(path_name.(string)).Push(res.Raw)
}
out.Map(v.(string)).Push(res.Value())
out.Map(path_name.(string)).Push(res.Value())
}
}
out.Map("valid").Push(valid)
Expand Down
31 changes: 17 additions & 14 deletions pkg/elem/encoding_json_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ func Test_JsonPath__String(t *testing.T) {
core.InstanceDef{
Operator: encodingJSONPathId,
Properties: map[string]interface{}{
"paths": []interface{}{"name.last"},
"path_names": []interface{}{"last_name"},
},
},
)
require.NoError(t, err)

o.Main().Out().Bufferize()
o.Start()

o.Main().In().Push(core.Binary(jsonDoc))
a.PortPushes("Anderson", o.Main().Out().Map("name.last"))
data := map[string]interface{}{"last_name": "name.last", "document": core.Binary(jsonDoc)}
o.Main().In().Push(data)
a.PortPushes("Anderson", o.Main().Out().Map("last_name"))
a.PortPushes(true, o.Main().Out().Map("valid"))
}

Expand All @@ -57,16 +57,17 @@ func Test_JsonPath__Invalid_Document(t *testing.T) {
core.InstanceDef{
Operator: encodingJSONPathId,
Properties: map[string]interface{}{
"paths": []interface{}{"name.last"},
"path_names": []interface{}{"last_name"},
},
},
)
require.NoError(t, err)

o.Main().Out().Bufferize()
o.Start()
o.Main().In().Push(core.Binary(`{"test"`))
a.PortPushes(nil, o.Main().Out().Map("name.last"))
data := map[string]interface{}{"last_name": "name.last", "document": core.Binary(`{"test"`)}
o.Main().In().Push(data)
a.PortPushes(nil, o.Main().Out().Map("last_name"))
a.PortPushes(false, o.Main().Out().Map("valid"))
}

Expand All @@ -77,17 +78,18 @@ func Test_JsonPath__NonExistent_Path(t *testing.T) {
core.InstanceDef{
Operator: encodingJSONPathId,
Properties: map[string]interface{}{
"paths": []interface{}{"name.missing", "name.last"},
"path_names": []interface{}{"name_missing", "name_last"},
},
},
)
require.NoError(t, err)

o.Main().Out().Bufferize()
o.Start()
o.Main().In().Push(core.Binary(jsonDoc))
a.PortPushes("Anderson", o.Main().Out().Map("name.last"))
a.PortPushes(nil, o.Main().Out().Map("name.missing"))
data := map[string]interface{}{"name_last": "name.last", "name_missing": "name.missing", "document": core.Binary(jsonDoc)}
o.Main().In().Push(data)
a.PortPushes("Anderson", o.Main().Out().Map("name_last"))
a.PortPushes(nil, o.Main().Out().Map("name_missing"))
a.PortPushes(true, o.Main().Out().Map("valid"))
}

Expand All @@ -98,15 +100,16 @@ func Test_JsonPath__Non_Primitive_Return(t *testing.T) {
core.InstanceDef{
Operator: encodingJSONPathId,
Properties: map[string]interface{}{
"paths": []interface{}{"friends.#.last"},
"path_names": []interface{}{"friends_last"},
},
},
)
require.NoError(t, err)

o.Main().Out().Bufferize()
o.Start()
o.Main().In().Push(core.Binary(jsonDoc))
a.PortPushes(`["Murphy","Craig","Fonder"]`, o.Main().Out().Map("friends.#.last"))
data := map[string]interface{}{"friends_last": "friends.#.last", "document": core.Binary(jsonDoc)}
o.Main().In().Push(data)
a.PortPushes(`["Murphy","Craig","Fonder"]`, o.Main().Out().Map("friends_last"))
a.PortPushes(true, o.Main().Out().Map("valid"))
}

0 comments on commit 2b6c249

Please sign in to comment.