Skip to content

Commit

Permalink
✨ feat: smart concurrency policy
Browse files Browse the repository at this point in the history
  • Loading branch information
0xE8551CCB committed Oct 11, 2019
1 parent 5ffb1b1 commit e373252
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 25 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ func main() {
// data: [{"id":"0","title":"Task #1","user":{"name":"user:100"}},{"id":"1","title":"Task #2","user":{"name":"user:101"}}]
data, _ := json.Marshal(taskSchema)
}

```

To learn more about [portal](https://github.com/iFaceless/portal), please read the [User Guide](./USERGUIDE.md)~
Expand Down
4 changes: 2 additions & 2 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

总体而言,它可以用来将应用层的数据模型对象(数据源可以是数据库、缓存、RPC 等)序列化成指定的 API Schema 结构体。然后用户可选择将序列化后的结构转换成 JSON 或者其它的格式供 HTTP API 返回。

*需要注意的是,[marshmallow](https://github.com/marshmallow-code/marshmallow) 框架除了提供对象序列化的功能外,还支持非常灵活的表单字段校验。但是 [portal](https://github.com/iFaceless/portal/) 只关注核心的序列化功能,对于结构体字段校验,可以使用[go-playground/validator](https://github.com/go-playground/validator) 或者 [asaskevich/govalidator](https://github.com/asaskevich/govalidator) 框架。*
*需要注意的是,[marshmallow](https://github.com/marshmallow-code/marshmallow) 框架除了提供对象序列化的功能外,还支持非常灵活的表单字段校验。但是 [portal](https://github.com/iFaceless/portal/) 只关注核心的序列化功能,对于结构体字段校验,可以使用 [go-playground/validator](https://github.com/go-playground/validator) 或者 [asaskevich/govalidator](https://github.com/asaskevich/govalidator) 框架。*

**生产环境使用请谨慎,可能还有潜藏的 Bug 等待修复~**

Expand All @@ -20,7 +20,7 @@
1. 自动尝试完成来源数据类型到目标数据类型的转换,无需更多的样板代码;
1. 简洁易用的 API。

# Install
# 安装

```
get get -u github.com/ifaceless/portal
Expand Down
48 changes: 42 additions & 6 deletions chell.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
type Chell struct {
// json, yaml etc.
fieldAliasMapTagName string
disableConcurrency bool
onlyFieldFilters map[int][]*filterNode
excludeFieldFilters map[int][]*filterNode
}
Expand Down Expand Up @@ -124,7 +125,7 @@ func (c *Chell) dump(ctx context.Context, dst *schema, src interface{}) error {
}

