From a6de702768959b6bb16d5e62c367040e46fb9d07 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 12 Oct 2023 23:57:40 +0530 Subject: [PATCH 1/5] v1 of cdc to s3 and gcs --- .github/workflows/flow.yml | 16 ++ flow/connectors/core.go | 2 + flow/connectors/s3/metadata.go | 234 +++++++++++++++++++++++ flow/connectors/s3/s3.go | 291 +++++++++++++++++++++++++++-- flow/e2e/congen.go | 6 + flow/e2e/s3/cdc_s3_test.go | 183 ++++++++++++++++++ flow/e2e/s3/qrep_flow_s3_test.go | 10 +- flow/e2e/s3/s3_helper.go | 54 +++++- flow/generated/protos/peers.pb.go | 199 +++++++++++--------- nexus/analyzer/src/lib.rs | 6 + nexus/pt/src/peerdb_peers.rs | 2 + nexus/pt/src/peerdb_peers.serde.rs | 18 ++ protos/peers.proto | 1 + ui/grpc_generated/peers.ts | 19 ++ 14 files changed, 917 insertions(+), 124 deletions(-) create mode 100644 flow/connectors/s3/metadata.go create mode 100644 flow/e2e/s3/cdc_s3_test.go diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 04698de4d..5af87c8bc 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -59,6 +59,20 @@ jobs: with: name: "snowflake_creds.json" json: ${{ secrets.SNOWFLAKE_GH_CI_PKEY }} + + - name: setup S3 credentials + id: s3-credentials + uses: jsdaniell/create-json@v1.2.2 + with: + name: "s3_creds.json" + json: ${{ secrets.S3_CREDS }} + + - name: setup GCS credentials + id: gcs-credentials + uses: jsdaniell/create-json@v1.2.2 + with: + name: "gcs_creds.json" + json: ${{ secrets.GCS_CREDS }} - name: create hstore extension and increase logical replication limits run: | @@ -82,6 +96,8 @@ jobs: AWS_REGION: ${{ secrets.AWS_REGION }} TEST_BQ_CREDS: ${{ github.workspace }}/bq_service_account.json TEST_SF_CREDS: ${{ github.workspace }}/snowflake_creds.json + TEST_S3_CREDS: ${{ secrets.S3_CREDS }}/s3_creds.json + TEST_GCS_CREDS: ${{ secrets.S3_CREDS }}/gcs_creds.json AZURE_TENANT_ID: ${{ secrets.AZURE_TENANT_ID }} AZURE_CLIENT_ID: ${{ secrets.AZURE_CLIENT_ID }} AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }} diff --git a/flow/connectors/core.go b/flow/connectors/core.go index a67f9e346..3d1c22673 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -148,6 +148,8 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne return nil, fmt.Errorf("use eventhub group config instead") case *protos.Peer_EventhubGroupConfig: return conneventhub.NewEventHubConnector(ctx, config.GetEventhubGroupConfig()) + case *protos.Peer_S3Config: + return conns3.NewS3Connector(ctx, config.GetS3Config()) default: return nil, ErrUnsupportedFunctionality } diff --git a/flow/connectors/s3/metadata.go b/flow/connectors/s3/metadata.go new file mode 100644 index 000000000..fb0a24841 --- /dev/null +++ b/flow/connectors/s3/metadata.go @@ -0,0 +1,234 @@ +package conns3 + +import ( + "context" + + "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/jackc/pgx/v5/pgxpool" + log "github.com/sirupsen/logrus" +) + +const ( + metadataSchema = "peerdb_s3_metadata" + lastSyncStateTableName = "last_sync_state" +) + +type PostgresMetadataStore struct { + config *protos.PostgresConfig + pool *pgxpool.Pool +} + +func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresMetadataStore, error) { + connectionString := utils.GetPGConnectionString(pgConfig) + + pool, err := pgxpool.New(ctx, connectionString) + if err != nil { + log.Errorf("failed to create connection pool: %v", err) + return nil, err + } + log.Info("created connection pool for s3 metadata store") + + return &PostgresMetadataStore{ + config: pgConfig, + pool: pool, + }, nil +} + +func (p *PostgresMetadataStore) Close() error { + if p.pool != nil { + p.pool.Close() + } + + return nil +} + +func (c *S3Connector) NeedsSetupMetadataTables() bool { + ms := c.pgMetadata + + // check if schema exists + rows := ms.pool.QueryRow(c.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", metadataSchema) + + var exists int64 + err := rows.Scan(&exists) + if err != nil { + log.Errorf("failed to check if schema exists: %v", err) + return false + } + + if exists > 0 { + return true + } + + return true +} + +func (c *S3Connector) SetupMetadataTables() error { + ms := c.pgMetadata + + // start a transaction + tx, err := ms.pool.Begin(c.ctx) + if err != nil { + log.Errorf("failed to start transaction: %v", err) + return err + } + + // create the schema + _, err = tx.Exec(c.ctx, "CREATE SCHEMA IF NOT EXISTS "+metadataSchema) + if err != nil { + log.Errorf("failed to create schema: %v", err) + return err + } + + // create the last sync state table + _, err = tx.Exec(c.ctx, ` + CREATE TABLE IF NOT EXISTS `+metadataSchema+`.`+lastSyncStateTableName+` ( + job_name TEXT PRIMARY KEY NOT NULL, + last_offset BIGINT NOT NULL, + updated_at TIMESTAMP NOT NULL DEFAULT NOW(), + sync_batch_id BIGINT NOT NULL + ) + `) + if err != nil { + log.Errorf("failed to create last sync state table: %v", err) + return err + } + + // commit the transaction + err = tx.Commit(c.ctx) + if err != nil { + log.Errorf("failed to commit transaction: %v", err) + return err + } + + return nil +} + +func (c *S3Connector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { + ms := c.pgMetadata + + rows := ms.pool.QueryRow(c.ctx, ` + SELECT last_offset + FROM `+metadataSchema+`.`+lastSyncStateTableName+` + WHERE job_name = $1 + `, jobName) + + var offset int64 + err := rows.Scan(&offset) + if err != nil { + // if the job doesn't exist, return 0 + if err.Error() == "no rows in result set" { + return &protos.LastSyncState{ + Checkpoint: 0, + }, nil + } + + log.WithFields(log.Fields{ + "flowName": jobName, + }).Errorf("failed to get last offset: %v", err) + return nil, err + } + + log.Infof("got last offset for job `%s`: %d", jobName, offset) + + return &protos.LastSyncState{ + Checkpoint: offset, + }, nil +} + +func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) { + ms := c.pgMetadata + + rows := ms.pool.QueryRow(c.ctx, ` + SELECT sync_batch_id + FROM `+metadataSchema+`.`+lastSyncStateTableName+` + WHERE job_name = $1 + `, jobName) + + var syncBatchID int64 + err := rows.Scan(&syncBatchID) + if err != nil { + // if the job doesn't exist, return 0 + if err.Error() == "no rows in result set" { + return 0, nil + } + + log.WithFields(log.Fields{ + "flowName": jobName, + }).Errorf("failed to get last offset: %v", err) + return 0, err + } + + log.Infof("got last sync batch ID for job `%s`: %d", jobName, syncBatchID) + + return syncBatchID, nil +} + +// update offset for a job +func (c *S3Connector) updateLastOffset(jobName string, offset int64) error { + ms := c.pgMetadata + + // start a transaction + tx, err := ms.pool.Begin(c.ctx) + if err != nil { + log.Errorf("failed to start transaction: %v", err) + return err + } + + // update the last offset + log.WithFields(log.Fields{ + "flowName": jobName, + }).Infof("updating last offset for job `%s` to `%d`", jobName, offset) + _, err = tx.Exec(c.ctx, ` + INSERT INTO `+metadataSchema+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) + VALUES ($1, $2, $3) + ON CONFLICT (job_name) + DO UPDATE SET last_offset = $2, updated_at = NOW() + `, jobName, offset, 0) + + if err != nil { + log.WithFields(log.Fields{ + "flowName": jobName, + }).Errorf("failed to update last offset: %v", err) + return err + } + + // commit the transaction + err = tx.Commit(c.ctx) + if err != nil { + log.Errorf("failed to commit transaction: %v", err) + return err + } + + return nil +} + +// update offset for a job +func (c *S3Connector) incrementSyncBatchID(jobName string) error { + ms := c.pgMetadata + + log.WithFields(log.Fields{ + "flowName": jobName, + }).Infof("incrementing sync batch id for job `%s`", jobName) + _, err := ms.pool.Exec(c.ctx, ` + UPDATE `+metadataSchema+`.`+lastSyncStateTableName+` + SET sync_batch_id=sync_batch_id+1 WHERE job_name=$1 + `, jobName) + + if err != nil { + log.WithFields(log.Fields{ + "flowName": jobName, + }).Errorf("failed to increment sync batch id: %v", err) + return err + } + + return nil +} + +func (c *S3Connector) SyncFlowCleanup(jobName string) error { + _, err := c.pgMetadata.pool.Exec(c.ctx, ` + DELETE FROM `+metadataSchema+`.`+lastSyncStateTableName+` + WHERE job_name = $1 + `, jobName) + return err +} diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 9ef8fb528..9a78757ea 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -3,41 +3,47 @@ package conns3 import ( "context" "fmt" + "time" "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/connectors/utils/metrics" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/aws/aws-sdk-go/service/s3" + "github.com/google/uuid" log "github.com/sirupsen/logrus" ) type S3Connector struct { - ctx context.Context - url string - client s3.S3 - creds utils.S3PeerCredentials + ctx context.Context + url string + pgMetadata *PostgresMetadataStore + client s3.S3 + creds utils.S3PeerCredentials } func NewS3Connector(ctx context.Context, - s3ProtoConfig *protos.S3Config) (*S3Connector, error) { + config *protos.S3Config) (*S3Connector, error) { keyID := "" - if s3ProtoConfig.AccessKeyId != nil { - keyID = *s3ProtoConfig.AccessKeyId + if config.AccessKeyId != nil { + keyID = *config.AccessKeyId } secretKey := "" - if s3ProtoConfig.SecretAccessKey != nil { - secretKey = *s3ProtoConfig.SecretAccessKey + if config.SecretAccessKey != nil { + secretKey = *config.SecretAccessKey } roleArn := "" - if s3ProtoConfig.RoleArn != nil { - roleArn = *s3ProtoConfig.RoleArn + if config.RoleArn != nil { + roleArn = *config.RoleArn } region := "" - if s3ProtoConfig.Region != nil { - region = *s3ProtoConfig.Region + if config.Region != nil { + region = *config.Region } endpoint := "" - if s3ProtoConfig.Endpoint != nil { - endpoint = *s3ProtoConfig.Endpoint + if config.Endpoint != nil { + endpoint = *config.Endpoint } s3PeerCreds := utils.S3PeerCredentials{ AccessKeyID: keyID, @@ -50,11 +56,18 @@ func NewS3Connector(ctx context.Context, if err != nil { return nil, fmt.Errorf("failed to create S3 client: %w", err) } + pgMetadata, err := NewPostgresMetadataStore(ctx, config.GetMetadataDb()) + if err != nil { + log.Errorf("failed to create postgres metadata store: %v", err) + return nil, err + } + return &S3Connector{ - ctx: ctx, - url: s3ProtoConfig.Url, - client: *s3Client, - creds: s3PeerCreds, + ctx: ctx, + url: config.Url, + pgMetadata: pgMetadata, + client: *s3Client, + creds: s3PeerCreds, }, nil } @@ -63,6 +76,246 @@ func (c *S3Connector) Close() error { return nil } +func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { + if len(req.Records.Records) == 0 { + return &model.SyncResponse{ + FirstSyncedCheckPointID: 0, + LastSyncedCheckPointID: 0, + NumRecordsSynced: 0, + }, nil + } + + syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName) + if err != nil { + return nil, fmt.Errorf("failed to get previous syncBatchID: %w", err) + } + syncBatchID = syncBatchID + 1 + lastCP := req.Records.LastCheckPointID + recordStream := model.NewQRecordStream(len(req.Records.Records)) + + err = recordStream.SetSchema(&model.QRecordSchema{ + Fields: []*model.QField{ + { + Name: "_peerdb_uid", + Type: qvalue.QValueKindString, + Nullable: false, + }, + { + Name: "_peerdb_timestamp", + Type: qvalue.QValueKindInt64, + Nullable: false, + }, + { + Name: "_peerdb_destination_table_name", + Type: qvalue.QValueKindString, + Nullable: false, + }, + { + Name: "_peerdb_data", + Type: qvalue.QValueKindString, + Nullable: false, + }, + { + Name: "_peerdb_record_type", + Type: qvalue.QValueKindInt64, + Nullable: true, + }, + { + Name: "_peerdb_match_data", + Type: qvalue.QValueKindString, + Nullable: true, + }, + { + Name: "_peerdb_batch_id", + Type: qvalue.QValueKindInt64, + Nullable: true, + }, + { + Name: "_peerdb_unchanged_toast_columns", + Type: qvalue.QValueKindString, + Nullable: true, + }, + }, + }) + if err != nil { + return nil, err + } + + first := true + var firstCP int64 = 0 + tableNameRowsMapping := make(map[string]uint32) + + for _, record := range req.Records.Records { + var entries [8]qvalue.QValue + switch typedRecord := record.(type) { + case *model.InsertRecord: + // json.Marshal converts bytes in Hex automatically to BASE64 string. + itemsJSON, err := typedRecord.Items.ToJSON() + if err != nil { + return nil, fmt.Errorf("failed to serialize insert record items to JSON: %w", err) + } + + // add insert record to the raw table + entries[2] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: typedRecord.DestinationTableName, + } + entries[3] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: itemsJSON, + } + entries[4] = qvalue.QValue{ + Kind: qvalue.QValueKindInt64, + Value: 0, + } + entries[5] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: "", + } + entries[7] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: utils.KeysToString(typedRecord.UnchangedToastColumns), + } + tableNameRowsMapping[typedRecord.DestinationTableName] += 1 + case *model.UpdateRecord: + newItemsJSON, err := typedRecord.NewItems.ToJSON() + if err != nil { + return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err) + } + oldItemsJSON, err := typedRecord.OldItems.ToJSON() + if err != nil { + return nil, fmt.Errorf("failed to serialize update record old items to JSON: %w", err) + } + + entries[2] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: typedRecord.DestinationTableName, + } + entries[3] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: newItemsJSON, + } + entries[4] = qvalue.QValue{ + Kind: qvalue.QValueKindInt64, + Value: 1, + } + entries[5] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: oldItemsJSON, + } + entries[7] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: utils.KeysToString(typedRecord.UnchangedToastColumns), + } + tableNameRowsMapping[typedRecord.DestinationTableName] += 1 + case *model.DeleteRecord: + itemsJSON, err := typedRecord.Items.ToJSON() + if err != nil { + return nil, fmt.Errorf("failed to serialize delete record items to JSON: %w", err) + } + + // append delete record to the raw table + entries[2] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: typedRecord.DestinationTableName, + } + entries[3] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: itemsJSON, + } + entries[4] = qvalue.QValue{ + Kind: qvalue.QValueKindInt64, + Value: 2, + } + entries[5] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: itemsJSON, + } + entries[7] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: utils.KeysToString(typedRecord.UnchangedToastColumns), + } + tableNameRowsMapping[typedRecord.DestinationTableName] += 1 + default: + return nil, fmt.Errorf("record type %T not supported in Snowflake flow connector", typedRecord) + } + + if first { + firstCP = record.GetCheckPointID() + first = false + } + + entries[0] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: uuid.New().String(), + } + entries[1] = qvalue.QValue{ + Kind: qvalue.QValueKindInt64, + Value: time.Now().UnixNano(), + } + entries[6] = qvalue.QValue{ + Kind: qvalue.QValueKindInt64, + Value: syncBatchID, + } + + recordStream.Records <- &model.QRecordOrError{ + Record: &model.QRecord{ + NumEntries: 8, + Entries: entries[:], + }, + } + } + + qrepConfig := &protos.QRepConfig{ + FlowJobName: req.FlowJobName, + DestinationTableIdentifier: fmt.Sprintf("raw_table_%s", req.FlowJobName), + } + partition := &protos.QRepPartition{ + PartitionId: fmt.Sprint(syncBatchID), + } + startTime := time.Now() + close(recordStream.Records) + numRecords, err := c.SyncQRepRecords(qrepConfig, partition, recordStream) + if err != nil { + return nil, err + } + + err = c.updateLastOffset(req.FlowJobName, lastCP) + if err != nil { + log.Errorf("failed to update last offset for s3 cdc: %v", err) + return nil, err + } + err = c.incrementSyncBatchID(req.FlowJobName) + if err != nil { + log.Errorf("%v", err) + return nil, err + } + metrics.LogSyncMetrics(c.ctx, req.FlowJobName, int64(numRecords), time.Since(startTime)) + return &model.SyncResponse{ + FirstSyncedCheckPointID: firstCP, + LastSyncedCheckPointID: lastCP, + NumRecordsSynced: int64(numRecords), + TableNameRowsMapping: tableNameRowsMapping, + }, nil +} + +func (c *S3Connector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { + log.Infof("CreateRawTable for S3 is a no-op") + return nil, nil +} + +func (c *S3Connector) InitializeTableSchema(req map[string]*protos.TableSchema) error { + log.Infof("InitializeTableSchema for S3 is a no-op") + return nil +} + +func (c *S3Connector) SetupNormalizedTables(req *protos.SetupNormalizedTableBatchInput) ( + *protos.SetupNormalizedTableBatchOutput, + error) { + log.Infof("SetupNormalizedTables for S3 is a no-op") + return nil, nil +} + func (c *S3Connector) ConnectionActive() bool { _, err := c.client.ListBuckets(nil) return err == nil diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index 80903eb55..02f5b2194 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -34,6 +34,12 @@ func cleanPostgres(pool *pgxpool.Pool, suffix string) error { return fmt.Errorf("failed to drop e2e_test schema: %w", err) } + // drop the S3 metadata database if it exists + _, err = pool.Exec(context.Background(), "DROP SCHEMA IF EXISTS peerdb_s3_metadata CASCADE") + if err != nil { + return fmt.Errorf("failed to drop metadata schema: %w", err) + } + // drop all open slots with the given suffix _, err = pool.Exec( context.Background(), diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go new file mode 100644 index 000000000..0ec424ea8 --- /dev/null +++ b/flow/e2e/s3/cdc_s3_test.go @@ -0,0 +1,183 @@ +package e2e_s3 + +import ( + "context" + "fmt" + "time" + + "github.com/PeerDB-io/peer-flow/e2e" + peerflow "github.com/PeerDB-io/peer-flow/workflows" + "github.com/stretchr/testify/require" +) + +func (s *PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string { + return fmt.Sprintf("e2e_test_%s.%s", s3Suffix, tableName) +} + +func (s *PeerFlowE2ETestSuiteS3) attachSuffix(input string) string { + return fmt.Sprintf("%s_%s", input, s3Suffix) +} + +func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env) + + srcTableName := s.attachSchemaSuffix("test_simple_flow_s3") + dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_s3", "test_simple_flow_s3") + flowJobName := s.attachSuffix("test_simple_flow") + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE %s ( + id SERIAL PRIMARY KEY, + key TEXT NOT NULL, + value TEXT NOT NULL + ); + `, srcTableName)) + s.NoError(err) + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: flowJobName, + TableNameMapping: map[string]string{srcTableName: dstTableName}, + PostgresPort: e2e.PostgresPort, + Destination: s.s3Helper.GetPeer(), + } + + flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + s.NoError(err) + + limits := peerflow.CDCFlowLimits{ + TotalSyncFlows: 9, + MaxBatchSize: 20, + } + + // Insert 100 rows into postgres, update 20 rows, and delete 20 rows + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + s.NoError(err) + //insert 100 + for i := 0; i < 100; i++ { + testKey := fmt.Sprintf("test_key_%d", i) + testValue := fmt.Sprintf("test_value_%d", i) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s (key, value) VALUES ($1, $2) + `, srcTableName), testKey, testValue) + s.NoError(err) + } + //update 20 + for i := 0; i < 20; i++ { + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + UPDATE %s SET value=$1 where id=$2 + `, srcTableName), "updated_value", i) + s.NoError(err) + } + //delete 20 + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s where id < 20 + `, srcTableName)) + s.NoError(err) + fmt.Println("Inserted 100 rows into the source table") + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + + // allow only continue as new error + s.Error(err) + s.Contains(err.Error(), "continue as new") + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + fmt.Println("JobName: ", flowJobName) + files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) + fmt.Println("Files in Test_Complete_Simple_Flow_S3: ", len(files)) + require.NoError(s.T(), err) + + require.Equal(s.T(), 8, len(files)) + + env.AssertExpectations(s.T()) +} + +func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env) + setupErr := s.setupS3("gcs") + if setupErr != nil { + s.Fail("failed to setup S3", setupErr) + } + + srcTableName := s.attachSchemaSuffix("test_simple_flow_gcs_interop") + dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_gcs_interop", "test_simple_flow_gcs_interop") + flowJobName := s.attachSuffix("test_simple_flow") + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE %s ( + id SERIAL PRIMARY KEY, + key TEXT NOT NULL, + value TEXT NOT NULL + ); + `, srcTableName)) + s.NoError(err) + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: flowJobName, + TableNameMapping: map[string]string{srcTableName: dstTableName}, + PostgresPort: e2e.PostgresPort, + Destination: s.s3Helper.GetPeer(), + } + + flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + s.NoError(err) + + limits := peerflow.CDCFlowLimits{ + TotalSyncFlows: 9, + MaxBatchSize: 20, + } + + // Insert 100 rows into postgres, update 20 rows, and delete 20 rows + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + s.NoError(err) + //insert 100 + for i := 0; i < 100; i++ { + testKey := fmt.Sprintf("test_key_%d", i) + testValue := fmt.Sprintf("test_value_%d", i) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s (key, value) VALUES ($1, $2) + `, srcTableName), testKey, testValue) + s.NoError(err) + } + //update 20 + for i := 0; i < 20; i++ { + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + UPDATE %s SET value=$1 where id=$2 + `, srcTableName), "updated_value", i) + s.NoError(err) + } + //delete 20 + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s where id < 20 + `, srcTableName)) + s.NoError(err) + fmt.Println("Inserted 100 rows into the source table") + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + + // allow only continue as new error + s.Error(err) + s.Contains(err.Error(), "continue as new") + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + fmt.Println("JobName: ", flowJobName) + files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) + fmt.Println("Files in Test_Complete_Simple_Flow_S3: ", len(files)) + require.NoError(s.T(), err) + + require.Equal(s.T(), 8, len(files)) + + env.AssertExpectations(s.T()) +} diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index e47d23592..2fca18a70 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -37,8 +37,12 @@ func (s *PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int s.NoError(err) } -func (s *PeerFlowE2ETestSuiteS3) setupS3() error { - helper, err := NewS3TestHelper() +func (s *PeerFlowE2ETestSuiteS3) setupS3(mode string) error { + switchToGCS := false + if mode == "gcs" { + switchToGCS = true + } + helper, err := NewS3TestHelper(switchToGCS) if err != nil { return err } @@ -63,7 +67,7 @@ func (s *PeerFlowE2ETestSuiteS3) SetupSuite() { } s.pool = pool - err = s.setupS3() + err = s.setupS3("s3") if err != nil { s.Fail("failed to setup S3", err) } diff --git a/flow/e2e/s3/s3_helper.go b/flow/e2e/s3/s3_helper.go index b02752b02..5d40d9a0e 100644 --- a/flow/e2e/s3/s3_helper.go +++ b/flow/e2e/s3/s3_helper.go @@ -2,9 +2,12 @@ package e2e_s3 import ( "context" + "encoding/json" "fmt" + "os" "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" @@ -13,26 +16,58 @@ import ( const ( peerName string = "test_s3_peer" - bucketName string = "peerdb-test-bucket" prefixName string = "test-s3" ) type S3TestHelper struct { - client *s3.S3 - s3Config *protos.S3Config + client *s3.S3 + s3Config *protos.S3Config + bucketName string } -func NewS3TestHelper() (*S3TestHelper, error) { - client, err := utils.CreateS3Client(utils.S3PeerCredentials{}) +func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) { + credsPath := os.Getenv("TEST_S3_CREDS") + bucketName := "peerdb-test-bucket" + if switchToGCS { + credsPath = os.Getenv("TEST_GCS_CREDS") + bucketName = "peerdb_staging" + } + + content, err := e2e.ReadFileToBytes(credsPath) + if err != nil { + return nil, fmt.Errorf("failed to read file: %w", err) + } + + var config utils.S3PeerCredentials + err = json.Unmarshal(content, &config) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal json: %w", err) + } + endpoint := "" + if switchToGCS { + endpoint = "https://storage.googleapis.com" + } + client, err := utils.CreateS3Client(config) if err != nil { return nil, err } - log.Infof("S3 client obtained") return &S3TestHelper{ client, &protos.S3Config{ - Url: fmt.Sprintf("s3://%s/%s", bucketName, prefixName), + Url: fmt.Sprintf("s3://%s/%s", bucketName, prefixName), + AccessKeyId: &config.AccessKeyID, + SecretAccessKey: &config.SecretAccessKey, + Region: &config.Region, + Endpoint: &endpoint, + MetadataDb: &protos.PostgresConfig{ + Host: "localhost", + Port: 7132, + Password: "postgres", + User: "postgres", + Database: "postgres", + }, }, + bucketName, }, nil } @@ -52,7 +87,8 @@ func (h *S3TestHelper) ListAllFiles( ctx context.Context, jobName string, ) ([]*s3.Object, error) { - Bucket := bucketName + + Bucket := h.bucketName Prefix := fmt.Sprintf("%s/%s/", prefixName, jobName) files, err := h.client.ListObjects(&s3.ListObjectsInput{ Bucket: &Bucket, @@ -68,7 +104,7 @@ func (h *S3TestHelper) ListAllFiles( // Delete all generated objects during the test func (h *S3TestHelper) CleanUp() error { - Bucket := bucketName + Bucket := h.bucketName Prefix := prefixName files, err := h.client.ListObjects(&s3.ListObjectsInput{ Bucket: &Bucket, diff --git a/flow/generated/protos/peers.pb.go b/flow/generated/protos/peers.pb.go index 2a8377ea7..7f8bb4a37 100644 --- a/flow/generated/protos/peers.pb.go +++ b/flow/generated/protos/peers.pb.go @@ -656,12 +656,13 @@ type S3Config struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Url string `protobuf:"bytes,1,opt,name=url,proto3" json:"url,omitempty"` - AccessKeyId *string `protobuf:"bytes,2,opt,name=access_key_id,json=accessKeyId,proto3,oneof" json:"access_key_id,omitempty"` - SecretAccessKey *string `protobuf:"bytes,3,opt,name=secret_access_key,json=secretAccessKey,proto3,oneof" json:"secret_access_key,omitempty"` - RoleArn *string `protobuf:"bytes,4,opt,name=role_arn,json=roleArn,proto3,oneof" json:"role_arn,omitempty"` - Region *string `protobuf:"bytes,5,opt,name=region,proto3,oneof" json:"region,omitempty"` - Endpoint *string `protobuf:"bytes,6,opt,name=endpoint,proto3,oneof" json:"endpoint,omitempty"` + Url string `protobuf:"bytes,1,opt,name=url,proto3" json:"url,omitempty"` + AccessKeyId *string `protobuf:"bytes,2,opt,name=access_key_id,json=accessKeyId,proto3,oneof" json:"access_key_id,omitempty"` + SecretAccessKey *string `protobuf:"bytes,3,opt,name=secret_access_key,json=secretAccessKey,proto3,oneof" json:"secret_access_key,omitempty"` + RoleArn *string `protobuf:"bytes,4,opt,name=role_arn,json=roleArn,proto3,oneof" json:"role_arn,omitempty"` + Region *string `protobuf:"bytes,5,opt,name=region,proto3,oneof" json:"region,omitempty"` + Endpoint *string `protobuf:"bytes,6,opt,name=endpoint,proto3,oneof" json:"endpoint,omitempty"` + MetadataDb *PostgresConfig `protobuf:"bytes,7,opt,name=metadata_db,json=metadataDb,proto3" json:"metadata_db,omitempty"` } func (x *S3Config) Reset() { @@ -738,6 +739,13 @@ func (x *S3Config) GetEndpoint() string { return "" } +func (x *S3Config) GetMetadataDb() *PostgresConfig { + if x != nil { + return x.MetadataDb + } + return nil +} + type SqlServerConfig struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1108,7 +1116,7 @@ var file_peers_proto_rawDesc = []byte{ 0x79, 0x12, 0x32, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x75, 0x62, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x05, - 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xa1, 0x02, 0x0a, 0x08, 0x53, 0x33, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xe0, 0x02, 0x0a, 0x08, 0x53, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x12, 0x27, 0x0a, 0x0d, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, @@ -1122,77 +1130,81 @@ var file_peers_proto_rawDesc = []byte{ 0x28, 0x09, 0x48, 0x03, 0x52, 0x06, 0x72, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x88, 0x01, 0x01, 0x12, 0x1f, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x48, 0x04, 0x52, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x88, 0x01, 0x01, - 0x42, 0x10, 0x0a, 0x0e, 0x5f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x5f, 0x6b, 0x65, 0x79, 0x5f, - 0x69, 0x64, 0x42, 0x14, 0x0a, 0x12, 0x5f, 0x73, 0x65, 0x63, 0x72, 0x65, 0x74, 0x5f, 0x61, 0x63, - 0x63, 0x65, 0x73, 0x73, 0x5f, 0x6b, 0x65, 0x79, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x72, 0x6f, 0x6c, - 0x65, 0x5f, 0x61, 0x72, 0x6e, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x72, 0x65, 0x67, 0x69, 0x6f, 0x6e, - 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x22, 0x89, 0x01, - 0x0a, 0x0f, 0x53, 0x71, 0x6c, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x06, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, - 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x12, 0x0a, - 0x04, 0x75, 0x73, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x75, 0x73, 0x65, - 0x72, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x18, 0x04, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x12, 0x1a, 0x0a, - 0x08, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x08, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x22, 0x91, 0x05, 0x0a, 0x04, 0x50, 0x65, - 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x28, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, - 0x65, 0x72, 0x73, 0x2e, 0x44, 0x42, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, - 0x12, 0x4a, 0x0a, 0x10, 0x73, 0x6e, 0x6f, 0x77, 0x66, 0x6c, 0x61, 0x6b, 0x65, 0x5f, 0x63, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x6e, 0x6f, 0x77, 0x66, 0x6c, - 0x61, 0x6b, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0f, 0x73, 0x6e, 0x6f, - 0x77, 0x66, 0x6c, 0x61, 0x6b, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x47, 0x0a, 0x0f, - 0x62, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, - 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, - 0x65, 0x65, 0x72, 0x73, 0x2e, 0x42, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x43, 0x6f, 0x6e, - 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, 0x62, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x43, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3e, 0x0a, 0x0c, 0x6d, 0x6f, 0x6e, 0x67, 0x6f, 0x5f, 0x63, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x70, 0x65, - 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x4d, 0x6f, 0x6e, 0x67, 0x6f, - 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0b, 0x6d, 0x6f, 0x6e, 0x67, 0x6f, 0x43, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x47, 0x0a, 0x0f, 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, - 0x73, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, - 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x50, 0x6f, - 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, - 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x47, - 0x0a, 0x0f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, - 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x75, 0x62, 0x43, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, - 0x62, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x35, 0x0a, 0x09, 0x73, 0x33, 0x5f, 0x63, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x70, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x33, 0x43, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x48, 0x00, 0x52, 0x08, 0x73, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x4a, - 0x0a, 0x10, 0x73, 0x71, 0x6c, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, - 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x71, 0x6c, 0x53, 0x65, 0x72, 0x76, 0x65, - 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0f, 0x73, 0x71, 0x6c, 0x73, 0x65, - 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x57, 0x0a, 0x15, 0x65, 0x76, - 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x5f, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x63, 0x6f, 0x6e, - 0x66, 0x69, 0x67, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, - 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x75, - 0x62, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x13, - 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x6f, 0x6e, - 0x66, 0x69, 0x67, 0x42, 0x08, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2a, 0x77, 0x0a, - 0x06, 0x44, 0x42, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x42, 0x49, 0x47, 0x51, 0x55, - 0x45, 0x52, 0x59, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x4e, 0x4f, 0x57, 0x46, 0x4c, 0x41, - 0x4b, 0x45, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x4d, 0x4f, 0x4e, 0x47, 0x4f, 0x10, 0x02, 0x12, - 0x0c, 0x0a, 0x08, 0x50, 0x4f, 0x53, 0x54, 0x47, 0x52, 0x45, 0x53, 0x10, 0x03, 0x12, 0x0c, 0x0a, - 0x08, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x48, 0x55, 0x42, 0x10, 0x04, 0x12, 0x06, 0x0a, 0x02, 0x53, - 0x33, 0x10, 0x05, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x51, 0x4c, 0x53, 0x45, 0x52, 0x56, 0x45, 0x52, - 0x10, 0x06, 0x12, 0x12, 0x0a, 0x0e, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x48, 0x55, 0x42, 0x5f, 0x47, - 0x52, 0x4f, 0x55, 0x50, 0x10, 0x07, 0x42, 0x7c, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, - 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x42, 0x0a, 0x50, 0x65, 0x65, 0x72, - 0x73, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, - 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, - 0xaa, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, 0xca, 0x02, - 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, 0xe2, 0x02, 0x17, 0x50, - 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, - 0x65, 0x65, 0x72, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x12, 0x3d, 0x0a, 0x0b, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x64, 0x62, 0x18, + 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, + 0x65, 0x65, 0x72, 0x73, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x43, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x52, 0x0a, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x44, 0x62, 0x42, + 0x10, 0x0a, 0x0e, 0x5f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x69, + 0x64, 0x42, 0x14, 0x0a, 0x12, 0x5f, 0x73, 0x65, 0x63, 0x72, 0x65, 0x74, 0x5f, 0x61, 0x63, 0x63, + 0x65, 0x73, 0x73, 0x5f, 0x6b, 0x65, 0x79, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x72, 0x6f, 0x6c, 0x65, + 0x5f, 0x61, 0x72, 0x6e, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x72, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x42, + 0x0b, 0x0a, 0x09, 0x5f, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x22, 0x89, 0x01, 0x0a, + 0x0f, 0x53, 0x71, 0x6c, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x12, 0x16, 0x0a, 0x06, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x06, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, 0x74, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x12, 0x0a, 0x04, + 0x75, 0x73, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x75, 0x73, 0x65, 0x72, + 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x12, 0x1a, 0x0a, 0x08, + 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, + 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x22, 0x91, 0x05, 0x0a, 0x04, 0x50, 0x65, 0x65, + 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x28, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, + 0x72, 0x73, 0x2e, 0x44, 0x42, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, + 0x4a, 0x0a, 0x10, 0x73, 0x6e, 0x6f, 0x77, 0x66, 0x6c, 0x61, 0x6b, 0x65, 0x5f, 0x63, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x6e, 0x6f, 0x77, 0x66, 0x6c, 0x61, + 0x6b, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0f, 0x73, 0x6e, 0x6f, 0x77, + 0x66, 0x6c, 0x61, 0x6b, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x47, 0x0a, 0x0f, 0x62, + 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, + 0x65, 0x72, 0x73, 0x2e, 0x42, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, 0x62, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3e, 0x0a, 0x0c, 0x6d, 0x6f, 0x6e, 0x67, 0x6f, 0x5f, 0x63, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x70, 0x65, 0x65, + 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x4d, 0x6f, 0x6e, 0x67, 0x6f, 0x43, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0b, 0x6d, 0x6f, 0x6e, 0x67, 0x6f, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x12, 0x47, 0x0a, 0x0f, 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, + 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, + 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x50, 0x6f, 0x73, + 0x74, 0x67, 0x72, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, 0x70, + 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x47, 0x0a, + 0x0f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, + 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x75, 0x62, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, + 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x35, 0x0a, 0x09, 0x73, 0x33, 0x5f, 0x63, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x70, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x48, 0x00, 0x52, 0x08, 0x73, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x4a, 0x0a, + 0x10, 0x73, 0x71, 0x6c, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, + 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x71, 0x6c, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, + 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0f, 0x73, 0x71, 0x6c, 0x73, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x57, 0x0a, 0x15, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x68, 0x75, 0x62, 0x5f, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x63, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, + 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x75, 0x62, + 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x13, 0x65, + 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x42, 0x08, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2a, 0x77, 0x0a, 0x06, + 0x44, 0x42, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x42, 0x49, 0x47, 0x51, 0x55, 0x45, + 0x52, 0x59, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x4e, 0x4f, 0x57, 0x46, 0x4c, 0x41, 0x4b, + 0x45, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x4d, 0x4f, 0x4e, 0x47, 0x4f, 0x10, 0x02, 0x12, 0x0c, + 0x0a, 0x08, 0x50, 0x4f, 0x53, 0x54, 0x47, 0x52, 0x45, 0x53, 0x10, 0x03, 0x12, 0x0c, 0x0a, 0x08, + 0x45, 0x56, 0x45, 0x4e, 0x54, 0x48, 0x55, 0x42, 0x10, 0x04, 0x12, 0x06, 0x0a, 0x02, 0x53, 0x33, + 0x10, 0x05, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x51, 0x4c, 0x53, 0x45, 0x52, 0x56, 0x45, 0x52, 0x10, + 0x06, 0x12, 0x12, 0x0a, 0x0e, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x48, 0x55, 0x42, 0x5f, 0x47, 0x52, + 0x4f, 0x55, 0x50, 0x10, 0x07, 0x42, 0x7c, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, + 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x42, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x73, + 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, + 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, + 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, 0xca, 0x02, 0x0b, + 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, 0xe2, 0x02, 0x17, 0x50, 0x65, + 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, + 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, + 0x65, 0x72, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1226,21 +1238,22 @@ var file_peers_proto_depIdxs = []int32{ 4, // 0: peerdb_peers.EventHubConfig.metadata_db:type_name -> peerdb_peers.PostgresConfig 10, // 1: peerdb_peers.EventHubGroupConfig.eventhubs:type_name -> peerdb_peers.EventHubGroupConfig.EventhubsEntry 4, // 2: peerdb_peers.EventHubGroupConfig.metadata_db:type_name -> peerdb_peers.PostgresConfig - 0, // 3: peerdb_peers.Peer.type:type_name -> peerdb_peers.DBType - 1, // 4: peerdb_peers.Peer.snowflake_config:type_name -> peerdb_peers.SnowflakeConfig - 2, // 5: peerdb_peers.Peer.bigquery_config:type_name -> peerdb_peers.BigqueryConfig - 3, // 6: peerdb_peers.Peer.mongo_config:type_name -> peerdb_peers.MongoConfig - 4, // 7: peerdb_peers.Peer.postgres_config:type_name -> peerdb_peers.PostgresConfig - 5, // 8: peerdb_peers.Peer.eventhub_config:type_name -> peerdb_peers.EventHubConfig - 7, // 9: peerdb_peers.Peer.s3_config:type_name -> peerdb_peers.S3Config - 8, // 10: peerdb_peers.Peer.sqlserver_config:type_name -> peerdb_peers.SqlServerConfig - 6, // 11: peerdb_peers.Peer.eventhub_group_config:type_name -> peerdb_peers.EventHubGroupConfig - 5, // 12: peerdb_peers.EventHubGroupConfig.EventhubsEntry.value:type_name -> peerdb_peers.EventHubConfig - 13, // [13:13] is the sub-list for method output_type - 13, // [13:13] is the sub-list for method input_type - 13, // [13:13] is the sub-list for extension type_name - 13, // [13:13] is the sub-list for extension extendee - 0, // [0:13] is the sub-list for field type_name + 4, // 3: peerdb_peers.S3Config.metadata_db:type_name -> peerdb_peers.PostgresConfig + 0, // 4: peerdb_peers.Peer.type:type_name -> peerdb_peers.DBType + 1, // 5: peerdb_peers.Peer.snowflake_config:type_name -> peerdb_peers.SnowflakeConfig + 2, // 6: peerdb_peers.Peer.bigquery_config:type_name -> peerdb_peers.BigqueryConfig + 3, // 7: peerdb_peers.Peer.mongo_config:type_name -> peerdb_peers.MongoConfig + 4, // 8: peerdb_peers.Peer.postgres_config:type_name -> peerdb_peers.PostgresConfig + 5, // 9: peerdb_peers.Peer.eventhub_config:type_name -> peerdb_peers.EventHubConfig + 7, // 10: peerdb_peers.Peer.s3_config:type_name -> peerdb_peers.S3Config + 8, // 11: peerdb_peers.Peer.sqlserver_config:type_name -> peerdb_peers.SqlServerConfig + 6, // 12: peerdb_peers.Peer.eventhub_group_config:type_name -> peerdb_peers.EventHubGroupConfig + 5, // 13: peerdb_peers.EventHubGroupConfig.EventhubsEntry.value:type_name -> peerdb_peers.EventHubConfig + 14, // [14:14] is the sub-list for method output_type + 14, // [14:14] is the sub-list for method input_type + 14, // [14:14] is the sub-list for extension type_name + 14, // [14:14] is the sub-list for extension extendee + 0, // [0:14] is the sub-list for field type_name } func init() { file_peers_proto_init() } diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 4da17cd4d..6a727d19f 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -619,6 +619,11 @@ fn parse_db_options( Some(config) } DbType::S3 => { + let s3_conn_str: String = opts + .get("metadata_db") + .map(|s| s.to_string()) + .unwrap_or_default(); + let metadata_db = parse_metadata_db_info(&s3_conn_str)?; let s3_config = S3Config { url: opts .get("url") @@ -629,6 +634,7 @@ fn parse_db_options( region: opts.get("region").map(|s| s.to_string()), role_arn: opts.get("role_arn").map(|s| s.to_string()), endpoint: opts.get("endpoint").map(|s| s.to_string()), + metadata_db, }; let config = Config::S3Config(s3_config); Some(config) diff --git a/nexus/pt/src/peerdb_peers.rs b/nexus/pt/src/peerdb_peers.rs index 562c29d7b..f8a304b14 100644 --- a/nexus/pt/src/peerdb_peers.rs +++ b/nexus/pt/src/peerdb_peers.rs @@ -125,6 +125,8 @@ pub struct S3Config { pub region: ::core::option::Option<::prost::alloc::string::String>, #[prost(string, optional, tag="6")] pub endpoint: ::core::option::Option<::prost::alloc::string::String>, + #[prost(message, optional, tag="7")] + pub metadata_db: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/nexus/pt/src/peerdb_peers.serde.rs b/nexus/pt/src/peerdb_peers.serde.rs index cde121a45..e03dc494e 100644 --- a/nexus/pt/src/peerdb_peers.serde.rs +++ b/nexus/pt/src/peerdb_peers.serde.rs @@ -1314,6 +1314,9 @@ impl serde::Serialize for S3Config { if self.endpoint.is_some() { len += 1; } + if self.metadata_db.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("peerdb_peers.S3Config", len)?; if !self.url.is_empty() { struct_ser.serialize_field("url", &self.url)?; @@ -1333,6 +1336,9 @@ impl serde::Serialize for S3Config { if let Some(v) = self.endpoint.as_ref() { struct_ser.serialize_field("endpoint", v)?; } + if let Some(v) = self.metadata_db.as_ref() { + struct_ser.serialize_field("metadataDb", v)?; + } struct_ser.end() } } @@ -1352,6 +1358,8 @@ impl<'de> serde::Deserialize<'de> for S3Config { "roleArn", "region", "endpoint", + "metadata_db", + "metadataDb", ]; #[allow(clippy::enum_variant_names)] @@ -1362,6 +1370,7 @@ impl<'de> serde::Deserialize<'de> for S3Config { RoleArn, Region, Endpoint, + MetadataDb, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -1390,6 +1399,7 @@ impl<'de> serde::Deserialize<'de> for S3Config { "roleArn" | "role_arn" => Ok(GeneratedField::RoleArn), "region" => Ok(GeneratedField::Region), "endpoint" => Ok(GeneratedField::Endpoint), + "metadataDb" | "metadata_db" => Ok(GeneratedField::MetadataDb), _ => Ok(GeneratedField::__SkipField__), } } @@ -1415,6 +1425,7 @@ impl<'de> serde::Deserialize<'de> for S3Config { let mut role_arn__ = None; let mut region__ = None; let mut endpoint__ = None; + let mut metadata_db__ = None; while let Some(k) = map.next_key()? { match k { GeneratedField::Url => { @@ -1453,6 +1464,12 @@ impl<'de> serde::Deserialize<'de> for S3Config { } endpoint__ = map.next_value()?; } + GeneratedField::MetadataDb => { + if metadata_db__.is_some() { + return Err(serde::de::Error::duplicate_field("metadataDb")); + } + metadata_db__ = map.next_value()?; + } GeneratedField::__SkipField__ => { let _ = map.next_value::()?; } @@ -1465,6 +1482,7 @@ impl<'de> serde::Deserialize<'de> for S3Config { role_arn: role_arn__, region: region__, endpoint: endpoint__, + metadata_db: metadata_db__, }) } } diff --git a/protos/peers.proto b/protos/peers.proto index b15d09dc4..162a63f3a 100644 --- a/protos/peers.proto +++ b/protos/peers.proto @@ -74,6 +74,7 @@ message S3Config { optional string role_arn = 4; optional string region = 5; optional string endpoint = 6; + PostgresConfig metadata_db = 7; } message SqlServerConfig { diff --git a/ui/grpc_generated/peers.ts b/ui/grpc_generated/peers.ts index a5acf3217..6d9bb4b30 100644 --- a/ui/grpc_generated/peers.ts +++ b/ui/grpc_generated/peers.ts @@ -151,6 +151,7 @@ export interface S3Config { roleArn?: string | undefined; region?: string | undefined; endpoint?: string | undefined; + metadataDb: PostgresConfig | undefined; } export interface SqlServerConfig { @@ -1197,6 +1198,7 @@ function createBaseS3Config(): S3Config { roleArn: undefined, region: undefined, endpoint: undefined, + metadataDb: undefined, }; } @@ -1220,6 +1222,9 @@ export const S3Config = { if (message.endpoint !== undefined) { writer.uint32(50).string(message.endpoint); } + if (message.metadataDb !== undefined) { + PostgresConfig.encode(message.metadataDb, writer.uint32(58).fork()).ldelim(); + } return writer; }, @@ -1272,6 +1277,13 @@ export const S3Config = { message.endpoint = reader.string(); continue; + case 7: + if (tag !== 58) { + break; + } + + message.metadataDb = PostgresConfig.decode(reader, reader.uint32()); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -1289,6 +1301,7 @@ export const S3Config = { roleArn: isSet(object.roleArn) ? String(object.roleArn) : undefined, region: isSet(object.region) ? String(object.region) : undefined, endpoint: isSet(object.endpoint) ? String(object.endpoint) : undefined, + metadataDb: isSet(object.metadataDb) ? PostgresConfig.fromJSON(object.metadataDb) : undefined, }; }, @@ -1312,6 +1325,9 @@ export const S3Config = { if (message.endpoint !== undefined) { obj.endpoint = message.endpoint; } + if (message.metadataDb !== undefined) { + obj.metadataDb = PostgresConfig.toJSON(message.metadataDb); + } return obj; }, @@ -1326,6 +1342,9 @@ export const S3Config = { message.roleArn = object.roleArn ?? undefined; message.region = object.region ?? undefined; message.endpoint = object.endpoint ?? undefined; + message.metadataDb = (object.metadataDb !== undefined && object.metadataDb !== null) + ? PostgresConfig.fromPartial(object.metadataDb) + : undefined; return message; }, }; From 3a7853a96e5f77538e0376f60b91aeff4a0ff73f Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 13 Oct 2023 02:17:08 +0530 Subject: [PATCH 2/5] ci: creds path fix --- .github/workflows/flow.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 5af87c8bc..ffceae7c5 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -96,8 +96,8 @@ jobs: AWS_REGION: ${{ secrets.AWS_REGION }} TEST_BQ_CREDS: ${{ github.workspace }}/bq_service_account.json TEST_SF_CREDS: ${{ github.workspace }}/snowflake_creds.json - TEST_S3_CREDS: ${{ secrets.S3_CREDS }}/s3_creds.json - TEST_GCS_CREDS: ${{ secrets.S3_CREDS }}/gcs_creds.json + TEST_S3_CREDS: ${{ github.workspace }}/s3_creds.json + TEST_GCS_CREDS: ${{ github.workspace }}/gcs_creds.json AZURE_TENANT_ID: ${{ secrets.AZURE_TENANT_ID }} AZURE_CLIENT_ID: ${{ secrets.AZURE_CLIENT_ID }} AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }} From 9ff768bb44c9e9fc5ac8b3950bedb7179d436732 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Sat, 14 Oct 2023 00:50:56 +0530 Subject: [PATCH 3/5] refactor metadata and stream --- flow/connectors/eventhub/eventhub.go | 59 +++- flow/connectors/eventhub/metadata.go | 236 --------------- .../store.go} | 82 +++--- flow/connectors/s3/s3.go | 268 ++++++------------ flow/connectors/snowflake/snowflake.go | 184 +----------- flow/connectors/utils/stream.go | 189 ++++++++++++ flow/e2e/s3/cdc_s3_test.go | 48 +--- flow/model/qrecord_stream.go | 12 + 8 files changed, 395 insertions(+), 683 deletions(-) delete mode 100644 flow/connectors/eventhub/metadata.go rename flow/connectors/{s3/metadata.go => external_metadata/store.go} (66%) create mode 100644 flow/connectors/utils/stream.go diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index b614ec329..3c03ede0e 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -10,6 +10,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azidentity" azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/connectors/utils/metrics" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -22,7 +23,7 @@ import ( type EventHubConnector struct { ctx context.Context config *protos.EventHubGroupConfig - pgMetadata *PostgresMetadataStore + pgMetadata *metadataStore.PostgresMetadataStore tableSchemas map[string]*protos.TableSchema creds *azidentity.DefaultAzureCredential hubManager *EventHubManager @@ -40,7 +41,9 @@ func NewEventHubConnector( } hubManager := NewEventHubManager(ctx, defaultAzureCreds, config) - pgMetadata, err := NewPostgresMetadataStore(ctx, config.GetMetadataDb()) + metadataSchemaName := "peerdb_eventhub_metadata" + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, config.GetMetadataDb(), + metadataSchemaName) if err != nil { log.Errorf("failed to create postgres metadata store: %v", err) return nil, err @@ -84,6 +87,48 @@ func (c *EventHubConnector) InitializeTableSchema(req map[string]*protos.TableSc return nil } +func (c *EventHubConnector) NeedsSetupMetadataTables() bool { + return c.pgMetadata.NeedsSetupMetadata() +} + +func (c *EventHubConnector) SetupMetadataTables() error { + err := c.pgMetadata.SetupMetadata() + if err != nil { + log.Errorf("failed to setup metadata tables: %v", err) + return err + } + + return nil +} + +func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) { + syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName) + if err != nil { + return 0, err + } + + return syncBatchID, nil +} + +func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { + res, err := c.pgMetadata.FetchLastOffset(jobName) + if err != nil { + return nil, err + } + + return res, nil +} + +func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error { + err := c.pgMetadata.UpdateLastOffset(jobName, offset) + if err != nil { + log.Errorf("failed to update last offset: %v", err) + return err + } + + return nil +} + func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string { return fmt.Sprintf("syncing records to eventhub with"+ @@ -177,7 +222,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S log.Errorf("failed to update last offset: %v", err) return nil, err } - err = c.incrementSyncBatchID(req.FlowJobName) + err = c.pgMetadata.IncrementID(req.FlowJobName) if err != nil { log.Errorf("%v", err) return nil, err @@ -311,3 +356,11 @@ func (c *EventHubConnector) SetupNormalizedTables( TableExistsMapping: nil, }, nil } + +func (c *EventHubConnector) SyncFlowCleanup(jobName string) error { + err := c.pgMetadata.DropMetadata(jobName) + if err != nil { + return err + } + return nil +} diff --git a/flow/connectors/eventhub/metadata.go b/flow/connectors/eventhub/metadata.go deleted file mode 100644 index 178ebbb07..000000000 --- a/flow/connectors/eventhub/metadata.go +++ /dev/null @@ -1,236 +0,0 @@ -package conneventhub - -import ( - "context" - - "github.com/PeerDB-io/peer-flow/connectors/utils" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/jackc/pgx/v5/pgxpool" - log "github.com/sirupsen/logrus" -) - -const ( - // schema for the peerdb metadata - metadataSchema = "peerdb_eventhub_metadata" - // The name of the table that stores the last sync state. - lastSyncStateTableName = "last_sync_state" -) - -type PostgresMetadataStore struct { - config *protos.PostgresConfig - pool *pgxpool.Pool -} - -func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresMetadataStore, error) { - connectionString := utils.GetPGConnectionString(pgConfig) - - pool, err := pgxpool.New(ctx, connectionString) - if err != nil { - log.Errorf("failed to create connection pool: %v", err) - return nil, err - } - log.Info("created connection pool for eventhub metadata store") - - return &PostgresMetadataStore{ - config: pgConfig, - pool: pool, - }, nil -} - -func (p *PostgresMetadataStore) Close() error { - if p.pool != nil { - p.pool.Close() - } - - return nil -} - -func (c *EventHubConnector) NeedsSetupMetadataTables() bool { - ms := c.pgMetadata - - // check if schema exists - rows := ms.pool.QueryRow(c.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", metadataSchema) - - var exists int64 - err := rows.Scan(&exists) - if err != nil { - log.Errorf("failed to check if schema exists: %v", err) - return false - } - - if exists > 0 { - return true - } - - return true -} - -func (c *EventHubConnector) SetupMetadataTables() error { - ms := c.pgMetadata - - // start a transaction - tx, err := ms.pool.Begin(c.ctx) - if err != nil { - log.Errorf("failed to start transaction: %v", err) - return err - } - - // create the schema - _, err = tx.Exec(c.ctx, "CREATE SCHEMA IF NOT EXISTS "+metadataSchema) - if err != nil { - log.Errorf("failed to create schema: %v", err) - return err - } - - // create the last sync state table - _, err = tx.Exec(c.ctx, ` - CREATE TABLE IF NOT EXISTS `+metadataSchema+`.`+lastSyncStateTableName+` ( - job_name TEXT PRIMARY KEY NOT NULL, - last_offset BIGINT NOT NULL, - updated_at TIMESTAMP NOT NULL DEFAULT NOW(), - sync_batch_id BIGINT NOT NULL - ) - `) - if err != nil { - log.Errorf("failed to create last sync state table: %v", err) - return err - } - - // commit the transaction - err = tx.Commit(c.ctx) - if err != nil { - log.Errorf("failed to commit transaction: %v", err) - return err - } - - return nil -} - -func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { - ms := c.pgMetadata - - rows := ms.pool.QueryRow(c.ctx, ` - SELECT last_offset - FROM `+metadataSchema+`.`+lastSyncStateTableName+` - WHERE job_name = $1 - `, jobName) - - var offset int64 - err := rows.Scan(&offset) - if err != nil { - // if the job doesn't exist, return 0 - if err.Error() == "no rows in result set" { - return &protos.LastSyncState{ - Checkpoint: 0, - }, nil - } - - log.WithFields(log.Fields{ - "flowName": jobName, - }).Errorf("failed to get last offset: %v", err) - return nil, err - } - - log.Infof("got last offset for job `%s`: %d", jobName, offset) - - return &protos.LastSyncState{ - Checkpoint: offset, - }, nil -} - -func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) { - ms := c.pgMetadata - - rows := ms.pool.QueryRow(c.ctx, ` - SELECT sync_batch_id - FROM `+metadataSchema+`.`+lastSyncStateTableName+` - WHERE job_name = $1 - `, jobName) - - var syncBatchID int64 - err := rows.Scan(&syncBatchID) - if err != nil { - // if the job doesn't exist, return 0 - if err.Error() == "no rows in result set" { - return 0, nil - } - - log.WithFields(log.Fields{ - "flowName": jobName, - }).Errorf("failed to get last offset: %v", err) - return 0, err - } - - log.Infof("got last sync batch ID for job `%s`: %d", jobName, syncBatchID) - - return syncBatchID, nil -} - -// update offset for a job -func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error { - ms := c.pgMetadata - - // start a transaction - tx, err := ms.pool.Begin(c.ctx) - if err != nil { - log.Errorf("failed to start transaction: %v", err) - return err - } - - // update the last offset - log.WithFields(log.Fields{ - "flowName": jobName, - }).Infof("updating last offset for job `%s` to `%d`", jobName, offset) - _, err = tx.Exec(c.ctx, ` - INSERT INTO `+metadataSchema+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) - VALUES ($1, $2, $3) - ON CONFLICT (job_name) - DO UPDATE SET last_offset = $2, updated_at = NOW() - `, jobName, offset, 0) - - if err != nil { - log.WithFields(log.Fields{ - "flowName": jobName, - }).Errorf("failed to update last offset: %v", err) - return err - } - - // commit the transaction - err = tx.Commit(c.ctx) - if err != nil { - log.Errorf("failed to commit transaction: %v", err) - return err - } - - return nil -} - -// update offset for a job -func (c *EventHubConnector) incrementSyncBatchID(jobName string) error { - ms := c.pgMetadata - - log.WithFields(log.Fields{ - "flowName": jobName, - }).Infof("incrementing sync batch id for job `%s`", jobName) - _, err := ms.pool.Exec(c.ctx, ` - UPDATE `+metadataSchema+`.`+lastSyncStateTableName+` - SET sync_batch_id=sync_batch_id+1 WHERE job_name=$1 - `, jobName) - - if err != nil { - log.WithFields(log.Fields{ - "flowName": jobName, - }).Errorf("failed to increment sync batch id: %v", err) - return err - } - - return nil -} - -func (c *EventHubConnector) SyncFlowCleanup(jobName string) error { - _, err := c.pgMetadata.pool.Exec(c.ctx, ` - DELETE FROM `+metadataSchema+`.`+lastSyncStateTableName+` - WHERE job_name = $1 - `, jobName) - return err -} diff --git a/flow/connectors/s3/metadata.go b/flow/connectors/external_metadata/store.go similarity index 66% rename from flow/connectors/s3/metadata.go rename to flow/connectors/external_metadata/store.go index fb0a24841..68ff6a068 100644 --- a/flow/connectors/s3/metadata.go +++ b/flow/connectors/external_metadata/store.go @@ -1,4 +1,4 @@ -package conns3 +package connmetadata import ( "context" @@ -10,16 +10,18 @@ import ( ) const ( - metadataSchema = "peerdb_s3_metadata" lastSyncStateTableName = "last_sync_state" ) type PostgresMetadataStore struct { - config *protos.PostgresConfig - pool *pgxpool.Pool + ctx context.Context + config *protos.PostgresConfig + pool *pgxpool.Pool + schemaName string } -func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresMetadataStore, error) { +func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig, + schemaName string) (*PostgresMetadataStore, error) { connectionString := utils.GetPGConnectionString(pgConfig) pool, err := pgxpool.New(ctx, connectionString) @@ -27,11 +29,13 @@ func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConf log.Errorf("failed to create connection pool: %v", err) return nil, err } - log.Info("created connection pool for s3 metadata store") + log.Info("created connection pool for metadata store") return &PostgresMetadataStore{ - config: pgConfig, - pool: pool, + ctx: ctx, + config: pgConfig, + pool: pool, + schemaName: schemaName, }, nil } @@ -43,11 +47,9 @@ func (p *PostgresMetadataStore) Close() error { return nil } -func (c *S3Connector) NeedsSetupMetadataTables() bool { - ms := c.pgMetadata - +func (p *PostgresMetadataStore) NeedsSetupMetadata() bool { // check if schema exists - rows := ms.pool.QueryRow(c.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", metadataSchema) + rows := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName) var exists int64 err := rows.Scan(&exists) @@ -63,26 +65,24 @@ func (c *S3Connector) NeedsSetupMetadataTables() bool { return true } -func (c *S3Connector) SetupMetadataTables() error { - ms := c.pgMetadata - +func (p *PostgresMetadataStore) SetupMetadata() error { // start a transaction - tx, err := ms.pool.Begin(c.ctx) + tx, err := p.pool.Begin(p.ctx) if err != nil { log.Errorf("failed to start transaction: %v", err) return err } // create the schema - _, err = tx.Exec(c.ctx, "CREATE SCHEMA IF NOT EXISTS "+metadataSchema) + _, err = tx.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName) if err != nil { log.Errorf("failed to create schema: %v", err) return err } // create the last sync state table - _, err = tx.Exec(c.ctx, ` - CREATE TABLE IF NOT EXISTS `+metadataSchema+`.`+lastSyncStateTableName+` ( + _, err = tx.Exec(p.ctx, ` + CREATE TABLE IF NOT EXISTS `+p.schemaName+`.`+lastSyncStateTableName+` ( job_name TEXT PRIMARY KEY NOT NULL, last_offset BIGINT NOT NULL, updated_at TIMESTAMP NOT NULL DEFAULT NOW(), @@ -93,9 +93,10 @@ func (c *S3Connector) SetupMetadataTables() error { log.Errorf("failed to create last sync state table: %v", err) return err } + log.Infof("created external metadata table %s.%s", p.schemaName, lastSyncStateTableName) // commit the transaction - err = tx.Commit(c.ctx) + err = tx.Commit(p.ctx) if err != nil { log.Errorf("failed to commit transaction: %v", err) return err @@ -104,15 +105,12 @@ func (c *S3Connector) SetupMetadataTables() error { return nil } -func (c *S3Connector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { - ms := c.pgMetadata - - rows := ms.pool.QueryRow(c.ctx, ` +func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (*protos.LastSyncState, error) { + rows := p.pool.QueryRow(p.ctx, ` SELECT last_offset - FROM `+metadataSchema+`.`+lastSyncStateTableName+` + FROM `+p.schemaName+`.`+lastSyncStateTableName+` WHERE job_name = $1 `, jobName) - var offset int64 err := rows.Scan(&offset) if err != nil { @@ -136,12 +134,10 @@ func (c *S3Connector) GetLastOffset(jobName string) (*protos.LastSyncState, erro }, nil } -func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) { - ms := c.pgMetadata - - rows := ms.pool.QueryRow(c.ctx, ` +func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { + rows := p.pool.QueryRow(p.ctx, ` SELECT sync_batch_id - FROM `+metadataSchema+`.`+lastSyncStateTableName+` + FROM `+p.schemaName+`.`+lastSyncStateTableName+` WHERE job_name = $1 `, jobName) @@ -165,11 +161,10 @@ func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) { } // update offset for a job -func (c *S3Connector) updateLastOffset(jobName string, offset int64) error { - ms := c.pgMetadata +func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) error { // start a transaction - tx, err := ms.pool.Begin(c.ctx) + tx, err := p.pool.Begin(p.ctx) if err != nil { log.Errorf("failed to start transaction: %v", err) return err @@ -179,8 +174,8 @@ func (c *S3Connector) updateLastOffset(jobName string, offset int64) error { log.WithFields(log.Fields{ "flowName": jobName, }).Infof("updating last offset for job `%s` to `%d`", jobName, offset) - _, err = tx.Exec(c.ctx, ` - INSERT INTO `+metadataSchema+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) + _, err = tx.Exec(p.ctx, ` + INSERT INTO `+p.schemaName+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) VALUES ($1, $2, $3) ON CONFLICT (job_name) DO UPDATE SET last_offset = $2, updated_at = NOW() @@ -194,7 +189,7 @@ func (c *S3Connector) updateLastOffset(jobName string, offset int64) error { } // commit the transaction - err = tx.Commit(c.ctx) + err = tx.Commit(p.ctx) if err != nil { log.Errorf("failed to commit transaction: %v", err) return err @@ -204,14 +199,13 @@ func (c *S3Connector) updateLastOffset(jobName string, offset int64) error { } // update offset for a job -func (c *S3Connector) incrementSyncBatchID(jobName string) error { - ms := c.pgMetadata +func (p *PostgresMetadataStore) IncrementID(jobName string) error { log.WithFields(log.Fields{ "flowName": jobName, }).Infof("incrementing sync batch id for job `%s`", jobName) - _, err := ms.pool.Exec(c.ctx, ` - UPDATE `+metadataSchema+`.`+lastSyncStateTableName+` + _, err := p.pool.Exec(p.ctx, ` + UPDATE `+p.schemaName+`.`+lastSyncStateTableName+` SET sync_batch_id=sync_batch_id+1 WHERE job_name=$1 `, jobName) @@ -225,9 +219,9 @@ func (c *S3Connector) incrementSyncBatchID(jobName string) error { return nil } -func (c *S3Connector) SyncFlowCleanup(jobName string) error { - _, err := c.pgMetadata.pool.Exec(c.ctx, ` - DELETE FROM `+metadataSchema+`.`+lastSyncStateTableName+` +func (p *PostgresMetadataStore) DropMetadata(jobName string) error { + _, err := p.pool.Exec(p.ctx, ` + DELETE FROM `+p.schemaName+`.`+lastSyncStateTableName+` WHERE job_name = $1 `, jobName) return err diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 9a78757ea..a96dcb481 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -5,20 +5,19 @@ import ( "fmt" "time" + metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/connectors/utils/metrics" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/aws/aws-sdk-go/service/s3" - "github.com/google/uuid" log "github.com/sirupsen/logrus" ) type S3Connector struct { ctx context.Context url string - pgMetadata *PostgresMetadataStore + pgMetadata *metadataStore.PostgresMetadataStore client s3.S3 creds utils.S3PeerCredentials } @@ -56,7 +55,9 @@ func NewS3Connector(ctx context.Context, if err != nil { return nil, fmt.Errorf("failed to create S3 client: %w", err) } - pgMetadata, err := NewPostgresMetadataStore(ctx, config.GetMetadataDb()) + metadataSchemaName := "peerdb_s3_metadata" + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, + config.GetMetadataDb(), metadataSchemaName) if err != nil { log.Errorf("failed to create postgres metadata store: %v", err) return nil, err @@ -71,11 +72,69 @@ func NewS3Connector(ctx context.Context, }, nil } +func (c *S3Connector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { + log.Infof("CreateRawTable for S3 is a no-op") + return nil, nil +} + +func (c *S3Connector) InitializeTableSchema(req map[string]*protos.TableSchema) error { + log.Infof("InitializeTableSchema for S3 is a no-op") + return nil +} + func (c *S3Connector) Close() error { log.Debugf("Closing s3 connector is a noop") return nil } +func (c *S3Connector) ConnectionActive() bool { + _, err := c.client.ListBuckets(nil) + return err == nil +} + +func (c *S3Connector) NeedsSetupMetadataTables() bool { + return c.pgMetadata.NeedsSetupMetadata() +} + +func (c *S3Connector) SetupMetadataTables() error { + err := c.pgMetadata.SetupMetadata() + if err != nil { + log.Errorf("failed to setup metadata tables: %v", err) + return err + } + + return nil +} + +func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) { + syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName) + if err != nil { + return 0, err + } + + return syncBatchID, nil +} + +func (c *S3Connector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { + res, err := c.pgMetadata.FetchLastOffset(jobName) + if err != nil { + return nil, err + } + + return res, nil +} + +// update offset for a job +func (c *S3Connector) updateLastOffset(jobName string, offset int64) error { + err := c.pgMetadata.UpdateLastOffset(jobName, offset) + if err != nil { + log.Errorf("failed to update last offset: %v", err) + return err + } + + return nil +} + func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { if len(req.Records.Records) == 0 { return &model.SyncResponse{ @@ -91,181 +150,19 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes } syncBatchID = syncBatchID + 1 lastCP := req.Records.LastCheckPointID - recordStream := model.NewQRecordStream(len(req.Records.Records)) - err = recordStream.SetSchema(&model.QRecordSchema{ - Fields: []*model.QField{ - { - Name: "_peerdb_uid", - Type: qvalue.QValueKindString, - Nullable: false, - }, - { - Name: "_peerdb_timestamp", - Type: qvalue.QValueKindInt64, - Nullable: false, - }, - { - Name: "_peerdb_destination_table_name", - Type: qvalue.QValueKindString, - Nullable: false, - }, - { - Name: "_peerdb_data", - Type: qvalue.QValueKindString, - Nullable: false, - }, - { - Name: "_peerdb_record_type", - Type: qvalue.QValueKindInt64, - Nullable: true, - }, - { - Name: "_peerdb_match_data", - Type: qvalue.QValueKindString, - Nullable: true, - }, - { - Name: "_peerdb_batch_id", - Type: qvalue.QValueKindInt64, - Nullable: true, - }, - { - Name: "_peerdb_unchanged_toast_columns", - Type: qvalue.QValueKindString, - Nullable: true, - }, - }, + tableNameRowsMapping := make(map[string]uint32) + streamRes, err := utils.RecordsToRawTableStream(model.RecordsToStreamRequest{ + Records: req.Records.Records, + TableMapping: tableNameRowsMapping, + CP: 0, + BatchID: syncBatchID, }) if err != nil { - return nil, err - } - - first := true - var firstCP int64 = 0 - tableNameRowsMapping := make(map[string]uint32) - - for _, record := range req.Records.Records { - var entries [8]qvalue.QValue - switch typedRecord := record.(type) { - case *model.InsertRecord: - // json.Marshal converts bytes in Hex automatically to BASE64 string. - itemsJSON, err := typedRecord.Items.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize insert record items to JSON: %w", err) - } - - // add insert record to the raw table - entries[2] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: typedRecord.DestinationTableName, - } - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: itemsJSON, - } - entries[4] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: 0, - } - entries[5] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: "", - } - entries[7] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: utils.KeysToString(typedRecord.UnchangedToastColumns), - } - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - case *model.UpdateRecord: - newItemsJSON, err := typedRecord.NewItems.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err) - } - oldItemsJSON, err := typedRecord.OldItems.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize update record old items to JSON: %w", err) - } - - entries[2] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: typedRecord.DestinationTableName, - } - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: newItemsJSON, - } - entries[4] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: 1, - } - entries[5] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: oldItemsJSON, - } - entries[7] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: utils.KeysToString(typedRecord.UnchangedToastColumns), - } - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - case *model.DeleteRecord: - itemsJSON, err := typedRecord.Items.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize delete record items to JSON: %w", err) - } - - // append delete record to the raw table - entries[2] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: typedRecord.DestinationTableName, - } - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: itemsJSON, - } - entries[4] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: 2, - } - entries[5] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: itemsJSON, - } - entries[7] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: utils.KeysToString(typedRecord.UnchangedToastColumns), - } - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - default: - return nil, fmt.Errorf("record type %T not supported in Snowflake flow connector", typedRecord) - } - - if first { - firstCP = record.GetCheckPointID() - first = false - } - - entries[0] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: uuid.New().String(), - } - entries[1] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: time.Now().UnixNano(), - } - entries[6] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: syncBatchID, - } - - recordStream.Records <- &model.QRecordOrError{ - Record: &model.QRecord{ - NumEntries: 8, - Entries: entries[:], - }, - } + return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err) } - + firstCP := streamRes.CP + recordStream := streamRes.Stream qrepConfig := &protos.QRepConfig{ FlowJobName: req.FlowJobName, DestinationTableIdentifier: fmt.Sprintf("raw_table_%s", req.FlowJobName), @@ -285,7 +182,7 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes log.Errorf("failed to update last offset for s3 cdc: %v", err) return nil, err } - err = c.incrementSyncBatchID(req.FlowJobName) + err = c.pgMetadata.IncrementID(req.FlowJobName) if err != nil { log.Errorf("%v", err) return nil, err @@ -299,16 +196,6 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes }, nil } -func (c *S3Connector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { - log.Infof("CreateRawTable for S3 is a no-op") - return nil, nil -} - -func (c *S3Connector) InitializeTableSchema(req map[string]*protos.TableSchema) error { - log.Infof("InitializeTableSchema for S3 is a no-op") - return nil -} - func (c *S3Connector) SetupNormalizedTables(req *protos.SetupNormalizedTableBatchInput) ( *protos.SetupNormalizedTableBatchOutput, error) { @@ -316,7 +203,10 @@ func (c *S3Connector) SetupNormalizedTables(req *protos.SetupNormalizedTableBatc return nil, nil } -func (c *S3Connector) ConnectionActive() bool { - _, err := c.client.ListBuckets(nil) - return err == nil +func (c *S3Connector) SyncFlowCleanup(jobName string) error { + err := c.pgMetadata.DropMetadata(jobName) + if err != nil { + return err + } + return nil } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 4335ee85e..7046746ec 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -645,183 +645,20 @@ func (c *SnowflakeConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest, ra func (c *SnowflakeConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest, rawTableIdentifier string, syncBatchID int64) (*model.SyncResponse, error) { - recordStream := model.NewQRecordStream(len(req.Records.Records)) - - err := recordStream.SetSchema(&model.QRecordSchema{ - Fields: []*model.QField{ - { - Name: "_peerdb_uid", - Type: qvalue.QValueKindString, - Nullable: false, - }, - { - Name: "_peerdb_timestamp", - Type: qvalue.QValueKindInt64, - Nullable: false, - }, - { - Name: "_peerdb_destination_table_name", - Type: qvalue.QValueKindString, - Nullable: false, - }, - { - Name: "_peerdb_data", - Type: qvalue.QValueKindString, - Nullable: false, - }, - { - Name: "_peerdb_record_type", - Type: qvalue.QValueKindInt64, - Nullable: true, - }, - { - Name: "_peerdb_match_data", - Type: qvalue.QValueKindString, - Nullable: true, - }, - { - Name: "_peerdb_batch_id", - Type: qvalue.QValueKindInt64, - Nullable: true, - }, - { - Name: "_peerdb_unchanged_toast_columns", - Type: qvalue.QValueKindString, - Nullable: true, - }, - }, - }) - if err != nil { - return nil, err - } - first := true - var firstCP int64 = 0 lastCP := req.Records.LastCheckPointID tableNameRowsMapping := make(map[string]uint32) - - for _, record := range req.Records.Records { - var entries [8]qvalue.QValue - switch typedRecord := record.(type) { - case *model.InsertRecord: - // json.Marshal converts bytes in Hex automatically to BASE64 string. - itemsJSON, err := typedRecord.Items.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize insert record items to JSON: %w", err) - } - - // add insert record to the raw table - entries[2] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: typedRecord.DestinationTableName, - } - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: itemsJSON, - } - entries[4] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: 0, - } - entries[5] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: "", - } - entries[7] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: utils.KeysToString(typedRecord.UnchangedToastColumns), - } - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - case *model.UpdateRecord: - newItemsJSON, err := typedRecord.NewItems.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err) - } - oldItemsJSON, err := typedRecord.OldItems.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize update record old items to JSON: %w", err) - } - - // add update record to the raw table - entries[2] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: typedRecord.DestinationTableName, - } - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: newItemsJSON, - } - entries[4] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: 1, - } - entries[5] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: oldItemsJSON, - } - entries[7] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: utils.KeysToString(typedRecord.UnchangedToastColumns), - } - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - case *model.DeleteRecord: - itemsJSON, err := typedRecord.Items.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize delete record items to JSON: %w", err) - } - - // append delete record to the raw table - entries[2] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: typedRecord.DestinationTableName, - } - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: itemsJSON, - } - entries[4] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: 2, - } - entries[5] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: itemsJSON, - } - entries[7] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: utils.KeysToString(typedRecord.UnchangedToastColumns), - } - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - default: - return nil, fmt.Errorf("record type %T not supported in Snowflake flow connector", typedRecord) - } - - if first { - firstCP = record.GetCheckPointID() - first = false - } - - entries[0] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: uuid.New().String(), - } - entries[1] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: time.Now().UnixNano(), - } - entries[6] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: syncBatchID, - } - - recordStream.Records <- &model.QRecordOrError{ - Record: &model.QRecord{ - NumEntries: 8, - Entries: entries[:], - }, - } + streamRes, err := utils.RecordsToRawTableStream(model.RecordsToStreamRequest{ + Records: req.Records.Records, + TableMapping: tableNameRowsMapping, + CP: 0, + BatchID: syncBatchID, + }) + if err != nil { + return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err) } - + firstCP := streamRes.CP + recordStream := streamRes.Stream qrepConfig := &protos.QRepConfig{ StagingPath: "", FlowJobName: req.FlowJobName, @@ -841,7 +678,6 @@ func (c *SnowflakeConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest, r return nil, err } metrics.LogSyncMetrics(c.ctx, req.FlowJobName, int64(numRecords), time.Since(startTime)) - return &model.SyncResponse{ FirstSyncedCheckPointID: firstCP, LastSyncedCheckPointID: lastCP, diff --git a/flow/connectors/utils/stream.go b/flow/connectors/utils/stream.go new file mode 100644 index 000000000..2d8ed15b9 --- /dev/null +++ b/flow/connectors/utils/stream.go @@ -0,0 +1,189 @@ +package utils + +import ( + "fmt" + "time" + + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/google/uuid" +) + +func RecordsToRawTableStream(req model.RecordsToStreamRequest) (*model.RecordsToStreamResponse, error) { + recordStream := model.NewQRecordStream(len(req.Records)) + err := recordStream.SetSchema(&model.QRecordSchema{ + Fields: []*model.QField{ + { + Name: "_peerdb_uid", + Type: qvalue.QValueKindString, + Nullable: false, + }, + { + Name: "_peerdb_timestamp", + Type: qvalue.QValueKindInt64, + Nullable: false, + }, + { + Name: "_peerdb_destination_table_name", + Type: qvalue.QValueKindString, + Nullable: false, + }, + { + Name: "_peerdb_data", + Type: qvalue.QValueKindString, + Nullable: false, + }, + { + Name: "_peerdb_record_type", + Type: qvalue.QValueKindInt64, + Nullable: true, + }, + { + Name: "_peerdb_match_data", + Type: qvalue.QValueKindString, + Nullable: true, + }, + { + Name: "_peerdb_batch_id", + Type: qvalue.QValueKindInt64, + Nullable: true, + }, + { + Name: "_peerdb_unchanged_toast_columns", + Type: qvalue.QValueKindString, + Nullable: true, + }, + }, + }) + if err != nil { + return nil, err + } + + first := true + var firstCP int64 = req.CP + for _, record := range req.Records { + var entries [8]qvalue.QValue + switch typedRecord := record.(type) { + case *model.InsertRecord: + // json.Marshal converts bytes in Hex automatically to BASE64 string. + itemsJSON, err := typedRecord.Items.ToJSON() + if err != nil { + return nil, fmt.Errorf("failed to serialize insert record items to JSON: %w", err) + } + + // add insert record to the raw table + entries[2] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: typedRecord.DestinationTableName, + } + entries[3] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: itemsJSON, + } + entries[4] = qvalue.QValue{ + Kind: qvalue.QValueKindInt64, + Value: 0, + } + entries[5] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: "", + } + entries[7] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: KeysToString(typedRecord.UnchangedToastColumns), + } + req.TableMapping[typedRecord.DestinationTableName] += 1 + case *model.UpdateRecord: + newItemsJSON, err := typedRecord.NewItems.ToJSON() + if err != nil { + return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err) + } + oldItemsJSON, err := typedRecord.OldItems.ToJSON() + if err != nil { + return nil, fmt.Errorf("failed to serialize update record old items to JSON: %w", err) + } + + entries[2] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: typedRecord.DestinationTableName, + } + entries[3] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: newItemsJSON, + } + entries[4] = qvalue.QValue{ + Kind: qvalue.QValueKindInt64, + Value: 1, + } + entries[5] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: oldItemsJSON, + } + entries[7] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: KeysToString(typedRecord.UnchangedToastColumns), + } + req.TableMapping[typedRecord.DestinationTableName] += 1 + case *model.DeleteRecord: + itemsJSON, err := typedRecord.Items.ToJSON() + if err != nil { + return nil, fmt.Errorf("failed to serialize delete record items to JSON: %w", err) + } + + // append delete record to the raw table + entries[2] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: typedRecord.DestinationTableName, + } + entries[3] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: itemsJSON, + } + entries[4] = qvalue.QValue{ + Kind: qvalue.QValueKindInt64, + Value: 2, + } + entries[5] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: itemsJSON, + } + entries[7] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: KeysToString(typedRecord.UnchangedToastColumns), + } + req.TableMapping[typedRecord.DestinationTableName] += 1 + default: + return nil, fmt.Errorf("record type %T not supported in Snowflake flow connector", typedRecord) + } + + if first { + firstCP = record.GetCheckPointID() + first = false + } + + entries[0] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: uuid.New().String(), + } + entries[1] = qvalue.QValue{ + Kind: qvalue.QValueKindInt64, + Value: time.Now().UnixNano(), + } + entries[6] = qvalue.QValue{ + Kind: qvalue.QValueKindInt64, + Value: req.BatchID, + } + + recordStream.Records <- &model.QRecordOrError{ + Record: &model.QRecord{ + NumEntries: 8, + Entries: entries[:], + }, + } + } + + return &model.RecordsToStreamResponse{ + Stream: recordStream, + CP: firstCP, + }, nil +} diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index 0ec424ea8..aaca2a125 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -44,16 +44,15 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 9, - MaxBatchSize: 20, + TotalSyncFlows: 5, + MaxBatchSize: 5, } - // Insert 100 rows into postgres, update 20 rows, and delete 20 rows go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) s.NoError(err) - //insert 100 - for i := 0; i < 100; i++ { + //insert 20 rows + for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -61,19 +60,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { `, srcTableName), testKey, testValue) s.NoError(err) } - //update 20 - for i := 0; i < 20; i++ { - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - UPDATE %s SET value=$1 where id=$2 - `, srcTableName), "updated_value", i) - s.NoError(err) - } - //delete 20 - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - DELETE FROM %s where id < 20 - `, srcTableName)) s.NoError(err) - fmt.Println("Inserted 100 rows into the source table") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -93,7 +80,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { fmt.Println("Files in Test_Complete_Simple_Flow_S3: ", len(files)) require.NoError(s.T(), err) - require.Equal(s.T(), 8, len(files)) + require.Equal(s.T(), 4, len(files)) env.AssertExpectations(s.T()) } @@ -128,16 +115,15 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 9, - MaxBatchSize: 20, + TotalSyncFlows: 5, + MaxBatchSize: 5, } - // Insert 100 rows into postgres, update 20 rows, and delete 20 rows go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) s.NoError(err) - //insert 100 - for i := 0; i < 100; i++ { + //insert 20 rows + for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -145,19 +131,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { `, srcTableName), testKey, testValue) s.NoError(err) } - //update 20 - for i := 0; i < 20; i++ { - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - UPDATE %s SET value=$1 where id=$2 - `, srcTableName), "updated_value", i) - s.NoError(err) - } - //delete 20 - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - DELETE FROM %s where id < 20 - `, srcTableName)) s.NoError(err) - fmt.Println("Inserted 100 rows into the source table") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -174,10 +148,10 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { defer cancel() fmt.Println("JobName: ", flowJobName) files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) - fmt.Println("Files in Test_Complete_Simple_Flow_S3: ", len(files)) + fmt.Println("Files in Test_Complete_Simple_Flow_GCS: ", len(files)) require.NoError(s.T(), err) - require.Equal(s.T(), 8, len(files)) + require.Equal(s.T(), 4, len(files)) env.AssertExpectations(s.T()) } diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index 1fb22826e..a338ace4f 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -19,6 +19,18 @@ type QRecordStream struct { schemaCache *QRecordSchema } +type RecordsToStreamRequest struct { + Records []Record + TableMapping map[string]uint32 + CP int64 + BatchID int64 +} + +type RecordsToStreamResponse struct { + Stream *QRecordStream + CP int64 +} + func NewQRecordStream(buffer int) *QRecordStream { return &QRecordStream{ schema: make(chan *QRecordSchemaOrError, 1), From 6b37a36a55b6fa059314e4b7bc7f48650634a1d2 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Sat, 14 Oct 2023 01:33:15 +0530 Subject: [PATCH 4/5] lint --- flow/connectors/external_metadata/store.go | 2 -- flow/connectors/utils/stream.go | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 68ff6a068..6ec7c0714 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -162,7 +162,6 @@ func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { // update offset for a job func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) error { - // start a transaction tx, err := p.pool.Begin(p.ctx) if err != nil { @@ -200,7 +199,6 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e // update offset for a job func (p *PostgresMetadataStore) IncrementID(jobName string) error { - log.WithFields(log.Fields{ "flowName": jobName, }).Infof("incrementing sync batch id for job `%s`", jobName) diff --git a/flow/connectors/utils/stream.go b/flow/connectors/utils/stream.go index 2d8ed15b9..422106ea5 100644 --- a/flow/connectors/utils/stream.go +++ b/flow/connectors/utils/stream.go @@ -60,7 +60,7 @@ func RecordsToRawTableStream(req model.RecordsToStreamRequest) (*model.RecordsTo } first := true - var firstCP int64 = req.CP + firstCP := req.CP for _, record := range req.Records { var entries [8]qvalue.QValue switch typedRecord := record.(type) { From 6e4b5e0cf4222e4faa7625623341966c77473b5e Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Sat, 14 Oct 2023 02:02:59 +0530 Subject: [PATCH 5/5] lint fix --- flow/connectors/eventhub/eventhub.go | 2 +- flow/connectors/s3/s3.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 3c03ede0e..51dba78b7 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -41,7 +41,7 @@ func NewEventHubConnector( } hubManager := NewEventHubManager(ctx, defaultAzureCreds, config) - metadataSchemaName := "peerdb_eventhub_metadata" + metadataSchemaName := "peerdb_eventhub_metadata" // #nosec G101 pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, config.GetMetadataDb(), metadataSchemaName) if err != nil { diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index a96dcb481..51e163f13 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -55,7 +55,7 @@ func NewS3Connector(ctx context.Context, if err != nil { return nil, fmt.Errorf("failed to create S3 client: %w", err) } - metadataSchemaName := "peerdb_s3_metadata" + metadataSchemaName := "peerdb_s3_metadata" // #nosec G101 pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, config.GetMetadataDb(), metadataSchemaName) if err != nil {