@@ -18,14 +18,15 @@ import (
1818 "strings"
1919 "time"
2020
21- ackrtlog "github.com/aws-controllers-k8s/runtime/pkg/runtime/log"
2221 ackerr "github.com/aws-controllers-k8s/runtime/pkg/errors"
2322 ackrequeue "github.com/aws-controllers-k8s/runtime/pkg/requeue"
23+ ackrtlog "github.com/aws-controllers-k8s/runtime/pkg/runtime/log"
2424 "github.com/aws/aws-sdk-go-v2/aws"
2525 svcsdk "github.com/aws/aws-sdk-go-v2/service/dynamodb"
2626 svcsdktypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
2727
2828 "github.com/aws-controllers-k8s/dynamodb-controller/apis/v1alpha1"
29+ svcapitypes "github.com/aws-controllers-k8s/dynamodb-controller/apis/v1alpha1"
2930)
3031
3132// equalCreateReplicationGroupMemberActions compares two CreateReplicationGroupMemberAction objects
@@ -262,25 +263,6 @@ func deleteReplicaUpdate(regionName string) svcsdktypes.ReplicationGroupUpdate {
262263 }
263264}
264265
265- // canUpdateTableReplicas returns true if it's possible to update table replicas.
266- // We can only modify replicas when they are in ACTIVE state.
267- func canUpdateTableReplicas (r * resource ) bool {
268- // Check if Status or Replicas is nil
269- // needed when called by sdkdelete
270- if r == nil || r .ko == nil || r .ko .Status .Replicas == nil {
271- return true // If no replicas exist, we can proceed with updates
272- }
273- // Check if any replica is not in ACTIVE state
274- for _ , replicaDesc := range r .ko .Status .Replicas {
275- if replicaDesc .RegionName != nil && replicaDesc .ReplicaStatus != nil {
276- if * replicaDesc .ReplicaStatus != string (svcsdktypes .ReplicaStatusActive ) {
277- return false
278- }
279- }
280- }
281- return true
282- }
283-
284266// hasStreamSpecificationWithNewAndOldImages checks if the table has DynamoDB Streams enabled
285267// with the stream containing both the new and the old images of the item.
286268func hasStreamSpecificationWithNewAndOldImages (r * resource ) bool {
@@ -317,8 +299,8 @@ func (rm *resourceManager) syncReplicas(
317299 // Handle specific errors
318300 if awsErr , ok := ackerr .AWSError (err ); ok {
319301 // Handle ValidationException - when replicas are not part of the global table
320- if awsErr .ErrorCode () == "ValidationException" &&
321- strings .Contains (awsErr .ErrorMessage (), "not part of the global table" ) {
302+ if awsErr .ErrorCode () == "ValidationException" &&
303+ strings .Contains (awsErr .ErrorMessage (), "not part of the global table" ) {
322304 // A replica was already deleted
323305 rlog .Debug ("replica already deleted from global table" ,
324306 "table" , * latest .ko .Spec .TableName ,
@@ -328,7 +310,7 @@ func (rm *resourceManager) syncReplicas(
328310 30 * time .Second ,
329311 )
330312 }
331-
313+
332314 // Handle ResourceInUseException - when the table is being updated
333315 if awsErr .ErrorCode () == "ResourceInUseException" {
334316 rlog .Debug ("table is currently in use, will retry" ,
@@ -382,20 +364,29 @@ func (rm *resourceManager) newUpdateTableReplicaUpdatesOneAtATimePayload(
382364
383365 if len (createReplicas ) > 0 {
384366 replica := * createReplicas [0 ]
367+ if checkReplicaStatus (latest .ko .Status .Replicas , * replica .RegionName ) {
368+ return nil , 0 , requeueWaitReplicasActive
369+ }
385370 rlog .Debug ("creating replica in region" , "table" , * desired .ko .Spec .TableName , "region" , * replica .RegionName )
386371 input .ReplicaUpdates = append (input .ReplicaUpdates , createReplicaUpdate (createReplicas [0 ]))
387372 return input , replicasInQueue , nil
388373 }
389374
390375 if len (updateReplicas ) > 0 {
391376 replica := * updateReplicas [0 ]
377+ if checkReplicaStatus (latest .ko .Status .Replicas , * replica .RegionName ) {
378+ return nil , 0 , requeueWaitReplicasActive
379+ }
392380 rlog .Debug ("updating replica in region" , "table" , * desired .ko .Spec .TableName , "region" , * replica .RegionName )
393381 input .ReplicaUpdates = append (input .ReplicaUpdates , updateReplicaUpdate (updateReplicas [0 ]))
394382 return input , replicasInQueue , nil
395383 }
396384
397385 if len (deleteRegions ) > 0 {
398386 replica := deleteRegions [0 ]
387+ if checkReplicaStatus (latest .ko .Status .Replicas , replica ) {
388+ return nil , 0 , requeueWaitReplicasActive
389+ }
399390 rlog .Debug ("deleting replica in region" , "table" , * desired .ko .Spec .TableName , "region" , replica )
400391 input .ReplicaUpdates = append (input .ReplicaUpdates , deleteReplicaUpdate (deleteRegions [0 ]))
401392 return input , replicasInQueue , nil
@@ -451,3 +442,58 @@ func calculateReplicaUpdates(
451442
452443 return createReplicas , updateReplicas , deleteRegions
453444}
445+
446+ func setTableReplicas (ko * svcapitypes.Table , replicas []svcsdktypes.ReplicaDescription ) {
447+ if len (replicas ) > 0 {
448+ tableReplicas := []* v1alpha1.CreateReplicationGroupMemberAction {}
449+ for _ , replica := range replicas {
450+ replicaElem := & v1alpha1.CreateReplicationGroupMemberAction {}
451+ if replica .RegionName != nil {
452+ replicaElem .RegionName = replica .RegionName
453+ }
454+ if replica .KMSMasterKeyId != nil {
455+ replicaElem .KMSMasterKeyID = replica .KMSMasterKeyId
456+ }
457+ if replica .ProvisionedThroughputOverride != nil {
458+ replicaElem .ProvisionedThroughputOverride = & v1alpha1.ProvisionedThroughputOverride {
459+ ReadCapacityUnits : replica .ProvisionedThroughputOverride .ReadCapacityUnits ,
460+ }
461+ }
462+ if replica .GlobalSecondaryIndexes != nil {
463+ gsiList := []* v1alpha1.ReplicaGlobalSecondaryIndex {}
464+ for _ , gsi := range replica .GlobalSecondaryIndexes {
465+ gsiElem := & v1alpha1.ReplicaGlobalSecondaryIndex {
466+ IndexName : gsi .IndexName ,
467+ }
468+ if gsi .ProvisionedThroughputOverride != nil {
469+ gsiElem .ProvisionedThroughputOverride = & v1alpha1.ProvisionedThroughputOverride {
470+ ReadCapacityUnits : gsi .ProvisionedThroughputOverride .ReadCapacityUnits ,
471+ }
472+ }
473+ gsiList = append (gsiList , gsiElem )
474+ }
475+ replicaElem .GlobalSecondaryIndexes = gsiList
476+ }
477+ if replica .ReplicaTableClassSummary != nil && replica .ReplicaTableClassSummary .TableClass != "" {
478+ replicaElem .TableClassOverride = aws .String (string (replica .ReplicaTableClassSummary .TableClass ))
479+ }
480+ tableReplicas = append (tableReplicas , replicaElem )
481+ }
482+ ko .Spec .TableReplicas = tableReplicas
483+ } else {
484+ ko .Spec .TableReplicas = nil
485+ }
486+ }
487+
488+ func checkReplicaStatus (ReplicaDescription []* svcapitypes.ReplicaDescription , regionName string ) bool {
489+ for _ , replica := range ReplicaDescription {
490+ if * replica .RegionName == regionName {
491+ replicaStatus := replica .ReplicaStatus
492+ if * replicaStatus == string (svcsdktypes .ReplicaStatusCreating ) || * replicaStatus == string (svcsdktypes .ReplicaStatusDeleting ) || * replicaStatus == string (svcsdktypes .ReplicaStatusUpdating ) {
493+ return true
494+ }
495+ }
496+ }
497+
498+ return false
499+ }
0 commit comments