Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mc2mc/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type OdpsClient interface {
ExecSQL(ctx context.Context, query string) error
SetDefaultProject(project string)
SetLogViewRetentionInDays(days int)
SetAdditionalHints(hints map[string]string)
}

type Client struct {
Expand Down
20 changes: 14 additions & 6 deletions mc2mc/internal/client/odps.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type odpsClient struct {
client *odps.Odps

logViewRetentionInDays int
additionalHints map[string]string
}

// NewODPSClient creates a new odpsClient instance
Expand All @@ -31,7 +32,7 @@ func NewODPSClient(logger *slog.Logger, client *odps.Odps) *odpsClient {
// with capability to do graceful shutdown by terminating task instance
// when context is cancelled.
func (c *odpsClient) ExecSQL(ctx context.Context, query string) error {
hints := addHints(query)
hints := addHints(c.additionalHints, query)
taskIns, err := c.client.ExecSQlWithHints(query, hints)
if err != nil {
return errors.WithStack(err)
Expand All @@ -57,6 +58,11 @@ func (c *odpsClient) ExecSQL(ctx context.Context, query string) error {
}
}

// SetAdditionalHints sets the additional hints for the odps client
func (c *odpsClient) SetAdditionalHints(hints map[string]string) {
c.additionalHints = hints
}

// SetLogViewRetentionInDays sets the log view retention in days
func (c *odpsClient) SetLogViewRetentionInDays(days int) {
c.logViewRetentionInDays = days
Expand Down Expand Up @@ -108,15 +114,17 @@ func wait(taskIns *odps.Instance) <-chan error {
return errChan
}

func addHints(query string) map[string]string {
func addHints(additionalHints map[string]string, query string) map[string]string {
hints := make(map[string]string)
for k, v := range additionalHints {
hints[k] = v
}
multisql := strings.Contains(query, ";")
if multisql {
return map[string]string{
"odps.sql.submit.mode": "script",
}
hints["odps.sql.submit.mode"] = "script"
}

return nil
return hints
}

// getTable returns the table with the given tableID
Expand Down
12 changes: 12 additions & 0 deletions mc2mc/internal/client/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ import (

type SetupFn func(c *Client) error

func SetupAdditionalHints(hints map[string]string) SetupFn {
return func(c *Client) error {
if c.OdpsClient == nil {
return errors.New("odps client is required")
}
if hints != nil {
c.OdpsClient.SetAdditionalHints(hints)
}
return nil
}
}

func SetUpLogViewRetentionInDays(days int) SetupFn {
return func(c *Client) error {
if c.OdpsClient == nil {
Expand Down
27 changes: 14 additions & 13 deletions mc2mc/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@ import (

// ConfigEnv is a mc configuration for the component.
type ConfigEnv struct {
LogLevel string `env:"LOG_LEVEL" envDefault:"INFO"`
OtelCollectorGRPCEndpoint string `env:"OTEL_COLLECTOR_GRPC_ENDPOINT"`
OtelAttributes string `env:"OTEL_ATTRIBUTES"`
MCServiceAccount string `env:"MC_SERVICE_ACCOUNT"`
LoadMethod string `env:"LOAD_METHOD" envDefault:"APPEND"`
QueryFilePath string `env:"QUERY_FILE_PATH" envDefault:"/data/in/query.sql"`
DestinationTableID string `env:"DESTINATION_TABLE_ID"`
DStart string `env:"DSTART"`
DEnd string `env:"DEND"`
ExecutionProject string `env:"EXECUTION_PROJECT"`
Concurrency int `env:"CONCURRENCY" envDefault:"7"`
LogViewRetentionInDays int `env:"LOG_VIEW_RETENTION_IN_DAYS" envDefault:"2"`
DisableMultiQueryGeneration bool `env:"DISABLE_MULTI_QUERY_GENERATION" envDefault:"false"`
LogLevel string `env:"LOG_LEVEL" envDefault:"INFO"`
OtelCollectorGRPCEndpoint string `env:"OTEL_COLLECTOR_GRPC_ENDPOINT"`
OtelAttributes string `env:"OTEL_ATTRIBUTES"`
MCServiceAccount string `env:"MC_SERVICE_ACCOUNT"`
LoadMethod string `env:"LOAD_METHOD" envDefault:"APPEND"`
QueryFilePath string `env:"QUERY_FILE_PATH" envDefault:"/data/in/query.sql"`
DestinationTableID string `env:"DESTINATION_TABLE_ID"`
DStart string `env:"DSTART"`
DEnd string `env:"DEND"`
ExecutionProject string `env:"EXECUTION_PROJECT"`
Concurrency int `env:"CONCURRENCY" envDefault:"7"`
AdditionalHints map[string]string `env:"ADDITIONAL_HINTS" envKeyValSeparator:"=" envSeparator:","`
LogViewRetentionInDays int `env:"LOG_VIEW_RETENTION_IN_DAYS" envDefault:"2"`
DisableMultiQueryGeneration bool `env:"DISABLE_MULTI_QUERY_GENERATION" envDefault:"false"`
// TODO: delete this
DevEnablePartitionValue string `env:"DEV__ENABLE_PARTITION_VALUE" envDefault:"false"`
DevEnableAutoPartition string `env:"DEV__ENABLE_AUTO_PARTITION" envDefault:"false"`
Expand Down
1 change: 1 addition & 0 deletions mc2mc/mc2mc.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func mc2mc(envs []string) error {
client.SetupODPSClient(cfg.GenOdps()),
client.SetupDefaultProject(cfg.ExecutionProject),
client.SetUpLogViewRetentionInDays(cfg.LogViewRetentionInDays),
client.SetupAdditionalHints(cfg.AdditionalHints),
)
if err != nil {
return errors.WithStack(err)
Expand Down
Loading