Skip to content

Commit

Permalink
Fix: creating gsi under on-demand mode (#85)
Browse files Browse the repository at this point in the history
Issue #, if available:

Description of changes:

- fix empty NonKeyAttributes validation issue
- fix GSI creation issue when using `pay-per-request` mode
```shell
2023-04-19T17:31:29.932+0200    ERROR   Reconciler error        {"controller": "table", "controllerGroup": "dynamodb.services.k8s.aws", "controllerKind": "Table", "Table": {"name":"ack-demo-table-provisioned-global-secondary-indexes","namespace":"ack-system"}, "namespace": "ack-system", "name": "ack-demo-table-provisioned-global-secondary-indexes", "reconcileID": "c7354a77-56c4-4c56-b715-f16b59d1926e", "error": "ValidationException: One or more parameter values were invalid: Neither ReadCapacityUnits nor WriteCapacityUnits can be specified when BillingMode is PAY_PER_REQUEST\n\tstatus code: 400, request id: R901I15N84RUVS24D4HVRP6L3JVV4KQNSO5AEMVJF66Q9ASUAAJG"}

2023-04-19T18:54:54.887+0200    ERROR   Reconciler error        {"controller": "table", "controllerGroup": "dynamodb.services.k8s.aws", "controllerKind": "Table", "Table": {"name":"ack-demo-table-provisioned-global-secondary-indexes","namespace":"ack-system"}, "namespace": "ack-system", "name": "ack-demo-table-provisioned-global-secondary-indexes", "reconcileID": "223f9f39-f2cc-460c-ad78-56fce55b4bf3", "error": "ValidationException: One or more parameter values were invalid: ProvisionedThroughput should not be specified for index: id-index when BillingMode is PAY_PER_REQUEST\n\tstatus code: 400, request id: JO5OGQR8IN74JJ2C62O43ESU43VV4KQNSO5AEMVJF66Q9ASUAAJG"}
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler
```
```yaml
    conditions:
    - message: |
        InvalidParameter: 3 validation error(s) found.
        - minimum field size of 1, UpdateTableInput.GlobalSecondaryIndexUpdates[0].Create.Projection.NonKeyAttributes.
        - minimum field value of 1, UpdateTableInput.GlobalSecondaryIndexUpdates[0].Create.ProvisionedThroughput.ReadCapacityUnits.
        - minimum field value of 1, UpdateTableInput.GlobalSecondaryIndexUpdates[0].Create.ProvisionedThroughput.WriteCapacityUnits.
      status: "True"
      type: ACK.Terminal
```

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
  • Loading branch information
Julian-Chu committed Nov 14, 2023
1 parent ccb4c56 commit b52e89f
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 22 deletions.
34 changes: 16 additions & 18 deletions pkg/resource/table/hooks_global_secondary_indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,21 +233,21 @@ func (rm *resourceManager) newUpdateTableGlobalSecondaryIndexUpdatesPayload(

// newSDKProvisionedThroughput builds a new *svcsdk.ProvisionedThroughput
func newSDKProvisionedThroughput(pt *v1alpha1.ProvisionedThroughput) *svcsdk.ProvisionedThroughput {
provisionedThroughput := &svcsdk.ProvisionedThroughput{}
if pt != nil {
if pt.ReadCapacityUnits != nil {
provisionedThroughput.ReadCapacityUnits = aws.Int64(*pt.ReadCapacityUnits)
} else {
provisionedThroughput.ReadCapacityUnits = aws.Int64(0)
}
if pt.WriteCapacityUnits != nil {
provisionedThroughput.WriteCapacityUnits = aws.Int64(*pt.WriteCapacityUnits)
} else {
provisionedThroughput.WriteCapacityUnits = aws.Int64(0)
}
} else {
provisionedThroughput.ReadCapacityUnits = aws.Int64(0)
provisionedThroughput.WriteCapacityUnits = aws.Int64(0)
if pt == nil {
return nil
}
provisionedThroughput := &svcsdk.ProvisionedThroughput{
// ref: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_ProvisionedThroughput.html
// Minimum capacity units is 1 when using provisioned capacity mode
ReadCapacityUnits: aws.Int64(1),
WriteCapacityUnits: aws.Int64(1),
}
if pt.ReadCapacityUnits != nil {
provisionedThroughput.ReadCapacityUnits = aws.Int64(*pt.ReadCapacityUnits)
}

if pt.WriteCapacityUnits != nil {
provisionedThroughput.WriteCapacityUnits = aws.Int64(*pt.WriteCapacityUnits)
}
return provisionedThroughput
}
Expand All @@ -263,12 +263,10 @@ func newSDKProjection(p *v1alpha1.Projection) *svcsdk.Projection {
}
if p.NonKeyAttributes != nil {
projection.NonKeyAttributes = p.NonKeyAttributes
} else {
projection.NonKeyAttributes = []*string{}
}
} else {
projection.ProjectionType = aws.String("")
projection.NonKeyAttributes = []*string{}
projection.NonKeyAttributes = nil
}
return projection
}
Expand Down
76 changes: 76 additions & 0 deletions pkg/resource/table/hooks_global_secondary_indexes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package table

import (
"reflect"
"testing"

"github.com/aws/aws-sdk-go/aws"
svcsdk "github.com/aws/aws-sdk-go/service/dynamodb"

"github.com/aws-controllers-k8s/dynamodb-controller/apis/v1alpha1"
)

func Test_newSDKProvisionedThroughput(t *testing.T) {
type args struct {
pt *v1alpha1.ProvisionedThroughput
}
tests := []struct {
name string
args args
want *svcsdk.ProvisionedThroughput
}{
{
name: "provisioned throughput is nil",
args: args{
pt: nil,
},
want: nil,
},
{
name: "provisioned throughput is not nil, read capacity units is nil",
args: args{
pt: &v1alpha1.ProvisionedThroughput{
ReadCapacityUnits: nil,
WriteCapacityUnits: aws.Int64(10),
},
},
want: &svcsdk.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(1),
WriteCapacityUnits: aws.Int64(10),
},
},
{
name: "provisioned throughput is not nil, write capacity units is nil",
args: args{
pt: &v1alpha1.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(10),
WriteCapacityUnits: nil,
},
},
want: &svcsdk.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(10),
WriteCapacityUnits: aws.Int64(1),
},
},
{
name: "provisioned throughput is not nil, write and read capacity units are not nil",
args: args{
pt: &v1alpha1.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(5),
WriteCapacityUnits: aws.Int64(5),
},
},
want: &svcsdk.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(5),
WriteCapacityUnits: aws.Int64(5),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := newSDKProvisionedThroughput(tt.args.pt); !reflect.DeepEqual(got, tt.want) {
t.Errorf("newSDKProvisionedThroughput() = %v, want %v", got, tt.want)
}
})
}
}
3 changes: 3 additions & 0 deletions pkg/resource/table/sdk.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions templates/hooks/table/sdk_read_one_post_set_output.go.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@
if isTableUpdating(&resource{ko}) {
return &resource{ko}, requeueWaitWhileUpdating
}
if !canUpdateTableGSIs(&resource{ko}) {
return &resource{ko}, requeueWaitGSIReady
}
if err := rm.setResourceAdditionalFields(ctx, ko); err != nil {
return nil, err
}
19 changes: 19 additions & 0 deletions test/e2e/resources/table_basic_pay_per_request.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Table used to test GSI creation under on-demand billing mode
apiVersion: dynamodb.services.k8s.aws/v1alpha1
kind: Table
metadata:
name: $TABLE_NAME
spec:
tableName: $TABLE_NAME
billingMode: PAY_PER_REQUEST
tableClass: STANDARD
attributeDefinitions:
- attributeName: Bill
attributeType: S
- attributeName: Total
attributeType: S
keySchema:
- attributeName: Bill
keyType: HASH
- attributeName: Total
keyType: RANGE
11 changes: 8 additions & 3 deletions test/e2e/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,16 @@ def __init__(self, gsis):
self.match_on = gsis

