diff --git a/mc2mc/internal/client/client.go b/mc2mc/internal/client/client.go index 6c50ac4..e17c28e 100644 --- a/mc2mc/internal/client/client.go +++ b/mc2mc/internal/client/client.go @@ -13,6 +13,7 @@ type OdpsClient interface { ExecSQL(ctx context.Context, query string) error SetDefaultProject(project string) SetLogViewRetentionInDays(days int) + SetAdditionalHints(hints map[string]string) } type Client struct { diff --git a/mc2mc/internal/client/odps.go b/mc2mc/internal/client/odps.go index c7b8e42..1fc4d39 100644 --- a/mc2mc/internal/client/odps.go +++ b/mc2mc/internal/client/odps.go @@ -16,6 +16,7 @@ type odpsClient struct { client *odps.Odps logViewRetentionInDays int + additionalHints map[string]string } // NewODPSClient creates a new odpsClient instance @@ -31,7 +32,7 @@ func NewODPSClient(logger *slog.Logger, client *odps.Odps) *odpsClient { // with capability to do graceful shutdown by terminating task instance // when context is cancelled. func (c *odpsClient) ExecSQL(ctx context.Context, query string) error { - hints := addHints(query) + hints := addHints(c.additionalHints, query) taskIns, err := c.client.ExecSQlWithHints(query, hints) if err != nil { return errors.WithStack(err) @@ -57,6 +58,11 @@ func (c *odpsClient) ExecSQL(ctx context.Context, query string) error { } } +// SetAdditionalHints sets the additional hints for the odps client +func (c *odpsClient) SetAdditionalHints(hints map[string]string) { + c.additionalHints = hints +} + // SetLogViewRetentionInDays sets the log view retention in days func (c *odpsClient) SetLogViewRetentionInDays(days int) { c.logViewRetentionInDays = days @@ -108,15 +114,17 @@ func wait(taskIns *odps.Instance) <-chan error { return errChan } -func addHints(query string) map[string]string { +func addHints(additionalHints map[string]string, query string) map[string]string { + hints := make(map[string]string) + for k, v := range additionalHints { + hints[k] = v + } multisql := strings.Contains(query, ";") if multisql { - return map[string]string{ - "odps.sql.submit.mode": "script", - } + hints["odps.sql.submit.mode"] = "script" } - return nil + return hints } // getTable returns the table with the given tableID diff --git a/mc2mc/internal/client/setup.go b/mc2mc/internal/client/setup.go index b88f281..278a458 100644 --- a/mc2mc/internal/client/setup.go +++ b/mc2mc/internal/client/setup.go @@ -10,6 +10,18 @@ import ( type SetupFn func(c *Client) error +func SetupAdditionalHints(hints map[string]string) SetupFn { + return func(c *Client) error { + if c.OdpsClient == nil { + return errors.New("odps client is required") + } + if hints != nil { + c.OdpsClient.SetAdditionalHints(hints) + } + return nil + } +} + func SetUpLogViewRetentionInDays(days int) SetupFn { return func(c *Client) error { if c.OdpsClient == nil { diff --git a/mc2mc/internal/config/config.go b/mc2mc/internal/config/config.go index a79608c..4011e73 100644 --- a/mc2mc/internal/config/config.go +++ b/mc2mc/internal/config/config.go @@ -9,19 +9,20 @@ import ( // ConfigEnv is a mc configuration for the component. type ConfigEnv struct { - LogLevel string `env:"LOG_LEVEL" envDefault:"INFO"` - OtelCollectorGRPCEndpoint string `env:"OTEL_COLLECTOR_GRPC_ENDPOINT"` - OtelAttributes string `env:"OTEL_ATTRIBUTES"` - MCServiceAccount string `env:"MC_SERVICE_ACCOUNT"` - LoadMethod string `env:"LOAD_METHOD" envDefault:"APPEND"` - QueryFilePath string `env:"QUERY_FILE_PATH" envDefault:"/data/in/query.sql"` - DestinationTableID string `env:"DESTINATION_TABLE_ID"` - DStart string `env:"DSTART"` - DEnd string `env:"DEND"` - ExecutionProject string `env:"EXECUTION_PROJECT"` - Concurrency int `env:"CONCURRENCY" envDefault:"7"` - LogViewRetentionInDays int `env:"LOG_VIEW_RETENTION_IN_DAYS" envDefault:"2"` - DisableMultiQueryGeneration bool `env:"DISABLE_MULTI_QUERY_GENERATION" envDefault:"false"` + LogLevel string `env:"LOG_LEVEL" envDefault:"INFO"` + OtelCollectorGRPCEndpoint string `env:"OTEL_COLLECTOR_GRPC_ENDPOINT"` + OtelAttributes string `env:"OTEL_ATTRIBUTES"` + MCServiceAccount string `env:"MC_SERVICE_ACCOUNT"` + LoadMethod string `env:"LOAD_METHOD" envDefault:"APPEND"` + QueryFilePath string `env:"QUERY_FILE_PATH" envDefault:"/data/in/query.sql"` + DestinationTableID string `env:"DESTINATION_TABLE_ID"` + DStart string `env:"DSTART"` + DEnd string `env:"DEND"` + ExecutionProject string `env:"EXECUTION_PROJECT"` + Concurrency int `env:"CONCURRENCY" envDefault:"7"` + AdditionalHints map[string]string `env:"ADDITIONAL_HINTS" envKeyValSeparator:"=" envSeparator:","` + LogViewRetentionInDays int `env:"LOG_VIEW_RETENTION_IN_DAYS" envDefault:"2"` + DisableMultiQueryGeneration bool `env:"DISABLE_MULTI_QUERY_GENERATION" envDefault:"false"` // TODO: delete this DevEnablePartitionValue string `env:"DEV__ENABLE_PARTITION_VALUE" envDefault:"false"` DevEnableAutoPartition string `env:"DEV__ENABLE_AUTO_PARTITION" envDefault:"false"` diff --git a/mc2mc/mc2mc.go b/mc2mc/mc2mc.go index 1c9766f..3d5467f 100644 --- a/mc2mc/mc2mc.go +++ b/mc2mc/mc2mc.go @@ -44,6 +44,7 @@ func mc2mc(envs []string) error { client.SetupODPSClient(cfg.GenOdps()), client.SetupDefaultProject(cfg.ExecutionProject), client.SetUpLogViewRetentionInDays(cfg.LogViewRetentionInDays), + client.SetupAdditionalHints(cfg.AdditionalHints), ) if err != nil { return errors.WithStack(err)