Skip to content

Commit

Permalink
Datum performance improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
serejja committed Feb 1, 2016
1 parent 82176dd commit 2f447d1
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 6 deletions.
48 changes: 44 additions & 4 deletions datum_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"reflect"
"sync"
)

// Reader is an interface that may be implemented to avoid using runtime reflection during deserialization.
Expand All @@ -24,6 +25,9 @@ type DatumReader interface {
SetSchema(Schema)
}

var enumSymbolsToIndexCache map[string]map[string]int32 = make(map[string]map[string]int32)
var enumSymbolsToIndexCacheLock sync.Mutex

// Generic Avro enum representation. This is still subject to change and may be rethought.
type GenericEnum struct {
// Avro enum symbols.
Expand Down Expand Up @@ -265,8 +269,26 @@ func (this *SpecificDatumReader) mapEnum(field Schema, dec Decoder) (reflect.Val
if enumIndex, err := dec.ReadEnum(); err != nil {
return reflect.ValueOf(enumIndex), err
} else {
enum := NewGenericEnum(field.(*EnumSchema).Symbols)
enum.SetIndex(enumIndex)
schema := field.(*EnumSchema)
fullName := schema.FullName()

if enumSymbolsToIndexCache[fullName] == nil {
enumSymbolsToIndexCacheLock.Lock()
if enumSymbolsToIndexCache[fullName] == nil {
symbolsToIndex := make(map[string]int32)
for index, symbol := range schema.Symbols {
symbolsToIndex[symbol] = int32(index)
}
enumSymbolsToIndexCache[fullName] = symbolsToIndex
}
enumSymbolsToIndexCacheLock.Unlock()
}

enum := &GenericEnum{
Symbols: schema.Symbols,
symbolsToIndex: enumSymbolsToIndexCache[fullName],
index: enumIndex,
}
return reflect.ValueOf(enum), nil
}
}
Expand Down Expand Up @@ -449,8 +471,26 @@ func (this *GenericDatumReader) mapEnum(field Schema, dec Decoder) (*GenericEnum
if enumIndex, err := dec.ReadEnum(); err != nil {
return nil, err
} else {
enum := NewGenericEnum(field.(*EnumSchema).Symbols)
enum.SetIndex(enumIndex)
schema := field.(*EnumSchema)
fullName := schema.FullName()

if enumSymbolsToIndexCache[fullName] == nil {
enumSymbolsToIndexCacheLock.Lock()
if enumSymbolsToIndexCache[fullName] == nil {
symbolsToIndex := make(map[string]int32)
for index, symbol := range schema.Symbols {
symbolsToIndex[symbol] = int32(index)
}
enumSymbolsToIndexCache[fullName] = symbolsToIndex
}
enumSymbolsToIndexCacheLock.Unlock()
}

enum := &GenericEnum{
Symbols: schema.Symbols,
symbolsToIndex: enumSymbolsToIndexCache[fullName],
index: enumIndex,
}
return enum, nil
}
}
Expand Down
1 change: 1 addition & 0 deletions datum_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ func specificReaderComplexVal() (Schema, []byte) {
}

func specificReaderBenchComplex(b *testing.B, dest interface{}) {
b.ReportAllocs()
schema, buf := specificReaderComplexVal()
datumReader := NewSpecificDatumReader()
datumReader.SetSchema(schema)
Expand Down
3 changes: 1 addition & 2 deletions datum_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ limitations under the License. */
package avro

import (
"fmt"
"reflect"
"strings"
)
Expand All @@ -33,7 +32,7 @@ func findField(where reflect.Value, name string) (reflect.Value, error) {
if rf, ok := rm.names[name]; ok {
return where.FieldByIndex(rf), nil
}
return reflect.Value{}, fmt.Errorf("Field %s does not exist in %s", name, t.Name())
return reflect.Value{}, FieldDoesNotExist
}

func reflectBuildRi(t reflect.Type) *reflectInfo {
Expand Down
3 changes: 3 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,6 @@ var InvalidSchema = errors.New("Invalid schema")

// Happens when a datum reader has no set schema.
var SchemaNotSet = errors.New("Schema not set")

// FieldDoesNotExist happens when a struct does not have a necessary field.
var FieldDoesNotExist = errors.New("Field does not exist")
8 changes: 8 additions & 0 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,14 @@ func (this *EnumSchema) GetName() string {
return this.Name
}

func (this *EnumSchema) FullName() string {
if this.Namespace == "" {
return this.GetName()
}

return fmt.Sprintf("%s.%s", this.Namespace, this.Name)
}

// Gets a custom non-reserved string property from this schema and a bool representing if it exists.
func (this *EnumSchema) Prop(key string) (string, bool) {
if this.Properties != nil {
Expand Down

0 comments on commit 2f447d1

Please sign in to comment.