def __call__(self, record: dict) -> bool:
gsi_key = "GlobalSecondaryIndexes"
if len(self.match_on) == 0:
return (not 'GlobalSecondaryIndexes' in record) or len(record["GlobalSecondaryIndexes"] == 0)
return (gsi_key not in record) or len(record[gsi_key]) == 0

awsGSIs = record["GlobalSecondaryIndexes"]
if len(self.match_on) != len(record["GlobalSecondaryIndexes"]):
# if GSI is still in creating status , it will not be present in the record
if gsi_key not in record:
return False

awsGSIs = record[gsi_key]
if len(self.match_on) != len(record[gsi_key]):
return False

for awsGSI in awsGSIs:
Expand Down
81 changes: 80 additions & 1 deletion test/e2e/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@ def table_basic():
except:
pass

@pytest.fixture(scope="module")
def table_basic_pay_per_request():
resource_name = random_suffix_name("table-basic-pay-per-request", 32)
(ref, cr) = create_table(resource_name, "table_basic_pay_per_request")

yield ref, cr
try:
_, deleted = k8s.delete_custom_resource(ref, wait_periods=3, period_length=10)
assert deleted
except:
pass

@service_marker
@pytest.mark.canary
class TestTable:
Expand Down Expand Up @@ -726,7 +738,7 @@ def test_multi_updates(self, table_gsi):
"readCapacityUnits": 10,
"writeCapacityUnits": 10
}
cr["spec"][ "sseSpecification"] = {
cr["spec"]["sseSpecification"] = {
"enabled": True,
"sseType": "KMS"
}
Expand All @@ -750,7 +762,11 @@ def test_multi_updates(self, table_gsi):
# GSI is the last element to get update in the code path... so we just wait for it
# to know that all the fields got updated.

# encounter an issue when running E2E test locally, sometimes the gsi is updated,
# but SSEDescription is still being updated, add 2mins to wait (Julian Chu)
time.sleep(120)
latestTable = table.get(table_name)
logging.info("latestTable: %s", latestTable)
assert latestTable["StreamSpecification"] is not None
assert latestTable["StreamSpecification"]["StreamEnabled"]

Expand All @@ -760,3 +776,66 @@ def test_multi_updates(self, table_gsi):
assert latestTable["ProvisionedThroughput"] is not None
assert latestTable["ProvisionedThroughput"]["ReadCapacityUnits"] == 10
assert latestTable["ProvisionedThroughput"]["WriteCapacityUnits"] == 10

def test_create_gsi_pay_per_request(self, table_basic_pay_per_request):
(ref, res) = table_basic_pay_per_request

table_name = res["spec"]["tableName"]

# Check DynamoDB Table exists
assert self.table_exists(table_name)

# Get CR latest revision
cr = k8s.wait_resource_consumed_by_controller(ref)

# Creating two more GSIs
cr["spec"]["attributeDefinitions"] = [
{
"attributeName": "Bill",
"attributeType": "S"
},
{
"attributeName": "Total",
"attributeType": "S"
},
{
"attributeName": "User",
"attributeType": "S"
},
]

gsi = {
"indexName": "bill-per-user",
"keySchema": [
{
"attributeName": "User",
"keyType": "HASH",
},
{
"attributeName": "Bill",
"keyType": "RANGE",
}
],
"projection": {
"projectionType": "ALL",
}
}

cr["spec"]['globalSecondaryIndexes'] = [
gsi,
]

# Patch k8s resource
k8s.patch_custom_resource(ref, cr)
k8s.wait_resource_consumed_by_controller(ref)
table.wait_until(
table_name,
table.gsi_matches(
[
gsi
],
),
timeout_seconds=MODIFY_WAIT_AFTER_SECONDS*40,
interval_seconds=15,
)

0 comments on commit b52e89f

Please sign in to comment.