Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(bigquery): add migration quickstart (#2249)
* feat(bigquery): add migration quickstart
- Loading branch information
Showing
12 changed files
with
302 additions
and
48 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
// Copyright 2021 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
// [START bigquerymigration_quickstart] | ||
|
||
// The bigquery_migration_quickstart application demonstrates basic usage of the | ||
// BigQuery migration API by executing a workflow that performs an offline SQL | ||
// translation task. | ||
package main | ||
|
||
import ( | ||
"context" | ||
"flag" | ||
"fmt" | ||
"log" | ||
"time" | ||
|
||
migration "cloud.google.com/go/bigquery/migration/apiv2alpha" | ||
translationtaskpb "google.golang.org/genproto/googleapis/cloud/bigquery/migration/tasks/translation/v2alpha" | ||
migrationpb "google.golang.org/genproto/googleapis/cloud/bigquery/migration/v2alpha" | ||
"google.golang.org/protobuf/types/known/anypb" | ||
) | ||
|
||
func main() { | ||
// Define command line flags for controlling the behavior of this quickstart. | ||
projectID := flag.String("project_id", "", "Cloud Project ID.") | ||
location := flag.String("location", "us", "BigQuery Migration location used for interactions.") | ||
outputPath := flag.String("output", "", "Cloud Storage path for translated resources.") | ||
// Parse flags and do some minimal validation. | ||
flag.Parse() | ||
if *projectID == "" { | ||
log.Fatal("empty --project_id specified, please provide a valid project ID") | ||
} | ||
if *location == "" { | ||
log.Fatal("empty --location specified, please provide a valid location") | ||
} | ||
if *outputPath == "" { | ||
log.Fatalf("empty --output specified, please provide a valid cloud storage path") | ||
} | ||
|
||
ctx := context.Background() | ||
migClient, err := migration.NewClient(ctx) | ||
if err != nil { | ||
log.Fatalf("migration.NewClient: %v", err) | ||
} | ||
defer migClient.Close() | ||
|
||
workflow, err := executeTranslationWorkflow(ctx, migClient, *projectID, *location, *outputPath) | ||
if err != nil { | ||
log.Fatalf("workflow execution failed: %v", err) | ||
} | ||
|
||
reportWorkflowStatus(workflow) | ||
} | ||
|
||
// executeTranslationWorkflow constructs a migration workflow that performs some offline SQL translation. | ||
func executeTranslationWorkflow(ctx context.Context, client *migration.Client, projectID, location, outPath string) (*migrationpb.MigrationWorkflow, error) { | ||
|
||
// Tasks are extensible; the translation task is defined by the BigQuery Migration API, and so we construct the appropriate | ||
// details for the task. | ||
detailsTranslation := &translationtaskpb.TranslationTaskDetails{ | ||
// The path to objects in cloud storage containing queries to be translated. This is a prefix to some input text files. | ||
InputPath: "gs://cloud-samples-data/bigquery/migration/translation/input/", | ||
// The path to objects in cloud storage containing DDL create statements. This is a prefix to some input DDL text files. | ||
SchemaPath: "gs://cloud-samples-data/bigquery/migration/translation/schema/", | ||
// This is the cloud storage path where results will be written. In this case it will contain translated queries, | ||
// and possibly error files. | ||
OutputPath: outPath, | ||
} | ||
|
||
// We then convert the task details for translation into the suitable protobuf `Any` representation needed | ||
// to define the workflow. | ||
detailsAny, err := anypb.New(detailsTranslation) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// Finally, construct the workflow creation request. | ||
req := &migrationpb.CreateMigrationWorkflowRequest{ | ||
Parent: fmt.Sprintf("projects/%s/locations/%s", projectID, location), | ||
MigrationWorkflow: &migrationpb.MigrationWorkflow{ | ||
DisplayName: "example SQL conversion", | ||
Tasks: map[string]*migrationpb.MigrationTask{ | ||
"example_conversion": { | ||
Type: "Translation_Teradata", | ||
Details: detailsAny, | ||
}, | ||
}, | ||
}, | ||
} | ||
|
||
// Create the workflow using the request. | ||
workflow, err := client.CreateMigrationWorkflow(ctx, req) | ||
if err != nil { | ||
return nil, fmt.Errorf("CreateMigrationWorkflow: %v", err) | ||
} | ||
|
||
// This is an asyncronous process, so we now poll the workflow | ||
// until completion or a suitable timeout has elapsed. | ||
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) | ||
defer cancel() | ||
for { | ||
select { | ||
case <-timeoutCtx.Done(): | ||
return nil, fmt.Errorf("task %s didn't complete due to context expiring", workflow.GetName()) | ||
default: | ||
polledWorkflow, err := client.GetMigrationWorkflow(timeoutCtx, &migrationpb.GetMigrationWorkflowRequest{ | ||
Name: workflow.GetName(), | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("polling ended in error: %v", err) | ||
} | ||
if polledWorkflow.GetState() == migrationpb.MigrationWorkflow_COMPLETED { | ||
// polledWorkflow contains the most recent metadata about the workflow, so we return that. | ||
return polledWorkflow, nil | ||
} | ||
// workflow still isn't complete, so sleep briefly before polling again. | ||
time.Sleep(5 * time.Second) | ||
} | ||
} | ||
} | ||
|
||
// reportWorkflowStatus prints information about the workflow execution in a more human readable format. | ||
func reportWorkflowStatus(workflow *migrationpb.MigrationWorkflow) { | ||
fmt.Printf("Migration workflow %s ended in state %s.\n", workflow.GetName(), workflow.GetState().String()) | ||
for k, task := range workflow.GetTasks() { | ||
fmt.Printf(" - Task %s had id %s", k, task.GetId()) | ||
if task.GetProcessingError() != nil { | ||
fmt.Printf(" with processing error: %s", task.GetProcessingError().GetReason()) | ||
} | ||
fmt.Println() | ||
} | ||
} | ||
|
||
// [END bigquerymigration_quickstart] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
// Copyright 2021 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package main | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"strings" | ||
"testing" | ||
"time" | ||
|
||
"cloud.google.com/go/storage" | ||
"github.com/GoogleCloudPlatform/golang-samples/bigquery/snippets/bqtestutil" | ||
"github.com/GoogleCloudPlatform/golang-samples/internal/testutil" | ||
) | ||
|
||
func TestApp(t *testing.T) { | ||
tc := testutil.SystemTest(t) | ||
m := testutil.BuildMain(t) | ||
defer m.Cleanup() | ||
|
||
if !m.Built() { | ||
t.Errorf("failed to build app") | ||
} | ||
|
||
// Setup an output bucket. | ||
bucket, cleanup, err := setupStorage(tc.ProjectID) | ||
if err != nil { | ||
t.Fatalf("error setting up storage: %v", err) | ||
} | ||
defer cleanup() | ||
|
||
stdOut, stdErr, err := m.Run(nil, 30*time.Second, fmt.Sprintf("--project_id=%s", tc.ProjectID), fmt.Sprintf("--output=%s", bucket)) | ||
if err != nil { | ||
t.Errorf("execution failed: %v", err) | ||
} | ||
|
||
// Look for a known substring in the output | ||
if !strings.Contains(string(stdOut), " ended in state COMPLETED") { | ||
t.Errorf("Did not find expected output. Stdout: %s", string(stdOut)) | ||
} | ||
|
||
if strings.Contains(string(stdOut), " with processing error") { | ||
t.Errorf("Workflow indicated it had processing errors. Stdout: %s", string(stdOut)) | ||
} | ||
|
||
if len(stdErr) > 0 { | ||
t.Errorf("did not expect stderr output, got %d bytes: %s", len(stdErr), string(stdErr)) | ||
} | ||
} | ||
|
||
// setupStorage is responsible for setting up a temporary bucket to hold artifacts from the quickstart. | ||
func setupStorage(projectID string) (string, func(), error) { | ||
ctx := context.Background() | ||
storageClient, err := storage.NewClient(ctx) | ||
if err != nil { | ||
return "", nil, err | ||
} | ||
bucket, err := bqtestutil.UniqueBucketName("golang-migration", "") | ||
if err != nil { | ||
storageClient.Close() | ||
return "", nil, fmt.Errorf("couldn't construct unique bucket name: %v", err) | ||
} | ||
if err := storageClient.Bucket(bucket).Create(ctx, projectID, nil); err != nil { | ||
storageClient.Close() | ||
return "", nil, fmt.Errorf("error creating output bucket: %v", err) | ||
} | ||
return bucket, func() { | ||
storageClient.Bucket(bucket).Delete(ctx) | ||
storageClient.Close() | ||
}, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.