Skip to content

Commit

Permalink
Merge pull request #19 from knocknote/feature/fix-update-index-key-by…
Browse files Browse the repository at this point in the history
…-primary-key

Fix handling of updating cache key of index type
  • Loading branch information
goccy committed Nov 29, 2019
2 parents 9affe17 + 3ffd6e5 commit 28184e9
Show file tree
Hide file tree
Showing 2 changed files with 243 additions and 15 deletions.
80 changes: 65 additions & 15 deletions second_level_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,10 @@ func (c *SecondLevelCache) update(tx *Tx, key server.CacheKey, value []byte, log
casID = tx.stash.casIDs[key.String()]
}
if err := c.cacheServer.Set(&server.CacheStoreRequest{
Key: key,
Value: value,
Key: key,
Value: value,
Expiration: c.opt.Expiration(),
CasID: casID,
CasID: casID,
}); err != nil {
return xerrors.Errorf("failed to update cache: %w", err)
}
Expand Down Expand Up @@ -841,6 +841,58 @@ func (c *SecondLevelCache) FindByQueryBuilder(ctx context.Context, tx *Tx, build
return nil
}

func (c *SecondLevelCache) deleteCacheKeyByOldValue(tx *Tx, column string, value *StructValue) error {
for _, index := range c.indexes {
if !index.HasColumn(column) {
continue
}

cacheKey, err := index.CacheKey(value)
if err != nil {
return xerrors.Errorf("failed to get cache key: %w", err)
}
if err := c.deleteUniqueKeyOrOldKey(tx, cacheKey); err != nil {
return xerrors.Errorf("failed to delete unique key or old key: %w", err)
}
}
return nil
}

func (c *SecondLevelCache) updateOrDeleteCacheKeyByNewValue(tx *Tx, column string, value *StructValue) error {
for _, index := range c.indexes {
if index.Type == IndexTypePrimaryKey {
continue
}
if !index.HasColumn(column) {
continue
}

switch index.Type {
case IndexTypeUniqueKey:
primaryKey, err := c.primaryKey.CacheKey(value)
if err != nil {
return xerrors.Errorf("failed to get cache key: %w", err)
}
cacheKey, err := index.CacheKey(value)
if err != nil {
return xerrors.Errorf("failed to get cache key: %w", err)
}
if err := c.setUniqueKey(tx, cacheKey, primaryKey); err != nil {
return xerrors.Errorf("failed to set unique key: %w", err)
}
case IndexTypeKey:
cacheKey, err := index.CacheKey(value)
if err != nil {
return xerrors.Errorf("failed to get cache key: %w", err)
}
if err := c.deleteOldKey(tx, cacheKey); err != nil {
return xerrors.Errorf("failed to delete old key: %w", err)
}
}
}
return nil
}

