Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for binary marshaler/unmarshaler #17

Merged
merged 7 commits into from
Dec 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ import (

func TestConsumer_SimpleSync(t *testing.T) {
ms, rdb := startMiniredis(t)
cs := NewConsumer[City](context.TODO(), rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
cs := NewConsumer[city](context.TODO(), rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
Block: 0,
Count: 0,
BufferSize: 0,
})
var sent int32 = 0 // number of items sent
var confim int32 = 0 // confirm status of receiver

cities := make([]City, 100)
cities := make([]city, 100)
for i := 0; i < len(cities); i++ {
cities[i] = City{Name: fmt.Sprintf("City %v", i), Size: i}
cities[i] = city{Name: fmt.Sprintf("City %v", i), Size: i}
}

go func() {
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestConsumer_SimpleSync(t *testing.T) {

func TestConsumer_ClientError(t *testing.T) {
ms, rdb := startMiniredis(t)
cs := NewConsumer[City](context.TODO(), rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
cs := NewConsumer[city](context.TODO(), rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
Block: 0,
Count: 0,
BufferSize: 0,
Expand All @@ -71,7 +71,7 @@ func TestConsumer_ClientError(t *testing.T) {

func TestConsumer_ParserError(t *testing.T) {
ms, rdb := startMiniredis(t)
cs := NewConsumer[NonParsable](context.TODO(), rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
cs := NewConsumer[nonParsable](context.TODO(), rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
Block: 0,
Count: 0,
BufferSize: 0,
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestConsuer_FieldParseError(t *testing.T) {

func TestConsumer_Close(t *testing.T) {
_, rdb := startMiniredis(t)
cs := NewConsumer[NonParsable](context.TODO(), rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
cs := NewConsumer[nonParsable](context.TODO(), rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
Block: 0,
Count: 0,
BufferSize: 0,
Expand All @@ -129,7 +129,7 @@ func TestConsumer_CloseGetSeenIDs(t *testing.T) {
var consumeCount = 75

ms, rdb := startMiniredis(t)
cs := NewConsumer[City](context.TODO(), rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
cs := NewConsumer[city](context.TODO(), rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
Block: 0,
Count: 0,
BufferSize: 0,
Expand All @@ -154,7 +154,7 @@ func TestConsumer_CancelContext(t *testing.T) {
ms, rdb := startMiniredis(t)
ctx, cancelFunc := context.WithCancel(context.TODO())
defer cancelFunc()
cs := NewConsumer[City](ctx, rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
cs := NewConsumer[city](ctx, rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
Block: 0,
Count: 0,
BufferSize: 0,
Expand Down Expand Up @@ -209,7 +209,7 @@ func BenchmarkConsumer(b *testing.B) {
for _, size := range []uint{0, 10, 100, 1000, 10000} {
b.Run(fmt.Sprintf("s-%v", size), func(b *testing.B) {
mock := benchmarkClientMock{msgs: msgbuf}
cs := NewConsumer[Empty](context.TODO(), mock, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
cs := NewConsumer[empty](context.TODO(), mock, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
Block: 0,
Count: int64(len(msgbuf)),
BufferSize: size,
Expand Down
26 changes: 13 additions & 13 deletions group_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (sc *simpleSyncMock) XAck(ctx context.Context, stream, group string, ids ..

func TestGroupConsumer_SimpleSync(t *testing.T) {
rdb := simpleSyncMock{}
cs := NewGroupMultiStreamConsumer[City](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": ">"})
cs := NewGroupMultiStreamConsumer[city](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": ">"})

var i int64 = 0
var readCount int64 = 100
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestGroupConsumer_SwitchToNew(t *testing.T) {
var readCount = 100
var maxHistory = 50
rdb := switchToNewMock{maxHandout: maxHistory}
cs := NewGroupMultiStreamConsumer[City](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0"})
cs := NewGroupMultiStreamConsumer[city](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0"})

var i = 0
for msg := range cs.Chan() {
Expand Down Expand Up @@ -154,12 +154,12 @@ func TestGroupConsumer_RemainingAck(t *testing.T) {
var ackCount = 100

rdb := remainingAckMock{}
cs := NewGroupMultiStreamConsumer[City](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0"}, GroupConsumerConfig{
cs := NewGroupMultiStreamConsumer[city](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0"}, GroupConsumerConfig{
AckBufferSize: uint(ackCount) + 1,
})

for i := 1; i <= ackCount; i += 1 {
cs.Ack(Message[City]{ID: fmt.Sprintf("0-%v", i)})
cs.Ack(Message[city]{ID: fmt.Sprintf("0-%v", i)})
}

time.Sleep(50 * time.Millisecond)
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestGroupConsumer_AckErrors(t *testing.T) {
var readCount = 5_000

rdb := ackErrorMock{}
cs := NewGroupMultiStreamConsumer[City](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0"})
cs := NewGroupMultiStreamConsumer[city](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0"})

var ackErrors = 0
var seen = 0
Expand Down Expand Up @@ -224,9 +224,9 @@ func TestGroupConsumer_AckErrorCancel(t *testing.T) {

rdb := ackErrorMock{}
ctx, cancelFunc := context.WithCancel(context.TODO())
cs := NewGroupMultiStreamConsumer[City](ctx, &rdb, "g1", "c1", map[string]string{"s1": "0-0"})
cs := NewGroupMultiStreamConsumer[city](ctx, &rdb, "g1", "c1", map[string]string{"s1": "0-0"})

var msgs []Message[City]
var msgs []Message[city]
for msg := range cs.Chan() {
assert.Nil(t, msg.Err)
msgs = append(msgs, msg)
Expand Down Expand Up @@ -257,7 +257,7 @@ func (fcm failCreateMock) XGroupCreateMkStream(ctx context.Context, stream, grou

func TestGroupConsumer_CreateError(t *testing.T) {
rdb := failCreateMock{}
cs := NewGroupMultiStreamConsumer[City](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0"})
cs := NewGroupMultiStreamConsumer[city](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0"})

msg := <-cs.Chan()
assert.NotNil(t, msg.Err)
Expand All @@ -278,7 +278,7 @@ func (rem readErrorMock) XReadGroup(ctx context.Context, a *redis.XReadGroupArgs

func TestGroupConsumer_ReadError(t *testing.T) {
rdb := readErrorMock{}
cs := NewGroupMultiStreamConsumer[City](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0"})
cs := NewGroupMultiStreamConsumer[city](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0"})

msg := <-cs.Chan()
assert.NotNil(t, msg.Err)
Expand Down Expand Up @@ -362,11 +362,11 @@ func (rem *readConcurentErrorMock) XAck(ctx context.Context, stream, group strin
func TestGroupConsumer_ConcurrentRead(t *testing.T) {
rdb := readConcurentErrorMock{}

cs := NewGroupMultiStreamConsumer[City](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0", "s2": "0-0", "s3": "0-0"})
cs := NewGroupMultiStreamConsumer[city](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0", "s2": "0-0", "s3": "0-0"})

msg := make([]Message[City], 0, 21)
msgError := make([]Message[City], 0, 21)
msgList := make([]Message[City], 0, 121)
msg := make([]Message[city], 0, 21)
msgError := make([]Message[city], 0, 21)
msgList := make([]Message[city], 0, 121)

for end := true; end; {
select {
Expand Down
141 changes: 141 additions & 0 deletions gtrsconvert/convert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package gtrsconvert

import (
"encoding"
"reflect"
"strconv"
"time"
)

// ConvertibleTo is implemented by types that can convert themselves to a map.
type ConvertibleTo interface {
ToMap() (map[string]any, error)
}

// ConvertibleFrom is implemented by types that can load themselves from a map.
type ConvertibleFrom interface {
FromMap(map[string]any) error
}

// structToMap convert a struct to a map.
func StructToMap(st any) (map[string]any, error) {
if c, ok := st.(ConvertibleTo); ok {
return c.ToMap()
}

rv := reflect.ValueOf(st)
rt := reflect.TypeOf(st)
out := make(map[string]interface{}, rv.NumField())

for i := 0; i < rv.NumField(); i++ {
fieldValue := rv.Field(i)
fieldType := rt.Field(i)
fieldName := getFieldNameFromType(fieldType)
switch v := fieldValue.Interface().(type) {
case time.Duration:
out[fieldName] = v.String()
case encoding.BinaryMarshaler:
txt, err := v.MarshalBinary()
if err != nil {
return nil, SerializeError{
Field: fieldType.Name,
Value: fieldValue.Interface(),
Err: err,
}
}
out[fieldName] = string(txt)
default:
out[fieldName] = fieldValue.Interface()
}
}
return out, nil
}

// mapToStruct tries to convert a map to a struct.
func MapToStruct(st any, data map[string]any) error {
rv := reflect.ValueOf(st).Elem()
rt := reflect.TypeOf(st)

if rt.Implements(typeOf[ConvertibleFrom]()) {
if c, ok := st.(ConvertibleFrom); ok {
return c.FromMap(data)
}
}

rt = rt.Elem()

for i := 0; i < rt.NumField(); i += 1 {
fieldRv := rv.Field(i)
fieldRt := rt.Field(i)

v, ok := data[getFieldNameFromType(fieldRt)]
if !ok {
continue
}

// The redis client always sends strings.
stval, ok := v.(string)
if !ok {
continue
}

val, err := valueFromString(fieldRv, stval)
if err != nil {
return FieldParseError{Field: fieldRt.Name, Value: v, Err: err}
} else {
fieldRv.Set(reflect.ValueOf(val))
}
}
return nil
}

// Parse value from string
// TODO: find a better solution. Maybe there is a library for this.
func valueFromString(val reflect.Value, st string) (any, error) {
iface := val.Interface()

switch cast := iface.(type) {
case string:
return st, nil
case bool:
return strconv.ParseBool(st)
case int:
v, err := strconv.ParseInt(st, 10, 0)
return int(v), err
case uint:
v, err := strconv.ParseUint(st, 10, 0)
return uint(v), err
case int32:
v, err := strconv.ParseInt(st, 10, 32)
return int32(v), err
case uint32:
v, err := strconv.ParseUint(st, 10, 32)
return uint32(v), err
case int64:
return strconv.ParseInt(st, 10, 64)
case uint64:
return strconv.ParseUint(st, 10, 64)
case float32:
v, err := strconv.ParseFloat(st, 32)
return float32(v), err
case float64:
return strconv.ParseFloat(st, 64)
case time.Duration:
return time.ParseDuration(st)
case encoding.BinaryUnmarshaler:
return cast, cast.UnmarshalBinary([]byte(st))
default:
ifaceptr := val.Addr().Interface()
unMarshaler, ok := ifaceptr.(encoding.BinaryUnmarshaler)
if ok {
err := unMarshaler.UnmarshalBinary([]byte(st))
if err != nil {
return nil, err
}
res := reflect.ValueOf(unMarshaler).Elem().Interface()
return res, nil
}
}

return nil, ErrUnsupportedFieldType
}
Loading