Skip to content

Commit

Permalink
Merge #42742
Browse files Browse the repository at this point in the history
42742: sql: delete temporary tables when session exits gracefully r=arulajmani a=arulajmani

Previously, temporary tables and the temporary schema created by a
session were not cleaned up when the session ended. As these objects
are session scoped, this needs to change.

This PR:

-  Addresses the cleanup of temporary objects when the session
exits gracefully.

- Adds a SchemaID field  to table descriptors. If this field is not
set, it is assumed to be 29, which is the schemaID of `public` schema.

Release note (sql change): When a session that has created temporary tables
exits gracefully, the tables and temporary schema are deleted
automatically.

Co-authored-by: Arul Ajmani <arulajmani@gmail.com>
  • Loading branch information
craig[bot] and arulajmani committed Dec 18, 2019
2 parents 2f1e342 + 6cd0e02 commit 2b45791
Show file tree
Hide file tree
Showing 33 changed files with 727 additions and 478 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/targets_test.go
Expand Up @@ -108,7 +108,7 @@ func TestDescriptorsMatchingTargets(t *testing.T) {
{"", `TABLE system.offline`, []string{"system", "foo"}, nil, `table "system.public.offline" does not exist`},
{"system", `TABLE *`, []string{"system", "foo", "bar"}, nil, ``},
}
searchPath := sessiondata.MakeSearchPath([]string{"public", "pg_catalog"}, sessiondata.DefaultTemporarySchemaName)
searchPath := sessiondata.MakeSearchPath([]string{"public", "pg_catalog"})
for i, test := range tests {
t.Run(fmt.Sprintf("%d/%s/%s", i, test.sessionDatabase, test.pattern), func(t *testing.T) {
sql := fmt.Sprintf(`GRANT ALL ON %s TO ignored`, test.pattern)
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/importccl/import_table_creation.go
Expand Up @@ -152,6 +152,7 @@ func MakeSimpleTableDescriptor(
st,
create,
parentID,
keys.PublicSchemaID,
tableID,
hlc.Timestamp{WallTime: walltime},
sqlbase.NewDefaultPrivilegeDescriptor(),
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/importccl/load.go
Expand Up @@ -166,8 +166,8 @@ func Load(
var txn *client.Txn
// At this point the CREATE statements in the loaded SQL do not
// use the SERIAL type so we need not process SERIAL types here.
desc, err := sql.MakeTableDesc(ctx, txn, nil /* vt */, st, s, dbDesc.ID, 0, /* table ID */
ts, privs, affected, nil, evalCtx, false /* temporary */)
desc, err := sql.MakeTableDesc(ctx, txn, nil /* vt */, st, s, dbDesc.ID, keys.PublicSchemaID,
0 /* table ID */, ts, privs, affected, nil, evalCtx, false /* temporary */)
if err != nil {
return backupccl.BackupDescriptor{}, errors.Wrap(err, "make table desc")
}
Expand Down
12 changes: 5 additions & 7 deletions pkg/sql/alter_table.go
Expand Up @@ -324,7 +324,7 @@ func (n *alterTableNode) startExec(params runParams) error {

// If the dropped column uses a sequence, remove references to it from that sequence.
if len(col.UsesSequenceIds) > 0 {
if err := removeSequenceDependencies(n.tableDesc, col, params); err != nil {
if err := params.p.removeSequenceDependencies(params.ctx, n.tableDesc, col); err != nil {
return err
}
}
Expand All @@ -334,11 +334,9 @@ func (n *alterTableNode) startExec(params runParams) error {
if err := params.p.canRemoveAllColumnOwnedSequences(params.ctx, n.tableDesc, col, t.DropBehavior); err != nil {
return err
}
// If the dropped column owns a sequence, drop the sequence as well.
if len(col.OwnsSequenceIds) > 0 {
if err := dropSequencesOwnedByCol(col, params); err != nil {
return err
}

if err := params.p.dropSequencesOwnedByCol(params.ctx, col); err != nil {
return err
}

// You can't drop a column depended on by a view unless CASCADE was
Expand Down Expand Up @@ -819,7 +817,7 @@ func applyColumnMutation(

case *tree.AlterTableSetDefault:
if len(col.UsesSequenceIds) > 0 {
if err := removeSequenceDependencies(tableDesc, col, params); err != nil {
if err := params.p.removeSequenceDependencies(params.ctx, tableDesc, col); err != nil {
return err
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/backfill.go
Expand Up @@ -1312,7 +1312,8 @@ func runSchemaChangesInTxn(
// Reclaim all the old names. Leave the data and descriptor
// cleanup for later.
for _, drain := range tableDesc.DrainingNames {
err := sqlbase.RemovePublicTableNamespaceEntry(ctx, planner.Txn(), drain.ParentID, drain.Name)
err := sqlbase.RemoveObjectNamespaceEntry(ctx, planner.Txn(), drain.ParentID,
drain.ParentSchemaID, drain.Name, false /* KVTrace */)
if err != nil {
return err
}
Expand Down
29 changes: 26 additions & 3 deletions pkg/sql/conn_executor.go
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"golang.org/x/net/trace"
)

Expand Down Expand Up @@ -539,8 +540,9 @@ func (s *Server) newConnExecutor(

// ctxHolder will be reset at the start of run(). We only define
// it here so that an early call to close() doesn't panic.
ctxHolder: ctxHolder{connCtx: ctx},
executorType: executorTypeExec,
ctxHolder: ctxHolder{connCtx: ctx},
executorType: executorTypeExec,
hasCreatedTemporarySchema: false,
}

ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount
Expand All @@ -549,6 +551,9 @@ func (s *Server) newConnExecutor(
sdMutator.setCurTxnReadOnly = func(val bool) {
ex.state.readOnly = val
}
sdMutator.onTempSchemaCreation = func() {
ex.hasCreatedTemporarySchema = true
}
sdMutator.RegisterOnSessionDataChange("application_name", func(newName string) {
ex.appStats = ex.server.sqlStats.getStatsForApplication(newName)
ex.applicationName.Store(newName)
Expand Down Expand Up @@ -737,12 +742,26 @@ func (ex *connExecutor) closeWrapper(ctx context.Context, recovered interface{})
// panic or safeErr. I'm propagating safeErr to be on the safe side.
panic(safeErr)
}
ex.close(ctx, normalClose)
// Closing is not cancelable.
closeCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx))
ex.close(closeCtx, normalClose)
}

func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.sessionEventf(ctx, "finishing connExecutor")

if ex.hasCreatedTemporarySchema {
err := cleanupSessionTempObjects(ctx, ex.server, ex.sessionID)
if err != nil {
log.Errorf(
ctx,
"error deleting temporary objects at session close, "+
"the temp tables deletion job will retry periodically: %s",
err,
)
}
}

ev := noEvent
if _, noTxn := ex.machine.CurState().(stateNoTxn); !noTxn {
ev = txnAborted
Expand Down Expand Up @@ -965,6 +984,10 @@ type connExecutor struct {
// executorType is set to whether this executor is an ordinary executor which
// responds to user queries or an internal one.
executorType executorType

// hasCreatedTemporarySchema is set if the executor has created a
// temporary schema, which requires special cleanup on close.
hasCreatedTemporarySchema bool
}

// ctxHolder contains a connection's context and, while session tracing is
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/create_sequence.go
Expand Up @@ -144,7 +144,8 @@ func MakeSequenceTableDesc(
privileges *sqlbase.PrivilegeDescriptor,
params *runParams,
) (sqlbase.MutableTableDescriptor, error) {
desc := InitTableDescriptor(id, parentID, sequenceName, creationTime, privileges, false /* temporary */)
desc := InitTableDescriptor(id, parentID, keys.PublicSchemaID, sequenceName, creationTime,
privileges, false /* temporary */)

// Mimic a table with one column, "value".
desc.Columns = []sqlbase.ColumnDescriptor{
Expand Down
47 changes: 28 additions & 19 deletions pkg/sql/create_table.go
Expand Up @@ -63,6 +63,7 @@ type createTableRun struct {

func (n *createTableNode) startExec(params runParams) error {
isTemporary := n.n.Temporary
schemaID := sqlbase.ID(keys.PublicSchemaID)

tKey := sqlbase.MakePublicTableNameKey(params.ctx,
params.ExecCfg().Settings, n.dbDesc.ID, n.n.Table.Table())
Expand All @@ -77,12 +78,12 @@ func (n *createTableNode) startExec(params runParams) error {

tempSchemaName := params.p.TemporarySchemaName()
sKey := sqlbase.NewSchemaKey(n.dbDesc.ID, tempSchemaName)
schemaID, err := getDescriptorID(params.ctx, params.p.txn, sKey)
var err error
schemaID, err = getDescriptorID(params.ctx, params.p.txn, sKey)
if err != nil {
return err
} else if schemaID == sqlbase.InvalidID {
// The temporary schema has not been created yet.
// TODO(arul): Add a job that does deletion for this session(temp schema)
if schemaID, err = createTempSchema(params, sKey); err != nil {
return err
}
Expand Down Expand Up @@ -139,7 +140,7 @@ func (n *createTableNode) startExec(params runParams) error {
}

desc, err = makeTableDescIfAs(params,
n.n, n.dbDesc.ID, id, creationTime, asCols, privs, params.p.EvalContext(), isTemporary)
n.n, n.dbDesc.ID, schemaID, id, creationTime, asCols, privs, params.p.EvalContext(), isTemporary)
if err != nil {
return err
}
Expand All @@ -151,7 +152,7 @@ func (n *createTableNode) startExec(params runParams) error {
}
} else {
affected = make(map[sqlbase.ID]*sqlbase.MutableTableDescriptor)
desc, err = makeTableDesc(params, n.n, n.dbDesc.ID, id, creationTime, privs, affected, isTemporary)
desc, err = makeTableDesc(params, n.n, n.dbDesc.ID, schemaID, id, creationTime, privs, affected, isTemporary)
if err != nil {
return err
}
Expand Down Expand Up @@ -851,22 +852,23 @@ var CreatePartitioningCCL = func(

// InitTableDescriptor returns a blank TableDescriptor.
func InitTableDescriptor(
id, parentID sqlbase.ID,
id, parentID, parentSchemaID sqlbase.ID,
name string,
creationTime hlc.Timestamp,
privileges *sqlbase.PrivilegeDescriptor,
temporary bool,
) sqlbase.MutableTableDescriptor {
return *sqlbase.NewMutableCreatedTableDescriptor(sqlbase.TableDescriptor{
ID: id,
Name: name,
ParentID: parentID,
FormatVersion: sqlbase.InterleavedFormatVersion,
Version: 1,
ModificationTime: creationTime,
Privileges: privileges,
CreateAsOfTime: creationTime,
Temporary: temporary,
ID: id,
Name: name,
ParentID: parentID,
UnexposedParentSchemaID: parentSchemaID,
FormatVersion: sqlbase.InterleavedFormatVersion,
Version: 1,
ModificationTime: creationTime,
Privileges: privileges,
CreateAsOfTime: creationTime,
Temporary: temporary,
})
}

Expand Down Expand Up @@ -914,7 +916,7 @@ func getFinalSourceQuery(source *tree.Select, evalCtx *tree.EvalContext) string
func makeTableDescIfAs(
params runParams,
p *tree.CreateTable,
parentID, id sqlbase.ID,
parentID, parentSchemaID, id sqlbase.ID,
creationTime hlc.Timestamp,
resultColumns []sqlbase.ResultColumn,
privileges *sqlbase.PrivilegeDescriptor,
Expand Down Expand Up @@ -952,7 +954,7 @@ func makeTableDescIfAs(
desc, err = makeTableDesc(
params,
p,
parentID, id,
parentID, parentSchemaID, id,
creationTime,
privileges,
nil, /* affected */
Expand Down Expand Up @@ -995,6 +997,10 @@ func dequalifyColumnRefs(
// to other tables (e.g. foreign keys or interleaving). This is useful at
// bootstrap when creating descriptors for virtual tables.
//
// parentID refers to the databaseID under which the descriptor is being
// created,and parentSchemaID refers to the schemaID of the schema under which
// the descriptor is being created.
//
// evalCtx can be nil if the table to be created has no default expression for
// any of the columns and no partitioning expression.
//
Expand All @@ -1016,7 +1022,7 @@ func MakeTableDesc(
vt SchemaResolver,
st *cluster.Settings,
n *tree.CreateTable,
parentID, id sqlbase.ID,
parentID, parentSchemaID, id sqlbase.ID,
creationTime hlc.Timestamp,
privileges *sqlbase.PrivilegeDescriptor,
affected map[sqlbase.ID]*sqlbase.MutableTableDescriptor,
Expand All @@ -1028,7 +1034,9 @@ func MakeTableDesc(
// been populated.
columnDefaultExprs := make([]tree.TypedExpr, len(n.Defs))

desc := InitTableDescriptor(id, parentID, n.Table.Table(), creationTime, privileges, temporary)
desc := InitTableDescriptor(
id, parentID, parentSchemaID, n.Table.Table(), creationTime, privileges, temporary,
)

// If all nodes in the cluster know how to handle secondary indexes with column families,
// write the new version into new index descriptors.
Expand Down Expand Up @@ -1299,7 +1307,7 @@ func MakeTableDesc(
func makeTableDesc(
params runParams,
n *tree.CreateTable,
parentID, id sqlbase.ID,
parentID, parentSchemaID, id sqlbase.ID,
creationTime hlc.Timestamp,
privileges *sqlbase.PrivilegeDescriptor,
affected map[sqlbase.ID]*sqlbase.MutableTableDescriptor,
Expand Down Expand Up @@ -1347,6 +1355,7 @@ func makeTableDesc(
params.p.ExecCfg().Settings,
n,
parentID,
parentSchemaID,
id,
creationTime,
privileges,
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/create_view.go
Expand Up @@ -13,6 +13,7 @@ package sql
import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
Expand Down Expand Up @@ -156,7 +157,9 @@ func makeViewTableDesc(
privileges *sqlbase.PrivilegeDescriptor,
semaCtx *tree.SemaContext,
) (sqlbase.MutableTableDescriptor, error) {
desc := InitTableDescriptor(id, parentID, viewName, creationTime, privileges, false /* temporary */)
desc := InitTableDescriptor(
id, parentID, keys.PublicSchemaID, viewName, creationTime, privileges, false, /* temporary */
)
desc.ViewQuery = viewQuery
for _, colRes := range resultColumns {
columnTableDef := tree.ColumnTableDef{Name: tree.Name(colRes.Name), Type: colRes.Typ}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql/server.go
Expand Up @@ -268,7 +268,7 @@ func (ds *ServerImpl) setupFlow(
ApplicationName: req.EvalContext.ApplicationName,
Database: req.EvalContext.Database,
User: req.EvalContext.User,
SearchPath: sessiondata.MakeSearchPath(req.EvalContext.SearchPath, req.EvalContext.TemporarySchemaName),
SearchPath: sessiondata.MakeSearchPath(req.EvalContext.SearchPath).WithTemporarySchemaName(req.EvalContext.TemporarySchemaName),
SequenceState: sessiondata.NewSequenceState(),
DataConversion: sessiondata.DataConversionConfig{
Location: location,
Expand Down
23 changes: 7 additions & 16 deletions pkg/sql/drop_database.go
Expand Up @@ -62,7 +62,9 @@ func (p *planner) DropDatabase(ctx context.Context, n *tree.DropDatabase) (planN
return nil, err
}

tbNames, err := GetObjectNames(ctx, p.txn, p, dbDesc, tree.PublicSchema, true /*explicitPrefix*/)
tbNames, err := GetObjectNames(
ctx, p.txn, p, dbDesc, tree.PublicSchema, true, /*explicitPrefix*/
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -139,22 +141,11 @@ func (n *dropDatabaseNode) startExec(params runParams) error {
}

for _, toDel := range n.td {
tbDesc := toDel.desc
if tbDesc.IsView() {
cascadedViews, err := p.dropViewImpl(ctx, tbDesc, tree.DropCascade)
if err != nil {
return err
}
// TODO(knz): dependent dropped views should be qualified here.
tbNameStrings = append(tbNameStrings, cascadedViews...)
} else {
cascadedViews, err := p.dropTableImpl(params, tbDesc)
if err != nil {
return err
}
// TODO(knz): dependent dropped table names should be qualified here.
tbNameStrings = append(tbNameStrings, cascadedViews...)
cascadedObjects, err := p.dropObject(ctx, toDel.desc, tree.DropCascade)
if err != nil {
return err
}
tbNameStrings = append(tbNameStrings, cascadedObjects...)
tbNameStrings = append(tbNameStrings, toDel.tn.FQString())
}

Expand Down

0 comments on commit 2b45791

Please sign in to comment.