Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: add support for CREATE OR REPLACE VIEW #47051

Merged
merged 1 commit into from Apr 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/generated/sql/bnf/create_view_stmt.bnf
@@ -1,5 +1,7 @@
create_view_stmt ::=
'CREATE' opt_temp 'VIEW' view_name '(' name_list ')' 'AS' select_stmt
| 'CREATE' opt_temp 'VIEW' view_name 'AS' select_stmt
| 'CREATE' 'OR' 'REPLACE' opt_temp 'VIEW' view_name '(' name_list ')' 'AS' select_stmt
| 'CREATE' 'OR' 'REPLACE' opt_temp 'VIEW' view_name 'AS' select_stmt
| 'CREATE' opt_temp 'VIEW' 'IF' 'NOT' 'EXISTS' view_name '(' name_list ')' 'AS' select_stmt
| 'CREATE' opt_temp 'VIEW' 'IF' 'NOT' 'EXISTS' view_name 'AS' select_stmt
1 change: 1 addition & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Expand Up @@ -1059,6 +1059,7 @@ create_table_as_stmt ::=

create_view_stmt ::=
'CREATE' opt_temp 'VIEW' view_name opt_column_list 'AS' select_stmt
| 'CREATE' 'OR' 'REPLACE' opt_temp 'VIEW' view_name opt_column_list 'AS' select_stmt
| 'CREATE' opt_temp 'VIEW' 'IF' 'NOT' 'EXISTS' view_name opt_column_list 'AS' select_stmt

create_sequence_stmt ::=
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/create_table.go
Expand Up @@ -133,7 +133,8 @@ func isTypeSupportedInVersion(v clusterversion.ClusterVersion, t *types.T) bool
func (n *createTableNode) ReadingOwnWrites() {}

