Permalink
Browse files

support for basic Queries

  • Loading branch information...
1 parent 3cf4e21 commit 481ed86f448565c6f9523a43c66505e1d2dc9742 @fabiokung committed Sep 7, 2012
Showing with 156 additions and 7 deletions.
  1. +47 −7 dynamodb.go
  2. +36 −0 dynamodb_test.go
  3. +73 −0 json.go
View
@@ -2,6 +2,7 @@ package dynamodb
import (
"bytes"
+ "encoding/json"
"github.com/bmizerany/aws4"
"io/ioutil"
"net/http"
@@ -46,36 +47,75 @@ func (t *Table) PutItem(item interface{}) error {
return err
}
- req, err := http.NewRequest("POST", t.region.url(), ioutil.NopCloser(bytes.NewReader(body)))
+ _, err = t.doDynamoRequest("PutItem", body)
if err != nil {
return err
}
+ return nil
+}
+
+func (t *Table) Query(key interface{}, limit int, consistent bool) ([]map[string]interface{}, error) {
+ body, err := t.queryRequestBody(key, limit, consistent)
+ if err != nil {
+ return nil, err
+ }
+
+ resp, err := t.doDynamoRequest("Query", body)
+ if err != nil {
+ return nil, err
+ }
+
+ data := make(map[string]interface{})
+ err = json.Unmarshal(resp, &data)
+ if err != nil {
+ return nil, err
+ }
+
+ items := data["Items"].([]interface{})
+ parsed := make([]map[string]interface{}, len(items))
+ for i, raw := range items {
+ item := raw.(map[string]interface{})
+ parsed[i], err = dynamoItemToMap(item)
+ if err != nil {
+ return parsed, err
+ }
+ }
+
+ return parsed, nil
+}
+
+func (t *Table) doDynamoRequest(operation string, body []byte) ([]byte, error) {
+ req, err := http.NewRequest("POST", t.region.url(), ioutil.NopCloser(bytes.NewReader(body)))
+ if err != nil {
+ return nil, err
+ }
+
req.ContentLength = int64(len(body))
req.Header.Set("Host", t.region.endpoint)
- req.Header.Set("X-Amz-Target", "DynamoDB_20111205.PutItem")
+ req.Header.Set("X-Amz-Target", "DynamoDB_20111205."+operation)
req.Header.Set("X-Amz-Date", time.Now().UTC().Format(iSO8601BasicFormat))
req.Header.Set("Date", time.Now().UTC().Format(http.TimeFormat))
req.Header.Set("Content-Type", "application/x-amz-json-1.0")
req.Header.Set("Connection", "Keep-Alive")
err = t.service.Sign(t.keys, req)
if err != nil {
- return err
+ return nil, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
- return err
+ return nil, err
}
defer resp.Body.Close()
+ body, err = ioutil.ReadAll(resp.Body)
if resp.StatusCode != 200 {
- body, _ := ioutil.ReadAll(resp.Body)
- return RequestError{Status: resp.Status, Message: string(body)}
+ return body, RequestError{Status: resp.Status, Message: string(body)}
}
- return nil
+ return body, err
}
type RequestError struct {
View
@@ -71,3 +71,39 @@ func TestPutRequestFloatSerialization(t *testing.T) {
func TestPutRequestBinarySerialization(t *testing.T) {
t.Error("pending")
}
+
+func TestGetRequestSerialization(t *testing.T) {
+ table := &Table{name: "RDepotTable"}
+ requestBody, err := table.queryRequestBody("myPrimaryKey", 0, false)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Equal(t,
+ `{"TableName":"RDepotTable","HashKeyValue":{"S":"myPrimaryKey"}}`,
+ string(requestBody))
+}
+
+func TestGetRequestSerializationWithLimit(t *testing.T) {
+ table := &Table{name: "RDepotTable"}
+ requestBody, err := table.queryRequestBody("myPrimaryKey", 10, false)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Equal(t,
+ `{"TableName":"RDepotTable","Limit":10,"HashKeyValue":{"S":"myPrimaryKey"}}`,
+ string(requestBody))
+}
+
+func TestGetRequestSerializationWithConsistentRead(t *testing.T) {
+ table := &Table{name: "RDepotTable"}
+ requestBody, err := table.queryRequestBody("myPrimaryKey", 0, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Equal(t,
+ `{"TableName":"RDepotTable","ConsistentRead":true,"HashKeyValue":{"S":"myPrimaryKey"}}`,
+ string(requestBody))
+}
View
73 json.go
@@ -3,10 +3,35 @@ package dynamodb
import (
"bytes"
"encoding/json"
+ "fmt"
"reflect"
"strconv"
+ "strings"
)
+type queryRequest struct {
+ TableName string
+ Limit int `json:",omitempty"`
+ ConsistentRead bool `json:",omitempty"`
+ HashKeyValue map[string]string
+}
+
+func (t *Table) queryRequestBody(key interface{}, limit int, consistent bool) ([]byte, error) {
+ v := reflect.ValueOf(key)
+ typeId, value, err := fieldToDynamoString(v)
+ if err != nil {
+ return []byte(""), err
+ }
+
+ request := &queryRequest{
+ TableName: t.name,
+ Limit: limit,
+ ConsistentRead: consistent,
+ HashKeyValue: make(map[string]string)}
+ request.HashKeyValue[typeId] = value
+ return json.Marshal(request)
+}
+
type putItemRequest struct {
TableName string
Item putRequestItem
@@ -55,6 +80,46 @@ func (i putRequestItem) MarshalJSON() ([]byte, error) {
return out.Bytes(), nil
}
+func dynamoItemToMap(item map[string]interface{}) (map[string]interface{}, error) {
+ result := make(map[string]interface{}, len(item))
+ for name, raw := range item {
+ attr := raw.(map[string]interface{})
+
+ var value interface{}
+ if v, ok := attr["S"]; ok {
+ value = v.(string)
+ } else if v, ok := attr["N"]; ok {
+ var err error
+ value, err = parseNumber(v.(string))
+ if err != nil {
+ return result, err
+ }
+ } else if v, ok := attr["B"]; ok {
+ value = []byte(v.(string))
+ } else {
+ var first string
+ for k, _ := range attr {
+ first = k
+ break
+ }
+ return result, &UnsupportedTypeError{TypeId: first}
+ }
+
+ result[name] = value
+ }
+
+ return result, nil
+}
+
+func parseNumber(value string) (number interface{}, err error) {
+ if strings.Contains(value, ".") {
+ number, err = strconv.ParseFloat(value, 64)
+ } else {
+ number, err = strconv.ParseInt(value, 10, 64)
+ }
+ return
+}
+
func fieldToDynamoString(v reflect.Value) (typeId string, value string, err error) {
if v.Kind() == reflect.Interface || v.Kind() == reflect.Ptr {
v = v.Elem()
@@ -96,3 +161,11 @@ func fieldToDynamoString(v reflect.Value) (typeId string, value string, err erro
return "", "", &json.MarshalerError{Type: v.Type()}
}
+
+type UnsupportedTypeError struct {
+ TypeId string
+}
+
+func (err *UnsupportedTypeError) Error() string {
+ return fmt.Sprintf("Dynamo type %s is currently unsupported", err.TypeId)
+}

0 comments on commit 481ed86

Please sign in to comment.