Skip to content

Commit

Permalink
sql: limit virtual tables to current database
Browse files Browse the repository at this point in the history
In postgres, the contents of tables in the `pg_catalog` and
`information_schema` databases are limited to entries relating to the
current database and system databases. This commit limits our
implementation of these tables in the same way, unless the current user
is root.
  • Loading branch information
jordanlewis committed Dec 6, 2016
1 parent 39b36cb commit a63b8bf
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 17 deletions.
26 changes: 22 additions & 4 deletions pkg/sql/information_schema.go
Expand Up @@ -27,6 +27,7 @@ import (

const (
informationSchemaName = "information_schema"
pgCatalogName = "pg_catalog"
)

var informationSchema = virtualSchema{
Expand Down Expand Up @@ -523,10 +524,11 @@ func forEachDatabaseDesc(p *planner, fn func(*sqlbase.DatabaseDescriptor) error)
return nil
}

// forEachTableDesc retrieves all table descriptors and iterates through them in
// lexicographical order with respect primarily to database name and secondarily
// to table name. For each table, the function will call fn with its respective
// database and table descriptor.
// forEachTableDesc retrieves all table descriptors from the current database
// and all system databases and iterates through them in lexicographical order
// with respect primarily to database name and secondarily to table name. For
// each table, the function will call fn with its respective database and table
// descriptor.
func forEachTableDesc(
p *planner, fn func(*sqlbase.DatabaseDescriptor, *sqlbase.TableDescriptor) error,
) error {
Expand All @@ -551,6 +553,19 @@ func (fn tableLookupFn) tableOrErr(id sqlbase.ID) (*sqlbase.TableDescriptor, err
return nil, errors.Errorf("could not find referenced table with ID %v", id)
}

// isSystemDatabaseName returns true if the input name is not a user db name.
func isSystemDatabaseName(name string) bool {
switch name {
case informationSchemaName:
return true
case pgCatalogName:
return true
case sqlbase.SystemDB.Name:
return true
}
return false
}

// forEachTableDescWithTableLookup acts like forEachTableDesc, except it also provides a
// tableLookupFn when calling fn to allow callers to lookup fetched table descriptors
// on demand. This is important for callers dealing with objects like foreign keys, where
Expand Down Expand Up @@ -630,6 +645,9 @@ func forEachTableDescWithTableLookup(
}
sort.Strings(dbNames)
for _, dbName := range dbNames {
if !p.isDatabaseVisible(dbName) {
continue
}
db := databases[dbName]
dbTableNames := make([]string, 0, len(db.tables))
for tableName := range db.tables {
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/pg_catalog.go
Expand Up @@ -40,7 +40,6 @@ var (
)

const (
pgCatalogName = "pg_catalog"
cockroachIndexEncoding = "prefix"
)

Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/planner.go
Expand Up @@ -428,3 +428,17 @@ func (p *planner) fillFKTableMap(m tableLookupsByID) error {
}
return nil
}

// isDatabaseVisible returns true if the given database is visible to the
// current user. Only the current database and system databases are available
// to ordinary users; everything is available to root.
func (p *planner) isDatabaseVisible(dbName string) bool {
if p.session.User == security.RootUser {
return true
} else if dbName == p.evalCtx.Database {
return true
} else if isSystemDatabaseName(dbName) {
return true
}
return false
}
24 changes: 21 additions & 3 deletions pkg/sql/show.go
Expand Up @@ -173,6 +173,12 @@ func (p *planner) ShowColumns(n *parser.ShowColumns) (planNode, error) {
}
}
}
// Temporarily set the current database to get visibility into
// information_schema if the current user isn't root.
origDatabase := p.evalCtx.Database
p.evalCtx.Database = tn.Database()
defer func() { p.evalCtx.Database = origDatabase }()

// Get columns of table from information_schema.columns.
rows, err := p.queryRows(getColumns, tn.Database(), tn.Table())
if err != nil {
Expand Down Expand Up @@ -768,16 +774,28 @@ func (p *planner) ShowTables(n *parser.ShowTables) (planNode, error) {
return nil, sqlbase.NewUndefinedDatabaseError(name)
}
}
// Temporarily set the current database to get visibility into
// information_schema if the current user isn't root.
origDatabase := p.evalCtx.Database
p.evalCtx.Database = name
defer func() { p.evalCtx.Database = origDatabase }()

// Get the tables of database from information_schema.tables.
const getTables = `SELECT TABLE_NAME FROM information_schema.tables
WHERE tables.TABLE_SCHEMA=$1 ORDER BY tables.TABLE_NAME`
stmt, err := parser.ParseOneTraditional(getTables)
rows, err := p.queryRows(getTables, name)
if err != nil {
return nil, err
}
golangFillQueryArguments(p.semaCtx.Placeholders, []interface{}{name})
return p.newPlan(stmt, nil, false)

v := p.newContainerValuesNode(columns, 0)
for _, r := range rows {
if _, err := v.rows.AddRow(r); err != nil {
v.rows.Close()
return nil, err
}
}
return v, nil
},
}, nil
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/testdata/explain
Expand Up @@ -133,9 +133,7 @@ query ITT
EXPLAIN SHOW TABLES
----
0 virtual table SHOW TABLES FROM test
1 sort +TABLE_NAME
2 virtual table information_schema.tables
3 values 5 columns, 36 rows
1 values 1 column, 1 row

