Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for specifying an array of metadata objects to use for the outgoing requests #234

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Flags:
-D, --data-file= File path for call data JSON file. Examples: /home/user/file.json or ./file.json.
-b, --binary The call data comes as serialized binary message or multiple count-prefixed messages read from stdin.
-B, --binary-file= File path for the call data as serialized binary message or multiple count-prefixed messages.
-m, --metadata= Request metadata as stringified JSON.
-m, --metadata= Request metadata as stringified JSON. Either as an object or an array of objects.
-M, --metadata-file= File path for call metadata JSON file. Examples: /home/user/metadata.json or ./metadata.json.
--stream-interval=0 Interval for stream requests between message sends.
--reflect-metadata= Reflect metadata as stringified JSON used only for reflection request.
Expand Down
33 changes: 28 additions & 5 deletions cmd/ghz/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ var (
Short('B').PlaceHolder(" ").IsSetByUser(&isBinDataPathSet).String()

isMDSet = false
md = kingpin.Flag("metadata", "Request metadata as stringified JSON.").
md = kingpin.Flag("metadata", "Request metadata as stringified JSON. Either as an object or an array of objects.").
Short('m').PlaceHolder(" ").IsSetByUser(&isMDSet).String()

isMDPathSet = false
Expand Down Expand Up @@ -315,11 +315,34 @@ func createConfigFromArgs(cfg *runner.Config) error {
binaryData = b
}

var metadata map[string]string
var metadataArray []map[string]string
var metadataMap map[string]string

*md = strings.TrimSpace(*md)
if *md != "" {
if err := json.Unmarshal([]byte(*md), &metadata); err != nil {
return fmt.Errorf("Error unmarshaling metadata '%v': %v", *md, err.Error())
// For backward compatibility reasons we support both approaches - specifying an array
// with multiple object items and specifying a single object

// 1. First try de-serializing it into an object
if err := json.Unmarshal([]byte(*md), &metadataMap); err != nil {
if !strings.Contains(err.Error(), "cannot unmarshal array into Go value of type map") {
// Some other fatal error which we should immediately propagate instead of try to
// de-serializing input into an array of maps (e.g. unexpected end of JSON input,
//etc.)
return fmt.Errorf("Error unmarshaling metadata '%v': %v", *md, err.Error())
}

// 2. If that fails, try to de-serialize it into an array
// NOTE: We could also simply check if string begins with [ or {, but that approach is
// not 100% robust

if err := json.Unmarshal([]byte(*md), &metadataArray); err != nil {
return fmt.Errorf("Error unmarshaling metadata '%v': %v", *md, err.Error())
}
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With respect to determining the underlying type we are trying to Unmarshal, instead of relying on strings.Contains() of the error message, I think we can take advantage of the actual UnmarshalTypeError. The approach may be a little bit more reliable. Example of what I mean:

md := `["foo", "bar"]`
	var metadataMap map[string]string
	if err := json.Unmarshal([]byte(md), &metadataMap); err != nil {
		if e, ok := err.(*json.UnmarshalTypeError); ok && e.Value == "array" {
			fmt.Println("trying to Unmarshal array into map", e)
		}
	} else {
		fmt.Println(metadataMap)
	}


if metadataMap != nil {
metadataArray = append(metadataArray, metadataMap)
}
}

Expand Down Expand Up @@ -369,7 +392,7 @@ func createConfigFromArgs(cfg *runner.Config) error {
cfg.DataPath = *dataPath
cfg.BinData = binaryData
cfg.BinDataPath = *binPath
cfg.Metadata = metadata
cfg.Metadata = metadataArray
cfg.MetadataPath = *mdPath
cfg.SI = runner.Duration(*si)
cfg.Output = *output
Expand Down
40 changes: 35 additions & 5 deletions internal/helloworld/greeter_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

context "golang.org/x/net/context"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
)

Expand Down Expand Up @@ -36,6 +37,7 @@ type Greeter struct {
mutex *sync.RWMutex
callCounts map[CallType]int
calls map[CallType][][]*HelloRequest
metadata map[CallType][][]metadata.MD
}

func randomSleep() {
Expand All @@ -49,22 +51,32 @@ func (s *Greeter) recordCall(ct CallType) int {

s.callCounts[ct]++
var messages []*HelloRequest
var metadataItems []metadata.MD
s.calls[ct] = append(s.calls[ct], messages)
s.metadata[ct] = append(s.metadata[ct], metadataItems)

return len(s.calls[ct]) - 1
}

func (s *Greeter) recordMessage(ct CallType, callIdx int, msg *HelloRequest) {
func (s *Greeter) recordMessageAndMetadata(ct CallType, callIdx int, msg *HelloRequest, ctx context.Context) {
s.mutex.Lock()
defer s.mutex.Unlock()

s.calls[ct][callIdx] = append(s.calls[ct][callIdx], msg)

var md metadata.MD

if ctx != nil {
md, _ = metadata.FromIncomingContext(ctx)
}

s.metadata[ct][callIdx] = append(s.metadata[ct][callIdx], md)
}

// SayHello implements helloworld.GreeterServer
func (s *Greeter) SayHello(ctx context.Context, in *HelloRequest) (*HelloReply, error) {
callIdx := s.recordCall(Unary)
s.recordMessage(Unary, callIdx, in)
s.recordMessageAndMetadata(Unary, callIdx, in, ctx)

randomSleep()

Expand All @@ -74,7 +86,7 @@ func (s *Greeter) SayHello(ctx context.Context, in *HelloRequest) (*HelloReply,
// SayHellos lists all hellos
func (s *Greeter) SayHellos(req *HelloRequest, stream Greeter_SayHellosServer) error {
callIdx := s.recordCall(ServerStream)
s.recordMessage(ServerStream, callIdx, req)
s.recordMessageAndMetadata(ServerStream, callIdx, req, nil)

randomSleep()

Expand Down Expand Up @@ -104,7 +116,7 @@ func (s *Greeter) SayHelloCS(stream Greeter_SayHelloCSServer) error {
if err != nil {
return err
}
s.recordMessage(ClientStream, callIdx, in)
s.recordMessageAndMetadata(ClientStream, callIdx, in, nil)
msgCount++
}
}
Expand All @@ -124,7 +136,7 @@ func (s *Greeter) SayHelloBidi(stream Greeter_SayHelloBidiServer) error {
return err
}

s.recordMessage(Bidi, callIdx, in)
s.recordMessageAndMetadata(Bidi, callIdx, in, nil)
msg := "Hello " + in.Name
if err := stream.Send(&HelloReply{Message: msg}); err != nil {
return err
Expand All @@ -148,6 +160,12 @@ func (s *Greeter) ResetCounters() {
s.calls[ClientStream] = make([][]*HelloRequest, 0)
s.calls[Bidi] = make([][]*HelloRequest, 0)

s.metadata = make(map[CallType][][]metadata.MD)
s.metadata[Unary] = make([][]metadata.MD, 0)
s.metadata[ServerStream] = make([][]metadata.MD, 0)
s.metadata[ClientStream] = make([][]metadata.MD, 0)
s.metadata[Bidi] = make([][]metadata.MD, 0)

s.mutex.Unlock()

if s.Stats != nil {
Expand Down Expand Up @@ -180,6 +198,18 @@ func (s *Greeter) GetCalls(key CallType) [][]*HelloRequest {
return nil
}

// GetMetadata gets the received metadata for the specific call type
func (s *Greeter) GetMetadata(key CallType) [][]metadata.MD {
s.mutex.Lock()
val, ok := s.metadata[key]
s.mutex.Unlock()

if ok {
return val
}
return nil
}

// GetConnectionCount gets the connection count
func (s *Greeter) GetConnectionCount() int {
return s.Stats.GetConnectionCount()
Expand Down
34 changes: 31 additions & 3 deletions runner/call_template_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"math/rand"
"strings"
"text/template"
"time"

Expand Down Expand Up @@ -102,7 +103,7 @@ func (td *callTemplateData) executeData(data string) ([]byte, error) {
}

func (td *callTemplateData) executeMetadata(metadata string) (map[string]string, error) {
var mdMap map[string]string
var md map[string]string

if len(metadata) > 0 {
input := []byte(metadata)
Expand All @@ -111,13 +112,40 @@ func (td *callTemplateData) executeMetadata(metadata string) (map[string]string,
input = tpl.Bytes()
}

err = json.Unmarshal(input, &mdMap)
err = json.Unmarshal(input, &md)
if err != nil {
return nil, err
}
}

return mdMap, nil
return md, nil
}

// Same as executeMetadata, but this method ensures that the input metadata JSON string is always
// an array. If the input is an object, but not an array, it's converted to an array.
func (td *callTemplateData) executeMetadataArray(metadata string) ([]map[string]string, error) {
var mdArray []map[string]string
var metadataSanitized = strings.TrimSpace(metadata)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I am missing something, but can we ensure that the metadata string is sanitized / trimmed earlier, wherever we set it as w.config.metadata. That way we do not have to trim on every invocation of executeMetadataArray().


// If the input is an object and not an array, we ensure we always work with an array
if !strings.HasPrefix(metadataSanitized, "[") && !strings.HasSuffix(metadataSanitized, "]") {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably not a big deal, but should this go in the check if len(metadata) > 0 { check?

metadata = "[" + metadataSanitized + "]"
}

if len(metadata) > 0 {
input := []byte(metadata)
tpl, err := td.execute(metadata)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bojand I started looking and profiling this code again and I think it would be good to add another optimization here so we don't re-render / evaluate the template and unmarshall the string for every single request.

That's especially important when using large metadata JSON objects like in my case.

Basically, I'm looking at adding some new flag (e.g. --plaintext-metadata, open to better naming) and when this flag is used we don't evaluate metadata as a template and cache it on the worker object and re-use it for subsequent requests, similar as we do with w.cachedMessages.

This should substantially speed things up and reduce memory usage (I'm just testing this change to confirm that).

WDYT?

Copy link
Author

@Kami Kami Jan 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we go, here is a quick WIP version - d586c35.

I confirmed that using --plaintext-metadata flag is much more efficient when working with large metadata JSON objects / arrays (like in my case) and results in lower worker CPU usage.

Which is kinda expected, because trying to render the template + parsing JSON for every single request will never be efficient when working with large metadata objects.

if err == nil {
input = tpl.Bytes()
}

err = json.Unmarshal(input, &mdArray)
if err != nil {
return nil, err
}
}

return mdArray, nil
}

func newUUID() string {
Expand Down
84 changes: 83 additions & 1 deletion runner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package runner
import (
"errors"
"path"
"reflect"
"strings"
"time"

Expand Down Expand Up @@ -79,7 +80,7 @@ type Config struct {
DataPath string `json:"data-file" toml:"data-file" yaml:"data-file"`
BinData []byte `json:"-" toml:"-" yaml:"-"`
BinDataPath string `json:"binary-file" toml:"binary-file" yaml:"binary-file"`
Metadata map[string]string `json:"metadata,omitempty" toml:"metadata,omitempty" yaml:"metadata,omitempty"`
Metadata interface{} `json:"metadata,omitempty" toml:"metadata,omitempty" yaml:"metadata,omitempty"`
MetadataPath string `json:"metadata-file" toml:"metadata-file" yaml:"metadata-file"`
SI Duration `json:"stream-interval" toml:"stream-interval" yaml:"stream-interval"`
Output string `json:"output" toml:"output" yaml:"output"`
Expand All @@ -96,6 +97,7 @@ type Config struct {
EnableCompression bool `json:"enable-compression,omitempty" toml:"enable-compression,omitempty" yaml:"enable-compression,omitempty"`
}

// Ensure that the data field value is either a map or an array of map items
func checkData(data interface{}) error {
_, isObjData := data.(map[string]interface{})
if !isObjData {
Expand All @@ -118,13 +120,37 @@ func checkData(data interface{}) error {
return nil
}

// Ensure that the metadata field value is either a map or an array of map items
func checkMetadata(metadata interface{}) error {
_, isObjData := metadata.(map[string]interface{})
if !isObjData {
arrData, isArrData := metadata.([]map[string]interface{})
if !isArrData {
return errors.New("Unsupported type for Metadata")
}
if len(arrData) == 0 {
return errors.New("Metadata array must not be empty")
}
for _, elem := range arrData {
elemType := reflect.ValueOf(elem).Kind()
if elemType != reflect.Map {
return errors.New("Metadata array contains unsupported type")
}
}
}

return nil
}

// LoadConfig loads the config from a file
func LoadConfig(p string, c *Config) error {
err := configor.Load(c, p)
if err != nil {
return err
}

// Process data field - we support two notations for this field - either an object or
// an array of objects so we do the conversion here
if c.Data != nil {
ext := path.Ext(p)
if strings.EqualFold(ext, ".yaml") || strings.EqualFold(ext, ".yml") {
Expand All @@ -151,6 +177,62 @@ func LoadConfig(p string, c *Config) error {
}
}

// Process metadata field - we support two notations for this field - either an object or
// an array of objects so we do the conversion here
if c.Metadata != nil {
ext := path.Ext(p)
if strings.EqualFold(ext, ".yaml") || strings.EqualFold(ext, ".yml") {
// Ensure that keys are of a string type and cast them
objData, isObjData2 := c.Metadata.(map[interface{}]interface{})
if isObjData2 {
nd := make(map[string]interface{})
for k, v := range objData {
sk, isString := k.(string)
if !isString {
return errors.New("Data key must string")
}
if len(sk) > 0 {
nd[sk] = v
}
}

c.Metadata = nd
} else {
// TODO: Refactor this into utility function
arrData, isArray := c.Metadata.([]interface{})

if isArray {
var array []map[string]interface{}
for _, item := range arrData {
objData3, isObjData3 := item.(map[interface{}]interface{})
newItem := make(map[string]interface{})
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe valid metadata object has to be map[string]string so here we should be asserting that both key and value in the internal representation of the objects in array are strings and then converting those to that type?


if isObjData3 {
for k, v := range objData3 {
sk, isString := k.(string)
if !isString {
return errors.New("Data key must string")
Copy link
Owner

@bojand bojand Oct 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the errors here should reference "Metadata key..."?

}
if len(sk) > 0 {
newItem[sk] = v
}
}

array = append(array, newItem)
}
}

c.Metadata = array
}
}
}

err := checkMetadata(c.Metadata)
if err != nil {
return err
}
}

c.ZStop = strings.ToLower(c.ZStop)
if c.ZStop != "close" && c.ZStop != "ignore" && c.ZStop != "wait" {
c.ZStop = "close"
Expand Down
28 changes: 28 additions & 0 deletions runner/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,34 @@ func TestConfig_Load(t *testing.T) {
Data: map[string]interface{}{
"f_strings": []interface{}{"123", "456"},
},
Metadata: map[string]interface{}{
"key_one": "value 1",
},
Format: "summary",
DialTimeout: Duration(10 * time.Second),
},
true,
},
{
"valid metadata is array",
&Config{
Insecure: true,
ImportPaths: []string{"/home/user/pb/grpcbin"},
Proto: "grpcbin.proto",
Call: "grpcbin.GRPCBin.DummyUnary",
Host: "127.0.0.1:9000",
Z: Duration(20 * time.Second),
X: Duration(60 * time.Second),
SI: Duration(25 * time.Second),
Timeout: Duration(30 * time.Second),
N: 200,
C: 50,
Connections: 1,
ZStop: "close",
Data: map[string]interface{}{
"f_strings": []interface{}{"123", "456"},
},
Metadata: []map[string]interface{}{{"key_one": "value 1"}, {"key_two": "value 2"}},
Format: "summary",
DialTimeout: Duration(10 * time.Second),
},
Expand Down
Loading