Skip to content

Commit

Permalink
Merge branch 'main' into pgvalue-qrep
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed May 27, 2024
2 parents d9dec77 + 9cbaf17 commit 3295cad
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 62 deletions.
8 changes: 1 addition & 7 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,13 +547,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
return err
}

command := `
BEGIN;
DROP AGGREGATE IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4);
CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4);
DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4);
END;
`
command := peerdbenv.PeerDBWALHeartbeatQuery()
// run above command for each Postgres peer
for _, pgPeer := range pgPeers {
activity.RecordHeartbeat(ctx, pgPeer.Name)
Expand Down
26 changes: 7 additions & 19 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/yuin/gopher-lua"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
Expand Down Expand Up @@ -77,25 +78,12 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

var srcConn TPull
if sessionID == "" {
srcConn, err = connectors.GetAs[TPull](ctx, config.Source)
if err != nil {
return nil, err
}
defer connectors.CloseConnector(ctx, srcConn)

if err := srcConn.SetupReplConn(ctx); err != nil {
return nil, err
}
} else {
srcConn, err = waitForCdcCache[TPull](ctx, a, sessionID)
if err != nil {
return nil, err
}
if err := srcConn.ConnectionActive(ctx); err != nil {
return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil)
}
srcConn, err := waitForCdcCache[TPull](ctx, a, sessionID)
if err != nil {
return nil, err
}
if err := srcConn.ConnectionActive(ctx); err != nil {
return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil)
}

shutdown := utils.HeartbeatRoutine(ctx, func() string {
Expand Down
6 changes: 5 additions & 1 deletion flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,11 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(
numRows, writeOcfError = p.WriteOCF(ctx, w)
}()

_, err = manager.NewUploader(s3svc).Upload(ctx, &s3.PutObjectInput{
uploader := manager.NewUploader(s3svc, func(u *manager.Uploader) {
u.PartSize = 4 * 1024 * 1024 * 1024
})

_, err = uploader.Upload(ctx, &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: r,
Expand Down
14 changes: 9 additions & 5 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ func PeerDBCDCDiskSpillMemPercentThreshold() int {
return getEnvInt("PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1)
}

// PEERDB_DISABLE_ONE_SYNC
func PeerDBDisableOneSync() bool {
return getEnvBool("PEERDB_DISABLE_ONE_SYNC", false)
}

// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum
func PeerDBFlowWorkerMaxMemBytes() uint64 {
return getEnvUint[uint64]("GOMEMLIMIT", 0)
Expand Down Expand Up @@ -106,6 +101,15 @@ func PeerDBEnableWALHeartbeat() bool {
return getEnvBool("PEERDB_ENABLE_WAL_HEARTBEAT", false)
}

// PEERDB_WAL_HEARTBEAT_QUERY
func PeerDBWALHeartbeatQuery() string {
return GetEnvString("PEERDB_WAL_HEARTBEAT_QUERY", `BEGIN;
DROP AGGREGATE IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4);
CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4);
DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4);
END;`)
}

// PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE
func PeerDBEnableParallelSyncNormalize() bool {
return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false)
Expand Down
52 changes: 22 additions & 30 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,38 +30,30 @@ func SyncFlowWorkflow(
parent := workflow.GetInfo(ctx).ParentWorkflowExecution
logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName))

enableOneSync := GetSideEffect(ctx, func(_ workflow.Context) bool {
return !peerdbenv.PeerDBDisableOneSync()
})
var fMaintain workflow.Future
var sessionID string
syncSessionCtx := ctx
if enableOneSync {
sessionOptions := &workflow.SessionOptions{
CreationTimeout: 5 * time.Minute,
ExecutionTimeout: 144 * time.Hour,
HeartbeatTimeout: time.Minute,
}
var err error
syncSessionCtx, err = workflow.CreateSession(ctx, sessionOptions)
if err != nil {
return err
}
defer workflow.CompleteSession(syncSessionCtx)
sessionID = workflow.GetSessionInfo(syncSessionCtx).SessionID
sessionOptions := &workflow.SessionOptions{
CreationTimeout: 5 * time.Minute,
ExecutionTimeout: 14 * 24 * time.Hour,
HeartbeatTimeout: time.Minute,
}

maintainCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{
StartToCloseTimeout: 14 * 24 * time.Hour,
HeartbeatTimeout: time.Minute,
WaitForCancellation: true,
})
fMaintain = workflow.ExecuteActivity(
maintainCtx,
flowable.MaintainPull,
config,
sessionID,
)
syncSessionCtx, err := workflow.CreateSession(ctx, sessionOptions)
if err != nil {
return err
}
defer workflow.CompleteSession(syncSessionCtx)

sessionID := workflow.GetSessionInfo(syncSessionCtx).SessionID
maintainCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{
StartToCloseTimeout: 14 * 24 * time.Hour,
HeartbeatTimeout: time.Minute,
WaitForCancellation: true,
})
fMaintain := workflow.ExecuteActivity(
maintainCtx,
flowable.MaintainPull,
config,
sessionID,
)

var stop, syncErr bool
currentSyncFlowNum := 0
Expand Down

0 comments on commit 3295cad

Please sign in to comment.