Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: permit creation and refreshing of materialized views as of system time #56167

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/sql/bnf/refresh_materialized_views.bnf
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
refresh_stmt ::=
'REFRESH' 'MATERIALIZED' 'VIEW' opt_concurrently view_name opt_clear_data
'REFRESH' 'MATERIALIZED' 'VIEW' opt_concurrently view_name opt_clear_data opt_as_of_clause
16 changes: 8 additions & 8 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ release_stmt ::=
'RELEASE' savepoint_name

refresh_stmt ::=
'REFRESH' 'MATERIALIZED' 'VIEW' opt_concurrently view_name opt_clear_data
'REFRESH' 'MATERIALIZED' 'VIEW' opt_concurrently view_name opt_clear_data opt_as_of_clause

nonpreparable_set_stmt ::=
set_transaction_stmt
Expand Down Expand Up @@ -343,6 +343,10 @@ opt_clear_data ::=
| 'WITH' 'NO' 'DATA'
|

opt_as_of_clause ::=
as_of_clause
|

set_transaction_stmt ::=
'SET' 'TRANSACTION' transaction_mode_list
| 'SET' 'SESSION' 'TRANSACTION' transaction_mode_list
Expand Down Expand Up @@ -388,10 +392,6 @@ string_or_placeholder_opt_list ::=
string_or_placeholder
| '(' string_or_placeholder_list ')'

opt_as_of_clause ::=
as_of_clause
|

opt_with_backup_options ::=
'WITH' backup_options_list
| 'WITH' 'OPTIONS' '(' backup_options_list ')'
Expand Down Expand Up @@ -1192,6 +1192,9 @@ username_or_sconst ::=
non_reserved_word
| 'SCONST'

as_of_clause ::=
'AS' 'OF' 'SYSTEM' 'TIME' a_expr

transaction_mode_list ::=
( transaction_mode ) ( ( opt_comma transaction_mode ) )*

Expand Down Expand Up @@ -1270,9 +1273,6 @@ opt_role_options ::=
opt_with role_options
|

as_of_clause ::=
'AS' 'OF' 'SYSTEM' 'TIME' a_expr

backup_options_list ::=
( backup_options ) ( ( ',' backup_options ) )*

