From b3fabb77d61397d9a034949a3d0a54e4846813a1 Mon Sep 17 00:00:00 2001 From: Pawan Dubey Date: Wed, 15 Dec 2021 15:02:06 -0500 Subject: [PATCH] Add barebones DDL events integration test Co-authored-by: Shiv Nagarajan --- test/integration/ddl_events_test.rb | 14 +++++ test/lib/go/ddl_ghostferry/main.go | 87 ++++++++++++++++++++++++++ test/lib/go/minimal_ghostferry/main.go | 62 +----------------- 3 files changed, 103 insertions(+), 60 deletions(-) create mode 100644 test/integration/ddl_events_test.rb create mode 100644 test/lib/go/ddl_ghostferry/main.go diff --git a/test/integration/ddl_events_test.rb b/test/integration/ddl_events_test.rb new file mode 100644 index 00000000..265202af --- /dev/null +++ b/test/integration/ddl_events_test.rb @@ -0,0 +1,14 @@ +require "test_helper" + +class DdlEventsTest < GhostferryTestCase + # DDL_FERRY = "ddl_ghostferry" + + def test_default_event_handler + seed_simple_database_with_single_table + + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + ghostferry.run + + assert_ghostferry_completed(ghostferry, times: 1) + end +end diff --git a/test/lib/go/ddl_ghostferry/main.go b/test/lib/go/ddl_ghostferry/main.go new file mode 100644 index 00000000..c78850b1 --- /dev/null +++ b/test/lib/go/ddl_ghostferry/main.go @@ -0,0 +1,87 @@ +package main + +import ( + "sync" + + tf "github.com/Shopify/ghostferry/test/lib/go/integrationferry" +) + +// =========================================== +// Code to handle an almost standard Ferry run +// =========================================== +func Main(f *tf.IntegrationFerry) error { + var err error + + err = f.SendStatusAndWaitUntilContinue(tf.StatusReady) + if err != nil { + return err + } + + err = f.Initialize() + if err != nil { + return err + } + + // TODO: add handler here + + err = f.Start() + if err != nil { + return err + } + + defer f.StopTargetVerifier() + + err = f.SendStatusAndWaitUntilContinue(tf.StatusBinlogStreamingStarted) + if err != nil { + return err + } + + wg := &sync.WaitGroup{} + wg.Add(1) + + go func() { + defer wg.Done() + f.Run() + }() + + f.WaitUntilRowCopyIsComplete() + err = f.SendStatusAndWaitUntilContinue(tf.StatusRowCopyCompleted) + if err != nil { + return err + } + + // TODO: this method should return errors rather than calling + // the error handler to panic directly. + f.FlushBinlogAndStopStreaming() + wg.Wait() + + if f.Verifier != nil { + err := f.SendStatusAndWaitUntilContinue(tf.StatusVerifyDuringCutover) + if err != nil { + return err + } + + result, err := f.Verifier.VerifyDuringCutover() + if err != nil { + return err + } + + // We now send the results back to the integration server as each verifier + // might log them differently, making it difficult to assert that the + // incorrect table was caught from the logs + err = f.SendStatusAndWaitUntilContinue(tf.StatusVerified, result.IncorrectTables...) + if err != nil { + return err + } + } + + return f.SendStatusAndWaitUntilContinue(tf.StatusDone) +} + +func main() { + f := tf.Setup() + err := Main(f) + if err != nil { + panic(err) + } +} diff --git a/test/lib/go/minimal_ghostferry/main.go b/test/lib/go/minimal_ghostferry/main.go index bf24c842..f8d14d63 100644 --- a/test/lib/go/minimal_ghostferry/main.go +++ b/test/lib/go/minimal_ghostferry/main.go @@ -1,14 +1,9 @@ package main import ( - "fmt" - "os" "sync" tf "github.com/Shopify/ghostferry/test/lib/go/integrationferry" - - "github.com/Shopify/ghostferry" - "github.com/sirupsen/logrus" ) // =========================================== @@ -82,61 +77,8 @@ func Main(f *tf.IntegrationFerry) error { } func main() { - logrus.SetFormatter(&logrus.JSONFormatter{}) - logrus.SetLevel(logrus.DebugLevel) - if os.Getenv("CI") == "true" { - logrus.SetLevel(logrus.ErrorLevel) - } - - config, err := tf.NewStandardConfig() - if err != nil { - panic(err) - } - - // This is currently a hack to customize the Ghostferry configuration. - // TODO: allow Ghostferry config to be specified by the ruby test directly. - compressedDataColumn := os.Getenv("GHOSTFERRY_DATA_COLUMN_SNAPPY") - if compressedDataColumn != "" { - config.CompressedColumnsForVerification = map[string]map[string]map[string]string{ - "gftest": map[string]map[string]string{ - "test_table_1": map[string]string{ - "data": "SNAPPY", - }, - }, - } - } - - ignoredColumn := os.Getenv("GHOSTFERRY_IGNORED_COLUMN") - if ignoredColumn != "" { - config.IgnoredColumnsForVerification = map[string]map[string]map[string]struct{}{ - "gftest": map[string]map[string]struct{}{ - "test_table_1": map[string]struct{}{ - ignoredColumn: struct{}{}, - }, - }, - } - } - - f := &tf.IntegrationFerry{ - Ferry: &ghostferry.Ferry{ - Config: config, - }, - } - - integrationPort := os.Getenv(tf.PortEnvName) - if integrationPort == "" { - panic(fmt.Sprintf("environment variable %s must be specified", tf.PortEnvName)) - } - - f.ErrorHandler = &ghostferry.PanicErrorHandler{ - Ferry: f.Ferry, - ErrorCallback: ghostferry.HTTPCallback{ - URI: fmt.Sprintf("http://localhost:%s/callbacks/error", integrationPort), - }, - DumpStateToStdoutOnError: true, - } - - err = Main(f) + f := tf.Setup() + err := Main(f) if err != nil { panic(err) }