Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: creating gsi under on-demand mode #85

Merged
merged 5 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 16 additions & 18 deletions pkg/resource/table/hooks_global_secondary_indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,21 +233,21 @@ func (rm *resourceManager) newUpdateTableGlobalSecondaryIndexUpdatesPayload(

// newSDKProvisionedThroughput builds a new *svcsdk.ProvisionedThroughput
func newSDKProvisionedThroughput(pt *v1alpha1.ProvisionedThroughput) *svcsdk.ProvisionedThroughput {
provisionedThroughput := &svcsdk.ProvisionedThroughput{}
if pt != nil {
if pt.ReadCapacityUnits != nil {
provisionedThroughput.ReadCapacityUnits = aws.Int64(*pt.ReadCapacityUnits)
} else {
provisionedThroughput.ReadCapacityUnits = aws.Int64(0)
}
if pt.WriteCapacityUnits != nil {
provisionedThroughput.WriteCapacityUnits = aws.Int64(*pt.WriteCapacityUnits)
} else {
provisionedThroughput.WriteCapacityUnits = aws.Int64(0)
}
} else {
provisionedThroughput.ReadCapacityUnits = aws.Int64(0)
provisionedThroughput.WriteCapacityUnits = aws.Int64(0)
if pt == nil {
return nil
}
Comment on lines +236 to +238
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch sir!

provisionedThroughput := &svcsdk.ProvisionedThroughput{
// ref: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_ProvisionedThroughput.html
// Minimum capacity units is 1 when using provisioned capacity mode
ReadCapacityUnits: aws.Int64(1),
WriteCapacityUnits: aws.Int64(1),
}
if pt.ReadCapacityUnits != nil {
provisionedThroughput.ReadCapacityUnits = aws.Int64(*pt.ReadCapacityUnits)
}

if pt.WriteCapacityUnits != nil {
provisionedThroughput.WriteCapacityUnits = aws.Int64(*pt.WriteCapacityUnits)
Comment on lines +239 to +250
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts: What if we're not using PROVISIONED capacity mode? Would 0's be allowed? If that's the case would it be worth to evaluate the capacity mode in this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you mean the PAY_PER_REQUEST mode.
this is tricky part. IIRC, we don't allow to set 0 capacity units for PAY_PER_REQUEST , please see the following message

   conditions:
    - message: |
        InvalidParameter: 3 validation error(s) found.
        - minimum field size of 1, UpdateTableInput.GlobalSecondaryIndexUpdates[0].Create.Projection.NonKeyAttributes.
        - minimum field value of 1, UpdateTableInput.GlobalSecondaryIndexUpdates[0].Create.ProvisionedThroughput.ReadCapacityUnits.
        - minimum field value of 1, UpdateTableInput.GlobalSecondaryIndexUpdates[0].Create.ProvisionedThroughput.WriteCapacityUnits.
      status: "True"
      type: ACK.Terminal

but aws DDB api will set capacity units as 0 when PAY_PER_REQUEST

If you set BillingMode as PROVISIONED, you must specify this property. If you set BillingMode as PAY_PER_REQUEST, you cannot specify this property.

https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-dynamodb-table.html#cfn-dynamodb-table-provisionedthroughput

If read/write capacity mode is PAY_PER_REQUEST the value is set to 0.

https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_ProvisionedThroughput.html

}
return provisionedThroughput
}
Expand All @@ -263,12 +263,10 @@ func newSDKProjection(p *v1alpha1.Projection) *svcsdk.Projection {
}
if p.NonKeyAttributes != nil {
projection.NonKeyAttributes = p.NonKeyAttributes
} else {
projection.NonKeyAttributes = []*string{}
}
} else {
projection.ProjectionType = aws.String("")
projection.NonKeyAttributes = []*string{}
projection.NonKeyAttributes = nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another nice catch!

}
return projection
}
Expand Down
76 changes: 76 additions & 0 deletions pkg/resource/table/hooks_global_secondary_indexes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package table

import (
"reflect"
"testing"

"github.com/aws/aws-sdk-go/aws"
svcsdk "github.com/aws/aws-sdk-go/service/dynamodb"

"github.com/aws-controllers-k8s/dynamodb-controller/apis/v1alpha1"
)

