Skip to content

Commit

Permalink
support for binary marshaler/unmarshaler (#17)
Browse files Browse the repository at this point in the history
Finally fix (de)serialization
  • Loading branch information
elee1766 committed Dec 24, 2023
1 parent 10c9750 commit d8a7916
Show file tree
Hide file tree
Showing 12 changed files with 598 additions and 434 deletions.
18 changes: 9 additions & 9 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ import (

func TestConsumer_SimpleSync(t *testing.T) {
ms, rdb := startMiniredis(t)
cs := NewConsumer[City](context.TODO(), rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
cs := NewConsumer[city](context.TODO(), rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
Block: 0,
Count: 0,
BufferSize: 0,
})
var sent int32 = 0 // number of items sent
var confim int32 = 0 // confirm status of receiver

cities := make([]City, 100)
cities := make([]city, 100)
for i := 0; i < len(cities); i++ {
cities[i] = City{Name: fmt.Sprintf("City %v", i), Size: i}
cities[i] = city{Name: fmt.Sprintf("City %v", i), Size: i}
}

go func() {
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestConsumer_SimpleSync(t *testing.T) {

func TestConsumer_ClientError(t *testing.T) {
ms, rdb := startMiniredis(t)
cs := NewConsumer[City](context.TODO(), rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
cs := NewConsumer[city](context.TODO(), rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
Block: 0,
Count: 0,
BufferSize: 0,
Expand All @@ -71,7 +71,7 @@ func TestConsumer_ClientError(t *testing.T) {

func TestConsumer_ParserError(t *testing.T) {
ms, rdb := startMiniredis(t)
cs := NewConsumer[NonParsable](context.TODO(), rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
cs := NewConsumer[nonParsable](context.TODO(), rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
Block: 0,
Count: 0,
BufferSize: 0,
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestConsuer_FieldParseError(t *testing.T) {

func TestConsumer_Close(t *testing.T) {
_, rdb := startMiniredis(t)
cs := NewConsumer[NonParsable](context.TODO(), rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
cs := NewConsumer[nonParsable](context.TODO(), rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
Block: 0,
Count: 0,
BufferSize: 0,
Expand All @@ -129,7 +129,7 @@ func TestConsumer_CloseGetSeenIDs(t *testing.T) {
var consumeCount = 75

ms, rdb := startMiniredis(t)
cs := NewConsumer[City](context.TODO(), rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
cs := NewConsumer[city](context.TODO(), rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
Block: 0,
Count: 0,
BufferSize: 0,
Expand All @@ -154,7 +154,7 @@ func TestConsumer_CancelContext(t *testing.T) {
ms, rdb := startMiniredis(t)
ctx, cancelFunc := context.WithCancel(context.TODO())
defer cancelFunc()
cs := NewConsumer[City](ctx, rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
cs := NewConsumer[city](ctx, rdb, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
Block: 0,
Count: 0,
BufferSize: 0,
Expand Down Expand Up @@ -209,7 +209,7 @@ func BenchmarkConsumer(b *testing.B) {
for _, size := range []uint{0, 10, 100, 1000, 10000} {
b.Run(fmt.Sprintf("s-%v", size), func(b *testing.B) {
mock := benchmarkClientMock{msgs: msgbuf}
cs := NewConsumer[Empty](context.TODO(), mock, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
cs := NewConsumer[empty](context.TODO(), mock, StreamIDs{"s1": "0-0"}, StreamConsumerConfig{
Block: 0,
Count: int64(len(msgbuf)),
BufferSize: size,
Expand Down
26 changes: 13 additions & 13 deletions group_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (sc *simpleSyncMock) XAck(ctx context.Context, stream, group string, ids ..

func TestGroupConsumer_SimpleSync(t *testing.T) {
rdb := simpleSyncMock{}
cs := NewGroupMultiStreamConsumer[City](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": ">"})
cs := NewGroupMultiStreamConsumer[city](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": ">"})

var i int64 = 0
var readCount int64 = 100
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestGroupConsumer_SwitchToNew(t *testing.T) {
var readCount = 100
var maxHistory = 50
rdb := switchToNewMock{maxHandout: maxHistory}
cs := NewGroupMultiStreamConsumer[City](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0"})
cs := NewGroupMultiStreamConsumer[city](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0"})

var i = 0
for msg := range cs.Chan() {
Expand Down Expand Up @@ -154,12 +154,12 @@ func TestGroupConsumer_RemainingAck(t *testing.T) {
var ackCount = 100

rdb := remainingAckMock{}
cs := NewGroupMultiStreamConsumer[City](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0"}, GroupConsumerConfig{
cs := NewGroupMultiStreamConsumer[city](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0"}, GroupConsumerConfig{
AckBufferSize: uint(ackCount) + 1,
})

for i := 1; i <= ackCount; i += 1 {
cs.Ack(Message[City]{ID: fmt.Sprintf("0-%v", i)})
cs.Ack(Message[city]{ID: fmt.Sprintf("0-%v", i)})
}

time.Sleep(50 * time.Millisecond)
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestGroupConsumer_AckErrors(t *testing.T) {
var readCount = 5_000

rdb := ackErrorMock{}
cs := NewGroupMultiStreamConsumer[City](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0"})
cs := NewGroupMultiStreamConsumer[city](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0"})

var ackErrors = 0
var seen = 0
Expand Down Expand Up @@ -224,9 +224,9 @@ func TestGroupConsumer_AckErrorCancel(t *testing.T) {

rdb := ackErrorMock{}
ctx, cancelFunc := context.WithCancel(context.TODO())
cs := NewGroupMultiStreamConsumer[City](ctx, &rdb, "g1", "c1", map[string]string{"s1": "0-0"})
cs := NewGroupMultiStreamConsumer[city](ctx, &rdb, "g1", "c1", map[string]string{"s1": "0-0"})

var msgs []Message[City]
var msgs []Message[city]
for msg := range cs.Chan() {
assert.Nil(t, msg.Err)
msgs = append(msgs, msg)
Expand Down Expand Up @@ -257,7 +257,7 @@ func (fcm failCreateMock) XGroupCreateMkStream(ctx context.Context, stream, grou

func TestGroupConsumer_CreateError(t *testing.T) {
rdb := failCreateMock{}
cs := NewGroupMultiStreamConsumer[City](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0"})
cs := NewGroupMultiStreamConsumer[city](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0"})

msg := <-cs.Chan()
assert.NotNil(t, msg.Err)
Expand All @@ -278,7 +278,7 @@ func (rem readErrorMock) XReadGroup(ctx context.Context, a *redis.XReadGroupArgs

func TestGroupConsumer_ReadError(t *testing.T) {
rdb := readErrorMock{}
cs := NewGroupMultiStreamConsumer[City](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0"})
cs := NewGroupMultiStreamConsumer[city](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0"})

msg := <-cs.Chan()
assert.NotNil(t, msg.Err)
Expand Down Expand Up @@ -362,11 +362,11 @@ func (rem *readConcurentErrorMock) XAck(ctx context.Context, stream, group strin
func TestGroupConsumer_ConcurrentRead(t *testing.T) {
rdb := readConcurentErrorMock{}

cs := NewGroupMultiStreamConsumer[City](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0", "s2": "0-0", "s3": "0-0"})
cs := NewGroupMultiStreamConsumer[city](context.TODO(), &rdb, "g1", "c1", map[string]string{"s1": "0-0", "s2": "0-0", "s3": "0-0"})

msg := make([]Message[City], 0, 21)
msgError := make([]Message[City], 0, 21)
msgList := make([]Message[City], 0, 121)
msg := make([]Message[city], 0, 21)
msgError := make([]Message[city], 0, 21)
msgList := make([]Message[city], 0, 121)

for end := true; end; {
select {
Expand Down
141 changes: 141 additions & 0 deletions gtrsconvert/convert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package gtrsconvert

import (
"encoding"
"reflect"
"strconv"
"time"
)

// ConvertibleTo is implemented by types that can convert themselves to a map.
type ConvertibleTo interface {
ToMap() (map[string]any, error)
}

// ConvertibleFrom is implemented by types that can load themselves from a map.
type ConvertibleFrom interface {
FromMap(map[string]any) error
}

// structToMap convert a struct to a map.
func StructToMap(st any) (map[string]any, error) {
if c, ok := st.(ConvertibleTo); ok {
return c.ToMap()
}

rv := reflect.ValueOf(st)
rt := reflect.TypeOf(st)
out := make(map[string]interface{}, rv.NumField())

for i := 0; i < rv.NumField(); i++ {
fieldValue := rv.Field(i)
fieldType := rt.Field(i)
fieldName := getFieldNameFromType(fieldType)
switch v := fieldValue.Interface().(type) {
case time.Duration:
out[fieldName] = v.String()
case encoding.BinaryMarshaler:
txt, err := v.MarshalBinary()
if err != nil {
return nil, SerializeError{
Field: fieldType.Name,
Value: fieldValue.Interface(),
Err: err,
}
}
out[fieldName] = string(txt)
default:
out[fieldName] = fieldValue.Interface()
}
}
return out, nil
}

// mapToStruct tries to convert a map to a struct.
func MapToStruct(st any, data map[string]any) error {
rv := reflect.ValueOf(st).Elem()
rt := reflect.TypeOf(st)

if rt.Implements(typeOf[ConvertibleFrom]()) {
if c, ok := st.(ConvertibleFrom); ok {
return c.FromMap(data)
}
}

rt = rt.Elem()

for i := 0; i < rt.NumField(); i += 1 {
fieldRv := rv.Field(i)
fieldRt := rt.Field(i)

v, ok := data[getFieldNameFromType(fieldRt)]
if !ok {
continue
}

// The redis client always sends strings.
stval, ok := v.(string)
if !ok {
continue
}

val, err := valueFromString(fieldRv, stval)
if err != nil {
return FieldParseError{Field: fieldRt.Name, Value: v, Err: err}
} else {
fieldRv.Set(reflect.ValueOf(val))
}
}
return nil
}

// Parse value from string
// TODO: find a better solution. Maybe there is a library for this.
func valueFromString(val reflect.Value, st string) (any, error) {
iface := val.Interface()

switch cast := iface.(type) {
case string:
return st, nil
case bool:
return strconv.ParseBool(st)
case int:
v, err := strconv.ParseInt(st, 10, 0)
return int(v), err
case uint:
v, err := strconv.ParseUint(st, 10, 0)
return uint(v), err
case int32:
v, err := strconv.ParseInt(st, 10, 32)
return int32(v), err
case uint32:
v, err := strconv.ParseUint(st, 10, 32)
return uint32(v), err
case int64:
return strconv.ParseInt(st, 10, 64)
case uint64:
return strconv.ParseUint(st, 10, 64)
case float32:
v, err := strconv.ParseFloat(st, 32)
return float32(v), err
case float64:
return strconv.ParseFloat(st, 64)
case time.Duration:
return time.ParseDuration(st)
case encoding.BinaryUnmarshaler:
return cast, cast.UnmarshalBinary([]byte(st))
default:
ifaceptr := val.Addr().Interface()
unMarshaler, ok := ifaceptr.(encoding.BinaryUnmarshaler)
if ok {
err := unMarshaler.UnmarshalBinary([]byte(st))
if err != nil {
return nil, err
}
res := reflect.ValueOf(unMarshaler).Elem().Interface()
return res, nil
}
}

return nil, ErrUnsupportedFieldType
}
Loading

0 comments on commit d8a7916

Please sign in to comment.