Skip to content

Commit

Permalink
AddInt -> IncrBy, add IncrBy atomic op
Browse files Browse the repository at this point in the history
  • Loading branch information
ccbrown committed Jul 24, 2019
1 parent 45620d1 commit bb467ae
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 15 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ If you're using CloudFormation, you can just copy/paste that into your template.
Then you can connect to it like so:
```go
awsConfig := defaults.Get().Config.WithMaxRetries(5)
session := session.Must(session.NewSession(awsConfig))
session := session.Must(session.NewSession())
backend := &KeyValueStore{
backend: &dynamodbstore.Backend{
Client: &dynamodbstore.AWSBackendClient{
Expand Down
6 changes: 5 additions & 1 deletion atomic_write_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ type AtomicWriteResult interface {
ConditionalFailed() bool
}

// DynamoDB can't do more than 25 operations in an atomic write. So all stores should enforce this
// DynamoDB can't do more than 25 operations in an atomic write so all backends should enforce this
// limit.
const MaxAtomicWriteOperations = 25

Expand All @@ -23,6 +23,10 @@ type AtomicWriteOperation interface {
// Deletes a key. No conditionals are applied.
Delete(key string) AtomicWriteResult

// Increments the given key by some number. If the key doesn't exist, it's set to the given
// number instead. No conditionals are applied.
IncrBy(key string, n int64) AtomicWriteResult

// Adds a member to a sorted set. No conditionals are applied.
ZAdd(key string, member interface{}, score float64) AtomicWriteResult

Expand Down
5 changes: 3 additions & 2 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ type Backend interface {
Get(key string) (*string, error)
Set(key string, value interface{}) error

// Add an integer to an integer value. Or set if the key doesn't exist.
AddInt(key string, n int64) (int64, error)
// Increments the given key by some number. If the key doesn't exist, it's set to the given
// number instead.
IncrBy(key string, n int64) (int64, error)

// Set if the key already exists.
SetXX(key string, value interface{}) (bool, error)
Expand Down
13 changes: 13 additions & 0 deletions dynamodbstore/atomic_write_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,19 @@ func (op *AtomicWriteOperation) Delete(key string) keyvaluestore.AtomicWriteResu
})
}

func (op *AtomicWriteOperation) IncrBy(key string, n int64) keyvaluestore.AtomicWriteResult {
return op.write(dynamodb.TransactWriteItem{
Update: &dynamodb.Update{
Key: compositeKey(key, "_"),
TableName: &op.Backend.TableName,
UpdateExpression: aws.String("ADD v :n"),
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
":n": attributeValue(n),
},
},
})
}

func (op *AtomicWriteOperation) ZAdd(key string, member interface{}, score float64) keyvaluestore.AtomicWriteResult {
s := *keyvaluestore.ToString(member)
return op.write(dynamodb.TransactWriteItem{
Expand Down
2 changes: 1 addition & 1 deletion dynamodbstore/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func attributeValue(v interface{}) *dynamodb.AttributeValue {
panic(fmt.Sprintf("unsupported value type: %T", v))
}

func (b *Backend) AddInt(key string, n int64) (int64, error) {
func (b *Backend) IncrBy(key string, n int64) (int64, error) {
result, err := b.Client.UpdateItem(&dynamodb.UpdateItemInput{
Key: compositeKey(key, "_"),
TableName: aws.String(b.TableName),
Expand Down
5 changes: 5 additions & 0 deletions keyvaluestorecache/atomic_write_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ func (op *readCacheAtomicWriteOperation) Delete(key string) keyvaluestore.Atomic
return op.atomicWrite.Delete(key)
}

func (op *readCacheAtomicWriteOperation) IncrBy(key string, n int64) keyvaluestore.AtomicWriteResult {
op.invalidations = append(op.invalidations, key)
return op.atomicWrite.IncrBy(key, n)
}

func (op *readCacheAtomicWriteOperation) ZAdd(key string, member interface{}, score float64) keyvaluestore.AtomicWriteResult {
op.invalidations = append(op.invalidations, key)
return op.atomicWrite.ZAdd(key, member, score)
Expand Down
4 changes: 2 additions & 2 deletions keyvaluestorecache/read_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ func (c *ReadCache) Set(key string, value interface{}) error {
return err
}

func (c *ReadCache) AddInt(key string, n int64) (int64, error) {
n, err := c.backend.AddInt(key, n)
func (c *ReadCache) IncrBy(key string, n int64) (int64, error) {
n, err := c.backend.IncrBy(key, n)
c.Invalidate(key)
return n, err
}
Expand Down
35 changes: 32 additions & 3 deletions keyvaluestoretest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,35 @@ func TestBackendAtomicWrite(t *testing.T, newBackend func() keyvaluestore.Backen
assert.Nil(t, got)
})

t.Run("IncrBy", func(t *testing.T) {
assert.NoError(t, b.Set("foo", "bar"))
_, err := b.Delete("notset")
assert.NoError(t, err)

tx := b.AtomicWrite()
defer assertConditionFail(t, tx.SetNX("foo", "bar"))
tx.IncrBy("n", 1)
ok, err := tx.Exec()
assert.NoError(t, err)
assert.False(t, ok)

got, err := b.Get("n")
assert.NoError(t, err)
assert.Nil(t, got)

tx = b.AtomicWrite()
defer assertConditionPass(t, tx.SetNX("notset", "baz"))
tx.IncrBy("n", 1)
ok, err = tx.Exec()
assert.NoError(t, err)
assert.True(t, ok)

got, err = b.Get("n")
assert.NoError(t, err)
require.NotNil(t, got)
assert.Equal(t, "1", *got)
})

t.Run("SetEQ", func(t *testing.T) {
assert.NoError(t, b.Set("foo", 1))
assert.NoError(t, b.Set("deleteme", "bar"))
Expand Down Expand Up @@ -214,11 +243,11 @@ func TestBackend(t *testing.T, newBackend func() keyvaluestore.Backend) {
})
})

t.Run("AddInt", func(t *testing.T) {
t.Run("IncrBy", func(t *testing.T) {
b := newBackend()

t.Run("New", func(t *testing.T) {
n, err := b.AddInt("foo", 2)
n, err := b.IncrBy("foo", 2)
assert.EqualValues(t, 2, n)
assert.NoError(t, err)

Expand All @@ -236,7 +265,7 @@ func TestBackend(t *testing.T, newBackend func() keyvaluestore.Backend) {
assert.NoError(t, err)
assert.Equal(t, "1", *v)

n, err := b.AddInt("foo", 2)
n, err := b.IncrBy("foo", 2)
assert.EqualValues(t, 3, n)
assert.NoError(t, err)

Expand Down
8 changes: 8 additions & 0 deletions memorystore/atomic_write_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ func (op *AtomicWriteOperation) Delete(key string) keyvaluestore.AtomicWriteResu
})
}

func (op *AtomicWriteOperation) IncrBy(key string, n int64) keyvaluestore.AtomicWriteResult {
return op.write(&atomicWriteOperation{
write: func() {
op.Backend.incrBy(key, n)
},
})
}

func (op *AtomicWriteOperation) ZAdd(key string, member interface{}, score float64) keyvaluestore.AtomicWriteResult {
return op.write(&atomicWriteOperation{
write: func() {
Expand Down
6 changes: 3 additions & 3 deletions memorystore/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ func (b *Backend) set(key string, value interface{}) {
b.m[key] = value
}

func (b *Backend) AddInt(key string, n int64) (int64, error) {
func (b *Backend) IncrBy(key string, n int64) (int64, error) {
b.mutex.Lock()
defer b.mutex.Unlock()
return b.addInt(key, n)
return b.incrBy(key, n)
}

func (b *Backend) addInt(key string, n int64) (int64, error) {
func (b *Backend) incrBy(key string, n int64) (int64, error) {
if v, ok := b.m[key]; ok {
if s := keyvaluestore.ToString(v); s != nil {
i, err := strconv.ParseInt(*s, 10, 64)
Expand Down
9 changes: 9 additions & 0 deletions redisstore/atomic_write_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ func (op *AtomicWriteOperation) Delete(key string) keyvaluestore.AtomicWriteResu
})
}

func (op *AtomicWriteOperation) IncrBy(key string, n int64) keyvaluestore.AtomicWriteResult {
return op.write(&atomicWriteOperation{
key: key,
condition: "true",
write: "redis.call('incrby', $@, $0)",
args: []interface{}{n},
})
}

func (op *AtomicWriteOperation) ZAdd(key string, member interface{}, score float64) keyvaluestore.AtomicWriteResult {
return op.write(&atomicWriteOperation{
key: key,
Expand Down
2 changes: 1 addition & 1 deletion redisstore/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (b *Backend) Set(key string, value interface{}) error {
return b.Client.Set(key, value, 0).Err()
}

func (b *Backend) AddInt(key string, n int64) (int64, error) {
func (b *Backend) IncrBy(key string, n int64) (int64, error) {
return b.Client.IncrBy(key, n).Result()
}

Expand Down

0 comments on commit bb467ae

Please sign in to comment.