Skip to content
This repository has been archived by the owner on Aug 28, 2021. It is now read-only.

Commit

Permalink
Add AWS backend for NBS (#2914)
Browse files Browse the repository at this point in the history
Add a new backend for NBS that stores tables in S3 and
manifests in DynamoDB.

Fixes #2877
  • Loading branch information
cmasone-attic committed Dec 6, 2016
1 parent dc28924 commit 0750459
Show file tree
Hide file tree
Showing 266 changed files with 83,782 additions and 3,459 deletions.
55 changes: 43 additions & 12 deletions go/nbs/benchmarks/main.go
Expand Up @@ -16,6 +16,9 @@ import (
"github.com/attic-labs/noms/go/nbs"
"github.com/attic-labs/noms/go/util/profile"
"github.com/attic-labs/testify/assert"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/dustin/go-humanize"
flag "github.com/juju/gnuflag"
)
Expand All @@ -26,9 +29,14 @@ var (
mtMiB = flag.Uint64("mem", 64, "Size in MiB of memTable")
useNBS = flag.String("useNBS", "", "Existing Database to use for not-WriteNovel benchmarks")
toNBS = flag.String("toNBS", "", "Write to an NBS store in the given directory")
useAWS = flag.String("useAWS", "", "Name of existing Database to use for not-WriteNovel benchmarks")
toAWS = flag.String("toAWS", "", "Write to an NBS store in AWS")
toFile = flag.String("toFile", "", "Write to a file in the given directory")
)

const s3Bucket = "attic-nbs"
const dynamoTable = "attic-nbs"

type panickingBencher struct {
n int
}
Expand Down Expand Up @@ -66,32 +74,53 @@ func main() {
wrote := false
var writeDB func()
var refresh func() blockStore
if *toNBS != "" || *toFile != "" {
var dir string
if *toNBS != "" || *toFile != "" || *toAWS != "" {
var reset func()
if *toNBS != "" {
dir = makeTempDir(*toNBS, pb)
open = func() blockStore {
return nbs.NewLocalStore(dir, bufSize)
}
dir := makeTempDir(*toNBS, pb)
defer os.RemoveAll(dir)
open = func() blockStore { return nbs.NewLocalStore(dir, bufSize) }
reset = func() { os.RemoveAll(dir); os.MkdirAll(dir, 0777) }

} else if *toFile != "" {
dir = makeTempDir(*toFile, pb)
dir := makeTempDir(*toFile, pb)
defer os.RemoveAll(dir)
open = func() blockStore {
f, err := ioutil.TempFile(dir, "")
d.Chk.NoError(err)
return newFileBlockStore(f)
}
reset = func() { os.RemoveAll(dir); os.MkdirAll(dir, 0777) }

} else if *toAWS != "" {
sess := session.New(aws.NewConfig().WithRegion("us-west-2"))
open = func() blockStore {
return nbs.NewAWSStore(dynamoTable, *toAWS, s3Bucket, sess, bufSize)
}
reset = func() {
ddb := dynamodb.New(sess)
_, err := ddb.DeleteItem(&dynamodb.DeleteItemInput{
TableName: aws.String(dynamoTable),
Key: map[string]*dynamodb.AttributeValue{
"db": {S: toAWS},
},
})
d.PanicIfError(err)
}
}
defer os.RemoveAll(dir)

writeDB = func() { wrote = ensureNovelWrite(wrote, open, src, pb) }
refresh = func() blockStore {
os.RemoveAll(dir)
os.MkdirAll(dir, 0777)
reset()
return open()
}
} else {
if *useNBS != "" {
open = func() blockStore { return nbs.NewLocalStore(*useNBS, bufSize) }
} else if *useAWS != "" {
sess := session.New(aws.NewConfig().WithRegion("us-west-2"))
open = func() blockStore {
return nbs.NewLocalStore(*useNBS, bufSize)
return nbs.NewAWSStore(dynamoTable, *useAWS, s3Bucket, sess, bufSize)
}
}
writeDB = func() {}
Expand All @@ -105,7 +134,9 @@ func main() {
}{
{"WriteNovel", func() {}, func() { wrote = benchmarkNovelWrite(refresh, src, pb) }},
{"WriteDuplicate", writeDB, func() { benchmarkNoRefreshWrite(open, src, pb) }},
{"ReadSequential", writeDB, func() { benchmarkRead(open, src.GetHashes(), src, pb) }},
{"ReadSequential", writeDB, func() {
benchmarkRead(open, src.GetHashes(), src, pb)
}},
{"ReadManySequential", writeDB, func() { benchmarkReadMany(open, src.GetHashes(), src, 1<<8, 6, pb) }},
{"ReadHashOrder", writeDB, func() {
ordered := src.GetHashes()
Expand Down
119 changes: 119 additions & 0 deletions go/nbs/dynamo_manifest.go
@@ -0,0 +1,119 @@
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0

package nbs

import (
"fmt"
"strings"

"github.com/attic-labs/noms/go/constants"
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/hash"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/dynamodb"
)

const (
tableName = "attic-nbs"
dbAttr = "db"
rootAttr = "root"
versAttr = "vers"
nbsVersAttr = "nbsVers"
tableSpecsAttr = "specs"

valueNotExistsExpression = "attribute_not_exists(" + rootAttr + ")"
)

var valueEqualsExpression = fmt.Sprintf("(%s = :prev) and (%s = :vers)", rootAttr, versAttr)

type ddbsvc interface {
GetItem(input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error)
PutItem(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error)
}

// It assumes the existence of a DynamoDB table whose primary partition key is in String format and named `db`.
type dynamoManifest struct {
table, db string
ddbsvc ddbsvc
}

func newDynamoManifest(table, namespace string, ddb ddbsvc) *dynamoManifest {
return &dynamoManifest{table: table, db: namespace, ddbsvc: ddb}
}

func (dm dynamoManifest) ParseIfExists(readHook func()) (exists bool, vers string, root hash.Hash, tableSpecs []tableSpec) {
result, err := dm.ddbsvc.GetItem(&dynamodb.GetItemInput{
TableName: aws.String(dm.table),
Key: map[string]*dynamodb.AttributeValue{
dbAttr: {S: aws.String(dm.db)},
},
})
d.PanicIfError(err)

// !exists(dbAttr) => unitialized store
if len(result.Item) > 0 {
if !validateManifest(result.Item) {
d.Panic("Malformed manifest for %s: %+v", dm.db, result.Item)
}
exists = true
vers = *result.Item[versAttr].S
root = hash.New(result.Item[rootAttr].B)
tableSpecs = parseSpecs(strings.Split(*result.Item[tableSpecsAttr].S, ":"))
}

return
}

func validateManifest(item map[string]*dynamodb.AttributeValue) bool {
return len(item) == 5 &&
item[nbsVersAttr] != nil && item[nbsVersAttr].S != nil &&
StorageVersion == *item[nbsVersAttr].S &&
item[versAttr] != nil && item[versAttr].S != nil &&
item[rootAttr] != nil && item[rootAttr].B != nil &&
item[tableSpecsAttr] != nil && item[tableSpecsAttr].S != nil
}

func (dm dynamoManifest) Update(specs []tableSpec, root, newRoot hash.Hash, writeHook func()) (actual hash.Hash, tableSpecs []tableSpec) {
tableSpecs = specs

tableInfo := make([]string, 2*len(tableSpecs))
formatSpecs(tableSpecs, tableInfo)
putArgs := dynamodb.PutItemInput{
TableName: aws.String(dm.table),
Item: map[string]*dynamodb.AttributeValue{
dbAttr: {S: aws.String(dm.db)},
nbsVersAttr: {S: aws.String(StorageVersion)},
versAttr: {S: aws.String(constants.NomsVersion)},
rootAttr: {B: newRoot[:]},
tableSpecsAttr: {S: aws.String(strings.Join(tableInfo, ":"))},
},
}

if root.IsEmpty() {
putArgs.ConditionExpression = aws.String(valueNotExistsExpression)
} else {
putArgs.ConditionExpression = aws.String(valueEqualsExpression)
putArgs.ExpressionAttributeValues = map[string]*dynamodb.AttributeValue{
":prev": {B: root[:]},
":vers": {S: aws.String(constants.NomsVersion)},
}
}

_, err := dm.ddbsvc.PutItem(&putArgs)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "ConditionalCheckFailedException" {
exists, vers, actual, tableSpecs := dm.ParseIfExists(nil)
d.Chk.True(exists)
d.Chk.True(vers == constants.NomsVersion)
return actual, tableSpecs
} // TODO handle other aws errors?
}
d.Chk.NoError(err)
}

return newRoot, tableSpecs
}
162 changes: 162 additions & 0 deletions go/nbs/dynamo_manifest_test.go
@@ -0,0 +1,162 @@
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0

package nbs

import (
"bytes"
"testing"

"github.com/attic-labs/noms/go/constants"
"github.com/attic-labs/noms/go/hash"
"github.com/attic-labs/testify/assert"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
)

const (
table = "testTable"
db = "testDB"
)

func makeDynamoManifestFake(t *testing.T) (mm manifest, ddb *fakeDDB) {
ddb = makeFakeDDB(assert.New(t))
mm = newDynamoManifest(table, db, ddb)
return
}

func TestDynamoManifestParseIfExists(t *testing.T) {
assert := assert.New(t)
mm, ddb := makeDynamoManifestFake(t)

exists, vers, root, tableSpecs := mm.ParseIfExists(nil)
assert.False(exists)

// Simulate another process writing a manifest (with an old Noms version).
newRoot := hash.Of([]byte("new root"))
tableName := hash.Of([]byte("table1"))
ddb.put(db, newRoot[:], "0", tableName.String()+":"+"0")

// ParseIfExists should now reflect the manifest written above.
exists, vers, root, tableSpecs = mm.ParseIfExists(nil)
assert.True(exists)
assert.Equal("0", vers)
assert.Equal(newRoot, root)
if assert.Len(tableSpecs, 1) {
assert.Equal(tableName.String(), tableSpecs[0].name.String())
assert.Equal(uint32(0), tableSpecs[0].chunkCount)
}
}

func TestDynamoManifestUpdateWontClobberOldVersion(t *testing.T) {
assert := assert.New(t)
mm, ddb := makeDynamoManifestFake(t)

// Simulate another process having already put old Noms data in dir/.
badRoot := hash.Of([]byte("bad root"))
ddb.put(db, badRoot[:], "0", "")

assert.Panics(func() { mm.Update(nil, badRoot, hash.Hash{}, nil) })
}

func TestDynamoManifestUpdate(t *testing.T) {
assert := assert.New(t)
mm, _ := makeDynamoManifestFake(t)

// First, test winning the race against another process.
newRoot := hash.Of([]byte("new root"))
specs := []tableSpec{{computeAddr([]byte("a")), 3}}
actual, tableSpecs := mm.Update(specs, hash.Hash{}, newRoot, nil)
assert.Equal(newRoot, actual)
assert.Equal(specs, tableSpecs)

// Now, test the case where the optimistic lock fails, and someone else updated the root since last we checked.
newRoot2 := hash.Of([]byte("new root 2"))
actual, tableSpecs = mm.Update(nil, hash.Hash{}, newRoot2, nil)
assert.Equal(newRoot, actual)
assert.Equal(specs, tableSpecs)
actual, tableSpecs = mm.Update(nil, actual, newRoot2, nil)
}

type fakeDDB struct {
data map[string]record
assert *assert.Assertions
numPuts int
}

type record struct {
root []byte
vers, specs string
}

func makeFakeDDB(a *assert.Assertions) *fakeDDB {
return &fakeDDB{
data: map[string]record{},
assert: a,
}
}

func (m *fakeDDB) GetItem(input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) {
key := input.Key[dbAttr].S
m.assert.NotNil(key, "key should have been a String: %+v", input.Key[dbAttr])

item := map[string]*dynamodb.AttributeValue{}
root, vers, specs := m.get(*key)
if root != nil {
item[dbAttr] = &dynamodb.AttributeValue{S: key}
item[nbsVersAttr] = &dynamodb.AttributeValue{S: aws.String(StorageVersion)}
item[versAttr] = &dynamodb.AttributeValue{S: aws.String(vers)}
item[rootAttr] = &dynamodb.AttributeValue{B: root}
item[tableSpecsAttr] = &dynamodb.AttributeValue{S: aws.String(specs)}
}
return &dynamodb.GetItemOutput{Item: item}, nil
}

func (m *fakeDDB) get(k string) ([]byte, string, string) {
return m.data[k].root, m.data[k].vers, m.data[k].specs
}

func (m *fakeDDB) put(k string, r []byte, v string, s string) {
m.data[k] = record{r, v, s}
}

func (m *fakeDDB) PutItem(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) {
m.assert.NotNil(input.Item[dbAttr], "%s should have been present", dbAttr)
m.assert.NotNil(input.Item[dbAttr].S, "key should have been a String: %+v", input.Item[dbAttr])
key := *input.Item[dbAttr].S

m.assert.NotNil(input.Item[nbsVersAttr], "%s should have been present", nbsVersAttr)
m.assert.NotNil(input.Item[nbsVersAttr].S, "nbsVers should have been a String: %+v", input.Item[nbsVersAttr])
m.assert.Equal(StorageVersion, *input.Item[nbsVersAttr].S)

m.assert.NotNil(input.Item[versAttr], "%s should have been present", versAttr)
m.assert.NotNil(input.Item[versAttr].S, "nbsVers should have been a String: %+v", input.Item[versAttr])
m.assert.Equal(constants.NomsVersion, *input.Item[versAttr].S)

m.assert.NotNil(input.Item[rootAttr], "%s should have present", rootAttr)
m.assert.NotNil(input.Item[rootAttr].B, "root should have been a blob: %+v", input.Item[rootAttr])
root := input.Item[rootAttr].B

m.assert.NotNil(input.Item[tableSpecsAttr], "%s should have been present", tableSpecsAttr)
m.assert.NotNil(input.Item[tableSpecsAttr].S, "specs should have been a String: %+v", input.Item[tableSpecsAttr])
specs := *input.Item[tableSpecsAttr].S

mustNotExist := *(input.ConditionExpression) == valueNotExistsExpression
current, present := m.data[key]

if mustNotExist && present {
return nil, mockAWSError("ConditionalCheckFailedException")
} else if !mustNotExist && !checkCondition(current, input.ExpressionAttributeValues) {
return nil, mockAWSError("ConditionalCheckFailedException")
}

m.put(key, root, constants.NomsVersion, specs)
m.numPuts++

return &dynamodb.PutItemOutput{}, nil
}

func checkCondition(current record, expressionAttrVals map[string]*dynamodb.AttributeValue) bool {
return current.vers == *expressionAttrVals[":vers"].S && bytes.Equal(current.root, expressionAttrVals[":prev"].B)
}

0 comments on commit 0750459

Please sign in to comment.