Expand Down
48 changes: 43 additions & 5 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,25 +546,37 @@ func (ex *connExecutor) execStmtInOpenState(
// don't return any event unless an error happens.

if os.ImplicitTxn.Get() {
asOfTs, err := p.isAsOf(ctx, stmt.AST)
asOfTs, timestampType, err := p.isAsOf(ctx, stmt.AST)
if err != nil {
return makeErrEvent(err)
}
if asOfTs != nil {
p.semaCtx.AsOfTimestamp = asOfTs
p.extendedEvalCtx.SetTxnTimestamp(asOfTs.GoTime())
ex.state.setHistoricalTimestamp(ctx, *asOfTs)
switch timestampType {
case transactionTimestamp:
p.semaCtx.AsOfTimestamp = asOfTs
p.extendedEvalCtx.SetTxnTimestamp(asOfTs.GoTime())
ex.state.setHistoricalTimestamp(ctx, *asOfTs)
case backfillTimestamp:
p.semaCtx.AsOfTimestampForBackfill = asOfTs
}
}
} else {
// If we're in an explicit txn, we allow AOST but only if it matches with
// the transaction's timestamp. This is useful for running AOST statements
// using the InternalExecutor inside an external transaction; one might want
// to do that to force p.avoidCachedDescriptors to be set below.
ts, err := p.isAsOf(ctx, stmt.AST)
ts, timestampType, err := p.isAsOf(ctx, stmt.AST)
if err != nil {
return makeErrEvent(err)
}
if ts != nil {
if timestampType == backfillTimestamp {
// Can't handle this: we don't know how to do a CTAS with a historical
// read timestamp and a present write timestamp.
err = unimplemented.NewWithIssueDetailf(35712, "historical ctas in explicit txn",
"historical CREATE TABLE AS unsupported in explicit transaction")
return makeErrEvent(err)
}
if readTs := ex.state.getReadTimestamp(); *ts != readTs {
err = pgerror.Newf(pgcode.Syntax,
"inconsistent AS OF SYSTEM TIME timestamp; expected: %s", readTs)
Expand Down Expand Up @@ -635,6 +647,8 @@ func (ex *connExecutor) execStmtInOpenState(
p.stmt = &stmt
p.cancelChecker = cancelchecker.NewCancelChecker(ctx)
p.autoCommit = os.ImplicitTxn.Get() && !ex.server.cfg.TestingKnobs.DisableAutoCommit

// Now actually execute the statement!
if err := ex.dispatchToExecutionEngine(ctx, p, res); err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -775,13 +789,35 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ex.sessionTracing.TracePlanStart(ctx, stmt.AST.StatementTag())
ex.statsCollector.phaseTimes[plannerStartLogicalPlan] = timeutil.Now()

var originalTxn *kv.Txn
if planner.semaCtx.AsOfTimestampForBackfill != nil {
// If we've been tasked with backfilling a schema change operation at a
// particular system time, it's important that we do planning for the
// operation at the timestamp that we're expecting to perform the backfill
// at, in case the schema of the objects that we read have changed in
// between the present transaction timestamp and the user-defined backfill
// timestamp.
//
// Set the planner's transaction to a new historical transaction pinned at
// that timestamp. We'll restore it after planning.
historicalTxn := kv.NewTxnWithSteppingEnabled(ctx, ex.transitionCtx.db, ex.transitionCtx.nodeIDOrZero)
historicalTxn.SetFixedTimestamp(ctx, *planner.semaCtx.AsOfTimestampForBackfill)
originalTxn = planner.txn
planner.txn = historicalTxn
}
// Prepare the plan. Note, the error is processed below. Everything
// between here and there needs to happen even if there's an error.
err := ex.makeExecPlan(ctx, planner)
// We'll be closing the plan manually below after execution; this
// defer is a catch-all in case some other return path is taken.
defer planner.curPlan.close(ctx)

if originalTxn != nil {
// Reset the planner's transaction to the current-timestamp, original
// transaction.
planner.txn = originalTxn
}

if planner.autoCommit {
planner.curPlan.flags.Set(planFlagImplicitTxn)
}
Expand Down Expand Up @@ -868,7 +904,9 @@ func (ex *connExecutor) dispatchToExecutionEngine(
default:
planner.curPlan.flags.Set(planFlagNotDistributed)
}

ex.sessionTracing.TraceExecStart(ctx, "distributed")
// Dispatch the query to the execution engine.
stats, err := ex.execWithDistSQLEngine(
ctx, planner, stmt.AST.StatementType(), res, distributePlan.WillDistribute(), progAtomic,
)
Expand Down
9 changes: 8 additions & 1 deletion pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/querycache"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/fsm"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -232,11 +233,17 @@ func (ex *connExecutor) populatePrepared(
}
p.extendedEvalCtx.PrepareOnly = true

protoTS, err := p.isAsOf(ctx, stmt.AST)
protoTS, timestampType, err := p.isAsOf(ctx, stmt.AST)
if err != nil {
return 0, err
}
if protoTS != nil {
if timestampType != transactionTimestamp {
// Can't handle this: we don't know how to do a CTAS with a historical
// read timestamp and a present write timestamp.
return 0, unimplemented.NewWithIssueDetailf(35712, "historical prepared backfill",
"historical CREATE TABLE AS unsupported in explicit transaction")
}
p.semaCtx.AsOfTimestamp = protoTS
txn.SetFixedTimestamp(ctx, *protoTS)
}
Expand Down
15 changes: 14 additions & 1 deletion pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,11 @@ func (n *createTableNode) startExec(params runParams) error {
// newTableDescIfAs does it automatically).
asCols = asCols[:len(asCols)-1]
}
// Set creationTime to the AS OF SYSTEM TIME that was stored in our As clause
// if there was one.
if params.p.semaCtx.AsOfTimestampForBackfill != nil {
creationTime = *params.p.semaCtx.AsOfTimestampForBackfill
}

desc, err = newTableDescIfAs(params,
n.n, n.dbDesc.GetID(), schemaID, id, creationTime, asCols, privs, params.p.EvalContext())
Expand Down Expand Up @@ -380,6 +385,14 @@ func (n *createTableNode) startExec(params runParams) error {
// If we are in an explicit txn or the source has placeholders, we execute the
// CTAS query synchronously.
if n.n.As() && !params.p.ExtendedEvalContext().TxnImplicit {
// If we're doing an explicit transaction, we can't do a historical CTAS
// so we should bail out.
if params.p.semaCtx.AsOfTimestampForBackfill != nil {
// We shouldn't get here in normal operation, but we'll check just in
// case.
return errors.AssertionFailedf("CTAS AS OF timestamp set in explicit txn")
}

err = func() error {
// The data fill portion of CREATE AS must operate on a read snapshot,
// so that it doesn't end up observing its own writes.
Expand Down Expand Up @@ -1098,7 +1111,7 @@ func getFinalSourceQuery(source *tree.Select, evalCtx *tree.EvalContext) string
f.Close()

// Substitute placeholders with their values.
ctx := tree.NewFmtCtx(tree.FmtSerializable)
ctx := tree.NewFmtCtx(tree.FmtSerializable | tree.FmtSkipAsOfSystemTimeClauses)
ctx.SetPlaceholderFormat(func(ctx *tree.FmtCtx, placeholder *tree.Placeholder) {
d, err := placeholder.Eval(evalCtx)
if err != nil {
Expand Down
10 changes: 9 additions & 1 deletion pkg/sql/create_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,15 @@ func (n *createViewNode) startExec(params runParams) error {
// * use AllocateIDs to give the view descriptor a primary key
desc.IsMaterializedView = true
desc.State = descpb.DescriptorState_ADD
desc.CreateAsOfTime = params.p.Txn().ReadTimestamp()

// If we're performing a CREATE MATERIALIZED VIEW ... AS OF SYSTEM TIME,
// set the CreateAsOfTime to be the user-specified timestamp. This will
// cause the backfill to be performed at that timestamp.
if params.p.semaCtx.AsOfTimestampForBackfill != nil {
desc.CreateAsOfTime = *params.p.semaCtx.AsOfTimestampForBackfill
} else {
desc.CreateAsOfTime = params.p.Txn().ReadTimestamp()
}
if err := desc.AllocateIDs(params.ctx); err != nil {
return err
}
Expand Down
45 changes: 38 additions & 7 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1124,13 +1124,23 @@ func ParseHLC(s string) (hlc.Timestamp, error) {
return tree.DecimalToHLC(dec)
}

type asOfTimestampType int

const (
transactionTimestamp asOfTimestampType = iota + 1
backfillTimestamp
)

// isAsOf analyzes a statement to bypass the logic in newPlan(), since
// that requires the transaction to be started already. If the returned
// timestamp is not nil, it is the timestamp to which a transaction
// should be set. The statements that will be checked are Select,
// ShowTrace (of a Select statement), Scrub, Export, and CreateStats.
func (p *planner) isAsOf(ctx context.Context, stmt tree.Statement) (*hlc.Timestamp, error) {
func (p *planner) isAsOf(
ctx context.Context, stmt tree.Statement,
) (*hlc.Timestamp, asOfTimestampType, error) {
var asOf tree.AsOfClause
timestampType := transactionTimestamp
switch s := stmt.(type) {
case *tree.Select:
selStmt := s.Select
Expand All @@ -1142,32 +1152,53 @@ func (p *planner) isAsOf(ctx context.Context, stmt tree.Statement) (*hlc.Timesta

sc, ok := selStmt.(*tree.SelectClause)
if !ok {
return nil, nil
return nil, 0, nil
}
if sc.From.AsOf.Expr == nil {
return nil, nil
return nil, 0, nil
}

asOf = sc.From.AsOf
case *tree.Scrub:
if s.AsOf.Expr == nil {
return nil, nil
return nil, 0, nil
}
asOf = s.AsOf
case *tree.Export:
return p.isAsOf(ctx, s.Query)
case *tree.CreateStats:
if s.Options.AsOf.Expr == nil {
return nil, nil
return nil, 0, nil
}
asOf = s.Options.AsOf
case *tree.Explain:
return p.isAsOf(ctx, s.Statement)
case *tree.CreateTable:
if !s.As() {
return nil, 0, nil
}
ts, _, err := p.isAsOf(ctx, s.AsSource)
return ts, backfillTimestamp, err
case *tree.CreateView:
if !s.Materialized {
return nil, 0, nil
}
// N.B.: If the AS OF SYSTEM TIME value here is older than the most recent
// schema change to any of the tables that the view depends on, we should
// reject this update.
ts, _, err := p.isAsOf(ctx, s.AsSource)
return ts, backfillTimestamp, err
case *tree.RefreshMaterializedView:
if s.AsOf.Expr == nil {
return nil, 0, nil
}
asOf = s.AsOf
timestampType = backfillTimestamp
default:
return nil, nil
return nil, 0, nil
}
ts, err := p.EvalAsOfTimestamp(ctx, asOf)
return &ts, err
return &ts, timestampType, err
}

// isSavepoint returns true if stmt is a SAVEPOINT statement.
Expand Down