Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDC to S3/GCS #507

Merged
merged 5 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ jobs:
with:
name: "snowflake_creds.json"
json: ${{ secrets.SNOWFLAKE_GH_CI_PKEY }}

- name: setup S3 credentials
id: s3-credentials
uses: jsdaniell/create-json@v1.2.2
with:
name: "s3_creds.json"
json: ${{ secrets.S3_CREDS }}

- name: setup GCS credentials
id: gcs-credentials
uses: jsdaniell/create-json@v1.2.2
with:
name: "gcs_creds.json"
json: ${{ secrets.GCS_CREDS }}

- name: create hstore extension and increase logical replication limits
run: |
Expand All @@ -82,6 +96,8 @@ jobs:
AWS_REGION: ${{ secrets.AWS_REGION }}
TEST_BQ_CREDS: ${{ github.workspace }}/bq_service_account.json
TEST_SF_CREDS: ${{ github.workspace }}/snowflake_creds.json
TEST_S3_CREDS: ${{ github.workspace }}/s3_creds.json
TEST_GCS_CREDS: ${{ github.workspace }}/gcs_creds.json
AZURE_TENANT_ID: ${{ secrets.AZURE_TENANT_ID }}
AZURE_CLIENT_ID: ${{ secrets.AZURE_CLIENT_ID }}
AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }}
Expand Down
2 changes: 2 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne
return nil, fmt.Errorf("use eventhub group config instead")
case *protos.Peer_EventhubGroupConfig:
return conneventhub.NewEventHubConnector(ctx, config.GetEventhubGroupConfig())
case *protos.Peer_S3Config:
return conns3.NewS3Connector(ctx, config.GetS3Config())
default:
return nil, ErrUnsupportedFunctionality
}
Expand Down
59 changes: 56 additions & 3 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand All @@ -22,7 +23,7 @@ import (
type EventHubConnector struct {
ctx context.Context
config *protos.EventHubGroupConfig
pgMetadata *PostgresMetadataStore
pgMetadata *metadataStore.PostgresMetadataStore
tableSchemas map[string]*protos.TableSchema
creds *azidentity.DefaultAzureCredential
hubManager *EventHubManager
Expand All @@ -40,7 +41,9 @@ func NewEventHubConnector(
}

hubManager := NewEventHubManager(ctx, defaultAzureCreds, config)
pgMetadata, err := NewPostgresMetadataStore(ctx, config.GetMetadataDb())
metadataSchemaName := "peerdb_eventhub_metadata" // #nosec G101
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, config.GetMetadataDb(),
metadataSchemaName)
if err != nil {
log.Errorf("failed to create postgres metadata store: %v", err)
return nil, err
Expand Down Expand Up @@ -84,6 +87,48 @@ func (c *EventHubConnector) InitializeTableSchema(req map[string]*protos.TableSc
return nil
}

func (c *EventHubConnector) NeedsSetupMetadataTables() bool {
return c.pgMetadata.NeedsSetupMetadata()
}

func (c *EventHubConnector) SetupMetadataTables() error {
err := c.pgMetadata.SetupMetadata()
if err != nil {
log.Errorf("failed to setup metadata tables: %v", err)
return err
}

return nil
}

func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) {
syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName)
if err != nil {
return 0, err
}

return syncBatchID, nil
}

func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) {
res, err := c.pgMetadata.FetchLastOffset(jobName)
if err != nil {
return nil, err
}

return res, nil
}

func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error {
err := c.pgMetadata.UpdateLastOffset(jobName, offset)
if err != nil {
log.Errorf("failed to update last offset: %v", err)
return err
}

return nil
}

func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
return fmt.Sprintf("syncing records to eventhub with"+
Expand Down Expand Up @@ -177,7 +222,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
log.Errorf("failed to update last offset: %v", err)
return nil, err
}
err = c.incrementSyncBatchID(req.FlowJobName)
err = c.pgMetadata.IncrementID(req.FlowJobName)
if err != nil {
log.Errorf("%v", err)
return nil, err
Expand Down Expand Up @@ -311,3 +356,11 @@ func (c *EventHubConnector) SetupNormalizedTables(
TableExistsMapping: nil,
}, nil
}