// getTableCreateParams returns the table key needed for the new table,
// as well as the schema id.
// as well as the schema id. It returns valid data in the case that
// the desired object exists.
func getTableCreateParams(
params runParams, dbID sqlbase.ID, isTemporary bool, tableName string,
) (sqlbase.DescriptorKey, sqlbase.ID, error) {
Expand Down Expand Up @@ -176,7 +177,8 @@ func getTableCreateParams(

exists, _, err := sqlbase.LookupObjectID(params.ctx, params.p.txn, dbID, schemaID, tableName)
if err == nil && exists {
return nil, 0, sqlbase.NewRelationAlreadyExistsError(tableName)
// Still return data in this case.
return tKey, schemaID, sqlbase.NewRelationAlreadyExistsError(tableName)
} else if err != nil {
return nil, 0, err
}
Expand Down
263 changes: 222 additions & 41 deletions pkg/sql/create_view.go
Expand Up @@ -15,7 +15,9 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
Expand All @@ -30,6 +32,7 @@ type createViewNode struct {
// qualified.
viewQuery string
ifNotExists bool
replace bool
temporary bool
dbDesc *sqlbase.DatabaseDescriptor
columns sqlbase.ResultColumns
Expand Down Expand Up @@ -70,12 +73,36 @@ func (n *createViewNode) startExec(params runParams) error {
backRefMutables[id] = backRefMutable
}

var replacingDesc *sqlbase.MutableTableDescriptor

tKey, schemaID, err := getTableCreateParams(params, n.dbDesc.ID, isTemporary, viewName)
if err != nil {
if sqlbase.IsRelationAlreadyExistsError(err) && n.ifNotExists {
switch {
case !sqlbase.IsRelationAlreadyExistsError(err):
return err
case n.ifNotExists:
return nil
case n.replace:
// If we are replacing an existing view see if what we are
// replacing is actually a view.
id, err := getDescriptorID(params.ctx, params.p.txn, tKey)
if err != nil {
return err
}
desc, err := params.p.Tables().getMutableTableVersionByID(params.ctx, id, params.p.txn)
if err != nil {
return err
}
if err := params.p.CheckPrivilege(params.ctx, desc, privilege.DROP); err != nil {
return err
}
if !desc.IsView() {
return pgerror.Newf(pgcode.WrongObjectType, `%q is not a view`, viewName)
}
replacingDesc = desc
default:
return err
}
return err
}

schemaName := tree.PublicSchemaName
Expand All @@ -84,66 +111,89 @@ func (n *createViewNode) startExec(params runParams) error {
schemaName = tree.Name(params.p.TemporarySchemaName())
}

id, err := GenerateUniqueDescID(params.ctx, params.extendedEvalCtx.ExecCfg.DB)
if err != nil {
return err
}

// Inherit permissions from the database descriptor.
privs := n.dbDesc.GetPrivileges()

desc, err := makeViewTableDesc(
viewName,
n.viewQuery,
n.dbDesc.ID,
schemaID,
id,
n.columns,
params.creationTimeForNewTableDescriptor(),
privs,
&params.p.semaCtx,
params.EvalContext(),
isTemporary,
)
if err != nil {
return err
}
var newDesc *sqlbase.MutableTableDescriptor

// Collect all the tables/views this view depends on.
for backrefID := range n.planDeps {
desc.DependsOn = append(desc.DependsOn, backrefID)
}
// If replacingDesc != nil, we found an existing view while resolving
// the name for our view. So instead of creating a new view, replace
// the existing one.
if replacingDesc != nil {
newDesc, err = params.p.replaceViewDesc(params.ctx, n, replacingDesc, backRefMutables)
if err != nil {
return err
}
} else {
// If we aren't replacing anything, make a new table descriptor.
id, err := GenerateUniqueDescID(params.ctx, params.extendedEvalCtx.ExecCfg.DB)
if err != nil {
return err
}
desc, err := makeViewTableDesc(
viewName,
n.viewQuery,
n.dbDesc.ID,
schemaID,
id,
n.columns,
params.creationTimeForNewTableDescriptor(),
privs,
&params.p.semaCtx,
params.p.EvalContext(),
isTemporary,
)
if err != nil {
return err
}

// TODO (lucy): I think this needs a NodeFormatter implementation. For now,
// do some basic string formatting (not accurate in the general case).
if err = params.p.createDescriptorWithID(
params.ctx, tKey.Key(), id, &desc, params.EvalContext().Settings,
fmt.Sprintf("CREATE VIEW %q AS %q", n.viewName, n.viewQuery),
); err != nil {
return err
// Collect all the tables/views this view depends on.
for backrefID := range n.planDeps {
desc.DependsOn = append(desc.DependsOn, backrefID)
}

// TODO (lucy): I think this needs a NodeFormatter implementation. For now,
// do some basic string formatting (not accurate in the general case).
if err = params.p.createDescriptorWithID(
params.ctx, tKey.Key(), id, &desc, params.EvalContext().Settings,
fmt.Sprintf("CREATE VIEW %q AS %q", n.viewName, n.viewQuery),
); err != nil {
return err
}
newDesc = &desc
}

// Persist the back-references in all referenced table descriptors.
for id, updated := range n.planDeps {
backRefMutable := backRefMutables[id]
// In case that we are replacing a view that already depends on
// this table, remove all existing references so that we don't leave
// any out of date references. Then, add the new references.
backRefMutable.DependedOnBy = removeMatchingReferences(
backRefMutable.DependedOnBy,
newDesc.ID,
)
for _, dep := range updated.deps {
// The logical plan constructor merely registered the dependencies.
// It did not populate the "ID" field of TableDescriptor_Reference,
// because the ID of the newly created view descriptor was not
// yet known.
// We need to do it here.
dep.ID = desc.ID
dep.ID = newDesc.ID
backRefMutable.DependedOnBy = append(backRefMutable.DependedOnBy, dep)
}
// TODO (lucy): Have more consistent/informative names for dependent jobs.
if err := params.p.writeSchemaChange(
params.ctx, backRefMutable, sqlbase.InvalidMutationID, "updating view reference",
params.ctx,
backRefMutable,
sqlbase.InvalidMutationID,
fmt.Sprintf("updating view reference %q", n.viewName),
); err != nil {
return err
}
}

if err := desc.Validate(params.ctx, params.p.txn); err != nil {
if err := newDesc.Validate(params.ctx, params.p.txn); err != nil {
return err
}

Expand All @@ -154,7 +204,7 @@ func (n *createViewNode) startExec(params runParams) error {
params.ctx,
params.p.txn,
EventLogCreateView,
int32(desc.ID),
int32(newDesc.ID),
int32(params.extendedEvalCtx.NodeID),
struct {
ViewName string
Expand Down Expand Up @@ -202,20 +252,151 @@ func makeViewTableDesc(
temporary,
)
desc.ViewQuery = viewQuery
if err := addResultColumns(semaCtx, evalCtx, &desc, resultColumns); err != nil {
return sqlbase.MutableTableDescriptor{}, err
}
return desc, nil
}

// replaceViewDesc modifies and returns the input view descriptor changed
// to hold the new view represented by n. Note that back references from
// tables that the new view depends on still need to be added. This function
// will additionally drop backreferences from tables the old view depended
// on that the new view no longer depends on.
func (p *planner) replaceViewDesc(
ctx context.Context,
n *createViewNode,
toReplace *sqlbase.MutableTableDescriptor,
backRefMutables map[sqlbase.ID]*sqlbase.MutableTableDescriptor,
) (*sqlbase.MutableTableDescriptor, error) {
// Set the query to the new query.
toReplace.ViewQuery = n.viewQuery
// Reset the columns to add the new result columns onto.
toReplace.Columns = make([]sqlbase.ColumnDescriptor, 0, len(n.columns))
toReplace.NextColumnID = 0
if err := addResultColumns(&p.semaCtx, p.EvalContext(), toReplace, n.columns); err != nil {
return nil, err
}

// Compare toReplace against its ClusterVersion to verify if
// its new set of columns is valid for a replacement view.
if err := verifyReplacingViewColumns(
toReplace.ClusterVersion.Columns,
toReplace.Columns,
); err != nil {
return nil, err
}

// Remove the back reference from all tables that the view depended on.
for _, id := range toReplace.DependsOn {
desc, ok := backRefMutables[id]
if !ok {
var err error
desc, err = p.Tables().getMutableTableVersionByID(ctx, id, p.txn)
if err != nil {
return nil, err
}
backRefMutables[id] = desc
}

// If n.planDeps doesn't contain id, then the new view definition doesn't
// reference this table anymore, so we can remove all existing references.
if _, ok := n.planDeps[id]; !ok {
desc.DependedOnBy = removeMatchingReferences(desc.DependedOnBy, toReplace.ID)
if err := p.writeSchemaChange(
ctx,
desc,
sqlbase.InvalidMutationID,
fmt.Sprintf("updating view reference %q", n.viewName),
); err != nil {
return nil, err
}
}
}

// Since the view query has been replaced, the dependencies that this
// table descriptor had are gone.
toReplace.DependsOn = make([]sqlbase.ID, 0, len(n.planDeps))
for backrefID := range n.planDeps {
toReplace.DependsOn = append(toReplace.DependsOn, backrefID)
}

// Since we are replacing an existing view here, we need to write the new
// descriptor into place.
if err := p.writeSchemaChange(ctx, toReplace, sqlbase.InvalidMutationID,
fmt.Sprintf("CREATE OR REPLACE VIEW %q AS %q", n.viewName, n.viewQuery),
); err != nil {
return nil, err
}
return toReplace, nil
}

// addResultColumns adds the resultColumns as actual column
// descriptors onto desc.
func addResultColumns(
semaCtx *tree.SemaContext,
evalCtx *tree.EvalContext,
desc *sqlbase.MutableTableDescriptor,
resultColumns sqlbase.ResultColumns,
) error {
for _, colRes := range resultColumns {
columnTableDef := tree.ColumnTableDef{Name: tree.Name(colRes.Name), Type: colRes.Typ}
// The new types in the CREATE VIEW column specs never use
// SERIAL so we need not process SERIAL types here.
col, _, _, err := sqlbase.MakeColumnDefDescs(&columnTableDef, semaCtx, evalCtx)
if err != nil {
return desc, err
return err
}
desc.AddColumn(col)
}
if err := desc.AllocateIDs(); err != nil {
return sqlbase.MutableTableDescriptor{}, err
return err
}
return desc, nil
return nil
}

// verifyReplacingViewColumns ensures that the new set of view columns must
// have at least the same prefix of columns as the old view. We attempt to
// match the postgres error message in each of the error cases below.
func verifyReplacingViewColumns(oldColumns, newColumns []sqlbase.ColumnDescriptor) error {
if len(newColumns) < len(oldColumns) {
return pgerror.Newf(pgcode.InvalidTableDefinition, "cannot drop columns from view")
}
for i := range oldColumns {
oldCol, newCol := &oldColumns[i], &newColumns[i]
if oldCol.Name != newCol.Name {
return pgerror.Newf(
pgcode.InvalidTableDefinition,
`cannot change name of view column %q to %q`,
oldCol.Name,
newCol.Name,
)
}
if !newCol.Type.Equal(oldCol.Type) {
return pgerror.Newf(
pgcode.InvalidTableDefinition,
`cannot change type of view column %q from %s to %s`,
oldCol.Name,
oldCol.Type.String(),
newCol.Type.String(),
)
}
if newCol.Hidden != oldCol.Hidden {
return pgerror.Newf(
pgcode.InvalidTableDefinition,
`cannot change visibility of view column %q`,
oldCol.Name,
)
}
if newCol.Nullable != oldCol.Nullable {
return pgerror.Newf(
pgcode.InvalidTableDefinition,
`cannot change nullability of view column %q`,
oldCol.Name,
)
}
}
return nil
}

func overrideColumnNames(cols sqlbase.ResultColumns, newNames tree.NameList) sqlbase.ResultColumns {
Expand Down