func (c *Chell) dumpSyncFields(ctx context.Context, dst *schema, src interface{}) error {
syncFields := dst.syncFields()
syncFields := dst.syncFields(c.disableConcurrency)
if len(syncFields) == 0 {
return nil
}
Expand All @@ -147,7 +148,7 @@ func (c *Chell) dumpSyncFields(ctx context.Context, dst *schema, src interface{}
}

func (c *Chell) dumpAsyncFields(ctx context.Context, dst *schema, src interface{}) error {
asyncFields := dst.asyncFields()
asyncFields := dst.asyncFields(c.disableConcurrency)
if len(asyncFields) == 0 {
return nil
}
Expand Down Expand Up @@ -283,13 +284,48 @@ func (c *Chell) dumpMany(ctx context.Context, dst, src interface{}, onlyFields,
schemaSlice.Set(reflect.MakeSlice(schemaSlice.Type(), rv.Len(), rv.Cap()))
schemaType := indirectStructTypeP(schemaSlice.Type())

if c.disableConcurrency || !hasAsyncFields(schemaType, onlyFields, excludeFields) {
return c.dumpManySynchronously(ctx, schemaType, schemaSlice, rv, onlyFields, excludeFields)
}

return c.dumpManyConcurrently(ctx, schemaType, schemaSlice, rv, onlyFields, excludeFields)
}

func (c *Chell) dumpManySynchronously(ctx context.Context, schemaType reflect.Type, dst, src reflect.Value, onlyFields, excludeFields []string) error {
logger.Debugf("[portal.dumpManySynchronously] '%s' -> '%s'", src, dst)
for i := 0; i < src.Len(); i++ {
schemaPtr := reflect.New(schemaType)
toSchema := newSchema(schemaPtr.Interface()).withFieldAliasMapTagName(c.fieldAliasMapTagName)
toSchema.setOnlyFields(onlyFields...)
toSchema.setExcludeFields(excludeFields...)
val := src.Index(i).Interface()
err := c.dump(incrDumpDepthContext(ctx), toSchema, val)
if err != nil {
return errors.WithStack(err)
}

elem := dst.Index(i)
switch elem.Kind() {
case reflect.Struct:
elem.Set(reflect.Indirect(schemaPtr))
case reflect.Ptr:
elem.Set(schemaPtr)
default:
return errors.Errorf("unsupported schema field type '%s', expected a struct or a pointer to struct", elem.Type().Kind())
}
}
return nil
}

func (c *Chell) dumpManyConcurrently(ctx context.Context, schemaType reflect.Type, dst, src reflect.Value, onlyFields, excludeFields []string) error {
logger.Debugf("[portal.dumpManyConcurrently] '%s' -> '%s'", src, dst)
type Result struct {
index int
schemaPtr reflect.Value
}

payloads := make([]interface{}, 0, rv.Len())
for i := 0; i < rv.Len(); i++ {
payloads := make([]interface{}, 0, src.Len())
for i := 0; i < src.Len(); i++ {
payloads = append(payloads, i)
}

Expand All @@ -301,7 +337,7 @@ func (c *Chell) dumpMany(ctx context.Context, dst, src interface{}, onlyFields,
toSchema := newSchema(schemaPtr.Interface()).withFieldAliasMapTagName(c.fieldAliasMapTagName)
toSchema.setOnlyFields(onlyFields...)
toSchema.setExcludeFields(excludeFields...)
val := rv.Index(index).Interface()
val := src.Index(index).Interface()
err := c.dump(incrDumpDepthContext(ctx), toSchema, val)
return &Result{index: index, schemaPtr: schemaPtr}, err
},
Expand All @@ -316,7 +352,7 @@ func (c *Chell) dumpMany(ctx context.Context, dst, src interface{}, onlyFields,
}

r := jobResult.Data.(*Result)
elem := schemaSlice.Index(r.index)
elem := dst.Index(r.index)
switch elem.Kind() {
case reflect.Struct:
elem.Set(reflect.Indirect(r.schemaPtr))
Expand Down
20 changes: 10 additions & 10 deletions examples/todo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ func main() {
portal.SetMaxPoolSize(1024)
portal.SetDebug(true)

task := model.TaskModel{
ID: 1,
UserID: 1,
Title: "Finish your jobs.",
}
//task := model.TaskModel{
// ID: 1,
// UserID: 1,
// Title: "Finish your jobs.",
//}

printFullFields(&task)
printWithOnlyFields(&task, "Description")
printWithOnlyFields(&task, "ID", "User[id,Notifications[ID],AnotherNotifications[Title]]", "simple_user[id]")
//printFullFields(&task)
//printWithOnlyFields(&task, "Description")
//printWithOnlyFields(&task, "ID", "User[id,Notifications[ID],AnotherNotifications[Title]]", "simple_user[id]")
printMany()
printWithExcludeFields(&task, "Description", "ID", "User[Name,Notifications[ID,Content],AnotherNotifications], SimpleUser")
//printWithExcludeFields(&task, "Description", "ID", "User[Name,Notifications[ID,Content],AnotherNotifications], SimpleUser")
fmt.Printf("elapsed: %.1f ms\n", time.Since(start).Seconds()*1000)
}

Expand Down Expand Up @@ -78,7 +78,7 @@ func printMany() {
})
}

err := portal.Dump(&taskSchemas, &tasks, portal.Only("ID", "Title", "User[Name]", "Description"))
err := portal.Dump(&taskSchemas, &tasks, portal.Only("ID", "Title", "User[Name]", "Description"), portal.DisableConcurrency())
if err != nil {
panic(err)
}
Expand Down
7 changes: 7 additions & 0 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,10 @@ func FieldAliasMapTagName(tag string) Option {
return nil
}
}

func DisableConcurrency() Option {
return func(c *Chell) error {
c.disableConcurrency = true
return nil
}
}
20 changes: 17 additions & 3 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ func newSchema(v interface{}) *schema {
return sch
}

func hasAsyncFields(schemaType reflect.Type, onlyFields, excludeFields []string) bool {
// TODO: try to cache the result
schema := newSchema(reflect.New(schemaType).Interface())
schema.setOnlyFields(onlyFields...)
schema.setExcludeFields(excludeFields...)
return len(schema.asyncFields(false)) > 0
}

func (s *schema) withFieldAliasMapTagName(t string) *schema {
s.fieldAliasMapTagName = t
return s
Expand Down Expand Up @@ -83,16 +91,22 @@ func (s *schema) availableFields() []*schemaField {
return fields
}

func (s *schema) syncFields() (fields []*schemaField) {
func (s *schema) syncFields(disableConcurrency bool) (fields []*schemaField) {
for _, f := range s.availableFields() {
if !f.async() {
if disableConcurrency {
fields = append(fields, f)
} else if !f.async() {
fields = append(fields, f)
}
}
return
}

func (s *schema) asyncFields() (fields []*schemaField) {
func (s *schema) asyncFields(disableConcurrency bool) (fields []*schemaField) {
if disableConcurrency {
return
}

for _, f := range s.availableFields() {
if f.async() {
fields = append(fields, f)
Expand Down
19 changes: 16 additions & 3 deletions schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package portal

import (
"context"
"reflect"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -21,7 +22,7 @@ type UserSchema2 struct {
PersonSchema
Name string `portal:"meth:GetName"`
School *SchoolSchema `portal:"nested"`
Async int `portal:"async"`
Async int `json:"async" portal:"async"`
}

func (u *UserSchema2) GetName(user interface{}) interface{} {
Expand Down Expand Up @@ -54,12 +55,24 @@ func TestSchema(t *testing.T) {
assert.Equal(t, &SchoolSchema{Name: "test school", Addr: ""}, val)
}

func Test_hasAsyncFields(t *testing.T) {
assert.False(t, hasAsyncFields(reflect.TypeOf(&PersonSchema{}), nil, nil))
assert.False(t, hasAsyncFields(reflect.TypeOf(PersonSchema{}), nil, nil))
assert.True(t, hasAsyncFields(reflect.TypeOf(UserSchema2{}), nil, nil))
assert.True(t, hasAsyncFields(reflect.TypeOf(&UserSchema2{}), nil, nil))
assert.True(t, hasAsyncFields(reflect.TypeOf(&UserSchema2{}), []string{"async"}, nil))
assert.True(t, hasAsyncFields(reflect.TypeOf(&UserSchema2{}), []string{"Async"}, nil))
assert.False(t, hasAsyncFields(reflect.TypeOf(&UserSchema2{}), []string{"Name"}, nil))
assert.False(t, hasAsyncFields(reflect.TypeOf(&UserSchema2{}), []string{}, []string{"Async"}))
assert.False(t, hasAsyncFields(reflect.TypeOf(&UserSchema2{}), []string{}, []string{"async"}))
}

func TestSchema_GetFields(t *testing.T) {
schema := newSchema(&UserSchema2{}).withFieldAliasMapTagName("json")
assert.ElementsMatch(t, []string{"Age", "ID", "Name", "School", "Async"}, filedNames(schema.availableFields()))

assert.ElementsMatch(t, []string{"Age", "ID", "Name", "School"}, filedNames(schema.syncFields()))
assert.ElementsMatch(t, []string{"Async"}, filedNames(schema.asyncFields()))
assert.ElementsMatch(t, []string{"Age", "ID", "Name", "School"}, filedNames(schema.syncFields(false)))
assert.ElementsMatch(t, []string{"Async"}, filedNames(schema.asyncFields(false)))

schema.setOnlyFields("ID")
assert.ElementsMatch(t, []string{"ID"}, filedNames(schema.availableFields()))
Expand Down

0 comments on commit e373252

Please sign in to comment.