From f3d1048481a352033f8c0332f4f6593ab4f89e09 Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Mon, 6 Apr 2026 16:58:44 +0530 Subject: [PATCH 1/3] Add enforceEmbeddedSchemaCorrectness option to TExecuteStatementReq Add a new optional bool field (0xD19) to TExecuteStatementReq in the thrift contract and expose it as an opt-in configuration parameter. When enabled, the server enforces embedded schema correctness during query execution. - Add field to thrift-generated cli_service.go with full Read/Write/Equals support - Add EnforceEmbeddedSchemaCorrectness to UserConfig (default false) - Add WithEnforceEmbeddedSchemaCorrectness connector option - Add DSN parameter support (enforceEmbeddedSchemaCorrectness=true) - Wire config to TExecuteStatementReq in executeStatement ES-1804970 Co-authored-by: Isaac --- connection.go | 5 +++ connector.go | 9 +++++ internal/cli_service/cli_service.go | 52 +++++++++++++++++++++++++++++ internal/config/config.go | 12 +++++-- 4 files changed, 76 insertions(+), 2 deletions(-) diff --git a/connection.go b/connection.go index c297d5bd..0d4af39e 100644 --- a/connection.go +++ b/connection.go @@ -323,6 +323,11 @@ func (c *conn) executeStatement(ctx context.Context, query string, args []driver req.Parameters = parameters } + // Add enforce embedded schema correctness if enabled + if c.cfg.EnforceEmbeddedSchemaCorrectness { + req.EnforceEmbeddedSchemaCorrectness = &c.cfg.EnforceEmbeddedSchemaCorrectness + } + resp, err := c.client.ExecuteStatement(ctx, &req) var log *logger.DBSQLLogger log, ctx = client.LoggerAndContext(ctx, resp) diff --git a/connector.go b/connector.go index 1f77ac3f..7a4fe993 100644 --- a/connector.go +++ b/connector.go @@ -291,6 +291,15 @@ func WithEnableMetricViewMetadata(enable bool) ConnOption { } } +// WithEnforceEmbeddedSchemaCorrectness enables enforcement of embedded schema correctness +// in query execution. When set to true, the server will enforce embedded schema correctness. +// Default is false. +func WithEnforceEmbeddedSchemaCorrectness(enforce bool) ConnOption { + return func(c *config.Config) { + c.EnforceEmbeddedSchemaCorrectness = enforce + } +} + // Setup of Oauth M2m authentication func WithClientCredentials(clientID, clientSecret string) ConnOption { return func(c *config.Config) { diff --git a/internal/cli_service/cli_service.go b/internal/cli_service/cli_service.go index 71952c69..d923bea6 100644 --- a/internal/cli_service/cli_service.go +++ b/internal/cli_service/cli_service.go @@ -11776,6 +11776,7 @@ func (p *TSparkArrowTypes) Validate() error { // - Parameters // - MaxBytesPerBatch // - StatementConf +// - EnforceEmbeddedSchemaCorrectness type TExecuteStatementReq struct { SessionHandle *TSessionHandle `thrift:"sessionHandle,1,required" db:"sessionHandle" json:"sessionHandle"` Statement string `thrift:"statement,2,required" db:"statement" json:"statement"` @@ -11794,6 +11795,8 @@ type TExecuteStatementReq struct { MaxBytesPerBatch *int64 `thrift:"maxBytesPerBatch,1289" db:"maxBytesPerBatch" json:"maxBytesPerBatch,omitempty"` // unused fields # 1290 to 1295 StatementConf *TStatementConf `thrift:"statementConf,1296" db:"statementConf" json:"statementConf,omitempty"` + // unused fields # 1297 to 3352 + EnforceEmbeddedSchemaCorrectness *bool `thrift:"enforceEmbeddedSchemaCorrectness,3353" db:"enforceEmbeddedSchemaCorrectness" json:"enforceEmbeddedSchemaCorrectness,omitempty"` } func NewTExecuteStatementReq() *TExecuteStatementReq { @@ -11894,6 +11897,13 @@ func (p *TExecuteStatementReq) GetStatementConf() *TStatementConf { } return p.StatementConf } +var TExecuteStatementReq_EnforceEmbeddedSchemaCorrectness_DEFAULT bool +func (p *TExecuteStatementReq) GetEnforceEmbeddedSchemaCorrectness() bool { + if !p.IsSetEnforceEmbeddedSchemaCorrectness() { + return TExecuteStatementReq_EnforceEmbeddedSchemaCorrectness_DEFAULT + } +return *p.EnforceEmbeddedSchemaCorrectness +} func (p *TExecuteStatementReq) IsSetSessionHandle() bool { return p.SessionHandle != nil } @@ -11950,6 +11960,10 @@ func (p *TExecuteStatementReq) IsSetStatementConf() bool { return p.StatementConf != nil } +func (p *TExecuteStatementReq) IsSetEnforceEmbeddedSchemaCorrectness() bool { + return p.EnforceEmbeddedSchemaCorrectness != nil +} + func (p *TExecuteStatementReq) Read(ctx context.Context, iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(ctx); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -12117,6 +12131,16 @@ func (p *TExecuteStatementReq) Read(ctx context.Context, iprot thrift.TProtocol) return err } } + case 3353: + if fieldTypeId == thrift.BOOL { + if err := p.ReadField3353(ctx, iprot); err != nil { + return err + } + } else { + if err := iprot.Skip(ctx, fieldTypeId); err != nil { + return err + } + } default: if err := iprot.Skip(ctx, fieldTypeId); err != nil { return err @@ -12299,6 +12323,15 @@ func (p *TExecuteStatementReq) ReadField1296(ctx context.Context, iprot thrift. return nil } +func (p *TExecuteStatementReq) ReadField3353(ctx context.Context, iprot thrift.TProtocol) error { + if v, err := iprot.ReadBool(ctx); err != nil { + return thrift.PrependError("error reading field 3353: ", err) +} else { + p.EnforceEmbeddedSchemaCorrectness = &v +} + return nil +} + func (p *TExecuteStatementReq) Write(ctx context.Context, oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin(ctx, "TExecuteStatementReq"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) } @@ -12318,6 +12351,7 @@ func (p *TExecuteStatementReq) Write(ctx context.Context, oprot thrift.TProtocol if err := p.writeField1288(ctx, oprot); err != nil { return err } if err := p.writeField1289(ctx, oprot); err != nil { return err } if err := p.writeField1296(ctx, oprot); err != nil { return err } + if err := p.writeField3353(ctx, oprot); err != nil { return err } } if err := oprot.WriteFieldStop(ctx); err != nil { return thrift.PrependError("write field stop error: ", err) } @@ -12525,6 +12559,18 @@ func (p *TExecuteStatementReq) writeField1296(ctx context.Context, oprot thrift. return err } +func (p *TExecuteStatementReq) writeField3353(ctx context.Context, oprot thrift.TProtocol) (err error) { + if p.IsSetEnforceEmbeddedSchemaCorrectness() { + if err := oprot.WriteFieldBegin(ctx, "enforceEmbeddedSchemaCorrectness", thrift.BOOL, 3353); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 3353:enforceEmbeddedSchemaCorrectness: ", p), err) } + if err := oprot.WriteBool(ctx, bool(*p.EnforceEmbeddedSchemaCorrectness)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.enforceEmbeddedSchemaCorrectness (3353) field write error: ", p), err) } + if err := oprot.WriteFieldEnd(ctx); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 3353:enforceEmbeddedSchemaCorrectness: ", p), err) } + } + return err +} + func (p *TExecuteStatementReq) Equals(other *TExecuteStatementReq) bool { if p == other { return true @@ -12584,6 +12630,12 @@ func (p *TExecuteStatementReq) Equals(other *TExecuteStatementReq) bool { if (*p.MaxBytesPerBatch) != (*other.MaxBytesPerBatch) { return false } } if !p.StatementConf.Equals(other.StatementConf) { return false } + if p.EnforceEmbeddedSchemaCorrectness != other.EnforceEmbeddedSchemaCorrectness { + if p.EnforceEmbeddedSchemaCorrectness == nil || other.EnforceEmbeddedSchemaCorrectness == nil { + return false + } + if (*p.EnforceEmbeddedSchemaCorrectness) != (*other.EnforceEmbeddedSchemaCorrectness) { return false } + } return true } diff --git a/internal/config/config.go b/internal/config/config.go index e13cb98f..1f008403 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -99,8 +99,9 @@ type UserConfig struct { RetryWaitMax time.Duration RetryMax int Transport http.RoundTripper - UseLz4Compression bool - EnableMetricViewMetadata bool + UseLz4Compression bool + EnableMetricViewMetadata bool + EnforceEmbeddedSchemaCorrectness bool CloudFetchConfig } @@ -282,6 +283,13 @@ func ParseDSN(dsn string) (UserConfig, error) { ucfg.EnableMetricViewMetadata = enableMetricViewMetadata } + if enforceEmbeddedSchemaCorrectness, ok, err := params.extractAsBool("enforceEmbeddedSchemaCorrectness"); ok { + if err != nil { + return UserConfig{}, err + } + ucfg.EnforceEmbeddedSchemaCorrectness = enforceEmbeddedSchemaCorrectness + } + // for timezone we do a case insensitive key match. // We use getNoCase because we want to leave timezone in the params so that it will also // be used as a session param. From 851ea14d3740c3637c9080530fd1a8f61e472822 Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Mon, 20 Apr 2026 10:11:37 +0530 Subject: [PATCH 2/3] Regenerate cli_service.go from thrift instead of hand-editing Address review feedback: the prior commit added EnforceEmbeddedSchemaCorrectness to internal/cli_service/cli_service.go by hand. Regenerate via the thrift compiler (0.19.0) by adding the field to TCLIService.thrift (0xD19: optional bool enforceEmbeddedSchemaCorrectness = false) and running `thrift -r --gen go TCLIService.thrift`. Because the thrift definition has a default value, the generated field is `bool` (value) rather than `*bool` (pointer). Update connection.go to assign the value directly. Co-authored-by: Isaac --- connection.go | 5 +---- internal/cli_service/cli_service.go | 23 ++++++++--------------- 2 files changed, 9 insertions(+), 19 deletions(-) diff --git a/connection.go b/connection.go index 0d4af39e..2ba323af 100644 --- a/connection.go +++ b/connection.go @@ -323,10 +323,7 @@ func (c *conn) executeStatement(ctx context.Context, query string, args []driver req.Parameters = parameters } - // Add enforce embedded schema correctness if enabled - if c.cfg.EnforceEmbeddedSchemaCorrectness { - req.EnforceEmbeddedSchemaCorrectness = &c.cfg.EnforceEmbeddedSchemaCorrectness - } + req.EnforceEmbeddedSchemaCorrectness = c.cfg.EnforceEmbeddedSchemaCorrectness resp, err := c.client.ExecuteStatement(ctx, &req) var log *logger.DBSQLLogger diff --git a/internal/cli_service/cli_service.go b/internal/cli_service/cli_service.go index d923bea6..a43bcc2d 100644 --- a/internal/cli_service/cli_service.go +++ b/internal/cli_service/cli_service.go @@ -11796,7 +11796,7 @@ type TExecuteStatementReq struct { // unused fields # 1290 to 1295 StatementConf *TStatementConf `thrift:"statementConf,1296" db:"statementConf" json:"statementConf,omitempty"` // unused fields # 1297 to 3352 - EnforceEmbeddedSchemaCorrectness *bool `thrift:"enforceEmbeddedSchemaCorrectness,3353" db:"enforceEmbeddedSchemaCorrectness" json:"enforceEmbeddedSchemaCorrectness,omitempty"` + EnforceEmbeddedSchemaCorrectness bool `thrift:"enforceEmbeddedSchemaCorrectness,3353" db:"enforceEmbeddedSchemaCorrectness" json:"enforceEmbeddedSchemaCorrectness"` } func NewTExecuteStatementReq() *TExecuteStatementReq { @@ -11897,12 +11897,10 @@ func (p *TExecuteStatementReq) GetStatementConf() *TStatementConf { } return p.StatementConf } -var TExecuteStatementReq_EnforceEmbeddedSchemaCorrectness_DEFAULT bool +var TExecuteStatementReq_EnforceEmbeddedSchemaCorrectness_DEFAULT bool = false + func (p *TExecuteStatementReq) GetEnforceEmbeddedSchemaCorrectness() bool { - if !p.IsSetEnforceEmbeddedSchemaCorrectness() { - return TExecuteStatementReq_EnforceEmbeddedSchemaCorrectness_DEFAULT - } -return *p.EnforceEmbeddedSchemaCorrectness + return p.EnforceEmbeddedSchemaCorrectness } func (p *TExecuteStatementReq) IsSetSessionHandle() bool { return p.SessionHandle != nil @@ -11961,7 +11959,7 @@ func (p *TExecuteStatementReq) IsSetStatementConf() bool { } func (p *TExecuteStatementReq) IsSetEnforceEmbeddedSchemaCorrectness() bool { - return p.EnforceEmbeddedSchemaCorrectness != nil + return p.EnforceEmbeddedSchemaCorrectness != TExecuteStatementReq_EnforceEmbeddedSchemaCorrectness_DEFAULT } func (p *TExecuteStatementReq) Read(ctx context.Context, iprot thrift.TProtocol) error { @@ -12327,7 +12325,7 @@ func (p *TExecuteStatementReq) ReadField3353(ctx context.Context, iprot thrift. if v, err := iprot.ReadBool(ctx); err != nil { return thrift.PrependError("error reading field 3353: ", err) } else { - p.EnforceEmbeddedSchemaCorrectness = &v + p.EnforceEmbeddedSchemaCorrectness = v } return nil } @@ -12563,7 +12561,7 @@ func (p *TExecuteStatementReq) writeField3353(ctx context.Context, oprot thrift. if p.IsSetEnforceEmbeddedSchemaCorrectness() { if err := oprot.WriteFieldBegin(ctx, "enforceEmbeddedSchemaCorrectness", thrift.BOOL, 3353); err != nil { return thrift.PrependError(fmt.Sprintf("%T write field begin error 3353:enforceEmbeddedSchemaCorrectness: ", p), err) } - if err := oprot.WriteBool(ctx, bool(*p.EnforceEmbeddedSchemaCorrectness)); err != nil { + if err := oprot.WriteBool(ctx, bool(p.EnforceEmbeddedSchemaCorrectness)); err != nil { return thrift.PrependError(fmt.Sprintf("%T.enforceEmbeddedSchemaCorrectness (3353) field write error: ", p), err) } if err := oprot.WriteFieldEnd(ctx); err != nil { return thrift.PrependError(fmt.Sprintf("%T write field end error 3353:enforceEmbeddedSchemaCorrectness: ", p), err) } @@ -12630,12 +12628,7 @@ func (p *TExecuteStatementReq) Equals(other *TExecuteStatementReq) bool { if (*p.MaxBytesPerBatch) != (*other.MaxBytesPerBatch) { return false } } if !p.StatementConf.Equals(other.StatementConf) { return false } - if p.EnforceEmbeddedSchemaCorrectness != other.EnforceEmbeddedSchemaCorrectness { - if p.EnforceEmbeddedSchemaCorrectness == nil || other.EnforceEmbeddedSchemaCorrectness == nil { - return false - } - if (*p.EnforceEmbeddedSchemaCorrectness) != (*other.EnforceEmbeddedSchemaCorrectness) { return false } - } + if p.EnforceEmbeddedSchemaCorrectness != other.EnforceEmbeddedSchemaCorrectness { return false } return true } From fdfc8a140b2d2b4fcb0352590dd2dacd88de2a2e Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Mon, 20 Apr 2026 10:31:41 +0530 Subject: [PATCH 3/3] Fix gofmt alignment in config.go struct Adding EnforceEmbeddedSchemaCorrectness to UserConfig widened the struct's alignment column; run `gofmt -s -w` to realign the preceding telemetry fields so CI lint passes. Co-authored-by: Isaac --- internal/config/config.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index e90f1987..bdb1a17c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -101,12 +101,12 @@ type UserConfig struct { // Telemetry configuration // Uses config overlay pattern: client > server > default. // Unset = check server feature flag; explicitly true/false overrides the server. - EnableTelemetry ConfigValue[bool] - TelemetryBatchSize int // 0 = use default (100) - TelemetryFlushInterval time.Duration // 0 = use default (5s) - TelemetryRetryCount int // -1 = use default (3); 0 = disable retries; set via telemetry_retry_count - TelemetryRetryDelay time.Duration // 0 = use default (100ms); set via telemetry_retry_delay - Transport http.RoundTripper + EnableTelemetry ConfigValue[bool] + TelemetryBatchSize int // 0 = use default (100) + TelemetryFlushInterval time.Duration // 0 = use default (5s) + TelemetryRetryCount int // -1 = use default (3); 0 = disable retries; set via telemetry_retry_count + TelemetryRetryDelay time.Duration // 0 = use default (100ms); set via telemetry_retry_delay + Transport http.RoundTripper UseLz4Compression bool EnableMetricViewMetadata bool EnforceEmbeddedSchemaCorrectness bool