func (c *EventHubConnector) SyncFlowCleanup(jobName string) error {
err := c.pgMetadata.DropMetadata(jobName)
if err != nil {
return err
}
return nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package conneventhub
package connmetadata

import (
"context"
Expand All @@ -10,30 +10,32 @@ import (
)

const (
// schema for the peerdb metadata
metadataSchema = "peerdb_eventhub_metadata"
// The name of the table that stores the last sync state.
lastSyncStateTableName = "last_sync_state"
)

type PostgresMetadataStore struct {
config *protos.PostgresConfig
pool *pgxpool.Pool
ctx context.Context
config *protos.PostgresConfig
pool *pgxpool.Pool
schemaName string
}

func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresMetadataStore, error) {
func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig,
schemaName string) (*PostgresMetadataStore, error) {
connectionString := utils.GetPGConnectionString(pgConfig)

pool, err := pgxpool.New(ctx, connectionString)
if err != nil {
log.Errorf("failed to create connection pool: %v", err)
return nil, err
}
log.Info("created connection pool for eventhub metadata store")
log.Info("created connection pool for metadata store")

return &PostgresMetadataStore{
config: pgConfig,
pool: pool,
ctx: ctx,
config: pgConfig,
pool: pool,
schemaName: schemaName,
}, nil
}

Expand All @@ -45,11 +47,9 @@ func (p *PostgresMetadataStore) Close() error {
return nil
}

func (c *EventHubConnector) NeedsSetupMetadataTables() bool {
ms := c.pgMetadata

func (p *PostgresMetadataStore) NeedsSetupMetadata() bool {
// check if schema exists
rows := ms.pool.QueryRow(c.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", metadataSchema)
rows := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName)

var exists int64
err := rows.Scan(&exists)
Expand All @@ -65,26 +65,24 @@ func (c *EventHubConnector) NeedsSetupMetadataTables() bool {
return true
}

func (c *EventHubConnector) SetupMetadataTables() error {
ms := c.pgMetadata

func (p *PostgresMetadataStore) SetupMetadata() error {
// start a transaction
tx, err := ms.pool.Begin(c.ctx)
tx, err := p.pool.Begin(p.ctx)
if err != nil {
log.Errorf("failed to start transaction: %v", err)
return err
}

// create the schema
_, err = tx.Exec(c.ctx, "CREATE SCHEMA IF NOT EXISTS "+metadataSchema)
_, err = tx.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName)
if err != nil {
log.Errorf("failed to create schema: %v", err)
return err
}

// create the last sync state table
_, err = tx.Exec(c.ctx, `
CREATE TABLE IF NOT EXISTS `+metadataSchema+`.`+lastSyncStateTableName+` (
_, err = tx.Exec(p.ctx, `
CREATE TABLE IF NOT EXISTS `+p.schemaName+`.`+lastSyncStateTableName+` (
job_name TEXT PRIMARY KEY NOT NULL,
last_offset BIGINT NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
Expand All @@ -95,9 +93,10 @@ func (c *EventHubConnector) SetupMetadataTables() error {
log.Errorf("failed to create last sync state table: %v", err)
return err
}
log.Infof("created external metadata table %s.%s", p.schemaName, lastSyncStateTableName)

// commit the transaction
err = tx.Commit(c.ctx)
err = tx.Commit(p.ctx)
if err != nil {
log.Errorf("failed to commit transaction: %v", err)
return err
Expand All @@ -106,15 +105,12 @@ func (c *EventHubConnector) SetupMetadataTables() error {
return nil
}

func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) {
ms := c.pgMetadata

rows := ms.pool.QueryRow(c.ctx, `
func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (*protos.LastSyncState, error) {
rows := p.pool.QueryRow(p.ctx, `
SELECT last_offset
FROM `+metadataSchema+`.`+lastSyncStateTableName+`
FROM `+p.schemaName+`.`+lastSyncStateTableName+`
WHERE job_name = $1
`, jobName)

var offset int64
err := rows.Scan(&offset)
if err != nil {
Expand All @@ -138,12 +134,10 @@ func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState
}, nil
}

func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) {
ms := c.pgMetadata

rows := ms.pool.QueryRow(c.ctx, `
func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {
rows := p.pool.QueryRow(p.ctx, `
SELECT sync_batch_id
FROM `+metadataSchema+`.`+lastSyncStateTableName+`
FROM `+p.schemaName+`.`+lastSyncStateTableName+`
WHERE job_name = $1
`, jobName)

Expand All @@ -167,11 +161,9 @@ func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) {
}

// update offset for a job
func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error {
ms := c.pgMetadata

func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) error {
// start a transaction

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
unnecessary leading newline (whitespace)

tx, err := ms.pool.Begin(c.ctx)
tx, err := p.pool.Begin(p.ctx)
if err != nil {
log.Errorf("failed to start transaction: %v", err)
return err
Expand All @@ -181,8 +173,8 @@ func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error
log.WithFields(log.Fields{
"flowName": jobName,
}).Infof("updating last offset for job `%s` to `%d`", jobName, offset)
_, err = tx.Exec(c.ctx, `
INSERT INTO `+metadataSchema+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id)
_, err = tx.Exec(p.ctx, `
INSERT INTO `+p.schemaName+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id)
VALUES ($1, $2, $3)
ON CONFLICT (job_name)
DO UPDATE SET last_offset = $2, updated_at = NOW()
Expand All @@ -196,7 +188,7 @@ func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error
}

// commit the transaction
err = tx.Commit(c.ctx)
err = tx.Commit(p.ctx)
if err != nil {
log.Errorf("failed to commit transaction: %v", err)
return err
Expand All @@ -206,14 +198,12 @@ func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error
}

// update offset for a job
func (c *EventHubConnector) incrementSyncBatchID(jobName string) error {
ms := c.pgMetadata

func (p *PostgresMetadataStore) IncrementID(jobName string) error {
log.WithFields(log.Fields{

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
unnecessary leading newline (whitespace)

"flowName": jobName,
}).Infof("incrementing sync batch id for job `%s`", jobName)
_, err := ms.pool.Exec(c.ctx, `
UPDATE `+metadataSchema+`.`+lastSyncStateTableName+`
_, err := p.pool.Exec(p.ctx, `
UPDATE `+p.schemaName+`.`+lastSyncStateTableName+`
SET sync_batch_id=sync_batch_id+1 WHERE job_name=$1
`, jobName)

Expand All @@ -227,9 +217,9 @@ func (c *EventHubConnector) incrementSyncBatchID(jobName string) error {
return nil
}

func (c *EventHubConnector) SyncFlowCleanup(jobName string) error {
_, err := c.pgMetadata.pool.Exec(c.ctx, `
DELETE FROM `+metadataSchema+`.`+lastSyncStateTableName+`
func (p *PostgresMetadataStore) DropMetadata(jobName string) error {
_, err := p.pool.Exec(p.ctx, `
DELETE FROM `+p.schemaName+`.`+lastSyncStateTableName+`
WHERE job_name = $1
`, jobName)
return err
Expand Down
Loading
Loading