diff --git a/mc2mc/internal/client/client.go b/mc2mc/internal/client/client.go index 855ab52..a2f0ad8 100644 --- a/mc2mc/internal/client/client.go +++ b/mc2mc/internal/client/client.go @@ -28,6 +28,9 @@ type Client struct { appCtx context.Context logger *slog.Logger shutdownFns []func() error + + // TODO: remove this temporary capability after 15 nov + enablePartitionValue bool } func NewClient(ctx context.Context, setupFns ...SetupFn) (*Client, error) { @@ -59,6 +62,9 @@ func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) err if err != nil { return errors.WithStack(err) } + if c.enablePartitionValue { + queryRaw = addPartitionValueColumn(queryRaw) + } // check if table is partitioned partitionNames, err := c.OdpsClient.GetPartitionNames(ctx, tableID) @@ -82,3 +88,9 @@ func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) err c.logger.Info("execution done") return errors.WithStack(err) } + +// TODO: remove this temporary support after 15 nov +func addPartitionValueColumn(rawQuery []byte) []byte { + sanitizeQuery := strings.TrimSuffix(string(rawQuery), ";") + return []byte(fmt.Sprintf("SELECT *, STRING(CURRENT_DATE()) as __partitionvalue FROM (%s)", sanitizeQuery)) +} diff --git a/mc2mc/internal/client/setup.go b/mc2mc/internal/client/setup.go index 8d2120d..358a76c 100644 --- a/mc2mc/internal/client/setup.go +++ b/mc2mc/internal/client/setup.go @@ -51,3 +51,10 @@ func SetupLoader(loadMethod string) SetupFn { return nil } } + +func EnablePartitionValue(enabled bool) SetupFn { + return func(c *Client) error { + c.enablePartitionValue = enabled + return nil + } +} diff --git a/mc2mc/internal/config/config.go b/mc2mc/internal/config/config.go index d709b69..e9acdae 100644 --- a/mc2mc/internal/config/config.go +++ b/mc2mc/internal/config/config.go @@ -16,6 +16,8 @@ type Config struct { OtelCollectorGRPCEndpoint string JobName string ScheduledTime string + // TODO: remove this temporary support after 15 nov 2024 + DevEnablePartitionValue bool } type maxComputeCredentials struct { @@ -37,6 +39,8 @@ func NewConfig() (*Config, error) { OtelCollectorGRPCEndpoint: getEnv("OTEL_COLLECTOR_GRPC_ENDPOINT", ""), JobName: getJobName(), ScheduledTime: getEnv("SCHEDULED_TIME", ""), + // TODO: delete this after 15 nov + DevEnablePartitionValue: getEnv("DEV__ENABLE_PARTITION_VALUE", "false") == "true", } // ali-odps-go-sdk related config scvAcc := getEnv("SERVICE_ACCOUNT", "") diff --git a/mc2mc/mc2mc.go b/mc2mc/mc2mc.go index 42f4084..1d3bb6c 100644 --- a/mc2mc/mc2mc.go +++ b/mc2mc/mc2mc.go @@ -29,6 +29,7 @@ func mc2mc() error { client.SetupOTelSDK(cfg.OtelCollectorGRPCEndpoint, cfg.JobName, cfg.ScheduledTime), client.SetupODPSClient(cfg.GenOdps()), client.SetupLoader(cfg.LoadMethod), + client.EnablePartitionValue(cfg.DevEnablePartitionValue), ) if err != nil { return errors.WithStack(err)