query ITT
EXPLAIN SHOW DATABASE
Expand Down
33 changes: 27 additions & 6 deletions pkg/sql/testdata/information_schema
Expand Up @@ -500,6 +500,9 @@ GRANT SELECT ON other_db.xyz TO testuser

user testuser

statement ok
SET DATABASE=other_db

query TTTTI colnames
SELECT * FROM information_schema.tables
----
Expand Down Expand Up @@ -540,7 +543,7 @@ DROP DATABASE other_db


## information_schema.table_constraints
## This test case will be moved up into the correct order when #8497 is fixed.
## TODO(nvanbenschoten) Move this test case up into the correct order now that #8497 is fixed.

query TTTTTT colnames
SELECT *
Expand Down Expand Up @@ -631,9 +634,6 @@ def constraint_column fk def constrain
def constraint_column fk def constraint_column t3 b 2 2
def constraint_column primary def constraint_column t3 rowid 1 NULL

#statement ok
#DROP DATABASE constraint_column


## information_schema.schema_privileges
statement ok
Expand Down Expand Up @@ -666,6 +666,8 @@ root def system SELECT NULL
root def test ALL NULL

## information_schema.table_privileges

# root can see everything
query TTTTTTTT colnames
SELECT * FROM information_schema.table_privileges
----
Expand Down Expand Up @@ -697,6 +699,9 @@ NULL root def system zones UPDATE
statement ok
DROP DATABASE constraint_db

statement ok
DROP DATABASE constraint_column

statement ok
CREATE TABLE other_db.xyz (i INT)

Expand Down Expand Up @@ -726,6 +731,24 @@ NULL root def other_db xyz ALL NULL
NULL testuser def other_db xyz SELECT NULL NULL
NULL testuser def other_db xyz UPDATE NULL NULL

# testuser can read permissions as well
user testuser

statement ok
SET DATABASE=other_db

query TTTTTTTT colnames
SELECT * FROM information_schema.table_privileges WHERE TABLE_SCHEMA = 'other_db'
----
GRANTOR GRANTEE TABLE_CATALOG TABLE_SCHEMA TABLE_NAME PRIVILEGE_TYPE IS_GRANTABLE WITH_HIERARCHY
NULL root def other_db abc ALL NULL NULL
NULL testuser def other_db abc SELECT NULL NULL
NULL root def other_db xyz ALL NULL NULL
NULL testuser def other_db xyz SELECT NULL NULL
NULL testuser def other_db xyz UPDATE NULL NULL

user root

## information_schema.statistics
statement ok
CREATE TABLE other_db.teststatis(id INT PRIMARY KEY, c INT, d INT, e STRING, INDEX idx_c(c), UNIQUE INDEX idx_cd(c,d))
Expand Down Expand Up @@ -762,5 +785,3 @@ NULL NULL NULL NULL NU

statement ok
DROP DATABASE other_db


85 changes: 85 additions & 0 deletions pkg/sql/testdata/pg_catalog
Expand Up @@ -907,3 +907,88 @@ WHERE n.nspname = 'public'
----
2737381215 t1 12
2989572986 t3 'FOO'

# Verify that a set database shows tables from that database for a non-root
# user, when that user has permissions.

statement ok
GRANT ALL ON constraint_db.* TO testuser

user testuser

statement ok
SET DATABASE = 'constraint_db'

query I
SELECT COUNT(*) FROM pg_catalog.pg_tables WHERE schemaname='constraint_db';
----
3

# Verify that an unset database shows only system tables in pg_catalog for a
# non-root user.

statement ok
SET DATABASE = ''

query I
SELECT COUNT(*) FROM pg_catalog.pg_tables WHERE schemaname='constraint_db';
----
0

query I
SELECT COUNT(*) FROM pg_catalog.pg_views WHERE schemaname='constraint_db';
----
0

query I
SELECT COUNT(*) FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE nspname='public'
----
0

query I
SELECT COUNT(*) FROM pg_catalog.pg_constraint con
JOIN pg_catalog.pg_namespace n ON con.connamespace = n.oid
WHERE n.nspname = 'public'
----
0

query I
SELECT COUNT(*) FROM pg_catalog.pg_depend
----
0

# Verify that an unset database shows everything in pg_catalog for the root
# user.

user root

query I
SELECT COUNT(*) FROM pg_catalog.pg_tables WHERE schemaname='constraint_db';
----
3

query I
SELECT COUNT(*) FROM pg_catalog.pg_views WHERE schemaname='constraint_db';
----
1

query I
SELECT COUNT(*) FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE nspname='public'
----
11

query I
SELECT COUNT(*) FROM pg_catalog.pg_constraint con
JOIN pg_catalog.pg_namespace n ON con.connamespace = n.oid
WHERE n.nspname = 'public'
----
8

query I
SELECT COUNT(*) FROM pg_catalog.pg_depend
----
2

0 comments on commit a63b8bf

Please sign in to comment.