func (c *SecondLevelCache) updateValue(tx *Tx, target *StructValue, updateMap map[string]interface{}) error {
for k, v := range updateMap {
field, exists := target.fields[k]
Expand All @@ -862,20 +914,18 @@ func (c *SecondLevelCache) updateValue(tx *Tx, target *StructValue, updateMap ma
target.fields[k] = value
continue
}
for _, index := range c.indexes {
if !index.HasColumn(k) {
continue
}

cacheKey, err := index.CacheKey(target)
if err != nil {
return xerrors.Errorf("failed to get cache key: %w", err)
}
if err := c.deleteUniqueKeyOrOldKey(tx, cacheKey); err != nil {
return xerrors.Errorf("failed to delete unique key or old key: %w", err)
}
// remove cache key by old unique key or old key
if err := c.deleteCacheKeyByOldValue(tx, k, target); err != nil {
return xerrors.Errorf("failed to delete cache key by value before updating")
}

target.fields[k] = value // update indexed value

// remove cache key by new key
if err := c.updateOrDeleteCacheKeyByNewValue(tx, k, target); err != nil {
return xerrors.Errorf("failed to delete cache key by value after updating")
}
target.fields[k] = value
}
return nil
}
Expand Down
178 changes: 178 additions & 0 deletions second_level_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,184 @@ func testUpdateKeyColumn(t *testing.T, typ CacheServerType) {
NoError(t, tx.Commit())
}

func TestUniqueIndexColumnUpdateByPrimaryKey(t *testing.T) {
NoError(t, initUserLoginTable(conn))
NoError(t, initCache(conn, CacheServerTypeMemcached))
slc := NewSecondLevelCache(userLoginType(), cache.cacheServer, TableOption{})
NoError(t, slc.cacheServer.Flush())
NoError(t, slc.WarmUp(conn))

// set unique index cache
{
txConn, err := conn.Begin()
NoError(t, err)
tx, err := cache.Begin(txConn)
NoError(t, err)

builder := NewQueryBuilder("user_logins").
Eq("user_id", uint64(1)).
Eq("user_session_id", uint64(1))

// create for positive cache
var userLogins UserLogins
NoError(t, slc.FindByQueryBuilder(context.Background(), tx, builder, &userLogins))
if len(userLogins) != 1 {
t.Fatal("failed to get value by index key")
}

builder = NewQueryBuilder("user_logins").
Eq("user_id", uint64(1)).
Eq("user_session_id", uint64(2))

// create for negative cache
var newUserLogins UserLogins
NoError(t, slc.FindByQueryBuilder(context.Background(), tx, builder, &newUserLogins))
if len(newUserLogins) != 0 {
t.Fatal("failed to get negative cache")
}
NoError(t, tx.Commit())
}

// update unique index cache by primary key
{
txConn, err := conn.Begin()
NoError(t, err)
tx, err := cache.Begin(txConn)
NoError(t, err)
updateParam := map[string]interface{}{
"user_session_id": uint64(2),
}
builder := NewQueryBuilder("user_logins").Eq("id", uint64(1))
NoError(t, slc.UpdateByQueryBuilder(context.Background(), tx, builder, updateParam))
NoError(t, tx.Commit())
}

// get value by old param
{
txConn, err := conn.Begin()
NoError(t, err)
tx, err := cache.Begin(txConn)
NoError(t, err)
builder := NewQueryBuilder("user_logins").
Eq("user_id", uint64(1)).
Eq("user_session_id", uint64(1))

var userLogins UserLogins
NoError(t, slc.FindByQueryBuilder(context.Background(), tx, builder, &userLogins))
if len(userLogins) != 0 {
t.Fatal("failed to get to updated value")
}
NoError(t, tx.Commit())
}

// get value by new param
{
txConn, err := conn.Begin()
NoError(t, err)
tx, err := cache.Begin(txConn)
NoError(t, err)
builder := NewQueryBuilder("user_logins").
Eq("user_id", uint64(1)).
Eq("user_session_id", uint64(2))

var userLogins UserLogins
NoError(t, slc.FindByQueryBuilder(context.Background(), tx, builder, &userLogins))
if len(userLogins) != 1 {
t.Fatal("failed to get to updated value")
}
NoError(t, tx.Commit())
}
}

func TestIndexColumnUpdateByPrimaryKey(t *testing.T) {
NoError(t, initUserLoginTable(conn))
NoError(t, initCache(conn, CacheServerTypeMemcached))
slc := NewSecondLevelCache(userLoginType(), cache.cacheServer, TableOption{})
NoError(t, slc.cacheServer.Flush())
NoError(t, slc.WarmUp(conn))

// set index cache
{
txConn, err := conn.Begin()
NoError(t, err)
tx, err := cache.Begin(txConn)
NoError(t, err)

builder := NewQueryBuilder("user_logins").
Eq("user_id", uint64(1)).
Eq("login_param_id", uint64(1))

// create for positive cache
var userLogins UserLogins
NoError(t, slc.FindByQueryBuilder(context.Background(), tx, builder, &userLogins))
if len(userLogins) != 1 {
t.Fatal("failed to get value by index key")
}

builder = NewQueryBuilder("user_logins").
Eq("user_id", uint64(1)).
Eq("login_param_id", uint64(2))

// create for negative cache
var newUserLogins UserLogins
NoError(t, slc.FindByQueryBuilder(context.Background(), tx, builder, &newUserLogins))
if len(newUserLogins) != 0 {
t.Fatal("failed to get negative cache")
}
NoError(t, tx.Commit())
}

// update index cache by primary key
{
txConn, err := conn.Begin()
NoError(t, err)
tx, err := cache.Begin(txConn)
NoError(t, err)
updateParam := map[string]interface{}{
"login_param_id": uint64(2),
}
builder := NewQueryBuilder("user_logins").Eq("id", uint64(1))
NoError(t, slc.UpdateByQueryBuilder(context.Background(), tx, builder, updateParam))
NoError(t, tx.Commit())
}

// get value by old param
{
txConn, err := conn.Begin()
NoError(t, err)
tx, err := cache.Begin(txConn)
NoError(t, err)
builder := NewQueryBuilder("user_logins").
Eq("user_id", uint64(1)).
Eq("login_param_id", uint64(1))

var userLogins UserLogins
NoError(t, slc.FindByQueryBuilder(context.Background(), tx, builder, &userLogins))
if len(userLogins) != 0 {
t.Fatal("failed to get to updated value")
}
NoError(t, tx.Commit())
}

// get value by new param
{
txConn, err := conn.Begin()
NoError(t, err)
tx, err := cache.Begin(txConn)
NoError(t, err)
builder := NewQueryBuilder("user_logins").
Eq("user_id", uint64(1)).
Eq("login_param_id", uint64(2))

var userLogins UserLogins
NoError(t, slc.FindByQueryBuilder(context.Background(), tx, builder, &userLogins))
if len(userLogins) != 1 {
t.Fatal("failed to get to updated value")
}
NoError(t, tx.Commit())
}
}

func TestDeleteByQueryBuilder(t *testing.T) {
for cacheServerType := range []CacheServerType{CacheServerTypeMemcached, CacheServerTypeRedis} {
testDeleteByQueryBuilder(t, CacheServerType(cacheServerType))
Expand Down

0 comments on commit 28184e9

Please sign in to comment.