Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3825,6 +3825,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo
ConsistencyChecker: p.execCfg.ConsistencyChecker,
RangeProber: p.execCfg.RangeProber,
StmtDiagnosticsRequestInserter: ex.server.cfg.StmtDiagnosticsRecorder.InsertRequest,
TxnDiagnosticsRequestInserter: ex.server.cfg.TxnDiagnosticsRecorder.InsertTxnRequest,
CatalogBuiltins: &p.evalCatalogBuiltins,
QueryCancelKey: ex.queryCancelKey,
DescIDGenerator: ex.getDescIDGenerator(),
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ func internalExtendedEvalCtx(
IndexUsageStatsController: indexUsageStatsController,
ConsistencyChecker: execCfg.ConsistencyChecker,
StmtDiagnosticsRequestInserter: execCfg.StmtDiagnosticsRecorder.InsertRequest,
TxnDiagnosticsRequestInserter: execCfg.TxnDiagnosticsRecorder.InsertTxnRequest,
RangeStatsFetcher: execCfg.RangeStatsFetcher,
},
Tracing: &SessionTracing{},
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sem/builtins/fixed_oids.go
Original file line number Diff line number Diff line change
Expand Up @@ -2859,6 +2859,7 @@ var builtinOidsArray = []string{
2904: `lca(ltree[]: ltree[]) -> ltree`,
2905: `levenshtein_less_equal(source: string, target: string, max_d: int) -> int`,
2906: `levenshtein_less_equal(source: string, target: string, ins_cost: int, del_cost: int, sub_cost: int, max_d: int) -> int`,
2907: `crdb_internal.request_transaction_bundle(transaction_fingerprint_id: string, sampling_probability: float, min_execution_latency: interval, expires_after: interval, redacted: bool) -> tuple{int AS request_id, bool AS created}`,
}

var builtinOidsBySignature map[string]oid.Oid
Expand Down
151 changes: 151 additions & 0 deletions pkg/sql/sem/builtins/generator_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,25 @@ The last argument is a JSONB object containing the following optional fields:
makeInternallyExecutedQueryGeneratorOverload(false /* withSessionBound */, true /* withOverrides */, true /* withTxn */),
makeInternallyExecutedQueryGeneratorOverload(true /* withSessionBound */, true /* withOverrides */, true /* withTxn */),
),
"crdb_internal.request_transaction_bundle": makeBuiltin(
tree.FunctionProperties{
Category: builtinconstants.CategorySystemInfo,
DistsqlBlocklist: true,
},
makeGeneratorOverload(
tree.ParamTypes{
{Name: "transaction_fingerprint_id", Typ: types.String},
{Name: "sampling_probability", Typ: types.Float},
{Name: "min_execution_latency", Typ: types.Interval},
{Name: "expires_after", Typ: types.Interval},
{Name: "redacted", Typ: types.Bool},
},
txnDiagnosticsRequestGeneratorType,
makeTxnDiagnosticsRequestGenerator,
"Starts a transaction diagnostic for the transaction fingerprint id",
volatility.Volatile,
),
),
}

