Skip to content

Commit

Permalink
Merge pull request #102 from aalness/streams_api
Browse files Browse the repository at this point in the history
dynamodb: add support for streams API
  • Loading branch information
boyand committed Oct 7, 2015
2 parents fda19cf + ecdf08c commit d769f53
Show file tree
Hide file tree
Showing 8 changed files with 614 additions and 22 deletions.
39 changes: 20 additions & 19 deletions aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,26 @@ type ServiceInfo struct {
//
// See http://goo.gl/d8BP1 for more details.
type Region struct {
Name string // the canonical name of this region.
EC2Endpoint string
S3Endpoint string
S3BucketEndpoint string // Not needed by AWS S3. Use ${bucket} for bucket name.
S3LocationConstraint bool // true if this region requires a LocationConstraint declaration.
S3LowercaseBucket bool // true if the region requires bucket names to be lower case.
SDBEndpoint string
SESEndpoint string
SNSEndpoint string
SQSEndpoint string
IAMEndpoint string
ELBEndpoint string
DynamoDBEndpoint string
CloudWatchServicepoint ServiceInfo
AutoScalingEndpoint string
RDSEndpoint ServiceInfo
STSEndpoint string
CloudFormationEndpoint string
ECSEndpoint string
Name string // the canonical name of this region.
EC2Endpoint string
S3Endpoint string
S3BucketEndpoint string // Not needed by AWS S3. Use ${bucket} for bucket name.
S3LocationConstraint bool // true if this region requires a LocationConstraint declaration.
S3LowercaseBucket bool // true if the region requires bucket names to be lower case.
SDBEndpoint string
SESEndpoint string
SNSEndpoint string
SQSEndpoint string
IAMEndpoint string
ELBEndpoint string
DynamoDBEndpoint string
CloudWatchServicepoint ServiceInfo
AutoScalingEndpoint string
RDSEndpoint ServiceInfo
STSEndpoint string
CloudFormationEndpoint string
ECSEndpoint string
DynamoDBStreamsEndpoint string
}

var Regions = map[string]Region{
Expand Down
11 changes: 11 additions & 0 deletions aws/regions.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var USGovWest = Region{
"https://sts.amazonaws.com",
"https://cloudformation.us-gov-west-1.amazonaws.com",
"https://ecs.us-gov-west-1.amazonaws.com",
"https://streams.dynamodb.us-gov-west-1.amazonaws.com",
}

var USEast = Region{
Expand All @@ -42,6 +43,7 @@ var USEast = Region{
"https://sts.amazonaws.com",
"https://cloudformation.us-east-1.amazonaws.com",
"https://ecs.us-east-1.amazonaws.com",
"https://streams.dynamodb.us-east-1.amazonaws.com",
}

var USWest = Region{
Expand All @@ -64,6 +66,7 @@ var USWest = Region{
"https://sts.amazonaws.com",
"https://cloudformation.us-west-1.amazonaws.com",
"https://ecs.us-west-1.amazonaws.com",
"https://streams.dynamodb.us-west-1.amazonaws.com",
}

var USWest2 = Region{
Expand All @@ -86,6 +89,7 @@ var USWest2 = Region{
"https://sts.amazonaws.com",
"https://cloudformation.us-west-2.amazonaws.com",
"https://ecs.us-west-2.amazonaws.com",
"https://streams.dynamodb.us-west-2.amazonaws.com",
}

var EUWest = Region{
Expand All @@ -108,6 +112,7 @@ var EUWest = Region{
"https://sts.amazonaws.com",
"https://cloudformation.eu-west-1.amazonaws.com",
"https://ecs.eu-west-1.amazonaws.com",
"https://streams.dynamodb.eu-west-1.amazonaws.com",
}

var EUCentral = Region{
Expand All @@ -130,6 +135,7 @@ var EUCentral = Region{
"https://sts.amazonaws.com",
"https://cloudformation.eu-central-1.amazonaws.com",
"https://ecs.eu-central-1.amazonaws.com",
"https://streams.dynamodb.eu-central-1.amazonaws.com",
}

var APSoutheast = Region{
Expand All @@ -152,6 +158,7 @@ var APSoutheast = Region{
"https://sts.amazonaws.com",
"https://cloudformation.ap-southeast-1.amazonaws.com",
"https://ecs.ap-southeast-1.amazonaws.com",
"https://streams.dynamodb.ap-southeast-1.amazonaws.com",
}

var APSoutheast2 = Region{
Expand All @@ -174,6 +181,7 @@ var APSoutheast2 = Region{
"https://sts.amazonaws.com",
"https://cloudformation.ap-southeast-2.amazonaws.com",
"https://ecs.ap-southeast-2.amazonaws.com",
"https://streams.dynamodb.ap-southeast-2.amazonaws.com",
}

var APNortheast = Region{
Expand All @@ -196,6 +204,7 @@ var APNortheast = Region{
"https://sts.amazonaws.com",
"https://cloudformation.ap-northeast-1.amazonaws.com",
"https://ecs.ap-northeast-1.amazonaws.com",
"https://streams.dynamodb.ap-northeast-1.amazonaws.com",
}

var SAEast = Region{
Expand All @@ -218,6 +227,7 @@ var SAEast = Region{
"https://sts.amazonaws.com",
"https://cloudformation.sa-east-1.amazonaws.com",
"https://ecs.sa-east-1.amazonaws.com",
"https://streams.dynamodb.sa-east-1.amazonaws.com",
}

var CNNorth = Region{
Expand All @@ -240,4 +250,5 @@ var CNNorth = Region{
"https://sts.cn-north-1.amazonaws.com.cn",
"https://cloudformation.cn-north-1.amazonaws.com.cn",
"https://ecs.cn-north-1.amazonaws.com.cn",
"https://streams.dynamodb.cn-north-1.amazonaws.com.cn",
}
25 changes: 23 additions & 2 deletions dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ func NewQuery(queryParts []string) *Query {
}
*/

const (
// DynamoDBAPIPrefix is the versioned prefix for DynamoDB API commands.
DynamoDBAPIPrefix = "DynamoDB_20120810."
// DynamoDBStreamsAPIPrefix is the versioned prefix for DynamoDB Streams API commands.
DynamoDBStreamsAPIPrefix = "DynamoDBStreams_20120810."
)

// Specific error constants
var ErrNotFound = errors.New("Item not found")

Expand Down Expand Up @@ -74,7 +81,13 @@ func buildError(r *http.Response, jsonBody []byte) error {

func (s *Server) queryServer(target string, query *Query) ([]byte, error) {
data := strings.NewReader(query.String())
hreq, err := http.NewRequest("POST", s.Region.DynamoDBEndpoint+"/", data)
var endpoint string
if isStreamsTarget(target) {
endpoint = s.Region.DynamoDBStreamsEndpoint
} else {
endpoint = s.Region.DynamoDBEndpoint
}
hreq, err := http.NewRequest("POST", endpoint+"/", data)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -117,5 +130,13 @@ func (s *Server) queryServer(target string, query *Query) ([]byte, error) {
}

func target(name string) string {
return "DynamoDB_20120810." + name
return DynamoDBAPIPrefix + name
}

func streamsTarget(name string) string {
return DynamoDBStreamsAPIPrefix + name
}

func isStreamsTarget(target string) bool {
return strings.HasPrefix(target, DynamoDBStreamsAPIPrefix)
}
5 changes: 4 additions & 1 deletion dynamodb/dynamodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ func setUpAuth(c *C) {
}
if *local {
c.Log("Using local server")
dynamodb_region = aws.Region{DynamoDBEndpoint: "http://127.0.0.1:8000"}
dynamodb_region = aws.Region{
DynamoDBEndpoint: "http://127.0.0.1:8000",
DynamoDBStreamsEndpoint: "http://127.0.0.1:8000",
}
dynamodb_auth = aws.Auth{AccessKey: "DUMMY_KEY", SecretKey: "DUMMY_SECRET"}
} else {
c.Log("Using REAL AMAZON SERVER")
Expand Down
35 changes: 35 additions & 0 deletions dynamodb/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ func (q *Query) AddCreateRequestTable(description TableDescriptionT) {
"WriteCapacityUnits": int(description.ProvisionedThroughput.WriteCapacityUnits),
}

if description.StreamSpecification.StreamEnabled {
b["StreamSpecification"] = msi{
"StreamEnabled": "true",
"StreamViewType": description.StreamSpecification.StreamViewType,
}
}

localSecondaryIndexes := []interface{}{}

for _, ind := range description.LocalSecondaryIndexes {
Expand Down Expand Up @@ -304,6 +311,34 @@ func (q *Query) AddExpressionAttributes(attributes []Attribute) {
}
}

func (q *Query) AddExclusiveStartStreamArn(arn string) {
q.buffer["ExclusiveStartStreamArn"] = arn
}

func (q *Query) AddStreamArn(arn string) {
q.buffer["StreamArn"] = arn
}

func (q *Query) AddExclusiveStartShardId(shardId string) {
q.buffer["ExclusiveStartShardId"] = shardId
}

func (q *Query) AddShardId(shardId string) {
q.buffer["ShardId"] = shardId
}

func (q *Query) AddShardIteratorType(shardIteratorType string) {
q.buffer["ShardIteratorType"] = shardIteratorType
}

func (q *Query) AddSequenceNumber(sequenceNumber string) {
q.buffer["SequenceNumber"] = sequenceNumber
}

func (q *Query) AddShardIterator(shardIterator string) {
q.buffer["ShardIterator"] = shardIterator
}

func attributeList(attributes []Attribute) msi {
b := msi{}
for _, a := range attributes {
Expand Down
Loading

0 comments on commit d769f53

Please sign in to comment.