Skip to content

Commit

Permalink
plpgsql: add support for COMMIT and ROLLBACK
Browse files Browse the repository at this point in the history
This commit adds support for the PL/pgSQL COMMIT and ROLLBACK statements,
which allow a stored procedure to finish the current transaction and
resume execution from the new transaction. This behavior is implemented
through the `TxnControl` expression from the previous commit, which
passes the necessary information (commit or rollback, continuation) to
the connExecutor before exiting. Note that these statements can only
be used from a stored procedure.

Fixes #115294

Release note (sql change): Added support for the PL/pgSQL COMMIT and
ROLLBACK statements.
  • Loading branch information
DrewKimball committed Mar 12, 2024
1 parent d3877b0 commit b32711b
Show file tree
Hide file tree
Showing 28 changed files with 1,100 additions and 74 deletions.
609 changes: 609 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/plpgsql_txn

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/ccl/logictestccl/tests/fakedist-disk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ go_test(
"//build/toolchains:is_heavy": {"test.Pool": "heavy"},
"//conditions:default": {"test.Pool": "large"},
}),
shard_count = 19,
shard_count = 20,
tags = [
"ccl_test",
"cpu:2",
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/logictestccl/tests/fakedist-disk/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/ccl/logictestccl/tests/fakedist-vec-off/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ go_test(
"//build/toolchains:is_heavy": {"test.Pool": "heavy"},
"//conditions:default": {"test.Pool": "large"},
}),
shard_count = 19,
shard_count = 20,
tags = [
"ccl_test",
"cpu:2",
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/logictestccl/tests/fakedist-vec-off/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/ccl/logictestccl/tests/fakedist/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ go_test(
"//build/toolchains:is_heavy": {"test.Pool": "heavy"},
"//conditions:default": {"test.Pool": "large"},
}),
shard_count = 20,
shard_count = 21,
tags = [
"ccl_test",
"cpu:2",
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/logictestccl/tests/fakedist/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
"//pkg/ccl/logictestccl:testdata", # keep
],
exec_properties = {"test.Pool": "large"},
shard_count = 19,
shard_count = 20,
tags = [
"ccl_test",
"cpu:1",
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/ccl/logictestccl/tests/local-mixed-23.2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
"//pkg/ccl/logictestccl:testdata", # keep
],
exec_properties = {"test.Pool": "large"},
shard_count = 18,
shard_count = 19,
tags = [
"ccl_test",
"cpu:1",
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/logictestccl/tests/local-mixed-23.2/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
"//pkg/ccl/logictestccl:testdata", # keep
],
exec_properties = {"test.Pool": "large"},
shard_count = 19,
shard_count = 20,
tags = [
"ccl_test",
"cpu:1",
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/ccl/logictestccl/tests/local-vec-off/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
"//pkg/ccl/logictestccl:testdata", # keep
],
exec_properties = {"test.Pool": "large"},
shard_count = 19,
shard_count = 20,
tags = [
"ccl_test",
"cpu:1",
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/logictestccl/tests/local-vec-off/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/ccl/logictestccl/tests/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
"//pkg/ccl/logictestccl:testdata", # keep
],
exec_properties = {"test.Pool": "large"},
shard_count = 34,
shard_count = 35,
tags = [
"ccl_test",
"cpu:1",
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/logictestccl/tests/local/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 74 additions & 12 deletions pkg/sql/opt/exec/execbuilder/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ func init() {
opt.ExistsOp: (*Builder).buildExistsSubquery,
opt.SubqueryOp: (*Builder).buildSubquery,

// User-defined functions.
opt.UDFCallOp: (*Builder).buildUDF,
// Routines.
opt.UDFCallOp: (*Builder).buildUDF,
opt.TxnControlOp: (*Builder).buildTxnControl,
}

for _, op := range opt.BoolOperators {
Expand Down Expand Up @@ -934,16 +935,9 @@ func (b *Builder) buildUDF(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Typ
}

// Build the argument expressions.
var err error
var args tree.TypedExprs
if len(udf.Args) > 0 {
args = make(tree.TypedExprs, len(udf.Args))
for i := range udf.Args {
args[i], err = b.buildScalar(ctx, udf.Args[i])
if err != nil {
return nil, err
}
}
args, err := b.buildRoutineArgs(ctx, udf.Args)
if err != nil {
return nil, err
}

for _, s := range udf.Def.Body {
Expand Down Expand Up @@ -991,6 +985,22 @@ func (b *Builder) buildUDF(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Typ
), nil
}

func (b *Builder) buildRoutineArgs(
ctx *buildScalarCtx, routineArgs memo.ScalarListExpr,
) (args tree.TypedExprs, err error) {
if len(routineArgs) == 0 {
return nil, nil
}
args = make(tree.TypedExprs, len(routineArgs))
for i := range routineArgs {
args[i], err = b.buildScalar(ctx, routineArgs[i])
if err != nil {
return nil, err
}
}
return args, nil
}

// initRoutineExceptionHandler initializes the exception handler (if any) for
// the shared BlockState of a group of sub-routines within a PLpgSQL block.
func (b *Builder) initRoutineExceptionHandler(
Expand Down Expand Up @@ -1202,3 +1212,55 @@ func (b *Builder) buildRoutinePlanGenerator(
func expectedLazyRoutineError(typ string) error {
return errors.AssertionFailedf("expected %s to be lazily planned as a routine", typ)
}

// buildTxnControl builds a TxnControlExpr into a typed expression that can be
// evaluated.
func (b *Builder) buildTxnControl(
ctx *buildScalarCtx, scalar opt.ScalarExpr,
) (tree.TypedExpr, error) {
txnExpr := scalar.(*memo.TxnControlExpr)
// Build the argument expressions.
args, err := b.buildRoutineArgs(ctx, txnExpr.Args)
if err != nil {
return nil, err
}
gen := func(
ctx context.Context, evalArgs tree.Datums,
) (con tree.StoredProcContinuation, err error) {
defer func() {
if r := recover(); r != nil {
// This code allows us to propagate internal errors without
// having to add error checks everywhere throughout the code.
// This is only possible because the code does not update shared
// state and does not manipulate locks.
//
// This is the same panic-catching logic that exists in
// o.Optimize() below. It's required here because it's possible
// for factory functions to panic below, like
// CopyAndReplaceDefault.
if ok, e := errorutil.ShouldCatch(r); ok {
err = e
log.VEventf(ctx, 1, "%v", err)
} else {
// Other panic objects can't be considered "safe" and thus
// are propagated as crashes that terminate the session.
panic(r)
}
}
}()
// Build the plan for the "continuation" procedure that will resume
// execution of the parent stored procedure in a new transaction.
var f norm.Factory
f.Init(ctx, b.evalCtx, b.catalog)
rootExpr := b.mem.RootExpr().(memo.RelExpr)
f.CopyAndReplace(rootExpr, txnExpr.Props, f.CopyWithoutAssigningPlaceholders)
memoArgs := make(memo.ScalarListExpr, len(evalArgs))
for i := range evalArgs {
memoArgs[i] = f.ConstructConstVal(evalArgs[i], evalArgs[i].ResolvedType())
}
continuationProc := f.ConstructUDFCall(memoArgs, &memo.UDFCallPrivate{Def: txnExpr.Def})
f.Memo().SetRoot(f.ConstructCall(continuationProc), f.Memo().RootProps())
return f.DetachMemo(), nil
}
return tree.NewTxnControlExpr(txnExpr.TxnOp, args, gen, txnExpr.Def.Name, txnExpr.Def.Typ), nil
}

0 comments on commit b32711b

Please sign in to comment.