Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: partial conversion testing #768

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions conversion/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (ci *ConvImpl) SchemaConv(sourceProfile profiles.SourceProfile, targetProfi
case constants.POSTGRES, constants.MYSQL, constants.DYNAMODB, constants.SQLSERVER, constants.ORACLE:
return schemaFromSource.schemaFromDatabase(sourceProfile, targetProfile, &GetInfoImpl{}, &common.ProcessSchemaImpl{})
case constants.PGDUMP, constants.MYSQLDUMP:
return schemaFromSource.SchemaFromDump(sourceProfile.Driver, targetProfile.Conn.Sp.Dialect, ioHelper, &ProcessDumpByDialectImpl{})
return schemaFromSource.SchemaFromDump(sourceProfile.Driver, targetProfile.Conn.Sp.Dialect, ioHelper, &ProcessDumpByDialectImpl{}, &SeekableImpl{})
default:
return nil, fmt.Errorf("schema conversion for driver %s not supported", sourceProfile.Driver)
}
Expand All @@ -94,7 +94,7 @@ func (ci *ConvImpl) DataConv(ctx context.Context, sourceProfile profiles.SourceP
if conv.SpSchema.CheckInterleaved() {
return nil, fmt.Errorf("spanner migration tool does not currently support data conversion from dump files\nif the schema contains interleaved tables. Suggest using direct access to source database\ni.e. using drivers postgres and mysql")
}
return dataFromSource.dataFromDump(sourceProfile.Driver, config, ioHelper, client, conv, dataOnly, &ProcessDumpByDialectImpl{}, &PopulateDataConvImpl{})
return dataFromSource.dataFromDump(sourceProfile.Driver, config, ioHelper, client, conv, dataOnly, &ProcessDumpByDialectImpl{}, &PopulateDataConvImpl{}, &SeekableImpl{})
case constants.CSV:
return dataFromSource.dataFromCSV(ctx, sourceProfile, targetProfile, config, conv, client, &PopulateDataConvImpl{}, &csv.CsvImpl{})
default:
Expand Down
15 changes: 7 additions & 8 deletions conversion/conversion_from_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ import (

type SchemaFromSourceInterface interface {
schemaFromDatabase(sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, getInfo GetInfoInterface, processSchema common.ProcessSchemaInterface) (*internal.Conv, error)
SchemaFromDump(driver string, spDialect string, ioHelper *utils.IOStreams, processDump ProcessDumpByDialectInterface) (*internal.Conv, error)
SchemaFromDump(driver string, spDialect string, ioHelper *utils.IOStreams, processDump ProcessDumpByDialectInterface, seekable SeekableInterface) (*internal.Conv, error)
}

type SchemaFromSourceImpl struct{}

type DataFromSourceInterface interface {
dataFromDatabase(ctx context.Context, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, getInfo GetInfoInterface, dataFromDb DataFromDatabaseInterface, snapshotMigration SnapshotMigrationInterface) (*writer.BatchWriter, error)
dataFromDump(driver string, config writer.BatchWriterConfig, ioHelper *utils.IOStreams, client *sp.Client, conv *internal.Conv, dataOnly bool, processDump ProcessDumpByDialectInterface, populateDataConv PopulateDataConvInterface) (*writer.BatchWriter, error)
dataFromDump(driver string, config writer.BatchWriterConfig, ioHelper *utils.IOStreams, client *sp.Client, conv *internal.Conv, dataOnly bool, processDump ProcessDumpByDialectInterface, populateDataConv PopulateDataConvInterface, seekable SeekableInterface) (*writer.BatchWriter, error)
dataFromCSV(ctx context.Context, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, populateDataConv PopulateDataConvInterface, csv csv.CsvInterface) (*writer.BatchWriter, error)
}

Expand Down Expand Up @@ -108,8 +108,8 @@ func (sads *SchemaFromSourceImpl) schemaFromDatabase(sourceProfile profiles.Sour
return conv, processSchema.ProcessSchema(conv, infoSchema, common.DefaultWorkers, additionalSchemaAttributes, &common.SchemaToSpannerImpl{}, &common.UtilsOrderImpl{}, &common.InfoSchemaImpl{})
}

func (sads *SchemaFromSourceImpl) SchemaFromDump(driver string, spDialect string, ioHelper *utils.IOStreams, processDump ProcessDumpByDialectInterface) (*internal.Conv, error) {
f, n, err := getSeekable(ioHelper.In)
func (sads *SchemaFromSourceImpl) SchemaFromDump(driver string, spDialect string, ioHelper *utils.IOStreams, processDump ProcessDumpByDialectInterface, seekable SeekableInterface) (*internal.Conv, error) {
f, n, err := seekable.getSeekable(ioHelper.In)
if err != nil {
utils.PrintSeekError(driver, err, ioHelper.Out)
return nil, fmt.Errorf("can't get seekable input file")
Expand All @@ -132,19 +132,18 @@ func (sads *SchemaFromSourceImpl) SchemaFromDump(driver string, spDialect string
}


func (sads *DataFromSourceImpl) dataFromDump(driver string, config writer.BatchWriterConfig, ioHelper *utils.IOStreams, client *sp.Client, conv *internal.Conv, dataOnly bool, processDump ProcessDumpByDialectInterface, populateDataConv PopulateDataConvInterface) (*writer.BatchWriter, error) {
func (sads *DataFromSourceImpl) dataFromDump(driver string, config writer.BatchWriterConfig, ioHelper *utils.IOStreams, client *sp.Client, conv *internal.Conv, dataOnly bool, processDump ProcessDumpByDialectInterface, populateDataConv PopulateDataConvInterface, seekable SeekableInterface) (*writer.BatchWriter, error) {
// TODO: refactor of the way we handle getSeekable
// to avoid the code duplication here
if !dataOnly {
_, err := ioHelper.SeekableIn.Seek(0, 0)
_, err := seekable.seek(ioHelper, 0, 0)
if err != nil {
fmt.Printf("\nCan't seek to start of file (preparation for second pass): %v\n", err)
return nil, fmt.Errorf("can't seek to start of file")
}
} else {
// Note: input file is kept seekable to plan for future
// changes in showing progress for data migration.
f, n, err := getSeekable(ioHelper.In)
f, n, err := seekable.getSeekable(ioHelper.In)
if err != nil {
utils.PrintSeekError(driver, err, ioHelper.Out)
return nil, fmt.Errorf("can't get seekable input file")
Expand Down
110 changes: 110 additions & 0 deletions conversion/conversion_from_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,17 @@ package conversion

import (
"fmt"
"os"
"testing"

sp "cloud.google.com/go/spanner"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/logger"
"github.com/GoogleCloudPlatform/spanner-migration-tool/profiles"
"github.com/GoogleCloudPlatform/spanner-migration-tool/sources/common"
"github.com/GoogleCloudPlatform/spanner-migration-tool/sources/mysql"
"github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/writer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
Expand Down Expand Up @@ -172,4 +178,108 @@ func TestSchemaFromDatabase(t *testing.T) {
_, err := s.schemaFromDatabase(tc.sourceProfile, targetProfile, &gim, &ps)
assert.Equal(t, tc.errorExpected, err != nil, tc.name)
}
}


func TestSchemaFromDump(t *testing.T) {
ioStream := &utils.IOStreams{In: os.Stdin, Out: os.Stdout}
// Avoid getting/setting env variables in the unit tests.
testCases := []struct {
name string
processDumpError error
getSeekableError error
errorExpected bool
}{
{
name: "successful schema from dump",
processDumpError: nil,
getSeekableError: nil,
errorExpected: false,
},
{
name: "schema from dump getSeekable error",
processDumpError: nil,
getSeekableError: fmt.Errorf("error"),
errorExpected: true,
},
{
name: "schema from dump process dump error",
processDumpError: fmt.Errorf("error"),
getSeekableError: nil,
errorExpected: true,
},
}

for _, tc := range testCases {
pd := MockProcessDumpByDialect{}
se := MockSeekable{}
logger.InitializeLogger("DEBUG")

pd.On("ProcessDump", mock.Anything, mock.Anything, mock.Anything).Return(tc.processDumpError)
se.On("getSeekable", mock.Anything). Return(&os.File{}, int64(0), tc.getSeekableError)
s := SchemaFromSourceImpl{}
_, err := s.SchemaFromDump("", "google_standard_sql", ioStream, &pd, &se)
assert.Equal(t, tc.errorExpected, err != nil, tc.name)
}
}


func TestDataFromDump(t *testing.T) {
ioStream := &utils.IOStreams{In: os.Stdin, Out: os.Stdout}
// Avoid getting/setting env variables in the unit tests.
testCases := []struct {
name string
dataOnly bool
ioHelper *utils.IOStreams
processDumpError error
getSeekableError error
seekError error
errorExpected bool
}{
{
name: "successful data from dump data only true",
dataOnly: true,
processDumpError: nil,
getSeekableError: nil,
errorExpected: false,
},
{
name: "data from dump get seekable error",
dataOnly: true,
processDumpError: nil,
getSeekableError: fmt.Errorf("error"),
errorExpected: true,
},
{
name: "successful data from dump data only false",
dataOnly: false,
processDumpError: nil,
getSeekableError: nil,
seekError: nil,
errorExpected: false,
},
{
name: "successful data from dump data only false seek error",
dataOnly: false,
processDumpError: nil,
getSeekableError: nil,
seekError: fmt.Errorf("error"),
errorExpected: true,
},
}

for _, tc := range testCases {
pdc := MockPopulateDataConv{}
pd := MockProcessDumpByDialect{}
se := MockSeekable{}
logger.InitializeLogger("DEBUG")

pd.On("ProcessDump", mock.Anything, mock.Anything, mock.Anything).Return(tc.processDumpError)
se.On("getSeekable", mock.Anything).Return(&os.File{}, int64(0), tc.getSeekableError)
se.On("seek", mock.Anything, mock.Anything, mock.Anything). Return(int64(0), tc.seekError)
pdc.On("populateDataConv", mock.Anything, mock.Anything, mock.Anything).Return(writer.NewBatchWriter(writer.BatchWriterConfig{}))
d := DataFromSourceImpl{}
_, err := d.dataFromDump("",writer.BatchWriterConfig{}, ioStream, &sp.Client{}, internal.MakeConv(), tc.dataOnly, &pd, &pdc, &se)
assert.Equal(t, tc.errorExpected, err != nil, tc.name)
}
}
19 changes: 18 additions & 1 deletion conversion/conversion_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,25 @@
}

type PopulateDataConvImpl struct{}

type SeekableInterface interface {
getSeekable(f *os.File) (*os.File, int64, error)
seek(ioHelper *utils.IOStreams, offset int64, whence int) (int64, error)
}

type SeekableImpl struct{}

func (si *SeekableImpl) seek(ioHelper *utils.IOStreams, offset int64, whence int) (int64, error) {
val, err := ioHelper.SeekableIn.Seek(offset, whence)
if err != nil {
fmt.Printf("\nCan't seek to start of file (preparation for second pass): %v\n", err)
return val, fmt.Errorf("can't seek to start of file")
}
return val, err

Check warning on line 79 in conversion/conversion_helper.go

View check run for this annotation

Codecov / codecov/patch

conversion/conversion_helper.go#L73-L79

Added lines #L73 - L79 were not covered by tests
}

// getSeekable returns a seekable file (with same content as f) and the size of the content (in bytes).
func getSeekable(f *os.File) (*os.File, int64, error) {
func (si *SeekableImpl) getSeekable(f *os.File) (*os.File, int64, error) {

Check warning on line 83 in conversion/conversion_helper.go

View check run for this annotation

Codecov / codecov/patch

conversion/conversion_helper.go#L83

Added line #L83 was not covered by tests
_, err := f.Seek(0, 0)
if err == nil { // Stdin is seekable, let's just use that. This happens when you run 'cmd < file'.
n, err := utils.GetFileSize(f)
Expand Down
33 changes: 31 additions & 2 deletions conversion/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package conversion

import (
"context"
"os"

sp "cloud.google.com/go/spanner"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils"
Expand Down Expand Up @@ -61,7 +62,7 @@ func (msads *MockSchemaFromSource) schemaFromDatabase(sourceProfile profiles.Sou
args := msads.Called(sourceProfile, targetProfile, getInfo, processSchema)
return args.Get(0).(*internal.Conv), args.Error(1)
}
func (msads *MockSchemaFromSource) SchemaFromDump(driver string, spDialect string, ioHelper *utils.IOStreams, processDump ProcessDumpByDialectInterface) (*internal.Conv, error) {
func (msads *MockSchemaFromSource) SchemaFromDump(driver string, spDialect string, ioHelper *utils.IOStreams, processDump ProcessDumpByDialectInterface, seekable SeekableInterface) (*internal.Conv, error) {
args := msads.Called(driver, spDialect, ioHelper, processDump)
return args.Get(0).(*internal.Conv), args.Error(1)
}
Expand All @@ -73,11 +74,39 @@ func (msads *MockDataFromSource) dataFromDatabase(ctx context.Context, sourcePro
args := msads.Called(ctx, sourceProfile, targetProfile, config, conv, client, getInfo, dataFromDb, snapshotMigration)
return args.Get(0).(*writer.BatchWriter), args.Error(1)
}
func (msads *MockDataFromSource) dataFromDump(driver string, config writer.BatchWriterConfig, ioHelper *utils.IOStreams, client *sp.Client, conv *internal.Conv, dataOnly bool, processDump ProcessDumpByDialectInterface, populateDataConv PopulateDataConvInterface) (*writer.BatchWriter, error) {
func (msads *MockDataFromSource) dataFromDump(driver string, config writer.BatchWriterConfig, ioHelper *utils.IOStreams, client *sp.Client, conv *internal.Conv, dataOnly bool, processDump ProcessDumpByDialectInterface, populateDataConv PopulateDataConvInterface, seekable SeekableInterface) (*writer.BatchWriter, error) {
args := msads.Called(driver, config, ioHelper, client, conv, dataOnly, processDump, populateDataConv)
return args.Get(0).(*writer.BatchWriter), args.Error(1)
}
func (msads *MockDataFromSource) dataFromCSV(ctx context.Context, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, pdc PopulateDataConvInterface, csv csv.CsvInterface) (*writer.BatchWriter, error) {
args := msads.Called(ctx, sourceProfile, targetProfile, config, conv, client, pdc, csv)
return args.Get(0).(*writer.BatchWriter), args.Error(1)
}

type MockProcessDumpByDialect struct {
mock.Mock
}
func (mpdd *MockProcessDumpByDialect) ProcessDump(driver string, conv *internal.Conv, r *internal.Reader) error {
args:= mpdd.Called(driver, conv, r)
return args.Error(0)
}

type MockSeekable struct {
mock.Mock
}
func (ms *MockSeekable) getSeekable(f *os.File) (*os.File, int64, error) {
args:= ms.Called(f)
return args.Get(0).(*os.File), args.Get(1).(int64), args.Error(2)
}
func (ms *MockSeekable) seek(ioHelper *utils.IOStreams, offset int64, whence int) (int64, error) {
args:= ms.Called(ioHelper, offset, whence)
return args.Get(0).(int64), args.Error(1)
}

type MockPopulateDataConv struct {
mock.Mock
}
func (mpdc *MockPopulateDataConv) populateDataConv(conv *internal.Conv, config writer.BatchWriterConfig, client *sp.Client) *writer.BatchWriter {
args:= mpdc.Called(conv, config, client)
return args.Get(0).(*writer.BatchWriter)
}
2 changes: 1 addition & 1 deletion webv2/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@
sourceProfile, _ := profiles.NewSourceProfile("", dc.Config.Driver, &n)
sourceProfile.Driver = dc.Config.Driver
schemaFromSource := conversion.SchemaFromSourceImpl{}
conv, err := schemaFromSource.SchemaFromDump(sourceProfile.Driver, dc.SpannerDetails.Dialect, &utils.IOStreams{In: f, Out: os.Stdout}, &conversion.ProcessDumpByDialectImpl{})
conv, err := schemaFromSource.SchemaFromDump(sourceProfile.Driver, dc.SpannerDetails.Dialect, &utils.IOStreams{In: f, Out: os.Stdout}, &conversion.ProcessDumpByDialectImpl{}, &conversion.SeekableImpl{})

Check warning on line 603 in webv2/web.go

View check run for this annotation

Codecov / codecov/patch

webv2/web.go#L603

Added line #L603 was not covered by tests
if err != nil {
http.Error(w, fmt.Sprintf("Schema Conversion Error : %v", err), http.StatusNotFound)
return
Expand Down
Loading