Skip to content

Commit

Permalink
fix: Version entity serialization mechanism and fix issue with int64 …
Browse files Browse the repository at this point in the history
…vals (#2944)

* fix: version the entity serialization mechanism to fix issue with int64 vals

Signed-off-by: Achal Shah <achals@gmail.com>

* fix tests

Signed-off-by: Achal Shah <achals@gmail.com>

* Add a test

Signed-off-by: Achal Shah <achals@gmail.com>

* Add a test

Signed-off-by: Achal Shah <achals@gmail.com>

* fix test

Signed-off-by: Achal Shah <achals@gmail.com>

* fix test

Signed-off-by: Achal Shah <achals@gmail.com>

* fix test

Signed-off-by: Achal Shah <achals@gmail.com>

* simplify

Signed-off-by: Achal Shah <achals@gmail.com>

* simplify

Signed-off-by: Achal Shah <achals@gmail.com>

* feature_store.yaml

Signed-off-by: Achal Shah <achals@gmail.com>

* fix tests

Signed-off-by: Achal Shah <achals@gmail.com>

* remove protos

Signed-off-by: Achal Shah <achals@gmail.com>

* fix tests

Signed-off-by: Achal Shah <achals@gmail.com>

* update feature_store.yaml templates

Signed-off-by: Achal Shah <achals@gmail.com>

* fix java

Signed-off-by: Achal Shah <achals@gmail.com>

* fix java test

Signed-off-by: Achal Shah <achals@gmail.com>

* docs

Signed-off-by: Achal Shah <achals@gmail.com>

* docs

Signed-off-by: Achal Shah <achals@gmail.com>

* docs

Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Jul 20, 2022
1 parent 92785b8 commit d0d27a3
Show file tree
Hide file tree
Showing 35 changed files with 373 additions and 159 deletions.
2 changes: 1 addition & 1 deletion go/internal/feast/onlinestore/onlinestore.go
Expand Up @@ -61,7 +61,7 @@ func NewOnlineStore(config *registry.RepoConfig) (OnlineStore, error) {
onlineStore, err := NewSqliteOnlineStore(config.Project, config, config.OnlineStore)
return onlineStore, err
} else if onlineStoreType == "redis" {
onlineStore, err := NewRedisOnlineStore(config.Project, config.OnlineStore)
onlineStore, err := NewRedisOnlineStore(config.Project, config, config.OnlineStore)
return onlineStore, err
} else {
return nil, fmt.Errorf("%s online store type is currently not supported; only redis and sqlite are supported", onlineStoreType)
Expand Down
38 changes: 25 additions & 13 deletions go/internal/feast/onlinestore/redisonlinestore.go
Expand Up @@ -6,14 +6,15 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/feast-dev/feast/go/internal/feast/registry"
"sort"
"strconv"
"strings"

"github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto"
"github.com/spaolacci/murmur3"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
Expand All @@ -37,10 +38,15 @@ type RedisOnlineStore struct {

// Redis client connector
client *redis.Client

config *registry.RepoConfig
}

func NewRedisOnlineStore(project string, onlineStoreConfig map[string]interface{}) (*RedisOnlineStore, error) {
store := RedisOnlineStore{project: project}
func NewRedisOnlineStore(project string, config *registry.RepoConfig, onlineStoreConfig map[string]interface{}) (*RedisOnlineStore, error) {
store := RedisOnlineStore{
project: project,
config: config,
}

var address []string
var password string
Expand Down Expand Up @@ -161,7 +167,7 @@ func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.E
redisKeyToEntityIndex := make(map[string]int)
for i := 0; i < len(entityKeys); i++ {

var key, err = buildRedisKey(r.project, entityKeys[i])
var key, err = buildRedisKey(r.project, entityKeys[i], r.config.EntityKeySerializationVersion)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -270,16 +276,16 @@ func (r *RedisOnlineStore) Destruct() {

}

func buildRedisKey(project string, entityKey *types.EntityKey) (*[]byte, error) {
serKey, err := serializeEntityKey(entityKey)
func buildRedisKey(project string, entityKey *types.EntityKey, entityKeySerializationVersion int64) (*[]byte, error) {
serKey, err := serializeEntityKey(entityKey, entityKeySerializationVersion)
if err != nil {
return nil, err
}
fullKey := append(*serKey, []byte(project)...)
return &fullKey, nil
}

func serializeEntityKey(entityKey *types.EntityKey) (*[]byte, error) {
func serializeEntityKey(entityKey *types.EntityKey, entityKeySerializationVersion int64) (*[]byte, error) {
// Serialize entity key to a bytestring so that it can be used as a lookup key in a hash table.

// Ensure that we have the right amount of join keys and entity values
Expand Down Expand Up @@ -316,7 +322,7 @@ func serializeEntityKey(entityKey *types.EntityKey) (*[]byte, error) {
offset := (2 * len(keys)) + (i * 3)
value := m[keys[i]].GetVal()

valueBytes, valueTypeBytes, err := serializeValue(value)
valueBytes, valueTypeBytes, err := serializeValue(value, entityKeySerializationVersion)
if err != nil {
return valueBytes, err
}
Expand All @@ -341,7 +347,7 @@ func serializeEntityKey(entityKey *types.EntityKey) (*[]byte, error) {
return &entityKeyBuffer, nil
}

func serializeValue(value interface{}) (*[]byte, types.ValueType_Enum, error) {
func serializeValue(value interface{}, entityKeySerializationVersion int64) (*[]byte, types.ValueType_Enum, error) {
// TODO: Implement support for other types (at least the major types like ints, strings, bytes)
switch x := (value).(type) {
case *types.Value_StringVal:
Expand All @@ -354,10 +360,16 @@ func serializeValue(value interface{}) (*[]byte, types.ValueType_Enum, error) {
binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int32Val))
return &valueBuffer, types.ValueType_INT32, nil
case *types.Value_Int64Val:
// TODO (woop): We unfortunately have to use 32 bit here for backward compatibility :(
valueBuffer := make([]byte, 4)
binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int64Val))
return &valueBuffer, types.ValueType_INT64, nil
if entityKeySerializationVersion <= 1 {
// We unfortunately have to use 32 bit here for backward compatibility :(
valueBuffer := make([]byte, 4)
binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int64Val))
return &valueBuffer, types.ValueType_INT64, nil
} else {
valueBuffer := make([]byte, 8)
binary.LittleEndian.PutUint64(valueBuffer, uint64(x.Int64Val))
return &valueBuffer, types.ValueType_INT64, nil
}
case nil:
return nil, types.ValueType_INVALID, fmt.Errorf("could not detect type for %v", x)
default:
Expand Down
25 changes: 21 additions & 4 deletions go/internal/feast/onlinestore/redisonlinestore_test.go
@@ -1,6 +1,7 @@
package onlinestore

import (
"github.com/feast-dev/feast/go/internal/feast/registry"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -10,7 +11,11 @@ func TestNewRedisOnlineStore(t *testing.T) {
var config = map[string]interface{}{
"connection_string": "redis://localhost:6379",
}
store, err := NewRedisOnlineStore("test", config)
rc := &registry.RepoConfig{
OnlineStore: config,
EntityKeySerializationVersion: 2,
}
store, err := NewRedisOnlineStore("test", rc, config)
assert.Nil(t, err)
var opts = store.client.Options()
assert.Equal(t, opts.Addr, "redis://localhost:6379")
Expand All @@ -23,7 +28,11 @@ func TestNewRedisOnlineStoreWithPassword(t *testing.T) {
var config = map[string]interface{}{
"connection_string": "redis://localhost:6379,password=secret",
}
store, err := NewRedisOnlineStore("test", config)
rc := &registry.RepoConfig{
OnlineStore: config,
EntityKeySerializationVersion: 2,
}
store, err := NewRedisOnlineStore("test", rc, config)
assert.Nil(t, err)
var opts = store.client.Options()
assert.Equal(t, opts.Addr, "redis://localhost:6379")
Expand All @@ -34,7 +43,11 @@ func TestNewRedisOnlineStoreWithDB(t *testing.T) {
var config = map[string]interface{}{
"connection_string": "redis://localhost:6379,db=1",
}
store, err := NewRedisOnlineStore("test", config)
rc := &registry.RepoConfig{
OnlineStore: config,
EntityKeySerializationVersion: 2,
}
store, err := NewRedisOnlineStore("test", rc, config)
assert.Nil(t, err)
var opts = store.client.Options()
assert.Equal(t, opts.Addr, "redis://localhost:6379")
Expand All @@ -45,7 +58,11 @@ func TestNewRedisOnlineStoreWithSsl(t *testing.T) {
var config = map[string]interface{}{
"connection_string": "redis://localhost:6379,ssl=true",
}
store, err := NewRedisOnlineStore("test", config)
rc := &registry.RepoConfig{
OnlineStore: config,
EntityKeySerializationVersion: 2,
}
store, err := NewRedisOnlineStore("test", rc, config)
assert.Nil(t, err)
var opts = store.client.Options()
assert.Equal(t, opts.Addr, "redis://localhost:6379")
Expand Down
15 changes: 8 additions & 7 deletions go/internal/feast/onlinestore/sqliteonlinestore.go
Expand Up @@ -16,23 +16,24 @@ import (

_ "github.com/mattn/go-sqlite3"
"google.golang.org/protobuf/proto"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
)

type SqliteOnlineStore struct {
// Feast project name
project string
path string
db *sql.DB
db_mu sync.Mutex
project string
path string
db *sql.DB
db_mu sync.Mutex
repoConfig *registry.RepoConfig
}

// Creates a new sqlite online store object. onlineStoreConfig should have relative path of database file with respect to repoConfig.repoPath.
func NewSqliteOnlineStore(project string, repoConfig *registry.RepoConfig, onlineStoreConfig map[string]interface{}) (*SqliteOnlineStore, error) {
store := SqliteOnlineStore{project: project}
store := SqliteOnlineStore{project: project, repoConfig: repoConfig}
if db_path, ok := onlineStoreConfig["path"]; !ok {
return nil, fmt.Errorf("cannot find sqlite path %s", db_path)
} else {
Expand Down Expand Up @@ -69,7 +70,7 @@ func (s *SqliteOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.
in_query := make([]string, len(entityKeys))
serialized_entities := make([]interface{}, len(entityKeys))
for i := 0; i < len(entityKeys); i++ {
serKey, err := serializeEntityKey(entityKeys[i])
serKey, err := serializeEntityKey(entityKeys[i], s.repoConfig.EntityKeySerializationVersion)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions go/internal/feast/registry/repoconfig.go
Expand Up @@ -30,6 +30,8 @@ type RepoConfig struct {
Flags map[string]interface{} `json:"flags"`
// RepoPath
RepoPath string `json:"repo_path"`
// EntityKeySerializationVersion
EntityKeySerializationVersion int64 `json:"entity_key_serialization_version"`
}

type RegistryConfig struct {
Expand Down

0 comments on commit d0d27a3

Please sign in to comment.