func Test_newSDKProvisionedThroughput(t *testing.T) {
type args struct {
pt *v1alpha1.ProvisionedThroughput
}
tests := []struct {
name string
args args
want *svcsdk.ProvisionedThroughput
}{
{
name: "provisioned throughput is nil",
args: args{
pt: nil,
},
want: nil,
},
{
name: "provisioned throughput is not nil, read capacity units is nil",
args: args{
pt: &v1alpha1.ProvisionedThroughput{
ReadCapacityUnits: nil,
WriteCapacityUnits: aws.Int64(10),
},
},
want: &svcsdk.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(1),
WriteCapacityUnits: aws.Int64(10),
},
},
{
name: "provisioned throughput is not nil, write capacity units is nil",
args: args{
pt: &v1alpha1.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(10),
WriteCapacityUnits: nil,
},
},
want: &svcsdk.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(10),
WriteCapacityUnits: aws.Int64(1),
},
},
{
name: "provisioned throughput is not nil, write and read capacity units are not nil",
args: args{
pt: &v1alpha1.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(5),
WriteCapacityUnits: aws.Int64(5),
},
},
want: &svcsdk.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(5),
WriteCapacityUnits: aws.Int64(5),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := newSDKProvisionedThroughput(tt.args.pt); !reflect.DeepEqual(got, tt.want) {
t.Errorf("newSDKProvisionedThroughput() = %v, want %v", got, tt.want)
}
})
}
}
3 changes: 3 additions & 0 deletions pkg/resource/table/sdk.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions templates/hooks/table/sdk_read_one_post_set_output.go.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@
if isTableUpdating(&resource{ko}) {
return &resource{ko}, requeueWaitWhileUpdating
}
if !canUpdateTableGSIs(&resource{ko}) {
return &resource{ko}, requeueWaitGSIReady
}
if err := rm.setResourceAdditionalFields(ctx, ko); err != nil {
return nil, err
}
19 changes: 19 additions & 0 deletions test/e2e/resources/table_basic_pay_per_request.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Table used to test GSI creation under on-demand billing mode
apiVersion: dynamodb.services.k8s.aws/v1alpha1
kind: Table
metadata:
name: $TABLE_NAME
spec:
tableName: $TABLE_NAME
billingMode: PAY_PER_REQUEST
tableClass: STANDARD
attributeDefinitions:
- attributeName: Bill
attributeType: S
- attributeName: Total
attributeType: S
keySchema:
- attributeName: Bill
keyType: HASH
- attributeName: Total
keyType: RANGE
11 changes: 8 additions & 3 deletions test/e2e/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,16 @@ def __init__(self, gsis):
self.match_on = gsis

def __call__(self, record: dict) -> bool:
gsi_key = "GlobalSecondaryIndexes"
if len(self.match_on) == 0:
return (not 'GlobalSecondaryIndexes' in record) or len(record["GlobalSecondaryIndexes"] == 0)
return (gsi_key not in record) or len(record[gsi_key]) == 0

awsGSIs = record["GlobalSecondaryIndexes"]
if len(self.match_on) != len(record["GlobalSecondaryIndexes"]):
# if GSI is still in creating status , it will not be present in the record
if gsi_key not in record:
return False

awsGSIs = record[gsi_key]
if len(self.match_on) != len(record[gsi_key]):
return False

for awsGSI in awsGSIs:
Expand Down
81 changes: 80 additions & 1 deletion test/e2e/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@ def table_basic():
except:
pass

@pytest.fixture(scope="module")
def table_basic_pay_per_request():
resource_name = random_suffix_name("table-basic-pay-per-request", 32)
(ref, cr) = create_table(resource_name, "table_basic_pay_per_request")

yield ref, cr
try:
_, deleted = k8s.delete_custom_resource(ref, wait_periods=3, period_length=10)
assert deleted
except:
pass

@service_marker
@pytest.mark.canary
class TestTable:
Expand Down Expand Up @@ -726,7 +738,7 @@ def test_multi_updates(self, table_gsi):
"readCapacityUnits": 10,
"writeCapacityUnits": 10
}
cr["spec"][ "sseSpecification"] = {
cr["spec"]["sseSpecification"] = {
"enabled": True,
"sseType": "KMS"
}
Expand All @@ -750,7 +762,11 @@ def test_multi_updates(self, table_gsi):
# GSI is the last element to get update in the code path... so we just wait for it
# to know that all the fields got updated.

# encounter an issue when running E2E test locally, sometimes the gsi is updated,
# but SSEDescription is still being updated, add 2mins to wait (Julian Chu)
time.sleep(120)
latestTable = table.get(table_name)
logging.info("latestTable: %s", latestTable)
assert latestTable["StreamSpecification"] is not None
assert latestTable["StreamSpecification"]["StreamEnabled"]

Expand All @@ -760,3 +776,66 @@ def test_multi_updates(self, table_gsi):
assert latestTable["ProvisionedThroughput"] is not None
assert latestTable["ProvisionedThroughput"]["ReadCapacityUnits"] == 10
assert latestTable["ProvisionedThroughput"]["WriteCapacityUnits"] == 10

def test_create_gsi_pay_per_request(self, table_basic_pay_per_request):
(ref, res) = table_basic_pay_per_request

table_name = res["spec"]["tableName"]

# Check DynamoDB Table exists
assert self.table_exists(table_name)

# Get CR latest revision
cr = k8s.wait_resource_consumed_by_controller(ref)

# Creating two more GSIs
cr["spec"]["attributeDefinitions"] = [
{
"attributeName": "Bill",
"attributeType": "S"
},
{
"attributeName": "Total",
"attributeType": "S"
},
{
"attributeName": "User",
"attributeType": "S"
},
]

gsi = {
"indexName": "bill-per-user",
"keySchema": [
{
"attributeName": "User",
"keyType": "HASH",
},
{
"attributeName": "Bill",
"keyType": "RANGE",
}
],
"projection": {
"projectionType": "ALL",
}
}

cr["spec"]['globalSecondaryIndexes'] = [
gsi,
]

# Patch k8s resource
k8s.patch_custom_resource(ref, cr)
k8s.wait_resource_consumed_by_controller(ref)
table.wait_until(
table_name,
table.gsi_matches(
[
gsi
],
),
timeout_seconds=MODIFY_WAIT_AFTER_SECONDS*40,
interval_seconds=15,
)