Skip to content

Commit

Permalink
Addressing comments and correcting LaunchDataflowJob method
Browse files Browse the repository at this point in the history
  • Loading branch information
darshan-sj committed Apr 4, 2024
1 parent 8820898 commit f52849c
Show file tree
Hide file tree
Showing 11 changed files with 27 additions and 25 deletions.
2 changes: 1 addition & 1 deletion conversion/conversion_from_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (sads *DataFromSourceImpl) dataFromDatabase(ctx context.Context, migrationP
if err != nil {
return nil, err
}
dfOutput, err := infoSchema.StartStreamingMigration(ctx, client, conv, streamInfo)
dfOutput, err := infoSchema.StartStreamingMigration(ctx, migrationProjectId, client, conv, streamInfo)

Check warning on line 255 in conversion/conversion_from_source.go

View check run for this annotation

Codecov / codecov/patch

conversion/conversion_from_source.go#L255

Added line #L255 was not covered by tests
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion conversion/data_from_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (dd *DataFromDatabaseImpl) dataFromDatabaseForDataflowMigration(migrationPr
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
streamingCfg.DataflowCfg.DbNameToShardIdMap = dbNameToShardIdMap
dfOutput, err := streaming.StartDataflow(ctx, targetProfile, streamingCfg, conv)
dfOutput, err := streaming.StartDataflow(ctx, migrationProjectId, targetProfile, streamingCfg, conv)

Check warning on line 149 in conversion/data_from_database.go

View check run for this annotation

Codecov / codecov/patch

conversion/data_from_database.go#L149

Added line #L149 was not covered by tests
if err != nil {
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
Expand Down Expand Up @@ -218,6 +218,7 @@ func (dd *DataFromDatabaseImpl) dataFromDatabaseForDataflowMigration(migrationPr

// create monitoring aggregated dashboard for sharded migration
aggMonitoringResources := metrics.MonitoringMetricsResources{
MigrationProjectId: migrationProjectId,
SpannerProjectId: targetProfile.Conn.Sp.Project,
SpannerInstanceId: targetProfile.Conn.Sp.Instance,
SpannerDatabaseId: targetProfile.Conn.Sp.Dbname,
Expand Down
2 changes: 1 addition & 1 deletion sources/common/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type InfoSchema interface {
GetIndexes(conv *internal.Conv, table SchemaAndName, colNameIdMp map[string]string) ([]schema.Index, error)
ProcessData(conv *internal.Conv, tableId string, srcSchema schema.Table, spCols []string, spSchema ddl.CreateTable, additionalAttributes internal.AdditionalDataAttributes) error
StartChangeDataCapture(ctx context.Context, conv *internal.Conv) (map[string]interface{}, error)
StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamInfo map[string]interface{}) (internal.DataflowOutput, error)
StartStreamingMigration(ctx context.Context, migrationProjectId string, client *sp.Client, conv *internal.Conv, streamInfo map[string]interface{}) (internal.DataflowOutput, error)
}

// SchemaAndName contains the schema and name for a table
Expand Down
2 changes: 1 addition & 1 deletion sources/dynamodb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
// StartStreamingMigration starts the streaming migration process by creating a seperate
// worker thread/goroutine for each table's DynamoDB Stream. It catches Ctrl+C signal if
// customer wants to stop the process.
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, latestStreamArn map[string]interface{}) (internal.DataflowOutput, error) {
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, migrationProjectId string, client *sp.Client, conv *internal.Conv, latestStreamArn map[string]interface{}) (internal.DataflowOutput, error) {

Check warning on line 222 in sources/dynamodb/schema.go

View check run for this annotation

Codecov / codecov/patch

sources/dynamodb/schema.go#L222

Added line #L222 was not covered by tests
fmt.Println("Processing of DynamoDB Streams started...")
fmt.Println("Use Ctrl+C to stop the process.")

Expand Down
4 changes: 2 additions & 2 deletions sources/mysql/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,10 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte

// StartStreamingMigration is used for automatic triggering of Dataflow job when
// performing a streaming migration.
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, migrationProjectId string, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {

Check warning on line 392 in sources/mysql/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/mysql/infoschema.go#L392

Added line #L392 was not covered by tests
streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg)

dfOutput, err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)
dfOutput, err := streaming.StartDataflow(ctx, migrationProjectId, isi.TargetProfile, streamingCfg, conv)

Check warning on line 395 in sources/mysql/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/mysql/infoschema.go#L395

Added line #L395 was not covered by tests
if err != nil {
err = fmt.Errorf("error starting dataflow: %v", err)
return internal.DataflowOutput{}, err
Expand Down
4 changes: 2 additions & 2 deletions sources/oracle/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,9 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte

// StartStreamingMigration is used for automatic triggering of Dataflow job when
// performing a streaming migration.
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, migrationProjectId string, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {

Check warning on line 457 in sources/oracle/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/oracle/infoschema.go#L457

Added line #L457 was not covered by tests
streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg)
dfOutput, err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)
dfOutput, err := streaming.StartDataflow(ctx, migrationProjectId, isi.TargetProfile, streamingCfg, conv)

Check warning on line 459 in sources/oracle/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/oracle/infoschema.go#L459

Added line #L459 was not covered by tests
if err != nil {
return internal.DataflowOutput{}, err
}
Expand Down
4 changes: 2 additions & 2 deletions sources/postgres/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte

// StartStreamingMigration is used for automatic triggering of Dataflow job when
// performing a streaming migration.
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, migrationProjectId string, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {

Check warning on line 94 in sources/postgres/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/postgres/infoschema.go#L94

Added line #L94 was not covered by tests
streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg)

dfOutput, err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)
dfOutput, err := streaming.StartDataflow(ctx, migrationProjectId, isi.TargetProfile, streamingCfg, conv)

Check warning on line 97 in sources/postgres/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/postgres/infoschema.go#L97

Added line #L97 was not covered by tests
if err != nil {
err = fmt.Errorf("error starting dataflow: %v", err)
return internal.DataflowOutput{}, err
Expand Down
2 changes: 1 addition & 1 deletion sources/spanner/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
return nil, nil
}

func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *spanner.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, migrationProjectId string, client *spanner.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {

Check warning on line 82 in sources/spanner/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/spanner/infoschema.go#L82

Added line #L82 was not covered by tests
return internal.DataflowOutput{}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion sources/sqlserver/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
return nil, nil
}

func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, migrationProjectId string, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {

Check warning on line 61 in sources/sqlserver/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/sqlserver/infoschema.go#L61

Added line #L61 was not covered by tests
return internal.DataflowOutput{}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion streaming/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func InitiateJobCleanup(ctx context.Context, migrationJobId string, dataShardIds
if err != nil {
logger.Log.Debug("Unable to read Datastream metadata for deletion\n")
} else {
cleanupDatastream(ctx, datastreamResources, migrationJobId)
cleanupDatastream(ctx, datastreamResources, migrationProjectId)

Check warning on line 82 in streaming/cleanup.go

View check run for this annotation

Codecov / codecov/patch

streaming/cleanup.go#L82

Added line #L82 was not covered by tests
}
}
}
Expand Down
25 changes: 13 additions & 12 deletions streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,15 +602,15 @@ func LaunchStream(ctx context.Context, sourceProfile profiles.SourceProfile, dbL
}

// LaunchDataflowJob populates the parameters from the streaming config and triggers a Dataflow job.
func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg, conv *internal.Conv) (internal.DataflowOutput, error) {
project, instance, dbName, _ := targetProfile.GetResourceIds(ctx, time.Now(), "", nil, &utils.GetUtilInfoImpl{})
func LaunchDataflowJob(ctx context.Context, migrationProjectId string, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg, conv *internal.Conv) (internal.DataflowOutput, error) {
spannerProjectId, instance, dbName, _ := targetProfile.GetResourceIds(ctx, time.Now(), "", nil, &utils.GetUtilInfoImpl{})

Check warning on line 606 in streaming/streaming.go

View check run for this annotation

Codecov / codecov/patch

streaming/streaming.go#L605-L606

Added lines #L605 - L606 were not covered by tests
dataflowCfg := streamingCfg.DataflowCfg
datastreamCfg := streamingCfg.DatastreamCfg

// Rate limit this function to match DataFlow createJob Quota.
DATA_FLOW_RL.Take()

fmt.Println("Launching dataflow job ", dataflowCfg.JobName, " in ", project, "-", dataflowCfg.Location)
fmt.Println("Launching dataflow job ", dataflowCfg.JobName, " in ", migrationProjectId, "-", dataflowCfg.Location)

Check warning on line 613 in streaming/streaming.go

View check run for this annotation

Codecov / codecov/patch

streaming/streaming.go#L613

Added line #L613 was not covered by tests

c, err := dataflow.NewFlexTemplatesClient(ctx)
if err != nil {
Expand All @@ -627,7 +627,7 @@ func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile
defer dsClient.Close()

// Fetch the GCS path from the destination connection profile.
dstProf := fmt.Sprintf("projects/%s/locations/%s/connectionProfiles/%s", project, datastreamCfg.DestinationConnectionConfig.Location, datastreamCfg.DestinationConnectionConfig.Name)
dstProf := fmt.Sprintf("projects/%s/locations/%s/connectionProfiles/%s", migrationProjectId, datastreamCfg.DestinationConnectionConfig.Location, datastreamCfg.DestinationConnectionConfig.Name)

Check warning on line 630 in streaming/streaming.go

View check run for this annotation

Codecov / codecov/patch

streaming/streaming.go#L630

Added line #L630 was not covered by tests
res, err := dsClient.GetConnectionProfile(ctx, &datastreampb.GetConnectionProfileRequest{Name: dstProf})
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("could not get connection profiles: %v", err)
Expand All @@ -641,19 +641,19 @@ func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile

// Initiate runtime environment flags and overrides.
var (
dataflowProjectId = project
dataflowVpcHostProjectId = project
dataflowProjectId = migrationProjectId
dataflowVpcHostProjectId = migrationProjectId

Check warning on line 645 in streaming/streaming.go

View check run for this annotation

Codecov / codecov/patch

streaming/streaming.go#L644-L645

Added lines #L644 - L645 were not covered by tests
gcsTemplatePath = DEFAULT_TEMPLATE_PATH
dataflowSubnetwork = ""
workerIpAddressConfig = dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PUBLIC
dataflowUserLabels = make(map[string]string)
machineType = "n1-standard-2"
)
// If project override present, use that otherwise default to Spanner project. Useful when customers want to run Dataflow in separate project.
// If project override present, use that otherwise default to Migration project. Useful when customers want to run Dataflow in separate project.

Check warning on line 652 in streaming/streaming.go

View check run for this annotation

Codecov / codecov/patch

streaming/streaming.go#L652

Added line #L652 was not covered by tests
if dataflowCfg.ProjectId != "" {
dataflowProjectId = dataflowCfg.ProjectId
}
// If VPC Host project override present, use that otherwise default to Spanner project.
// If VPC Host project override present, use that otherwise default to Migration project.
if dataflowCfg.VpcHostProjectId != "" {
dataflowVpcHostProjectId = dataflowCfg.VpcHostProjectId
}
Expand Down Expand Up @@ -706,13 +706,14 @@ func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile
Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: gcsTemplatePath},
Parameters: map[string]string{
"inputFilePattern": utils.ConcatDirectoryPath(inputFilePattern, "data"),
"streamName": fmt.Sprintf("projects/%s/locations/%s/streams/%s", project, datastreamCfg.StreamLocation, datastreamCfg.StreamId),
"streamName": fmt.Sprintf("projects/%s/locations/%s/streams/%s", migrationProjectId, datastreamCfg.StreamLocation, datastreamCfg.StreamId),
"projectId": spannerProjectId,

Check warning on line 710 in streaming/streaming.go

View check run for this annotation

Codecov / codecov/patch

streaming/streaming.go#L709-L710

Added lines #L709 - L710 were not covered by tests
"instanceId": instance,
"databaseId": dbName,
"sessionFilePath": streamingCfg.TmpDir + "session.json",
"deadLetterQueueDirectory": inputFilePattern + "dlq",
"transformationContextFilePath": streamingCfg.TmpDir + "transformationContext.json",
"gcsPubSubSubscription": fmt.Sprintf("projects/%s/subscriptions/%s", project, streamingCfg.PubsubCfg.SubscriptionId),
"gcsPubSubSubscription": fmt.Sprintf("projects/%s/subscriptions/%s", migrationProjectId, streamingCfg.PubsubCfg.SubscriptionId),

Check warning on line 716 in streaming/streaming.go

View check run for this annotation

Codecov / codecov/patch

streaming/streaming.go#L716

Added line #L716 was not covered by tests
},
Environment: &dataflowpb.FlexTemplateRuntimeEnvironment{
MaxWorkers: maxWorkers,
Expand Down Expand Up @@ -881,7 +882,7 @@ func StartDatastream(ctx context.Context, migrationProjectId string, streamingCf
return streamingCfg, nil
}

func StartDataflow(ctx context.Context, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg, conv *internal.Conv) (internal.DataflowOutput, error) {
func StartDataflow(ctx context.Context, migrationProjectId string, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg, conv *internal.Conv) (internal.DataflowOutput, error) {

Check warning on line 885 in streaming/streaming.go

View check run for this annotation

Codecov / codecov/patch

streaming/streaming.go#L885

Added line #L885 was not covered by tests
sc, err := storageclient.NewStorageClientImpl(ctx)
if err != nil {
return internal.DataflowOutput{}, err
Expand All @@ -906,7 +907,7 @@ func StartDataflow(ctx context.Context, targetProfile profiles.TargetProfile, st
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("error while writing to GCS: %v", err)
}
dfOutput, err := LaunchDataflowJob(ctx, targetProfile, streamingCfg, conv)
dfOutput, err := LaunchDataflowJob(ctx, migrationProjectId, targetProfile, streamingCfg, conv)

Check warning on line 910 in streaming/streaming.go

View check run for this annotation

Codecov / codecov/patch

streaming/streaming.go#L910

Added line #L910 was not covered by tests
if err != nil {
return internal.DataflowOutput{}, fmt.Errorf("error launching dataflow: %v", err)
}
Expand Down

0 comments on commit f52849c

Please sign in to comment.