Skip to content

Commit

Permalink
Fix prefix field in each service
Browse files Browse the repository at this point in the history
  • Loading branch information
evalphobia committed Oct 26, 2016
1 parent 19012db commit 6241acb
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 70 deletions.
50 changes: 26 additions & 24 deletions dynamodb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (

// Table is a wapper struct for DynamoDB table
type Table struct {
service *DynamoDB
name string
design *TableDesign
service *DynamoDB
name string
nameWithPrefix string
design *TableDesign

putSpool []*SDK.PutItemInput
errorItems []*SDK.PutItemInput
Expand All @@ -35,19 +36,20 @@ func NewTable(svc *DynamoDB, name string) (*Table, error) {

design := newTableDesignFromDescription(req.Table)
return &Table{
service: svc,
name: name,
design: design,
service: svc,
name: name,
nameWithPrefix: tableName,
design: design,
}, nil
}

// Design returns table design.
func (t *Table) Design() (*TableDesign, error) {
req, err := t.service.client.DescribeTable(&SDK.DescribeTableInput{
TableName: pointers.String(t.name),
TableName: pointers.String(t.nameWithPrefix),
})
if err != nil {
t.service.Errorf("error on `DescribeTable` operation; table=%s; error=%s", t.name, err.Error())
t.service.Errorf("error on `DescribeTable` operation; table=%s; error=%s", t.nameWithPrefix, err.Error())
return nil, err
}

Expand Down Expand Up @@ -76,14 +78,14 @@ func (t *Table) UpdateReadThroughput(r int64) error {
// updateThroughput updates dynamodb table provisioned throughput
func (t *Table) updateThroughput() error {
_, err := t.service.client.UpdateTable(&SDK.UpdateTableInput{
TableName: pointers.String(t.name),
TableName: pointers.String(t.nameWithPrefix),
ProvisionedThroughput: &SDK.ProvisionedThroughput{
ReadCapacityUnits: pointers.Long64(t.design.readCapacity),
WriteCapacityUnits: pointers.Long64(t.design.writeCapacity),
},
})
if err != nil {
t.service.Errorf("error on `UpdateTable` operation; table=%s; error=%s", t.name, err.Error())
t.service.Errorf("error on `UpdateTable` operation; table=%s; error=%s", t.nameWithPrefix, err.Error())
return err
}

Expand All @@ -103,13 +105,13 @@ func (t *Table) updateThroughput() error {
// AddItem adds an item to the write-waiting list (writeItem)
func (t *Table) AddItem(item *PutItem) {
w := &SDK.PutItemInput{
TableName: pointers.String(t.name),
TableName: pointers.String(t.nameWithPrefix),
ReturnConsumedCapacity: pointers.String("TOTAL"),
Item: item.data,
Expected: item.conditions,
}
t.putSpool = append(t.putSpool, w)
t.service.addWriteTable(t.name)
t.service.addWriteTable(t.nameWithPrefix)
}

// Put excecutes put operation from the write-waiting list (writeItem)
Expand All @@ -132,7 +134,7 @@ func (t *Table) Put() error {

t.putSpool = nil
if errList.HasError() {
t.service.Errorf("errors on `Put` operations; table=%s; errors=[%s];", t.name, errList.Error())
t.service.Errorf("errors on `Put` operations; table=%s; errors=[%s];", t.nameWithPrefix, errList.Error())
return errList
}
return nil
Expand All @@ -143,7 +145,7 @@ func (t *Table) validatePutItem(item *SDK.PutItemInput) error {
hashKey := t.design.GetHashKeyName()
itemAttrs := item.Item
if _, ok := itemAttrs[hashKey]; !ok {
return fmt.Errorf("error on `validatePutItem`; No such hash key; table=%s; hashkey=%s", t.name, hashKey)
return fmt.Errorf("error on `validatePutItem`; No such hash key; table=%s; hashkey=%s", t.nameWithPrefix, hashKey)
}

rangeKey := t.design.GetRangeKeyName()
Expand All @@ -152,7 +154,7 @@ func (t *Table) validatePutItem(item *SDK.PutItemInput) error {
}

if _, ok := itemAttrs[rangeKey]; !ok {
return fmt.Errorf("error on `validatePutItem`; No such range key; table=%s; rangekey=%s", t.name, rangeKey)
return fmt.Errorf("error on `validatePutItem`; No such range key; table=%s; rangekey=%s", t.nameWithPrefix, rangeKey)
}
return nil
}
Expand Down Expand Up @@ -192,10 +194,10 @@ func (t *Table) scan(cond *ConditionList, in *SDK.ScanInput) (*QueryResult, erro
}

in.ExclusiveStartKey = cond.startKey
in.TableName = pointers.String(t.name)
in.TableName = pointers.String(t.nameWithPrefix)
req, err := t.service.client.Scan(in)
if err != nil {
t.service.Errorf("error on `Scan` operation; table=%s; error=%s;", t.name, err.Error())
t.service.Errorf("error on `Scan` operation; table=%s; error=%s;", t.nameWithPrefix, err.Error())
return nil, err
}

Expand Down Expand Up @@ -227,7 +229,7 @@ func (t *Table) Count(cond *ConditionList) (*QueryResult, error) {
func (t *Table) query(cond *ConditionList, in *SDK.QueryInput) (*QueryResult, error) {
if !cond.HasCondition() {
err := fmt.Errorf("condition is missing, you must specify at least one condition")
t.service.Errorf("error on `query`; table=%s; error=%s", t.name, err.Error())
t.service.Errorf("error on `query`; table=%s; error=%s", t.nameWithPrefix, err.Error())
return nil, err
}

Expand All @@ -246,10 +248,10 @@ func (t *Table) query(cond *ConditionList, in *SDK.QueryInput) (*QueryResult, er
in.ConsistentRead = pointers.Bool(cond.isConsistent)
}

in.TableName = pointers.String(t.name)
in.TableName = pointers.String(t.nameWithPrefix)
req, err := t.service.client.Query(in)
if err != nil {
t.service.Errorf("error on `Query` operation; table=%s; error=%s", t.name, err.Error())
t.service.Errorf("error on `Query` operation; table=%s; error=%s", t.nameWithPrefix, err.Error())
return nil, err
}

Expand All @@ -274,13 +276,13 @@ func (t *Table) NewConditionList() *ConditionList {
// GetOne retrieves a single item by GetOne(HashKey [, RangeKey])
func (t *Table) GetOne(hashValue interface{}, rangeValue ...interface{}) (map[string]interface{}, error) {
in := &SDK.GetItemInput{
TableName: pointers.String(t.name),
TableName: pointers.String(t.nameWithPrefix),
Key: t.design.keyAttributeValue(hashValue, rangeValue...),
}
req, err := t.service.client.GetItem(in)
switch {
case err != nil:
t.service.Errorf("error on `GetItem` operation; table=%s; error=%s", t.name, err.Error())
t.service.Errorf("error on `GetItem` operation; table=%s; error=%s", t.nameWithPrefix, err.Error())
return nil, err
case req.Item == nil:
return nil, nil
Expand All @@ -296,14 +298,14 @@ func (t *Table) GetOne(hashValue interface{}, rangeValue ...interface{}) (map[st
// Delete deletes the item.
func (t *Table) Delete(hashValue interface{}, rangeValue ...interface{}) error {
in := &SDK.DeleteItemInput{
TableName: pointers.String(t.name),
TableName: pointers.String(t.nameWithPrefix),
Key: t.design.keyAttributeValue(hashValue, rangeValue...),
}

fmt.Printf("hashValue: %v, rangeValue: %v, Delete: %+v\n", hashValue, rangeValue, in)
_, err := t.service.client.DeleteItem(in)
if err != nil {
t.service.Errorf("error on `DeleteItem` operation; table=%s; error=%s", t.name, err.Error())
t.service.Errorf("error on `DeleteItem` operation; table=%s; error=%s", t.nameWithPrefix, err.Error())
return err
}
return nil
Expand Down
33 changes: 18 additions & 15 deletions s3/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,24 @@ const (
type Bucket struct {
service *S3

name string
endpoint string
expireSecond int
name string
nameWithPrefix string
endpoint string
expireSecond int

putSpoolMu sync.Mutex
putSpool []*SDK.PutObjectInput
}

// NewBucket returns initialized *Bucket.
func NewBucket(name string, service *S3) *Bucket {
func NewBucket(svc *S3, name string) *Bucket {
bucketName := svc.prefix + name
return &Bucket{
service: service,
name: name,
endpoint: service.endpoint,
expireSecond: defaultExpireSecond,
service: svc,
name: name,
nameWithPrefix: bucketName,
endpoint: svc.endpoint,
expireSecond: defaultExpireSecond,
}
}

Expand All @@ -69,7 +72,7 @@ func (b *Bucket) addObject(obj *PutObject, path, acl string) {
size := obj.Size()
req := &SDK.PutObjectInput{
ACL: &acl,
Bucket: &b.name,
Bucket: &b.nameWithPrefix,
Body: obj.data,
ContentLength: &size,
ContentType: pointers.String(obj.FileType()),
Expand All @@ -88,7 +91,7 @@ func (b *Bucket) PutAll() error {
for _, obj := range b.putSpool {
_, err := cli.PutObject(obj)
if err != nil {
b.service.Errorf("error on `PutObject` operation; bucket=%s; error=%s;", b.name, err.Error())
b.service.Errorf("error on `PutObject` operation; bucket=%s; error=%s;", b.nameWithPrefix, err.Error())
errList.Add(err)
}
}
Expand All @@ -105,7 +108,7 @@ func (b *Bucket) PutOne(obj *PutObject, path, acl string) error {
size := obj.Size()
req := &SDK.PutObjectInput{
ACL: &acl,
Bucket: &b.name,
Bucket: &b.nameWithPrefix,
Body: obj.data,
ContentLength: &size,
ContentType: pointers.String(obj.FileType()),
Expand All @@ -114,7 +117,7 @@ func (b *Bucket) PutOne(obj *PutObject, path, acl string) error {

_, err := b.service.client.PutObject(req)
if err != nil {
b.service.Errorf("error on `PutObject` operation; bucket=%s; error=%s;", b.name, err.Error())
b.service.Errorf("error on `PutObject` operation; bucket=%s; error=%s;", b.nameWithPrefix, err.Error())
}
return err
}
Expand Down Expand Up @@ -157,7 +160,7 @@ func (b *Bucket) GetSecretURL(path string) (string, error) {
// ** this isn't work **
func (b *Bucket) GetSecretURLWithExpire(path string, expire int) (string, error) {
req, _ := b.service.client.GetObjectRequest(&SDK.GetObjectInput{
Bucket: pointers.String(b.name),
Bucket: pointers.String(b.nameWithPrefix),
Key: pointers.String(path),
})
return req.Presign(time.Duration(expire) * time.Second)
Expand All @@ -166,11 +169,11 @@ func (b *Bucket) GetSecretURLWithExpire(path string, expire int) (string, error)
// DeleteObject deletees the object of target path.
func (b *Bucket) DeleteObject(path string) error {
_, err := b.service.client.DeleteObject(&SDK.DeleteObjectInput{
Bucket: pointers.String(b.name),
Bucket: pointers.String(b.nameWithPrefix),
Key: pointers.String(path),
})
if err != nil {
b.service.Errorf("error on `GetObject` operation; bucket=%s; error=%s;", b.name, err.Error())
b.service.Errorf("error on `GetObject` operation; bucket=%s; error=%s;", b.nameWithPrefix, err.Error())
}
return err
}
2 changes: 1 addition & 1 deletion s3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (svc *S3) GetBucket(bucket string) (*Bucket, error) {
return nil, err
}

b = NewBucket(bucketName, svc)
b = NewBucket(svc, bucket)
svc.bucketsMu.Lock()
svc.buckets[bucketName] = b
svc.bucketsMu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion s3/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestGetBucket(t *testing.T) {

b3, err := svc2.GetBucket(testEmptyBucketName)
assert.NoError(err)
assert.Equal(testPrefix+testEmptyBucketName, b3.name)
assert.Equal(testPrefix+testEmptyBucketName, b3.nameWithPrefix)
}

func TestIsExistBucket(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion sns/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (svc *SNS) CreateTopic(name string) (*Topic, error) {
return nil, err
}

topic := NewTopic(arn, name, svc)
topic := NewTopic(svc, arn, name)
return topic, nil
}

Expand Down
25 changes: 14 additions & 11 deletions sns/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,22 @@ import (

// Topic is struct for Topic.
type Topic struct {
svc *SNS
name string
arn string
sound string
svc *SNS
name string
nameWithPrefix string
arn string
sound string
}

// NewTopic returns initialized *Topic.
func NewTopic(arn, name string, svc *SNS) *Topic {
func NewTopic(svc *SNS, arn, name string) *Topic {
topicName := svc.prefix + name
return &Topic{
arn: arn,
name: name,
sound: "default",
svc: svc,
svc: svc,
arn: arn,
name: name,
nameWithPrefix: topicName,
sound: "default",
}
}

Expand All @@ -32,7 +35,7 @@ func (t *Topic) Subscribe(endpointARN, protocol string) (subscriptionARN string,
TopicArn: pointers.String(t.arn),
})
if err != nil {
t.svc.Errorf("error on `Subscribe` operation; name=%s; error=%s;", t.name, err.Error())
t.svc.Errorf("error on `Subscribe` operation; name=%s; error=%s;", t.nameWithPrefix, err.Error())
return "", err
}
return *resp.SubscriptionArn, nil
Expand All @@ -49,7 +52,7 @@ func (t *Topic) Delete() error {
TopicArn: pointers.String(t.arn),
})
if err != nil {
t.svc.Errorf("error on `DeleteTopic` operation; name=%s; error=%s;", t.name, err.Error())
t.svc.Errorf("error on `DeleteTopic` operation; name=%s; error=%s;", t.nameWithPrefix, err.Error())
}
return err
}
2 changes: 1 addition & 1 deletion sns/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func TestNewTopic(t *testing.T) {
assert := assert.New(t)
svc := getTestClient(t)

tp := NewTopic("arn", "name", svc)
tp := NewTopic(svc, "arn", "name")
assert.NotNil(tp)
assert.Equal("arn", tp.arn)
assert.Equal("name", tp.name)
Expand Down
2 changes: 1 addition & 1 deletion sqs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (svc *SQS) GetQueue(name string) (*Queue, error) {
return nil, err
}

q = NewQueue(queueName, *url.QueueUrl, svc)
q = NewQueue(svc, name, *url.QueueUrl)
svc.queuesMu.Lock()
svc.queues[queueName] = q
svc.queuesMu.Unlock()
Expand Down
Loading

0 comments on commit 6241acb

Please sign in to comment.