Skip to content

Commit

Permalink
Merge pull request #24047 from fermezz/fix-start-replication-task
Browse files Browse the repository at this point in the history
Stop replication task before applying changes that require the task to be stopped.
  • Loading branch information
johnsonaj committed Apr 26, 2022
2 parents 60e22dd + 824cb0c commit d22470a
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 26 deletions.
3 changes: 3 additions & 0 deletions .changelog/24047.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
resource/aws_dms_replication_task: Fix to stop the task before updating, if required
```
23 changes: 18 additions & 5 deletions internal/service/dms/replication_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,15 @@ func resourceReplicationTaskUpdate(d *schema.ResourceData, meta interface{}) err
input.TableMappings = aws.String(d.Get("table_mappings").(string))
}

log.Println("[DEBUG] DMS update replication task:", input)
status := d.Get("status").(string)
if status == replicationTaskStatusRunning {
log.Println("[DEBUG] stopping DMS replication task:", input)
if err := stopReplicationTask(d.Id(), conn); err != nil {
return err
}
}

log.Println("[DEBUG] updating DMS replication task:", input)
_, err := conn.ModifyReplicationTask(input)
if err != nil {
return fmt.Errorf("error updating DMS Replication Task (%s): %w", d.Id(), err)
Expand All @@ -265,7 +272,8 @@ func resourceReplicationTaskUpdate(d *schema.ResourceData, meta interface{}) err
}

if d.Get("start_replication_task").(bool) {
if err := startReplicationTask(d.Id(), conn); err != nil {
err := startReplicationTask(d.Id(), conn)
if err != nil {
return err
}
}
Expand Down Expand Up @@ -373,9 +381,14 @@ func startReplicationTask(id string, conn *dms.DatabaseMigrationService) error {
return fmt.Errorf("error reading DMS Replication Task (%s): empty output", id)
}

startReplicationTaskType := dms.StartReplicationTaskTypeValueStartReplication
if aws.StringValue(task.Status) != replicationTaskStatusReady {
startReplicationTaskType = dms.StartReplicationTaskTypeValueResumeProcessing
}

_, err = conn.StartReplicationTask(&dms.StartReplicationTaskInput{
ReplicationTaskArn: task.ReplicationTaskArn,
StartReplicationTaskType: aws.String(dms.StartReplicationTaskTypeValueStartReplication),
StartReplicationTaskType: aws.String(startReplicationTaskType),
})

if err != nil {
Expand All @@ -384,7 +397,7 @@ func startReplicationTask(id string, conn *dms.DatabaseMigrationService) error {

err = waitReplicationTaskRunning(conn, id)
if err != nil {
return fmt.Errorf("error wating for DMS Replication Task (%s) start: %w", id, err)
return fmt.Errorf("error waiting for DMS Replication Task (%s) start: %w", id, err)
}

return nil
Expand Down Expand Up @@ -412,7 +425,7 @@ func stopReplicationTask(id string, conn *dms.DatabaseMigrationService) error {

err = waitReplicationTaskStopped(conn, id)
if err != nil {
return fmt.Errorf("error wating for DMS Replication Task (%s) stop: %w", id, err)
return fmt.Errorf("error waiting for DMS Replication Task (%s) stop: %w", id, err)
}

return nil
Expand Down
Loading

0 comments on commit d22470a

Please sign in to comment.