var decodePlanGistGeneratorType = types.String
Expand Down Expand Up @@ -4199,3 +4218,135 @@ func (qi *internallyExecutedQueryIterator) Close(context.Context) {
func (qi *internallyExecutedQueryIterator) ResolvedType() *types.T {
return internallyExecutedQueryGeneratorType
}

func makeTxnDiagnosticsRequestGenerator(
ctx context.Context, evalCtx *eval.Context, args tree.Datums,
) (eval.ValueGenerator, error) {
hasPriv, shouldRedact, err := evalCtx.SessionAccessor.HasViewActivityOrViewActivityRedactedRole(ctx)
if err != nil {
return nil, err
}

txnFingerprintIdStr := string(tree.MustBeDString(args[0]))
samplingProbability := float64(tree.MustBeDFloat(args[1]))
minExecutionLatency := tree.MustBeDInterval(args[2])
expiresAfter := tree.MustBeDInterval(args[3])
redacted := bool(tree.MustBeDBool(args[4]))
txnFingerprintId, err := strconv.ParseUint(txnFingerprintIdStr, 16, 64)
if err != nil {
return nil, errors.New("invalid transaction fingerprint id: must be a hex encoded representation of a transaction fingerprint id")
}
query := `
SELECT jsonb_array_elements_text(metadata->'stmtFingerprintIDs') as stmt_fingerprint_id
FROM (
SELECT metadata from crdb_internal.transaction_statistics
WHERE fingerprint_id = decode($1, 'hex')
LIMIT 1
) t
`
it, err := evalCtx.Planner.QueryIteratorEx(ctx, "get_txn_fingerprint", sessiondata.NoSessionDataOverride, query, txnFingerprintIdStr)
if err != nil {
return &txnDiagnosticsRequestGenerator{requestId: 0, created: false}, err
}

var stmtFPs []uint64
var ok, found bool
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
if err != nil {
return nil, err
}
found = true
id := string(tree.MustBeDString(it.Cur()[0]))
value, err := strconv.ParseUint(id, 16, 64)
if err != nil {
return nil, err
}
stmtFPs = append(stmtFPs, value)
}

if shouldRedact {
if !redacted {
return nil, pgerror.Newf(
pgcode.InsufficientPrivilege,
"users with VIEWACTIVITYREDACTED privilege can only request redacted statement bundles",
)
}
} else if !hasPriv {
return nil, pgerror.Newf(
pgcode.InsufficientPrivilege,
"requesting statement bundle requires VIEWACTIVITY privilege",
)
}

var username string
if sd := evalCtx.SessionData(); sd != nil {
username = sd.User().Normalized()
}
reqId, err := evalCtx.TxnDiagnosticsRequestInserter(
ctx,
txnFingerprintId,
stmtFPs,
username,
samplingProbability,
time.Duration(minExecutionLatency.Duration.Nanos()),
time.Duration(expiresAfter.Duration.Nanos()),
redacted)
if err != nil {
return nil, err
}

if !found {
return &txnDiagnosticsRequestGenerator{created: false}, nil
}
return &txnDiagnosticsRequestGenerator{requestId: reqId, created: true}, nil
}

// txnDiagnosticsRequestGenerator supports the execution of request_transaction_bundle.
var txnDiagnosticsRequestGeneratorType = types.MakeLabeledTuple(
[]*types.T{types.Int, types.Bool},
[]string{"request_id", "created"},
)

type txnDiagnosticsRequestGenerator struct {
requestId int
created bool
called bool
}

// ResolvedType implements the eval.ValueGenerator interface.
func (g *txnDiagnosticsRequestGenerator) ResolvedType() *types.T {
return txnDiagnosticsRequestGeneratorType
}

// Start implements the eval.ValueGenerator interface.
func (g *txnDiagnosticsRequestGenerator) Start(ctx context.Context, txn *kv.Txn) error {
return nil
}

// Next implements the eval.ValueGenerator interface.
func (g *txnDiagnosticsRequestGenerator) Next(ctx context.Context) (bool, error) {
if g.called {
return false, nil
}
g.called = true
return true, nil
}

// Values implements the eval.ValueGenerator interface.
func (g *txnDiagnosticsRequestGenerator) Values() (tree.Datums, error) {
var requestIdDatum tree.Datum
if g.created {
requestIdDatum = tree.NewDInt(tree.DInt(g.requestId))
} else {
requestIdDatum = tree.DNull
}

return tree.Datums{
requestIdDatum,
tree.MakeDBool(tree.DBool(g.created)),
}, nil
}

// Close implements the eval.ValueGenerator interface.
func (g *txnDiagnosticsRequestGenerator) Close(ctx context.Context) {
}
5 changes: 5 additions & 0 deletions pkg/sql/sem/eval/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ type Context struct {
// bundle request.
StmtDiagnosticsRequestInserter StmtDiagnosticsRequestInsertFunc

// TxnDiagnosticsRequestInsertFunc is used by the
// crdb_internal.request_transaction_bundle builtin to insert a transaction
// bundle request.
TxnDiagnosticsRequestInserter TxnDiagnosticsRequestInsertFunc

// CatalogBuiltins is used by various builtins which depend on looking up
// catalog information. Unlike the Planner, it is available in DistSQL.
CatalogBuiltins CatalogBuiltins
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/sem/eval/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,20 @@ type StmtDiagnosticsRequestInsertFunc func(
username string,
) error

// TxnDiagnosticsRequestInsertFunc is an interface embedded in EvalCtx that can
// be used by the builtins to insert a transaction diagnostics request. This
// interface is introduced to avoid circular dependency.
type TxnDiagnosticsRequestInsertFunc func(
ctx context.Context,
txnFingerprintId uint64,
stmtFingerprintIds []uint64,
username string,
samplingProbability float64,
minExecutionLatency time.Duration,
expiresAfter time.Duration,
redacted bool,
) (int, error)

// AsOfSystemTime represents the result from the evaluation of AS OF SYSTEM TIME
// clause.
type AsOfSystemTime struct {
Expand Down
Loading
Loading