From f1c243477fa16194d84a2e21e6350ac8e1f0dd0b Mon Sep 17 00:00:00 2001 From: "Ahmad N. F." Date: Tue, 15 Apr 2025 15:13:59 +0700 Subject: [PATCH 1/5] feat: add query identifier and pass it as hints fix comment --- mc2mc/internal/client/client.go | 23 +++++++++++++++-------- mc2mc/internal/client/odps.go | 11 ++++++++++- mc2mc/mc2mc.go | 10 ++++++---- 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/mc2mc/internal/client/client.go b/mc2mc/internal/client/client.go index 6756e8f..ec80c22 100644 --- a/mc2mc/internal/client/client.go +++ b/mc2mc/internal/client/client.go @@ -10,7 +10,7 @@ import ( ) type OdpsClient interface { - ExecSQL(ctx context.Context, query string) error + ExecSQL(ctx context.Context, query string, hints ...map[string]string) error SetDefaultProject(project string) SetLogViewRetentionInDays(days int) SetAdditionalHints(hints map[string]string) @@ -47,13 +47,20 @@ func (c *Client) Close() error { return errors.WithStack(err) } -func (c *Client) Execute(ctx context.Context, query string) error { - // execute query with odps client - c.logger.Info(fmt.Sprintf("query to execute:\n%s", query)) - if err := c.OdpsClient.ExecSQL(ctx, query); err != nil { - return errors.WithStack(err) +func (c *Client) ExecuteFnWithQueryID(id int) func(context.Context, string) error { + idStr := fmt.Sprintf("%d", id) + additionalHints := map[string]string{ + "goto.sql.script.sequence": idStr, } - c.logger.Info("execution done") - return nil + return func(ctx context.Context, query string) error { + // execute query with odps client + c.logger.Info(fmt.Sprintf("query to execute:\n%s", query)) + if err := c.OdpsClient.ExecSQL(ctx, query, additionalHints); err != nil { + return errors.WithStack(err) + } + + c.logger.Info(fmt.Sprintf("execution done for id: %d", id)) + return nil + } } diff --git a/mc2mc/internal/client/odps.go b/mc2mc/internal/client/odps.go index 387431a..e5f3ae6 100644 --- a/mc2mc/internal/client/odps.go +++ b/mc2mc/internal/client/odps.go @@ -33,12 +33,21 @@ func NewODPSClient(logger *slog.Logger, client *odps.Odps) *odpsClient { // ExecSQL executes the given query in syncronous mode (blocking) // with capability to do graceful shutdown by terminating task instance // when context is cancelled. -func (c *odpsClient) ExecSQL(ctx context.Context, query string) error { +func (c *odpsClient) ExecSQL(ctx context.Context, query string, queryHints ...map[string]string) error { if c.isDryRun { c.logger.Info("dry run mode, skipping execution") return nil } + hints := addHints(c.additionalHints, query) + + // add job-specific hints, which takes priority over the global hints + if len(queryHints) > 0 { + for k, v := range queryHints[0] { + hints[k] = v + } + } + taskIns, err := c.client.ExecSQlWithHints(query, hints) if err != nil { return errors.WithStack(err) diff --git a/mc2mc/mc2mc.go b/mc2mc/mc2mc.go index b4b9ae0..28fbc3d 100644 --- a/mc2mc/mc2mc.go +++ b/mc2mc/mc2mc.go @@ -175,10 +175,11 @@ func executeConcurrently(ctx context.Context, c *client.Client, concurrency int, wg.Add(len(queriesToExecute)) errChan := make(chan error, len(queriesToExecute)) - for _, queryToExecute := range queriesToExecute { + for i, queryToExecute := range queriesToExecute { sem <- 0 + executeFn := c.ExecuteFnWithQueryID(i + 1) go func(queryToExecute string, errChan chan error) { - err := c.Execute(ctx, queryToExecute) + err := executeFn(ctx, queryToExecute) if err != nil { errChan <- errors.WithStack(err) } @@ -201,8 +202,9 @@ func executeConcurrently(ctx context.Context, c *client.Client, concurrency int, } func execute(ctx context.Context, c *client.Client, queriesToExecute []string) error { - for _, queryToExecute := range queriesToExecute { - err := c.Execute(ctx, queryToExecute) + for i, queryToExecute := range queriesToExecute { + executeFn := c.ExecuteFnWithQueryID(i + 1) + err := executeFn(ctx, queryToExecute) if err != nil { return errors.WithStack(err) } From b922ed3ae3285ffab67f2df063ac49aef260ee10 Mon Sep 17 00:00:00 2001 From: "Ahmad N. F." Date: Tue, 15 Apr 2025 15:25:35 +0700 Subject: [PATCH 2/5] fix call terminate function --- mc2mc/internal/client/odps.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mc2mc/internal/client/odps.go b/mc2mc/internal/client/odps.go index e5f3ae6..a07a04b 100644 --- a/mc2mc/internal/client/odps.go +++ b/mc2mc/internal/client/odps.go @@ -65,7 +65,7 @@ func (c *odpsClient) ExecSQL(ctx context.Context, query string, queryHints ...ma select { case <-ctx.Done(): c.logger.Info("context cancelled, terminating task instance") - err := taskIns.Terminate() + err := c.terminate(taskIns) return e.Join(ctx.Err(), err) case err := <-c.wait(taskIns): return errors.WithStack(err) From d92a225032abd00ce4b03b3b70fc5646bc446705 Mon Sep 17 00:00:00 2001 From: "Ahmad N. F." Date: Tue, 15 Apr 2025 15:43:27 +0700 Subject: [PATCH 3/5] log job hints --- mc2mc/internal/client/client.go | 11 +++++++---- mc2mc/internal/client/odps.go | 4 +++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/mc2mc/internal/client/client.go b/mc2mc/internal/client/client.go index ec80c22..d682e1a 100644 --- a/mc2mc/internal/client/client.go +++ b/mc2mc/internal/client/client.go @@ -9,6 +9,10 @@ import ( "github.com/pkg/errors" ) +const ( + SqlScriptSequenceHint = "goto.sql.script.sequence" +) + type OdpsClient interface { ExecSQL(ctx context.Context, query string, hints ...map[string]string) error SetDefaultProject(project string) @@ -48,19 +52,18 @@ func (c *Client) Close() error { } func (c *Client) ExecuteFnWithQueryID(id int) func(context.Context, string) error { - idStr := fmt.Sprintf("%d", id) additionalHints := map[string]string{ - "goto.sql.script.sequence": idStr, + SqlScriptSequenceHint: fmt.Sprintf("%d", id), } return func(ctx context.Context, query string) error { // execute query with odps client - c.logger.Info(fmt.Sprintf("query to execute:\n%s", query)) + c.logger.Info(fmt.Sprintf("[sequence: %d] query to execute:\n%s", id, query)) if err := c.OdpsClient.ExecSQL(ctx, query, additionalHints); err != nil { return errors.WithStack(err) } - c.logger.Info(fmt.Sprintf("execution done for id: %d", id)) + c.logger.Info(fmt.Sprintf("[sequence: %d] execution done", id)) return nil } } diff --git a/mc2mc/internal/client/odps.go b/mc2mc/internal/client/odps.go index a07a04b..a976871 100644 --- a/mc2mc/internal/client/odps.go +++ b/mc2mc/internal/client/odps.go @@ -42,9 +42,11 @@ func (c *odpsClient) ExecSQL(ctx context.Context, query string, queryHints ...ma hints := addHints(c.additionalHints, query) // add job-specific hints, which takes priority over the global hints + logHints := []string{} if len(queryHints) > 0 { for k, v := range queryHints[0] { hints[k] = v + logHints = append(logHints, fmt.Sprintf("%s: %s", k, v)) } } @@ -59,7 +61,7 @@ func (c *odpsClient) ExecSQL(ctx context.Context, query string, queryHints ...ma err = e.Join(err, taskIns.Terminate()) return errors.WithStack(err) } - c.logger.Info(fmt.Sprintf("taskId: %s, log view: %s", taskIns.Id(), url)) + c.logger.Info(fmt.Sprintf("taskId: %s, log view: %s, hints: (%s)", taskIns.Id(), url, strings.Join(logHints, ", "))) // wait execution success select { From 0b825ad9d3503dc5d9259d02a7088eaf037481e8 Mon Sep 17 00:00:00 2001 From: "Ahmad N. F." Date: Tue, 15 Apr 2025 16:29:43 +0700 Subject: [PATCH 4/5] fix: provide additionalHints in query execution instead --- mc2mc/internal/client/client.go | 17 +++++++++-------- mc2mc/internal/client/odps.go | 21 ++++++--------------- mc2mc/internal/client/setup.go | 12 ------------ mc2mc/mc2mc.go | 17 ++++++++--------- 4 files changed, 23 insertions(+), 44 deletions(-) diff --git a/mc2mc/internal/client/client.go b/mc2mc/internal/client/client.go index d682e1a..e71ea4b 100644 --- a/mc2mc/internal/client/client.go +++ b/mc2mc/internal/client/client.go @@ -14,10 +14,9 @@ const ( ) type OdpsClient interface { - ExecSQL(ctx context.Context, query string, hints ...map[string]string) error + ExecSQL(ctx context.Context, query string, hints map[string]string) error SetDefaultProject(project string) SetLogViewRetentionInDays(days int) - SetAdditionalHints(hints map[string]string) SetDryRun(dryRun bool) } @@ -51,14 +50,16 @@ func (c *Client) Close() error { return errors.WithStack(err) } -func (c *Client) ExecuteFnWithQueryID(id int) func(context.Context, string) error { - additionalHints := map[string]string{ - SqlScriptSequenceHint: fmt.Sprintf("%d", id), - } - - return func(ctx context.Context, query string) error { +func (c *Client) ExecuteFn(id int) func(context.Context, string, map[string]string) error { + return func(ctx context.Context, query string, additionalHints map[string]string) error { // execute query with odps client c.logger.Info(fmt.Sprintf("[sequence: %d] query to execute:\n%s", id, query)) + // Merge additionalHints with the id + if additionalHints == nil { + additionalHints = make(map[string]string) + } + additionalHints[SqlScriptSequenceHint] = fmt.Sprintf("%d", id) + if err := c.OdpsClient.ExecSQL(ctx, query, additionalHints); err != nil { return errors.WithStack(err) } diff --git a/mc2mc/internal/client/odps.go b/mc2mc/internal/client/odps.go index a976871..6143856 100644 --- a/mc2mc/internal/client/odps.go +++ b/mc2mc/internal/client/odps.go @@ -17,7 +17,6 @@ type odpsClient struct { client *odps.Odps logViewRetentionInDays int - additionalHints map[string]string isDryRun bool } @@ -33,21 +32,18 @@ func NewODPSClient(logger *slog.Logger, client *odps.Odps) *odpsClient { // ExecSQL executes the given query in syncronous mode (blocking) // with capability to do graceful shutdown by terminating task instance // when context is cancelled. -func (c *odpsClient) ExecSQL(ctx context.Context, query string, queryHints ...map[string]string) error { +func (c *odpsClient) ExecSQL(ctx context.Context, query string, additionalHints map[string]string) error { if c.isDryRun { c.logger.Info("dry run mode, skipping execution") return nil } - hints := addHints(c.additionalHints, query) - // add job-specific hints, which takes priority over the global hints - logHints := []string{} - if len(queryHints) > 0 { - for k, v := range queryHints[0] { - hints[k] = v - logHints = append(logHints, fmt.Sprintf("%s: %s", k, v)) - } + hints := addHints(additionalHints, query) + + logHints := make([]string, 0) + for k, v := range hints { + logHints = append(logHints, fmt.Sprintf("%s: %s", k, v)) } taskIns, err := c.client.ExecSQlWithHints(query, hints) @@ -74,11 +70,6 @@ func (c *odpsClient) ExecSQL(ctx context.Context, query string, queryHints ...ma } } -// SetAdditionalHints sets the additional hints for the odps client -func (c *odpsClient) SetAdditionalHints(hints map[string]string) { - c.additionalHints = hints -} - // SetLogViewRetentionInDays sets the log view retention in days func (c *odpsClient) SetLogViewRetentionInDays(days int) { c.logViewRetentionInDays = days diff --git a/mc2mc/internal/client/setup.go b/mc2mc/internal/client/setup.go index b60e841..0c0dfaa 100644 --- a/mc2mc/internal/client/setup.go +++ b/mc2mc/internal/client/setup.go @@ -20,18 +20,6 @@ func SetupDryRun(dryRun bool) SetupFn { } } -func SetupAdditionalHints(hints map[string]string) SetupFn { - return func(c *Client) error { - if c.OdpsClient == nil { - return errors.New("odps client is required") - } - if hints != nil { - c.OdpsClient.SetAdditionalHints(hints) - } - return nil - } -} - func SetUpLogViewRetentionInDays(days int) SetupFn { return func(c *Client) error { if c.OdpsClient == nil { diff --git a/mc2mc/mc2mc.go b/mc2mc/mc2mc.go index 28fbc3d..6e41877 100644 --- a/mc2mc/mc2mc.go +++ b/mc2mc/mc2mc.go @@ -44,7 +44,6 @@ func mc2mc(envs []string) error { client.SetupODPSClient(cfg.GenOdps()), client.SetupDefaultProject(cfg.ExecutionProject), client.SetUpLogViewRetentionInDays(cfg.LogViewRetentionInDays), - client.SetupAdditionalHints(cfg.AdditionalHints), client.SetupDryRun(cfg.DryRun), ) if err != nil { @@ -162,13 +161,13 @@ func mc2mc(envs []string) error { // only support concurrent execution for REPLACE method if cfg.LoadMethod == "REPLACE" { - return executeConcurrently(ctx, c, cfg.Concurrency, queriesToExecute) + return executeConcurrently(ctx, c, cfg.Concurrency, queriesToExecute, cfg.AdditionalHints) } // otherwise execute sequentially - return execute(ctx, c, queriesToExecute) + return execute(ctx, c, queriesToExecute, cfg.AdditionalHints) } -func executeConcurrently(ctx context.Context, c *client.Client, concurrency int, queriesToExecute []string) error { +func executeConcurrently(ctx context.Context, c *client.Client, concurrency int, queriesToExecute []string, additionalHints map[string]string) error { // execute query concurrently sem := make(chan uint8, concurrency) wg := sync.WaitGroup{} @@ -177,9 +176,9 @@ func executeConcurrently(ctx context.Context, c *client.Client, concurrency int, for i, queryToExecute := range queriesToExecute { sem <- 0 - executeFn := c.ExecuteFnWithQueryID(i + 1) + executeFn := c.ExecuteFn(i + 1) go func(queryToExecute string, errChan chan error) { - err := executeFn(ctx, queryToExecute) + err := executeFn(ctx, queryToExecute, additionalHints) if err != nil { errChan <- errors.WithStack(err) } @@ -201,10 +200,10 @@ func executeConcurrently(ctx context.Context, c *client.Client, concurrency int, return errs } -func execute(ctx context.Context, c *client.Client, queriesToExecute []string) error { +func execute(ctx context.Context, c *client.Client, queriesToExecute []string, additionalHints map[string]string) error { for i, queryToExecute := range queriesToExecute { - executeFn := c.ExecuteFnWithQueryID(i + 1) - err := executeFn(ctx, queryToExecute) + executeFn := c.ExecuteFn(i + 1) + err := executeFn(ctx, queryToExecute, additionalHints) if err != nil { return errors.WithStack(err) } From b13bf4cbdc15d6e8a230545434f4f8227b3bccbf Mon Sep 17 00:00:00 2001 From: "Ahmad N. F." Date: Tue, 15 Apr 2025 16:35:07 +0700 Subject: [PATCH 5/5] move hints to string function --- mc2mc/internal/client/odps.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/mc2mc/internal/client/odps.go b/mc2mc/internal/client/odps.go index 6143856..a483f1b 100644 --- a/mc2mc/internal/client/odps.go +++ b/mc2mc/internal/client/odps.go @@ -38,14 +38,8 @@ func (c *odpsClient) ExecSQL(ctx context.Context, query string, additionalHints return nil } - // add job-specific hints, which takes priority over the global hints hints := addHints(additionalHints, query) - logHints := make([]string, 0) - for k, v := range hints { - logHints = append(logHints, fmt.Sprintf("%s: %s", k, v)) - } - taskIns, err := c.client.ExecSQlWithHints(query, hints) if err != nil { return errors.WithStack(err) @@ -57,7 +51,7 @@ func (c *odpsClient) ExecSQL(ctx context.Context, query string, additionalHints err = e.Join(err, taskIns.Terminate()) return errors.WithStack(err) } - c.logger.Info(fmt.Sprintf("taskId: %s, log view: %s, hints: (%s)", taskIns.Id(), url, strings.Join(logHints, ", "))) + c.logger.Info(fmt.Sprintf("taskId: %s, log view: %s, hints: (%s)", taskIns.Id(), url, getHintsString(hints))) // wait execution success select { @@ -219,3 +213,14 @@ func retry(l *slog.Logger, retryMax int, retryBackoffMs int64, f func() error) e return err } + +func getHintsString(hints map[string]string) string { + if hints == nil { + return "" + } + var hintsStr []string + for k, v := range hints { + hintsStr = append(hintsStr, fmt.Sprintf("%s: %s", k, v)) + } + return strings.Join(hintsStr, ", ") +}