Skip to content

Commit

Permalink
Add barebones DDL events integration test
Browse files Browse the repository at this point in the history
Co-authored-by: Shiv Nagarajan <shiv.nagarajan@shopify.com>
  • Loading branch information
pawandubey and shivnagarajan committed Dec 15, 2021
1 parent 2b18c9b commit b3fabb7
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 60 deletions.
14 changes: 14 additions & 0 deletions test/integration/ddl_events_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
require "test_helper"

class DdlEventsTest < GhostferryTestCase
# DDL_FERRY = "ddl_ghostferry"

def test_default_event_handler
seed_simple_database_with_single_table

ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY)
ghostferry.run

assert_ghostferry_completed(ghostferry, times: 1)
end
end
87 changes: 87 additions & 0 deletions test/lib/go/ddl_ghostferry/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package main

import (
"sync"

tf "github.com/Shopify/ghostferry/test/lib/go/integrationferry"
)

// ===========================================
// Code to handle an almost standard Ferry run
// ===========================================
func Main(f *tf.IntegrationFerry) error {
var err error

err = f.SendStatusAndWaitUntilContinue(tf.StatusReady)
if err != nil {
return err
}

err = f.Initialize()
if err != nil {
return err
}

// TODO: add handler here

err = f.Start()
if err != nil {
return err
}

defer f.StopTargetVerifier()

err = f.SendStatusAndWaitUntilContinue(tf.StatusBinlogStreamingStarted)
if err != nil {
return err
}

wg := &sync.WaitGroup{}
wg.Add(1)

go func() {
defer wg.Done()
f.Run()
}()

f.WaitUntilRowCopyIsComplete()
err = f.SendStatusAndWaitUntilContinue(tf.StatusRowCopyCompleted)
if err != nil {
return err
}

// TODO: this method should return errors rather than calling
// the error handler to panic directly.
f.FlushBinlogAndStopStreaming()
wg.Wait()

if f.Verifier != nil {
err := f.SendStatusAndWaitUntilContinue(tf.StatusVerifyDuringCutover)
if err != nil {
return err
}

result, err := f.Verifier.VerifyDuringCutover()
if err != nil {
return err
}

// We now send the results back to the integration server as each verifier
// might log them differently, making it difficult to assert that the
// incorrect table was caught from the logs
err = f.SendStatusAndWaitUntilContinue(tf.StatusVerified, result.IncorrectTables...)
if err != nil {
return err
}
}

return f.SendStatusAndWaitUntilContinue(tf.StatusDone)
}

func main() {
f := tf.Setup()
err := Main(f)
if err != nil {
panic(err)
}
}
62 changes: 2 additions & 60 deletions test/lib/go/minimal_ghostferry/main.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
package main

import (
"fmt"
"os"
"sync"

tf "github.com/Shopify/ghostferry/test/lib/go/integrationferry"

"github.com/Shopify/ghostferry"
"github.com/sirupsen/logrus"
)

// ===========================================
Expand Down Expand Up @@ -82,61 +77,8 @@ func Main(f *tf.IntegrationFerry) error {
}

func main() {
logrus.SetFormatter(&logrus.JSONFormatter{})
logrus.SetLevel(logrus.DebugLevel)
if os.Getenv("CI") == "true" {
logrus.SetLevel(logrus.ErrorLevel)
}

config, err := tf.NewStandardConfig()
if err != nil {
panic(err)
}

// This is currently a hack to customize the Ghostferry configuration.
// TODO: allow Ghostferry config to be specified by the ruby test directly.
compressedDataColumn := os.Getenv("GHOSTFERRY_DATA_COLUMN_SNAPPY")
if compressedDataColumn != "" {
config.CompressedColumnsForVerification = map[string]map[string]map[string]string{
"gftest": map[string]map[string]string{
"test_table_1": map[string]string{
"data": "SNAPPY",
},
},
}
}

ignoredColumn := os.Getenv("GHOSTFERRY_IGNORED_COLUMN")
if ignoredColumn != "" {
config.IgnoredColumnsForVerification = map[string]map[string]map[string]struct{}{
"gftest": map[string]map[string]struct{}{
"test_table_1": map[string]struct{}{
ignoredColumn: struct{}{},
},
},
}
}

f := &tf.IntegrationFerry{
Ferry: &ghostferry.Ferry{
Config: config,
},
}

integrationPort := os.Getenv(tf.PortEnvName)
if integrationPort == "" {
panic(fmt.Sprintf("environment variable %s must be specified", tf.PortEnvName))
}

f.ErrorHandler = &ghostferry.PanicErrorHandler{
Ferry: f.Ferry,
ErrorCallback: ghostferry.HTTPCallback{
URI: fmt.Sprintf("http://localhost:%s/callbacks/error", integrationPort),
},
DumpStateToStdoutOnError: true,
}

err = Main(f)
f := tf.Setup()
err := Main(f)
if err != nil {
panic(err)
}
Expand Down

0 comments on commit b3fabb7

Please sign in to comment.