Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support DROP MIRROR for Query Replication #481

Merged
merged 2 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 56 additions & 9 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ func (h *FlowRequestHandler) CreateQRepFlow(
}

func (h *FlowRequestHandler) ShutdownFlow(
ctx context.Context, req *protos.ShutdownRequest) (*protos.ShutdownResponse, error) {
ctx context.Context,
req *protos.ShutdownRequest,
) (*protos.ShutdownResponse, error) {
err := h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
Expand Down Expand Up @@ -142,8 +144,24 @@ func (h *FlowRequestHandler) ShutdownFlow(
return nil, fmt.Errorf("unable to start DropFlow workflow: %w", err)
}

if err = dropFlowHandle.Get(ctx, nil); err != nil {
return nil, fmt.Errorf("DropFlow workflow did not execute successfully: %w", err)
cancelCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()

errChan := make(chan error, 1)
go func() {
errChan <- dropFlowHandle.Get(cancelCtx, nil)
}()

select {
case err := <-errChan:
if err != nil {
return nil, fmt.Errorf("DropFlow workflow did not execute successfully: %w", err)
}
case <-time.After(1 * time.Minute):
err := h.handleWorkflowNotClosed(ctx, workflowID, "")
if err != nil {
return nil, fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err)
}
}

return &protos.ShutdownResponse{
Expand All @@ -153,9 +171,9 @@ func (h *FlowRequestHandler) ShutdownFlow(

func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowID string) error {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 5 * time.Second
expBackoff.MaxInterval = 30 * time.Second
expBackoff.MaxElapsedTime = 5 * time.Minute
expBackoff.InitialInterval = 3 * time.Second
expBackoff.MaxInterval = 10 * time.Second
expBackoff.MaxElapsedTime = 1 * time.Minute

// empty will terminate the latest run
runID := ""
Expand All @@ -176,10 +194,39 @@ func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowI

err := backoff.Retry(operation, expBackoff)
if err != nil {
// terminate workflow if it is still running
reason := "PeerFlow workflow did not close in time"
err = h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, reason)
return h.handleWorkflowNotClosed(ctx, workflowID, runID)
}

return nil
}

func (h *FlowRequestHandler) handleWorkflowNotClosed(ctx context.Context, workflowID, runID string) error {
errChan := make(chan error, 1)

// Create a new context with timeout for CancelWorkflow
ctxWithTimeout, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()

// Call CancelWorkflow in a goroutine
go func() {
err := h.temporalClient.CancelWorkflow(ctxWithTimeout, workflowID, runID)
errChan <- err
}()

select {
case err := <-errChan:
if err != nil {
log.Errorf("unable to cancel PeerFlow workflow: %s. Attempting to terminate.", err.Error())
terminationReason := fmt.Sprintf("workflow %s did not cancel in time.", workflowID)
if err = h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, terminationReason); err != nil {
return fmt.Errorf("unable to terminate PeerFlow workflow: %w", err)
}
}
case <-time.After(1 * time.Minute):
// If 1 minute has passed and we haven't received an error, terminate the workflow
log.Errorf("Timeout reached while trying to cancel PeerFlow workflow. Attempting to terminate.")
terminationReason := fmt.Sprintf("workflow %s did not cancel in time.", workflowID)
if err := h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, terminationReason); err != nil {
return fmt.Errorf("unable to terminate PeerFlow workflow: %w", err)
}
}
Expand Down
25 changes: 18 additions & 7 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ const (
dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?"
isDeletedColumnName = "_PEERDB_IS_DELETED"
checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?"

syncRecordsChunkSize = 1024
)
Expand Down Expand Up @@ -936,16 +937,26 @@ func (c *SnowflakeConnector) SyncFlowCleanup(jobName string) error {
}
}()

_, err = syncFlowCleanupTx.ExecContext(c.ctx, fmt.Sprintf(dropTableIfExistsSQL, peerDBInternalSchema,
getRawTableIdentifier(jobName)))
row := syncFlowCleanupTx.QueryRowContext(c.ctx, checkSchemaExistsSQL, peerDBInternalSchema)
var schemaExists bool
err = row.Scan(&schemaExists)
if err != nil {
return fmt.Errorf("unable to drop raw table: %w", err)
return fmt.Errorf("unable to check if internal schema exists: %w", err)
}
_, err = syncFlowCleanupTx.ExecContext(c.ctx,
fmt.Sprintf(deleteJobMetadataSQL, peerDBInternalSchema, mirrorJobsTableIdentifier), jobName)
if err != nil {
return fmt.Errorf("unable to delete job metadata: %w", err)

if schemaExists {
_, err = syncFlowCleanupTx.ExecContext(c.ctx, fmt.Sprintf(dropTableIfExistsSQL, peerDBInternalSchema,
getRawTableIdentifier(jobName)))
if err != nil {
return fmt.Errorf("unable to drop raw table: %w", err)
}
_, err = syncFlowCleanupTx.ExecContext(c.ctx,
fmt.Sprintf(deleteJobMetadataSQL, peerDBInternalSchema, mirrorJobsTableIdentifier), jobName)
if err != nil {
return fmt.Errorf("unable to delete job metadata: %w", err)
}
}

err = syncFlowCleanupTx.Commit()
if err != nil {
return fmt.Errorf("unable to commit transaction for sync flow cleanup: %w", err)
Expand Down
14 changes: 9 additions & 5 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/google/uuid"
"go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/log"
Expand Down Expand Up @@ -231,14 +232,17 @@ func QRepFlowWorkflow(

// register a signal handler to terminate the workflow
terminateWorkflow := false
signalChan := workflow.GetSignalChannel(ctx, "terminate")
signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName)

s := workflow.NewSelector(ctx)
s.AddReceive(signalChan, func(c workflow.ReceiveChannel, _ bool) {
var signal string
c.Receive(ctx, &signal)
logger.Info("Received signal to terminate workflow", "Signal", signal)
terminateWorkflow = true
var signalVal shared.CDCFlowSignal
c.Receive(ctx, &signalVal)
logger.Info("received signal", "signal", signalVal)
if signalVal == shared.ShutdownSignal {
logger.Info("received shutdown signal")
terminateWorkflow = true
}
})

// register a query to get the number of partitions processed
Expand Down
Loading