diff --git a/doc/src/sgml/ref/create_view.sgml b/doc/src/sgml/ref/create_view.sgml index 57bfae3084964..a13f1cbde35de 100644 --- a/doc/src/sgml/ref/create_view.sgml +++ b/doc/src/sgml/ref/create_view.sgml @@ -323,12 +323,6 @@ CREATE VIEW vista AS SELECT text 'Hello World' AS hello; or set-returning functions. - - - - The view must not have the security_barrier property. - - @@ -361,6 +355,19 @@ CREATE VIEW vista AS SELECT text 'Hello World' AS hello; such rows that are not visible through the view. + + If an automatically updatable view is marked with the + security_barrier property then all the view's WHERE + conditions (and any conditions using operators which are marked as LEAKPROOF) + will always be evaluated before any conditions that a user of the view has + added. See for full details. Note that, + due to this, rows which are not ultimately returned (because they do not + pass the user's WHERE conditions) may still end up being locked. + EXPLAIN can be used to see which conditions are + applied at the relation level (and therefore do not lock rows) and which are + not. + + A more complex view that does not satisfy all these conditions is read-only by default: the system will not allow an insert, update, or diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 8f9e5e56079b9..f5ae98ff80e99 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -8910,7 +8910,6 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation, List *view_options = untransformRelOptions(newOptions); ListCell *cell; bool check_option = false; - bool security_barrier = false; foreach(cell, view_options) { @@ -8918,8 +8917,6 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation, if (pg_strcasecmp(defel->defname, "check_option") == 0) check_option = true; - if (pg_strcasecmp(defel->defname, "security_barrier") == 0) - security_barrier = defGetBoolean(defel); } /* @@ -8929,8 +8926,7 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation, if (check_option) { const char *view_updatable_error = - view_query_is_auto_updatable(view_query, - security_barrier, true); + view_query_is_auto_updatable(view_query, true); if (view_updatable_error) ereport(ERROR, diff --git a/src/backend/commands/view.c b/src/backend/commands/view.c index 173576217a733..bc085666fbdd3 100644 --- a/src/backend/commands/view.c +++ b/src/backend/commands/view.c @@ -396,7 +396,6 @@ DefineView(ViewStmt *stmt, const char *queryString) RangeVar *view; ListCell *cell; bool check_option; - bool security_barrier; /* * Run parse analysis to convert the raw parse tree to a Query. Note this @@ -451,7 +450,6 @@ DefineView(ViewStmt *stmt, const char *queryString) * specified. */ check_option = false; - security_barrier = false; foreach(cell, stmt->options) { @@ -459,8 +457,6 @@ DefineView(ViewStmt *stmt, const char *queryString) if (pg_strcasecmp(defel->defname, "check_option") == 0) check_option = true; - if (pg_strcasecmp(defel->defname, "security_barrier") == 0) - security_barrier = defGetBoolean(defel); } /* @@ -470,7 +466,7 @@ DefineView(ViewStmt *stmt, const char *queryString) if (check_option) { const char *view_updatable_error = - view_query_is_auto_updatable(viewParse, security_barrier, true); + view_query_is_auto_updatable(viewParse, true); if (view_updatable_error) ereport(ERROR, diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index c89d8083c67a5..98ad91078ed4b 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -1998,6 +1998,7 @@ _copyRangeTblEntry(const RangeTblEntry *from) COPY_SCALAR_FIELD(checkAsUser); COPY_BITMAPSET_FIELD(selectedCols); COPY_BITMAPSET_FIELD(modifiedCols); + COPY_NODE_FIELD(securityQuals); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 9793cf52a8c2b..9901d231cdb80 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2296,6 +2296,7 @@ _equalRangeTblEntry(const RangeTblEntry *a, const RangeTblEntry *b) COMPARE_SCALAR_FIELD(checkAsUser); COMPARE_BITMAPSET_FIELD(selectedCols); COMPARE_BITMAPSET_FIELD(modifiedCols); + COMPARE_NODE_FIELD(securityQuals); return true; } diff --git a/src/backend/nodes/nodeFuncs.c b/src/backend/nodes/nodeFuncs.c index 123f2a6a3c7cb..1e48a7f8890e9 100644 --- a/src/backend/nodes/nodeFuncs.c +++ b/src/backend/nodes/nodeFuncs.c @@ -2020,6 +2020,9 @@ range_table_walker(List *rtable, return true; break; } + + if (walker(rte->securityQuals, context)) + return true; } return false; } @@ -2755,6 +2758,7 @@ range_table_mutator(List *rtable, MUTATE(newrte->values_lists, rte->values_lists, List *); break; } + MUTATE(newrte->securityQuals, rte->securityQuals, List *); newrt = lappend(newrt, newrte); } return newrt; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index bfb4b9fb03d16..10e81391b1361 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -2409,6 +2409,7 @@ _outRangeTblEntry(StringInfo str, const RangeTblEntry *node) WRITE_OID_FIELD(checkAsUser); WRITE_BITMAPSET_FIELD(selectedCols); WRITE_BITMAPSET_FIELD(modifiedCols); + WRITE_NODE_FIELD(securityQuals); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 216d75e8d1a45..ef1eae91bf710 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -1252,6 +1252,7 @@ _readRangeTblEntry(void) READ_OID_FIELD(checkAsUser); READ_BITMAPSET_FIELD(selectedCols); READ_BITMAPSET_FIELD(modifiedCols); + READ_NODE_FIELD(securityQuals); READ_DONE(); } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 35bda67b1f1c3..0508d16902be6 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -915,6 +915,12 @@ inheritance_planner(PlannerInfo *root) /* Generate plan */ subplan = grouping_planner(&subroot, 0.0 /* retrieve all tuples */ ); + /* + * Planning may have modified the query result relation (if there + * were security barrier quals on the result RTE). + */ + appinfo->child_relid = subroot.parse->resultRelation; + /* * If this child rel was excluded by constraint exclusion, exclude it * from the result plan. @@ -932,9 +938,40 @@ inheritance_planner(PlannerInfo *root) if (final_rtable == NIL) final_rtable = subroot.parse->rtable; else - final_rtable = list_concat(final_rtable, + { + List *tmp_rtable = NIL; + ListCell *cell1, *cell2; + + /* + * Check to see if any of the original RTEs were turned into + * subqueries during planning. Currently, this should only ever + * happen due to securityQuals being involved which push a + * relation down under a subquery, to ensure that the security + * barrier quals are evaluated first. + * + * When this happens, we want to use the new subqueries in the + * final rtable. + */ + forboth(cell1, final_rtable, cell2, subroot.parse->rtable) + { + RangeTblEntry *rte1 = (RangeTblEntry *) lfirst(cell1); + RangeTblEntry *rte2 = (RangeTblEntry *) lfirst(cell2); + + if (rte1->rtekind == RTE_RELATION && + rte2->rtekind == RTE_SUBQUERY) + { + /* Should only be when there are securityQuals today */ + Assert(rte1->securityQuals != NIL); + tmp_rtable = lappend(tmp_rtable, rte2); + } + else + tmp_rtable = lappend(tmp_rtable, rte1); + } + + final_rtable = list_concat(tmp_rtable, list_copy_tail(subroot.parse->rtable, list_length(final_rtable))); + } /* * We need to collect all the RelOptInfos from all child plans into @@ -1162,6 +1199,12 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) /* Preprocess targetlist */ tlist = preprocess_targetlist(root, tlist); + /* + * Expand any rangetable entries that have security barrier quals. + * This may add new security barrier subquery RTEs to the rangetable. + */ + expand_security_quals(root, tlist); + /* * Locate any window functions in the tlist. (We don't need to look * anywhere else, since expressions used in ORDER BY will be in there diff --git a/src/backend/optimizer/prep/Makefile b/src/backend/optimizer/prep/Makefile index 86301bfbd32a7..5195d9b0ba7aa 100644 --- a/src/backend/optimizer/prep/Makefile +++ b/src/backend/optimizer/prep/Makefile @@ -12,6 +12,6 @@ subdir = src/backend/optimizer/prep top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = prepjointree.o prepqual.o preptlist.o prepunion.o +OBJS = prepjointree.o prepqual.o prepsecurity.o preptlist.o prepunion.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/optimizer/prep/prepsecurity.c b/src/backend/optimizer/prep/prepsecurity.c new file mode 100644 index 0000000000000..7daaa3349ed50 --- /dev/null +++ b/src/backend/optimizer/prep/prepsecurity.c @@ -0,0 +1,466 @@ +/*------------------------------------------------------------------------- + * + * prepsecurity.c + * Routines for preprocessing security barrier quals. + * + * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/optimizer/prep/prepsecurity.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/heapam.h" +#include "access/sysattr.h" +#include "catalog/heap.h" +#include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" +#include "optimizer/prep.h" +#include "parser/analyze.h" +#include "parser/parsetree.h" +#include "rewrite/rewriteManip.h" +#include "utils/rel.h" + + +typedef struct +{ + int rt_index; /* Index of security barrier RTE */ + int sublevels_up; /* Current nesting depth */ + Relation rel; /* RTE relation at rt_index */ + List *targetlist; /* Targetlist for new subquery RTE */ + List *colnames; /* Column names in subquery RTE */ + List *vars_processed; /* List of Vars already processed */ +} security_barrier_replace_vars_context; + +static void expand_security_qual(PlannerInfo *root, List *tlist, int rt_index, + RangeTblEntry *rte, Node *qual); + +static void security_barrier_replace_vars(Node *node, + security_barrier_replace_vars_context *context); + +static bool security_barrier_replace_vars_walker(Node *node, + security_barrier_replace_vars_context *context); + + +/* + * expand_security_quals - + * expands any security barrier quals on RTEs in the query rtable, turning + * them into security barrier subqueries. + * + * Any given RTE may have multiple security barrier quals in a list, from which + * we create a set of nested subqueries to isolate each security barrier from + * the others, providing protection against malicious user-defined security + * barriers. The first security barrier qual in the list will be used in the + * innermost subquery. + */ +void +expand_security_quals(PlannerInfo *root, List *tlist) +{ + Query *parse = root->parse; + int rt_index; + ListCell *cell; + + /* + * Process each RTE in the rtable list. + * + * We only ever modify entries in place and append to the rtable, so it is + * safe to use a foreach loop here. + */ + rt_index = 0; + foreach(cell, parse->rtable) + { + RangeTblEntry *rte = (RangeTblEntry *) lfirst(cell); + + rt_index++; + + if (rte->securityQuals == NIL) + continue; + + /* + * Ignore any RTEs that aren't used in the query (such RTEs may be + * present for permissions checks). + */ + if (rt_index != parse->resultRelation && + !rangeTableEntry_used((Node *) parse, rt_index, 0)) + continue; + + /* + * If this RTE is the target then we need to make a copy of it before + * expanding it. The unexpanded copy will become the new target, and + * the original RTE will be expanded to become the source of rows to + * update/delete. + */ + if (rt_index == parse->resultRelation) + { + RangeTblEntry *newrte = copyObject(rte); + parse->rtable = lappend(parse->rtable, newrte); + parse->resultRelation = list_length(parse->rtable); + + /* + * Wipe out any copied security barrier quals on the new target to + * prevent infinite recursion. + */ + newrte->securityQuals = NIL; + + /* + * There's no need to do permissions checks twice, so wipe out the + * permissions info for the original RTE (we prefer to keep the + * bits set on the result RTE). + */ + rte->requiredPerms = 0; + rte->checkAsUser = InvalidOid; + rte->selectedCols = NULL; + rte->modifiedCols = NULL; + + /* + * For the most part, Vars referencing the original relation should + * remain as they are, meaning that they pull OLD values from the + * expanded RTE. But in the RETURNING list and in any WITH CHECK + * OPTION quals, we want such Vars to represent NEW values, so + * change them to reference the new RTE. + */ + ChangeVarNodes((Node *) parse->returningList, rt_index, + parse->resultRelation, 0); + + ChangeVarNodes((Node *) parse->withCheckOptions, rt_index, + parse->resultRelation, 0); + } + + /* + * Process each security barrier qual in turn, starting with the + * innermost one (the first in the list) and working outwards. + * + * We remove each qual from the list before processing it, so that its + * variables aren't modified by expand_security_qual. Also we don't + * necessarily want the attributes referred to by the qual to be + * exposed by the newly built subquery. + */ + while (rte->securityQuals != NIL) + { + Node *qual = (Node *) linitial(rte->securityQuals); + rte->securityQuals = list_delete_first(rte->securityQuals); + + ChangeVarNodes(qual, rt_index, 1, 0); + expand_security_qual(root, tlist, rt_index, rte, qual); + } + } +} + + +/* + * expand_security_qual - + * expand the specified security barrier qual on a query RTE, turning the + * RTE into a security barrier subquery. + */ +static void +expand_security_qual(PlannerInfo *root, List *tlist, int rt_index, + RangeTblEntry *rte, Node *qual) +{ + Query *parse = root->parse; + Oid relid = rte->relid; + Query *subquery; + RangeTblEntry *subrte; + RangeTblRef *subrtr; + PlanRowMark *rc; + security_barrier_replace_vars_context context; + ListCell *cell; + + /* + * There should only be 2 possible cases: + * + * 1. A relation RTE, which we turn into a subquery RTE containing all + * referenced columns. + * + * 2. A subquery RTE (either from a prior call to this function or from an + * expanded view). In this case we build a new subquery on top of it to + * isolate this security barrier qual from any other quals. + */ + switch (rte->rtekind) + { + case RTE_RELATION: + /* + * Turn the relation RTE into a security barrier subquery RTE, + * moving all permissions checks down into the subquery. + */ + subquery = makeNode(Query); + subquery->commandType = CMD_SELECT; + subquery->querySource = QSRC_INSTEAD_RULE; + + subrte = copyObject(rte); + subrte->inFromCl = true; + subrte->securityQuals = NIL; + subquery->rtable = list_make1(subrte); + + subrtr = makeNode(RangeTblRef); + subrtr->rtindex = 1; + subquery->jointree = makeFromExpr(list_make1(subrtr), qual); + subquery->hasSubLinks = checkExprHasSubLink(qual); + + rte->rtekind = RTE_SUBQUERY; + rte->relid = InvalidOid; + rte->subquery = subquery; + rte->security_barrier = true; + rte->inh = false; /* must not be set for a subquery */ + + /* the permissions checks have now been moved down */ + rte->requiredPerms = 0; + rte->checkAsUser = InvalidOid; + rte->selectedCols = NULL; + rte->modifiedCols = NULL; + + /* + * Now deal with any PlanRowMark on this RTE by requesting a lock + * of the same strength on the RTE copied down to the subquery. + * + * Note that we can't push the user-defined quals down since they + * may included untrusted functions and that means that we will + * end up locking all rows which pass the securityQuals, even if + * those rows don't pass the user-defined quals. This is currently + * documented behavior, but it'd be nice to come up with a better + * solution some day. + */ + rc = get_plan_rowmark(root->rowMarks, rt_index); + if (rc != NULL) + { + switch (rc->markType) + { + case ROW_MARK_EXCLUSIVE: + applyLockingClause(subquery, 1, LCS_FORUPDATE, + rc->noWait, false); + break; + case ROW_MARK_NOKEYEXCLUSIVE: + applyLockingClause(subquery, 1, LCS_FORNOKEYUPDATE, + rc->noWait, false); + break; + case ROW_MARK_SHARE: + applyLockingClause(subquery, 1, LCS_FORSHARE, + rc->noWait, false); + break; + case ROW_MARK_KEYSHARE: + applyLockingClause(subquery, 1, LCS_FORKEYSHARE, + rc->noWait, false); + break; + case ROW_MARK_REFERENCE: + case ROW_MARK_COPY: + /* No locking needed */ + break; + } + root->rowMarks = list_delete(root->rowMarks, rc); + } + + /* + * Replace any variables in the outer query that refer to the + * original relation RTE with references to columns that we will + * expose in the new subquery, building the subquery's targetlist + * as we go. + */ + context.rt_index = rt_index; + context.sublevels_up = 0; + context.rel = heap_open(relid, NoLock); + context.targetlist = NIL; + context.colnames = NIL; + context.vars_processed = NIL; + + security_barrier_replace_vars((Node *) parse, &context); + security_barrier_replace_vars((Node *) tlist, &context); + + heap_close(context.rel, NoLock); + + /* Now we know what columns the subquery needs to expose */ + rte->subquery->targetList = context.targetlist; + rte->eref = makeAlias(rte->eref->aliasname, context.colnames); + + break; + + case RTE_SUBQUERY: + /* + * Build a new subquery that includes all the same columns as the + * original subquery. + */ + subquery = makeNode(Query); + subquery->commandType = CMD_SELECT; + subquery->querySource = QSRC_INSTEAD_RULE; + subquery->targetList = NIL; + + foreach(cell, rte->subquery->targetList) + { + TargetEntry *tle; + Var *var; + + tle = (TargetEntry *) lfirst(cell); + var = makeVarFromTargetEntry(1, tle); + + tle = makeTargetEntry((Expr *) var, + list_length(subquery->targetList) + 1, + pstrdup(tle->resname), + tle->resjunk); + subquery->targetList = lappend(subquery->targetList, tle); + } + + subrte = makeNode(RangeTblEntry); + subrte->rtekind = RTE_SUBQUERY; + subrte->subquery = rte->subquery; + subrte->security_barrier = rte->security_barrier; + subrte->eref = copyObject(rte->eref); + subrte->inFromCl = true; + subquery->rtable = list_make1(subrte); + + subrtr = makeNode(RangeTblRef); + subrtr->rtindex = 1; + subquery->jointree = makeFromExpr(list_make1(subrtr), qual); + subquery->hasSubLinks = checkExprHasSubLink(qual); + + rte->subquery = subquery; + rte->security_barrier = true; + + break; + + default: + elog(ERROR, "invalid range table entry for security barrier qual"); + } +} + + +/* + * security_barrier_replace_vars - + * Apply security barrier variable replacement to an expression tree. + * + * This also builds/updates a targetlist with entries for each replacement + * variable that needs to be exposed by the security barrier subquery RTE. + * + * NOTE: although this has the form of a walker, we cheat and modify the + * nodes in-place. The given expression tree should have been copied + * earlier to ensure that no unwanted side-effects occur! + */ +static void +security_barrier_replace_vars(Node *node, + security_barrier_replace_vars_context *context) +{ + /* + * Must be prepared to start with a Query or a bare expression tree; if + * it's a Query, go straight to query_tree_walker to make sure that + * sublevels_up doesn't get incremented prematurely. + */ + if (node && IsA(node, Query)) + query_tree_walker((Query *) node, + security_barrier_replace_vars_walker, + (void *) context, 0); + else + security_barrier_replace_vars_walker(node, context); +} + +static bool +security_barrier_replace_vars_walker(Node *node, + security_barrier_replace_vars_context *context) +{ + if (node == NULL) + return false; + + if (IsA(node, Var)) + { + Var *var = (Var *) node; + + /* + * Note that the same Var may be present in different lists, so we + * need to take care not to process it multiple times. + */ + if (var->varno == context->rt_index && + var->varlevelsup == context->sublevels_up && + !list_member_ptr(context->vars_processed, var)) + { + /* + * Found a matching variable. Make sure that it is in the subquery + * targetlist and map its attno accordingly. + */ + AttrNumber attno; + ListCell *l; + TargetEntry *tle; + char *attname; + Var *newvar; + + /* Search for the base attribute in the subquery targetlist */ + attno = InvalidAttrNumber; + foreach(l, context->targetlist) + { + tle = (TargetEntry *) lfirst(l); + attno++; + + Assert(IsA(tle->expr, Var)); + if (((Var *) tle->expr)->varattno == var->varattno && + ((Var *) tle->expr)->varcollid == var->varcollid) + { + /* Map the variable onto this subquery targetlist entry */ + var->varattno = attno; + return false; + } + } + + /* Not in the subquery targetlist, so add it. Get its name. */ + if (var->varattno < 0) + { + Form_pg_attribute att_tup; + + att_tup = SystemAttributeDefinition(var->varattno, + context->rel->rd_rel->relhasoids); + attname = NameStr(att_tup->attname); + } + else if (var->varattno == InvalidAttrNumber) + { + attname = "wholerow"; + } + else if (var->varattno <= context->rel->rd_att->natts) + { + Form_pg_attribute att_tup; + + att_tup = context->rel->rd_att->attrs[var->varattno - 1]; + attname = NameStr(att_tup->attname); + } + else + { + elog(ERROR, "invalid attribute number %d in security_barrier_replace_vars", var->varattno); + } + + /* New variable for subquery targetlist */ + newvar = copyObject(var); + newvar->varno = 1; + + attno = list_length(context->targetlist) + 1; + tle = makeTargetEntry((Expr *) newvar, + attno, + pstrdup(attname), + false); + + context->targetlist = lappend(context->targetlist, tle); + + context->colnames = lappend(context->colnames, + makeString(pstrdup(attname))); + + /* Update the outer query's variable */ + var->varattno = attno; + + /* Remember this Var so that we don't process it again */ + context->vars_processed = lappend(context->vars_processed, var); + } + return false; + } + + if (IsA(node, Query)) + { + /* Recurse into subselects */ + bool result; + + context->sublevels_up++; + result = query_tree_walker((Query *) node, + security_barrier_replace_vars_walker, + (void *) context, 0); + context->sublevels_up--; + return result; + } + + return expression_tree_walker(node, security_barrier_replace_vars_walker, + (void *) context); +} diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c index 52dcc720de7cc..cdf541d34d5f5 100644 --- a/src/backend/optimizer/prep/prepunion.c +++ b/src/backend/optimizer/prep/prepunion.c @@ -55,6 +55,7 @@ typedef struct { PlannerInfo *root; AppendRelInfo *appinfo; + int sublevels_up; } adjust_appendrel_attrs_context; static Plan *recurse_set_operations(Node *setOp, PlannerInfo *root, @@ -1580,8 +1581,9 @@ translate_col_privs(const Bitmapset *parent_privs, * child rel instead. We also update rtindexes appearing outside Vars, * such as resultRelation and jointree relids. * - * Note: this is only applied after conversion of sublinks to subplans, - * so we don't need to cope with recursion into sub-queries. + * Note: this is applied after conversion of sublinks to subplans in the + * query jointree, but there may still be sublinks in the security barrier + * quals of RTEs, so we do need to cope with recursion into sub-queries. * * Note: this is not hugely different from what pullup_replace_vars() does; * maybe we should try to fold the two routines together. @@ -1594,9 +1596,12 @@ adjust_appendrel_attrs(PlannerInfo *root, Node *node, AppendRelInfo *appinfo) context.root = root; context.appinfo = appinfo; + context.sublevels_up = 0; /* - * Must be prepared to start with a Query or a bare expression tree. + * Must be prepared to start with a Query or a bare expression tree; if + * it's a Query, go straight to query_tree_walker to make sure that + * sublevels_up doesn't get incremented prematurely. */ if (node && IsA(node, Query)) { @@ -1635,7 +1640,7 @@ adjust_appendrel_attrs_mutator(Node *node, { Var *var = (Var *) copyObject(node); - if (var->varlevelsup == 0 && + if (var->varlevelsup == context->sublevels_up && var->varno == appinfo->parent_relid) { var->varno = appinfo->child_relid; @@ -1652,6 +1657,7 @@ adjust_appendrel_attrs_mutator(Node *node, if (newnode == NULL) elog(ERROR, "attribute %d of relation \"%s\" does not exist", var->varattno, get_rel_name(appinfo->parent_reloid)); + ((Var *) newnode)->varlevelsup += context->sublevels_up; return newnode; } else if (var->varattno == 0) @@ -1694,10 +1700,16 @@ adjust_appendrel_attrs_mutator(Node *node, RowExpr *rowexpr; List *fields; RangeTblEntry *rte; + ListCell *lc; rte = rt_fetch(appinfo->parent_relid, context->root->parse->rtable); fields = (List *) copyObject(appinfo->translated_vars); + foreach(lc, fields) + { + Var *field = (Var *) lfirst(lc); + field->varlevelsup += context->sublevels_up; + } rowexpr = makeNode(RowExpr); rowexpr->args = fields; rowexpr->row_typeid = var->vartype; @@ -1716,7 +1728,8 @@ adjust_appendrel_attrs_mutator(Node *node, { CurrentOfExpr *cexpr = (CurrentOfExpr *) copyObject(node); - if (cexpr->cvarno == appinfo->parent_relid) + if (context->sublevels_up == 0 && + cexpr->cvarno == appinfo->parent_relid) cexpr->cvarno = appinfo->child_relid; return (Node *) cexpr; } @@ -1724,7 +1737,8 @@ adjust_appendrel_attrs_mutator(Node *node, { RangeTblRef *rtr = (RangeTblRef *) copyObject(node); - if (rtr->rtindex == appinfo->parent_relid) + if (context->sublevels_up == 0 && + rtr->rtindex == appinfo->parent_relid) rtr->rtindex = appinfo->child_relid; return (Node *) rtr; } @@ -1737,7 +1751,8 @@ adjust_appendrel_attrs_mutator(Node *node, adjust_appendrel_attrs_mutator, (void *) context); /* now fix JoinExpr's rtindex (probably never happens) */ - if (j->rtindex == appinfo->parent_relid) + if (context->sublevels_up == 0 && + j->rtindex == appinfo->parent_relid) j->rtindex = appinfo->child_relid; return (Node *) j; } @@ -1750,7 +1765,7 @@ adjust_appendrel_attrs_mutator(Node *node, adjust_appendrel_attrs_mutator, (void *) context); /* now fix PlaceHolderVar's relid sets */ - if (phv->phlevelsup == 0) + if (phv->phlevelsup == context->sublevels_up) phv->phrels = adjust_relid_set(phv->phrels, appinfo->parent_relid, appinfo->child_relid); @@ -1822,12 +1837,29 @@ adjust_appendrel_attrs_mutator(Node *node, return (Node *) newinfo; } - /* - * NOTE: we do not need to recurse into sublinks, because they should - * already have been converted to subplans before we see them. - */ - Assert(!IsA(node, SubLink)); - Assert(!IsA(node, Query)); + if (IsA(node, Query)) + { + /* + * Recurse into sublink subqueries. This should only be possible in + * security barrier quals of top-level RTEs. All other sublinks should + * have already been converted to subplans during expression + * preprocessing, but this doesn't happen for security barrier quals, + * since they are destined to become quals of a subquery RTE, which + * will be recursively planned, and so should not be preprocessed at + * this stage. + * + * We don't explicitly Assert() for securityQuals here simply because + * it's not trivial to do so. + */ + Query *newnode; + + context->sublevels_up++; + newnode = query_tree_mutator((Query *) node, + adjust_appendrel_attrs_mutator, + (void *) context, 0); + context->sublevels_up--; + return (Node *) newnode; + } return expression_tree_mutator(node, adjust_appendrel_attrs_mutator, (void *) context); diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c index 5dbcce3e550b0..caed8caee6b22 100644 --- a/src/backend/rewrite/rewriteHandler.c +++ b/src/backend/rewrite/rewriteHandler.c @@ -2023,8 +2023,7 @@ view_col_is_auto_updatable(RangeTblRef *rtr, TargetEntry *tle) * updatable. */ const char * -view_query_is_auto_updatable(Query *viewquery, bool security_barrier, - bool check_cols) +view_query_is_auto_updatable(Query *viewquery, bool check_cols) { RangeTblRef *rtr; RangeTblEntry *base_rte; @@ -2097,14 +2096,6 @@ view_query_is_auto_updatable(Query *viewquery, bool security_barrier, if (expression_returns_set((Node *) viewquery->targetList)) return gettext_noop("Views that return set-returning functions are not automatically updatable."); - /* - * For now, we also don't support security-barrier views, because of the - * difficulty of keeping upper-level qual expressions away from - * lower-level data. This might get relaxed in the future. - */ - if (security_barrier) - return gettext_noop("Security-barrier views are not automatically updatable."); - /* * The view query should select from a single base relation, which must be * a table or another view. @@ -2353,9 +2344,7 @@ relation_is_updatable(Oid reloid, { Query *viewquery = get_view_query(rel); - if (view_query_is_auto_updatable(viewquery, - RelationIsSecurityView(rel), - false) == NULL) + if (view_query_is_auto_updatable(viewquery, false) == NULL) { Bitmapset *updatable_cols; int auto_events; @@ -2510,7 +2499,6 @@ rewriteTargetView(Query *parsetree, Relation view) auto_update_detail = view_query_is_auto_updatable(viewquery, - RelationIsSecurityView(view), parsetree->commandType != CMD_DELETE); if (auto_update_detail) @@ -2713,6 +2701,14 @@ rewriteTargetView(Query *parsetree, Relation view) new_rte->modifiedCols = adjust_view_column_set(view_rte->modifiedCols, view_targetlist); + /* + * Move any security barrier quals from the view RTE onto the new target + * RTE. Any such quals should now apply to the new target RTE and will not + * reference the original view RTE in the rewritten query. + */ + new_rte->securityQuals = view_rte->securityQuals; + view_rte->securityQuals = NIL; + /* * For UPDATE/DELETE, rewriteTargetListUD will have added a wholerow junk * TLE for the view to the end of the targetlist, which we no longer need. @@ -2793,6 +2789,10 @@ rewriteTargetView(Query *parsetree, Relation view) * only adjust their varnos to reference the new target (just the same as * we did with the view targetlist). * + * Note that there is special-case handling for the quals of a security + * barrier view, since they need to be kept separate from any user-supplied + * quals, so these quals are kept on the new target RTE. + * * For INSERT, the view's quals can be ignored in the main query. */ if (parsetree->commandType != CMD_INSERT && @@ -2801,7 +2801,25 @@ rewriteTargetView(Query *parsetree, Relation view) Node *viewqual = (Node *) copyObject(viewquery->jointree->quals); ChangeVarNodes(viewqual, base_rt_index, new_rt_index, 0); - AddQual(parsetree, (Node *) viewqual); + + if (RelationIsSecurityView(view)) + { + /* + * Note: the parsetree has been mutated, so the new_rte pointer is + * stale and needs to be re-computed. + */ + new_rte = rt_fetch(new_rt_index, parsetree->rtable); + new_rte->securityQuals = lcons(viewqual, new_rte->securityQuals); + + /* + * Make sure that the query is marked correctly if the added qual + * has sublinks. + */ + if (!parsetree->hasSubLinks) + parsetree->hasSubLinks = checkExprHasSubLink(viewqual); + } + else + AddQual(parsetree, (Node *) viewqual); } /* @@ -2863,9 +2881,8 @@ rewriteTargetView(Query *parsetree, Relation view) * Make sure that the query is marked correctly if the added * qual has sublinks. We can skip this check if the query is * already marked, or if the command is an UPDATE, in which - * case the same qual will have already been added to the - * query's WHERE clause, and AddQual will have already done - * this check. + * case the same qual will have already been added, and this + * check will already have been done. */ if (!parsetree->hasSubLinks && parsetree->commandType != CMD_UPDATE) diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index b5011af3e7fae..18d499100889b 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -801,6 +801,7 @@ typedef struct RangeTblEntry Oid checkAsUser; /* if valid, check access as this role */ Bitmapset *selectedCols; /* columns needing SELECT permission */ Bitmapset *modifiedCols; /* columns needing INSERT/UPDATE permission */ + List *securityQuals; /* any security barrier quals to apply */ } RangeTblEntry; /* diff --git a/src/include/optimizer/prep.h b/src/include/optimizer/prep.h index 0f5a7d36ec1f4..f5fc7e8e71351 100644 --- a/src/include/optimizer/prep.h +++ b/src/include/optimizer/prep.h @@ -35,6 +35,11 @@ extern Relids get_relids_for_join(PlannerInfo *root, int joinrelid); extern Node *negate_clause(Node *node); extern Expr *canonicalize_qual(Expr *qual); +/* + * prototypes for prepsecurity.c + */ +extern void expand_security_quals(PlannerInfo *root, List *tlist); + /* * prototypes for preptlist.c */ diff --git a/src/include/rewrite/rewriteHandler.h b/src/include/rewrite/rewriteHandler.h index 22551ece3ec7c..1b5121314d349 100644 --- a/src/include/rewrite/rewriteHandler.h +++ b/src/include/rewrite/rewriteHandler.h @@ -25,7 +25,6 @@ extern void AcquireRewriteLocks(Query *parsetree, extern Node *build_column_default(Relation rel, int attrno); extern Query *get_view_query(Relation view); extern const char *view_query_is_auto_updatable(Query *viewquery, - bool security_barrier, bool check_cols); extern int relation_is_updatable(Oid reloid, bool include_triggers, diff --git a/src/test/regress/expected/create_view.out b/src/test/regress/expected/create_view.out index 91d163961060f..f6db582afda50 100644 --- a/src/test/regress/expected/create_view.out +++ b/src/test/regress/expected/create_view.out @@ -252,7 +252,7 @@ CREATE VIEW mysecview4 WITH (security_barrier) AS SELECT * FROM tbl1 WHERE a <> 0; CREATE VIEW mysecview5 WITH (security_barrier=100) -- Error AS SELECT * FROM tbl1 WHERE a > 100; -ERROR: security_barrier requires a Boolean value +ERROR: invalid value for boolean option "security_barrier": 100 CREATE VIEW mysecview6 WITH (invalid_option) -- Error AS SELECT * FROM tbl1 WHERE a < 100; ERROR: unrecognized parameter "invalid_option" diff --git a/src/test/regress/expected/updatable_views.out b/src/test/regress/expected/updatable_views.out index 99c9165a95fbd..83a33772cd6d7 100644 --- a/src/test/regress/expected/updatable_views.out +++ b/src/test/regress/expected/updatable_views.out @@ -22,12 +22,10 @@ CREATE VIEW rw_view14 AS SELECT ctid, a, b FROM base_tbl; -- System columns may CREATE VIEW rw_view15 AS SELECT a, upper(b) FROM base_tbl; -- Expression/function may be part of an updatable view CREATE VIEW rw_view16 AS SELECT a, b, a AS aa FROM base_tbl; -- Repeated column may be part of an updatable view CREATE VIEW ro_view17 AS SELECT * FROM ro_view1; -- Base relation not updatable -CREATE VIEW ro_view18 WITH (security_barrier = true) - AS SELECT * FROM base_tbl; -- Security barrier views not updatable -CREATE VIEW ro_view19 AS SELECT * FROM (VALUES(1)) AS tmp(a); -- VALUES in rangetable +CREATE VIEW ro_view18 AS SELECT * FROM (VALUES(1)) AS tmp(a); -- VALUES in rangetable CREATE SEQUENCE seq; -CREATE VIEW ro_view20 AS SELECT * FROM seq; -- View based on a sequence -CREATE VIEW ro_view21 AS SELECT a, b, generate_series(1, a) g FROM base_tbl; -- SRF in targetlist not supported +CREATE VIEW ro_view19 AS SELECT * FROM seq; -- View based on a sequence +CREATE VIEW ro_view20 AS SELECT a, b, generate_series(1, a) g FROM base_tbl; -- SRF in targetlist not supported SELECT table_name, is_insertable_into FROM information_schema.tables WHERE table_name LIKE E'r_\\_view%' @@ -44,7 +42,6 @@ SELECT table_name, is_insertable_into ro_view19 | NO ro_view2 | NO ro_view20 | NO - ro_view21 | NO ro_view3 | NO ro_view4 | NO ro_view5 | NO @@ -55,7 +52,7 @@ SELECT table_name, is_insertable_into rw_view14 | YES rw_view15 | YES rw_view16 | YES -(21 rows) +(20 rows) SELECT table_name, is_updatable, is_insertable_into FROM information_schema.views @@ -73,7 +70,6 @@ SELECT table_name, is_updatable, is_insertable_into ro_view19 | NO | NO ro_view2 | NO | NO ro_view20 | NO | NO - ro_view21 | NO | NO ro_view3 | NO | NO ro_view4 | NO | NO ro_view5 | NO | NO @@ -84,7 +80,7 @@ SELECT table_name, is_updatable, is_insertable_into rw_view14 | YES | YES rw_view15 | YES | YES rw_view16 | YES | YES -(21 rows) +(20 rows) SELECT table_name, column_name, is_updatable FROM information_schema.columns @@ -103,23 +99,21 @@ SELECT table_name, column_name, is_updatable ro_view17 | a | NO ro_view17 | b | NO ro_view18 | a | NO - ro_view18 | b | NO - ro_view19 | a | NO + ro_view19 | sequence_name | NO + ro_view19 | last_value | NO + ro_view19 | start_value | NO + ro_view19 | increment_by | NO + ro_view19 | max_value | NO + ro_view19 | min_value | NO + ro_view19 | cache_value | NO + ro_view19 | log_cnt | NO + ro_view19 | is_cycled | NO + ro_view19 | is_called | NO ro_view2 | a | NO ro_view2 | b | NO - ro_view20 | sequence_name | NO - ro_view20 | last_value | NO - ro_view20 | start_value | NO - ro_view20 | increment_by | NO - ro_view20 | max_value | NO - ro_view20 | min_value | NO - ro_view20 | cache_value | NO - ro_view20 | log_cnt | NO - ro_view20 | is_cycled | NO - ro_view20 | is_called | NO - ro_view21 | a | NO - ro_view21 | b | NO - ro_view21 | g | NO + ro_view20 | a | NO + ro_view20 | b | NO + ro_view20 | g | NO ro_view3 | ?column? | NO ro_view4 | count | NO ro_view5 | a | NO @@ -140,7 +134,7 @@ SELECT table_name, column_name, is_updatable rw_view16 | a | YES rw_view16 | b | YES rw_view16 | aa | YES -(48 rows) +(46 rows) -- Read-only views DELETE FROM ro_view1; @@ -268,24 +262,20 @@ INSERT INTO ro_view17 VALUES (3, 'ROW 3'); ERROR: cannot insert into view "ro_view1" DETAIL: Views containing DISTINCT are not automatically updatable. HINT: To enable inserting into the view, provide an INSTEAD OF INSERT trigger or an unconditional ON INSERT DO INSTEAD rule. -INSERT INTO ro_view18 VALUES (3, 'ROW 3'); -ERROR: cannot insert into view "ro_view18" -DETAIL: Security-barrier views are not automatically updatable. -HINT: To enable inserting into the view, provide an INSTEAD OF INSERT trigger or an unconditional ON INSERT DO INSTEAD rule. -DELETE FROM ro_view19; -ERROR: cannot delete from view "ro_view19" +DELETE FROM ro_view18; +ERROR: cannot delete from view "ro_view18" DETAIL: Views that do not select from a single table or view are not automatically updatable. HINT: To enable deleting from the view, provide an INSTEAD OF DELETE trigger or an unconditional ON DELETE DO INSTEAD rule. -UPDATE ro_view20 SET max_value=1000; -ERROR: cannot update view "ro_view20" +UPDATE ro_view19 SET max_value=1000; +ERROR: cannot update view "ro_view19" DETAIL: Views that do not select from a single table or view are not automatically updatable. HINT: To enable updating the view, provide an INSTEAD OF UPDATE trigger or an unconditional ON UPDATE DO INSTEAD rule. -UPDATE ro_view21 SET b=upper(b); -ERROR: cannot update view "ro_view21" +UPDATE ro_view20 SET b=upper(b); +ERROR: cannot update view "ro_view20" DETAIL: Views that return set-returning functions are not automatically updatable. HINT: To enable updating the view, provide an INSTEAD OF UPDATE trigger or an unconditional ON UPDATE DO INSTEAD rule. DROP TABLE base_tbl CASCADE; -NOTICE: drop cascades to 17 other objects +NOTICE: drop cascades to 16 other objects DETAIL: drop cascades to view ro_view1 drop cascades to view ro_view17 drop cascades to view ro_view2 @@ -299,13 +289,12 @@ drop cascades to view ro_view11 drop cascades to view ro_view13 drop cascades to view rw_view15 drop cascades to view rw_view16 -drop cascades to view ro_view18 -drop cascades to view ro_view21 +drop cascades to view ro_view20 drop cascades to view ro_view4 drop cascades to view rw_view14 -DROP VIEW ro_view10, ro_view12, ro_view19; +DROP VIEW ro_view10, ro_view12, ro_view18; DROP SEQUENCE seq CASCADE; -NOTICE: drop cascades to view ro_view20 +NOTICE: drop cascades to view ro_view19 -- simple updatable view CREATE TABLE base_tbl (a int PRIMARY KEY, b text DEFAULT 'Unspecified'); INSERT INTO base_tbl SELECT i, 'Row ' || i FROM generate_series(-2, 2) g(i); @@ -1740,3 +1729,554 @@ DROP TABLE base_tbl CASCADE; NOTICE: drop cascades to 2 other objects DETAIL: drop cascades to view rw_view1 drop cascades to view rw_view2 +-- security barrier view +CREATE TABLE base_tbl (person text, visibility text); +INSERT INTO base_tbl VALUES ('Tom', 'public'), + ('Dick', 'private'), + ('Harry', 'public'); +CREATE VIEW rw_view1 AS + SELECT person FROM base_tbl WHERE visibility = 'public'; +CREATE FUNCTION snoop(anyelement) +RETURNS boolean AS +$$ +BEGIN + RAISE NOTICE 'snooped value: %', $1; + RETURN true; +END; +$$ +LANGUAGE plpgsql COST 0.000001; +CREATE OR REPLACE FUNCTION leakproof(anyelement) +RETURNS boolean AS +$$ +BEGIN + RETURN true; +END; +$$ +LANGUAGE plpgsql STRICT IMMUTABLE LEAKPROOF; +SELECT * FROM rw_view1 WHERE snoop(person); +NOTICE: snooped value: Tom +NOTICE: snooped value: Dick +NOTICE: snooped value: Harry + person +-------- + Tom + Harry +(2 rows) + +UPDATE rw_view1 SET person=person WHERE snoop(person); +NOTICE: snooped value: Tom +NOTICE: snooped value: Dick +NOTICE: snooped value: Harry +DELETE FROM rw_view1 WHERE NOT snoop(person); +NOTICE: snooped value: Dick +NOTICE: snooped value: Tom +NOTICE: snooped value: Harry +ALTER VIEW rw_view1 SET (security_barrier = true); +SELECT table_name, is_insertable_into + FROM information_schema.tables + WHERE table_name = 'rw_view1'; + table_name | is_insertable_into +------------+-------------------- + rw_view1 | YES +(1 row) + +SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name = 'rw_view1'; + table_name | is_updatable | is_insertable_into +------------+--------------+-------------------- + rw_view1 | YES | YES +(1 row) + +SELECT table_name, column_name, is_updatable + FROM information_schema.columns + WHERE table_name = 'rw_view1' + ORDER BY ordinal_position; + table_name | column_name | is_updatable +------------+-------------+-------------- + rw_view1 | person | YES +(1 row) + +SELECT * FROM rw_view1 WHERE snoop(person); +NOTICE: snooped value: Tom +NOTICE: snooped value: Harry + person +-------- + Tom + Harry +(2 rows) + +UPDATE rw_view1 SET person=person WHERE snoop(person); +NOTICE: snooped value: Tom +NOTICE: snooped value: Harry +DELETE FROM rw_view1 WHERE NOT snoop(person); +NOTICE: snooped value: Tom +NOTICE: snooped value: Harry +EXPLAIN (costs off) SELECT * FROM rw_view1 WHERE snoop(person); + QUERY PLAN +----------------------------------------------- + Subquery Scan on rw_view1 + Filter: snoop(rw_view1.person) + -> Seq Scan on base_tbl + Filter: (visibility = 'public'::text) +(4 rows) + +EXPLAIN (costs off) UPDATE rw_view1 SET person=person WHERE snoop(person); + QUERY PLAN +----------------------------------------------------- + Update on base_tbl base_tbl_1 + -> Subquery Scan on base_tbl + Filter: snoop(base_tbl.person) + -> Seq Scan on base_tbl base_tbl_2 + Filter: (visibility = 'public'::text) +(5 rows) + +EXPLAIN (costs off) DELETE FROM rw_view1 WHERE NOT snoop(person); + QUERY PLAN +----------------------------------------------------- + Delete on base_tbl base_tbl_1 + -> Subquery Scan on base_tbl + Filter: (NOT snoop(base_tbl.person)) + -> Seq Scan on base_tbl base_tbl_2 + Filter: (visibility = 'public'::text) +(5 rows) + +-- security barrier view on top of security barrier view +CREATE VIEW rw_view2 WITH (security_barrier = true) AS + SELECT * FROM rw_view1 WHERE snoop(person); +SELECT table_name, is_insertable_into + FROM information_schema.tables + WHERE table_name = 'rw_view2'; + table_name | is_insertable_into +------------+-------------------- + rw_view2 | YES +(1 row) + +SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name = 'rw_view2'; + table_name | is_updatable | is_insertable_into +------------+--------------+-------------------- + rw_view2 | YES | YES +(1 row) + +SELECT table_name, column_name, is_updatable + FROM information_schema.columns + WHERE table_name = 'rw_view2' + ORDER BY ordinal_position; + table_name | column_name | is_updatable +------------+-------------+-------------- + rw_view2 | person | YES +(1 row) + +SELECT * FROM rw_view2 WHERE snoop(person); +NOTICE: snooped value: Tom +NOTICE: snooped value: Tom +NOTICE: snooped value: Harry +NOTICE: snooped value: Harry + person +-------- + Tom + Harry +(2 rows) + +UPDATE rw_view2 SET person=person WHERE snoop(person); +NOTICE: snooped value: Tom +NOTICE: snooped value: Tom +NOTICE: snooped value: Harry +NOTICE: snooped value: Harry +DELETE FROM rw_view2 WHERE NOT snoop(person); +NOTICE: snooped value: Tom +NOTICE: snooped value: Tom +NOTICE: snooped value: Harry +NOTICE: snooped value: Harry +EXPLAIN (costs off) SELECT * FROM rw_view2 WHERE snoop(person); + QUERY PLAN +----------------------------------------------------- + Subquery Scan on rw_view2 + Filter: snoop(rw_view2.person) + -> Subquery Scan on rw_view1 + Filter: snoop(rw_view1.person) + -> Seq Scan on base_tbl + Filter: (visibility = 'public'::text) +(6 rows) + +EXPLAIN (costs off) UPDATE rw_view2 SET person=person WHERE snoop(person); + QUERY PLAN +----------------------------------------------------------- + Update on base_tbl base_tbl_1 + -> Subquery Scan on base_tbl + Filter: snoop(base_tbl.person) + -> Subquery Scan on base_tbl_2 + Filter: snoop(base_tbl_2.person) + -> Seq Scan on base_tbl base_tbl_3 + Filter: (visibility = 'public'::text) +(7 rows) + +EXPLAIN (costs off) DELETE FROM rw_view2 WHERE NOT snoop(person); + QUERY PLAN +----------------------------------------------------------- + Delete on base_tbl base_tbl_1 + -> Subquery Scan on base_tbl + Filter: (NOT snoop(base_tbl.person)) + -> Subquery Scan on base_tbl_2 + Filter: snoop(base_tbl_2.person) + -> Seq Scan on base_tbl base_tbl_3 + Filter: (visibility = 'public'::text) +(7 rows) + +DROP TABLE base_tbl CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to view rw_view1 +drop cascades to view rw_view2 +-- security barrier view on top of table with rules +CREATE TABLE base_tbl(id int PRIMARY KEY, data text, deleted boolean); +INSERT INTO base_tbl VALUES (1, 'Row 1', false), (2, 'Row 2', true); +CREATE RULE base_tbl_ins_rule AS ON INSERT TO base_tbl + WHERE EXISTS (SELECT 1 FROM base_tbl t WHERE t.id = new.id) + DO INSTEAD + UPDATE base_tbl SET data = new.data, deleted = false WHERE id = new.id; +CREATE RULE base_tbl_del_rule AS ON DELETE TO base_tbl + DO INSTEAD + UPDATE base_tbl SET deleted = true WHERE id = old.id; +CREATE VIEW rw_view1 WITH (security_barrier=true) AS + SELECT id, data FROM base_tbl WHERE NOT deleted; +SELECT * FROM rw_view1; + id | data +----+------- + 1 | Row 1 +(1 row) + +EXPLAIN (costs off) DELETE FROM rw_view1 WHERE id = 1 AND snoop(data); + QUERY PLAN +------------------------------------------------------------------------- + Update on base_tbl base_tbl_1 + -> Nested Loop + -> Index Scan using base_tbl_pkey on base_tbl base_tbl_1 + Index Cond: (id = 1) + -> Subquery Scan on base_tbl + Filter: snoop(base_tbl.data) + -> Index Scan using base_tbl_pkey on base_tbl base_tbl_2 + Index Cond: (id = 1) + Filter: (NOT deleted) +(9 rows) + +DELETE FROM rw_view1 WHERE id = 1 AND snoop(data); +NOTICE: snooped value: Row 1 +EXPLAIN (costs off) INSERT INTO rw_view1 VALUES (2, 'New row 2'); + QUERY PLAN +----------------------------------------------------------- + Insert on base_tbl + InitPlan 1 (returns $0) + -> Index Only Scan using base_tbl_pkey on base_tbl t + Index Cond: (id = 2) + -> Result + One-Time Filter: ($0 IS NOT TRUE) + + Update on base_tbl + InitPlan 1 (returns $0) + -> Index Only Scan using base_tbl_pkey on base_tbl t + Index Cond: (id = 2) + -> Result + One-Time Filter: $0 + -> Index Scan using base_tbl_pkey on base_tbl + Index Cond: (id = 2) +(15 rows) + +INSERT INTO rw_view1 VALUES (2, 'New row 2'); +SELECT * FROM base_tbl; + id | data | deleted +----+-----------+--------- + 1 | Row 1 | t + 2 | New row 2 | f +(2 rows) + +DROP TABLE base_tbl CASCADE; +NOTICE: drop cascades to view rw_view1 +-- security barrier view based on inheiritance set +CREATE TABLE t1 (a int, b float, c text); +CREATE INDEX t1_a_idx ON t1(a); +INSERT INTO t1 +SELECT i,i,'t1' FROM generate_series(1,10) g(i); +CREATE TABLE t11 (d text) INHERITS (t1); +CREATE INDEX t11_a_idx ON t11(a); +INSERT INTO t11 +SELECT i,i,'t11','t11d' FROM generate_series(1,10) g(i); +CREATE TABLE t12 (e int[]) INHERITS (t1); +CREATE INDEX t12_a_idx ON t12(a); +INSERT INTO t12 +SELECT i,i,'t12','{1,2}'::int[] FROM generate_series(1,10) g(i); +CREATE TABLE t111 () INHERITS (t11, t12); +NOTICE: merging multiple inherited definitions of column "a" +NOTICE: merging multiple inherited definitions of column "b" +NOTICE: merging multiple inherited definitions of column "c" +CREATE INDEX t111_a_idx ON t111(a); +INSERT INTO t111 +SELECT i,i,'t111','t111d','{1,1,1}'::int[] FROM generate_series(1,10) g(i); +CREATE VIEW v1 WITH (security_barrier=true) AS +SELECT *, (SELECT d FROM t11 WHERE t11.a = t1.a LIMIT 1) AS d +FROM t1 +WHERE a > 5 AND EXISTS(SELECT 1 FROM t12 WHERE t12.a = t1.a); +SELECT * FROM v1 WHERE a=3; -- should not see anything + a | b | c | d +---+---+---+--- +(0 rows) + +SELECT * FROM v1 WHERE a=8; + a | b | c | d +---+---+------+------ + 8 | 8 | t1 | t11d + 8 | 8 | t11 | t11d + 8 | 8 | t12 | t11d + 8 | 8 | t111 | t11d +(4 rows) + +EXPLAIN (VERBOSE, COSTS OFF) +UPDATE v1 SET a=100 WHERE snoop(a) AND leakproof(a) AND a = 3; + QUERY PLAN +------------------------------------------------------------------------------------------------- + Update on public.t1 t1_4 + -> Subquery Scan on t1 + Output: 100, t1.b, t1.c, t1.ctid + Filter: snoop(t1.a) + -> Hash Join + Output: t1_5.ctid, t1_5.a, t1_5.b, t1_5.c + Hash Cond: (t12.a = t1_5.a) + -> HashAggregate + Output: t12.a + Group Key: t12.a + -> Append + -> Seq Scan on public.t12 + Output: t12.a + -> Seq Scan on public.t111 + Output: t111.a + -> Hash + Output: t1_5.ctid, t1_5.a, t1_5.b, t1_5.c + -> Index Scan using t1_a_idx on public.t1 t1_5 + Output: t1_5.ctid, t1_5.a, t1_5.b, t1_5.c + Index Cond: ((t1_5.a > 5) AND (t1_5.a = 3)) + Filter: leakproof(t1_5.a) + -> Subquery Scan on t1_1 + Output: 100, t1_1.b, t1_1.c, t1_1.d, t1_1.ctid + Filter: snoop(t1_1.a) + -> Hash Join + Output: t11.ctid, t11.a, t11.b, t11.c, t11.d + Hash Cond: (t12_1.a = t11.a) + -> HashAggregate + Output: t12_1.a + Group Key: t12_1.a + -> Append + -> Seq Scan on public.t12 t12_1 + Output: t12_1.a + -> Seq Scan on public.t111 t111_1 + Output: t111_1.a + -> Hash + Output: t11.ctid, t11.a, t11.b, t11.c, t11.d + -> Index Scan using t11_a_idx on public.t11 + Output: t11.ctid, t11.a, t11.b, t11.c, t11.d + Index Cond: ((t11.a > 5) AND (t11.a = 3)) + Filter: leakproof(t11.a) + -> Subquery Scan on t1_2 + Output: 100, t1_2.b, t1_2.c, t1_2.e, t1_2.ctid + Filter: snoop(t1_2.a) + -> Hash Join + Output: t12_2.ctid, t12_2.a, t12_2.b, t12_2.c, t12_2.e + Hash Cond: (t12_3.a = t12_2.a) + -> HashAggregate + Output: t12_3.a + Group Key: t12_3.a + -> Append + -> Seq Scan on public.t12 t12_3 + Output: t12_3.a + -> Seq Scan on public.t111 t111_2 + Output: t111_2.a + -> Hash + Output: t12_2.ctid, t12_2.a, t12_2.b, t12_2.c, t12_2.e + -> Index Scan using t12_a_idx on public.t12 t12_2 + Output: t12_2.ctid, t12_2.a, t12_2.b, t12_2.c, t12_2.e + Index Cond: ((t12_2.a > 5) AND (t12_2.a = 3)) + Filter: leakproof(t12_2.a) + -> Subquery Scan on t1_3 + Output: 100, t1_3.b, t1_3.c, t1_3.d, t1_3.e, t1_3.ctid + Filter: snoop(t1_3.a) + -> Hash Join + Output: t111_3.ctid, t111_3.a, t111_3.b, t111_3.c, t111_3.d, t111_3.e + Hash Cond: (t12_4.a = t111_3.a) + -> HashAggregate + Output: t12_4.a + Group Key: t12_4.a + -> Append + -> Seq Scan on public.t12 t12_4 + Output: t12_4.a + -> Seq Scan on public.t111 t111_4 + Output: t111_4.a + -> Hash + Output: t111_3.ctid, t111_3.a, t111_3.b, t111_3.c, t111_3.d, t111_3.e + -> Index Scan using t111_a_idx on public.t111 t111_3 + Output: t111_3.ctid, t111_3.a, t111_3.b, t111_3.c, t111_3.d, t111_3.e + Index Cond: ((t111_3.a > 5) AND (t111_3.a = 3)) + Filter: leakproof(t111_3.a) +(81 rows) + +UPDATE v1 SET a=100 WHERE snoop(a) AND leakproof(a) AND a = 3; +SELECT * FROM v1 WHERE a=100; -- Nothing should have been changed to 100 + a | b | c | d +---+---+---+--- +(0 rows) + +SELECT * FROM t1 WHERE a=100; -- Nothing should have been changed to 100 + a | b | c +---+---+--- +(0 rows) + +EXPLAIN (VERBOSE, COSTS OFF) +UPDATE v1 SET a=a+1 WHERE snoop(a) AND leakproof(a) AND a = 8; + QUERY PLAN +------------------------------------------------------------------------------------------------- + Update on public.t1 t1_4 + -> Subquery Scan on t1 + Output: (t1.a + 1), t1.b, t1.c, t1.ctid + Filter: snoop(t1.a) + -> Hash Join + Output: t1_5.a, t1_5.ctid, t1_5.b, t1_5.c + Hash Cond: (t12.a = t1_5.a) + -> HashAggregate + Output: t12.a + Group Key: t12.a + -> Append + -> Seq Scan on public.t12 + Output: t12.a + -> Seq Scan on public.t111 + Output: t111.a + -> Hash + Output: t1_5.a, t1_5.ctid, t1_5.b, t1_5.c + -> Index Scan using t1_a_idx on public.t1 t1_5 + Output: t1_5.a, t1_5.ctid, t1_5.b, t1_5.c + Index Cond: ((t1_5.a > 5) AND (t1_5.a = 8)) + Filter: leakproof(t1_5.a) + -> Subquery Scan on t1_1 + Output: (t1_1.a + 1), t1_1.b, t1_1.c, t1_1.d, t1_1.ctid + Filter: snoop(t1_1.a) + -> Hash Join + Output: t11.a, t11.ctid, t11.b, t11.c, t11.d + Hash Cond: (t12_1.a = t11.a) + -> HashAggregate + Output: t12_1.a + Group Key: t12_1.a + -> Append + -> Seq Scan on public.t12 t12_1 + Output: t12_1.a + -> Seq Scan on public.t111 t111_1 + Output: t111_1.a + -> Hash + Output: t11.a, t11.ctid, t11.b, t11.c, t11.d + -> Index Scan using t11_a_idx on public.t11 + Output: t11.a, t11.ctid, t11.b, t11.c, t11.d + Index Cond: ((t11.a > 5) AND (t11.a = 8)) + Filter: leakproof(t11.a) + -> Subquery Scan on t1_2 + Output: (t1_2.a + 1), t1_2.b, t1_2.c, t1_2.e, t1_2.ctid + Filter: snoop(t1_2.a) + -> Hash Join + Output: t12_2.a, t12_2.ctid, t12_2.b, t12_2.c, t12_2.e + Hash Cond: (t12_3.a = t12_2.a) + -> HashAggregate + Output: t12_3.a + Group Key: t12_3.a + -> Append + -> Seq Scan on public.t12 t12_3 + Output: t12_3.a + -> Seq Scan on public.t111 t111_2 + Output: t111_2.a + -> Hash + Output: t12_2.a, t12_2.ctid, t12_2.b, t12_2.c, t12_2.e + -> Index Scan using t12_a_idx on public.t12 t12_2 + Output: t12_2.a, t12_2.ctid, t12_2.b, t12_2.c, t12_2.e + Index Cond: ((t12_2.a > 5) AND (t12_2.a = 8)) + Filter: leakproof(t12_2.a) + -> Subquery Scan on t1_3 + Output: (t1_3.a + 1), t1_3.b, t1_3.c, t1_3.d, t1_3.e, t1_3.ctid + Filter: snoop(t1_3.a) + -> Hash Join + Output: t111_3.a, t111_3.ctid, t111_3.b, t111_3.c, t111_3.d, t111_3.e + Hash Cond: (t12_4.a = t111_3.a) + -> HashAggregate + Output: t12_4.a + Group Key: t12_4.a + -> Append + -> Seq Scan on public.t12 t12_4 + Output: t12_4.a + -> Seq Scan on public.t111 t111_4 + Output: t111_4.a + -> Hash + Output: t111_3.a, t111_3.ctid, t111_3.b, t111_3.c, t111_3.d, t111_3.e + -> Index Scan using t111_a_idx on public.t111 t111_3 + Output: t111_3.a, t111_3.ctid, t111_3.b, t111_3.c, t111_3.d, t111_3.e + Index Cond: ((t111_3.a > 5) AND (t111_3.a = 8)) + Filter: leakproof(t111_3.a) +(81 rows) + +UPDATE v1 SET a=a+1 WHERE snoop(a) AND leakproof(a) AND a = 8; +NOTICE: snooped value: 8 +NOTICE: snooped value: 8 +NOTICE: snooped value: 8 +NOTICE: snooped value: 8 +SELECT * FROM v1 WHERE b=8; + a | b | c | d +---+---+------+------ + 9 | 8 | t111 | t11d + 9 | 8 | t12 | t11d + 9 | 8 | t11 | t11d + 9 | 8 | t1 | t11d +(4 rows) + +DELETE FROM v1 WHERE snoop(a) AND leakproof(a); -- should not delete everything, just where a>5 +NOTICE: snooped value: 10 +NOTICE: snooped value: 9 +NOTICE: snooped value: 9 +NOTICE: snooped value: 6 +NOTICE: snooped value: 7 +NOTICE: snooped value: 10 +NOTICE: snooped value: 9 +NOTICE: snooped value: 9 +NOTICE: snooped value: 6 +NOTICE: snooped value: 7 +NOTICE: snooped value: 10 +NOTICE: snooped value: 9 +NOTICE: snooped value: 9 +NOTICE: snooped value: 6 +NOTICE: snooped value: 7 +NOTICE: snooped value: 6 +NOTICE: snooped value: 7 +NOTICE: snooped value: 9 +NOTICE: snooped value: 10 +NOTICE: snooped value: 9 +TABLE t1; -- verify all a<=5 are intact + a | b | c +---+---+------ + 1 | 1 | t1 + 2 | 2 | t1 + 3 | 3 | t1 + 4 | 4 | t1 + 5 | 5 | t1 + 1 | 1 | t11 + 2 | 2 | t11 + 3 | 3 | t11 + 4 | 4 | t11 + 5 | 5 | t11 + 1 | 1 | t12 + 2 | 2 | t12 + 3 | 3 | t12 + 4 | 4 | t12 + 5 | 5 | t12 + 1 | 1 | t111 + 2 | 2 | t111 + 3 | 3 | t111 + 4 | 4 | t111 + 5 | 5 | t111 +(20 rows) + +DROP TABLE t1, t11, t12, t111 CASCADE; +NOTICE: drop cascades to view v1 +DROP FUNCTION snoop(anyelement); +DROP FUNCTION leakproof(anyelement); diff --git a/src/test/regress/sql/updatable_views.sql b/src/test/regress/sql/updatable_views.sql index a77cf197582e5..eb7b17979ed0d 100644 --- a/src/test/regress/sql/updatable_views.sql +++ b/src/test/regress/sql/updatable_views.sql @@ -25,12 +25,10 @@ CREATE VIEW rw_view14 AS SELECT ctid, a, b FROM base_tbl; -- System columns may CREATE VIEW rw_view15 AS SELECT a, upper(b) FROM base_tbl; -- Expression/function may be part of an updatable view CREATE VIEW rw_view16 AS SELECT a, b, a AS aa FROM base_tbl; -- Repeated column may be part of an updatable view CREATE VIEW ro_view17 AS SELECT * FROM ro_view1; -- Base relation not updatable -CREATE VIEW ro_view18 WITH (security_barrier = true) - AS SELECT * FROM base_tbl; -- Security barrier views not updatable -CREATE VIEW ro_view19 AS SELECT * FROM (VALUES(1)) AS tmp(a); -- VALUES in rangetable +CREATE VIEW ro_view18 AS SELECT * FROM (VALUES(1)) AS tmp(a); -- VALUES in rangetable CREATE SEQUENCE seq; -CREATE VIEW ro_view20 AS SELECT * FROM seq; -- View based on a sequence -CREATE VIEW ro_view21 AS SELECT a, b, generate_series(1, a) g FROM base_tbl; -- SRF in targetlist not supported +CREATE VIEW ro_view19 AS SELECT * FROM seq; -- View based on a sequence +CREATE VIEW ro_view20 AS SELECT a, b, generate_series(1, a) g FROM base_tbl; -- SRF in targetlist not supported SELECT table_name, is_insertable_into FROM information_schema.tables @@ -87,13 +85,12 @@ SELECT * FROM base_tbl; DELETE FROM rw_view16 WHERE a=-3; -- should be OK -- Read-only views INSERT INTO ro_view17 VALUES (3, 'ROW 3'); -INSERT INTO ro_view18 VALUES (3, 'ROW 3'); -DELETE FROM ro_view19; -UPDATE ro_view20 SET max_value=1000; -UPDATE ro_view21 SET b=upper(b); +DELETE FROM ro_view18; +UPDATE ro_view19 SET max_value=1000; +UPDATE ro_view20 SET b=upper(b); DROP TABLE base_tbl CASCADE; -DROP VIEW ro_view10, ro_view12, ro_view19; +DROP VIEW ro_view10, ro_view12, ro_view18; DROP SEQUENCE seq CASCADE; -- simple updatable view @@ -828,3 +825,166 @@ CREATE VIEW rw_view2 AS SELECT * FROM rw_view1 WHERE a > b WITH LOCAL CHECK OPTION; INSERT INTO rw_view2 VALUES (2,3); -- ok, but not in view (doesn't fail rw_view2's check) DROP TABLE base_tbl CASCADE; + +-- security barrier view + +CREATE TABLE base_tbl (person text, visibility text); +INSERT INTO base_tbl VALUES ('Tom', 'public'), + ('Dick', 'private'), + ('Harry', 'public'); + +CREATE VIEW rw_view1 AS + SELECT person FROM base_tbl WHERE visibility = 'public'; + +CREATE FUNCTION snoop(anyelement) +RETURNS boolean AS +$$ +BEGIN + RAISE NOTICE 'snooped value: %', $1; + RETURN true; +END; +$$ +LANGUAGE plpgsql COST 0.000001; + +CREATE OR REPLACE FUNCTION leakproof(anyelement) +RETURNS boolean AS +$$ +BEGIN + RETURN true; +END; +$$ +LANGUAGE plpgsql STRICT IMMUTABLE LEAKPROOF; + +SELECT * FROM rw_view1 WHERE snoop(person); +UPDATE rw_view1 SET person=person WHERE snoop(person); +DELETE FROM rw_view1 WHERE NOT snoop(person); + +ALTER VIEW rw_view1 SET (security_barrier = true); + +SELECT table_name, is_insertable_into + FROM information_schema.tables + WHERE table_name = 'rw_view1'; + +SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name = 'rw_view1'; + +SELECT table_name, column_name, is_updatable + FROM information_schema.columns + WHERE table_name = 'rw_view1' + ORDER BY ordinal_position; + +SELECT * FROM rw_view1 WHERE snoop(person); +UPDATE rw_view1 SET person=person WHERE snoop(person); +DELETE FROM rw_view1 WHERE NOT snoop(person); + +EXPLAIN (costs off) SELECT * FROM rw_view1 WHERE snoop(person); +EXPLAIN (costs off) UPDATE rw_view1 SET person=person WHERE snoop(person); +EXPLAIN (costs off) DELETE FROM rw_view1 WHERE NOT snoop(person); + +-- security barrier view on top of security barrier view + +CREATE VIEW rw_view2 WITH (security_barrier = true) AS + SELECT * FROM rw_view1 WHERE snoop(person); + +SELECT table_name, is_insertable_into + FROM information_schema.tables + WHERE table_name = 'rw_view2'; + +SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name = 'rw_view2'; + +SELECT table_name, column_name, is_updatable + FROM information_schema.columns + WHERE table_name = 'rw_view2' + ORDER BY ordinal_position; + +SELECT * FROM rw_view2 WHERE snoop(person); +UPDATE rw_view2 SET person=person WHERE snoop(person); +DELETE FROM rw_view2 WHERE NOT snoop(person); + +EXPLAIN (costs off) SELECT * FROM rw_view2 WHERE snoop(person); +EXPLAIN (costs off) UPDATE rw_view2 SET person=person WHERE snoop(person); +EXPLAIN (costs off) DELETE FROM rw_view2 WHERE NOT snoop(person); + +DROP TABLE base_tbl CASCADE; + +-- security barrier view on top of table with rules + +CREATE TABLE base_tbl(id int PRIMARY KEY, data text, deleted boolean); +INSERT INTO base_tbl VALUES (1, 'Row 1', false), (2, 'Row 2', true); + +CREATE RULE base_tbl_ins_rule AS ON INSERT TO base_tbl + WHERE EXISTS (SELECT 1 FROM base_tbl t WHERE t.id = new.id) + DO INSTEAD + UPDATE base_tbl SET data = new.data, deleted = false WHERE id = new.id; + +CREATE RULE base_tbl_del_rule AS ON DELETE TO base_tbl + DO INSTEAD + UPDATE base_tbl SET deleted = true WHERE id = old.id; + +CREATE VIEW rw_view1 WITH (security_barrier=true) AS + SELECT id, data FROM base_tbl WHERE NOT deleted; + +SELECT * FROM rw_view1; + +EXPLAIN (costs off) DELETE FROM rw_view1 WHERE id = 1 AND snoop(data); +DELETE FROM rw_view1 WHERE id = 1 AND snoop(data); + +EXPLAIN (costs off) INSERT INTO rw_view1 VALUES (2, 'New row 2'); +INSERT INTO rw_view1 VALUES (2, 'New row 2'); + +SELECT * FROM base_tbl; + +DROP TABLE base_tbl CASCADE; + +-- security barrier view based on inheiritance set +CREATE TABLE t1 (a int, b float, c text); +CREATE INDEX t1_a_idx ON t1(a); +INSERT INTO t1 +SELECT i,i,'t1' FROM generate_series(1,10) g(i); + +CREATE TABLE t11 (d text) INHERITS (t1); +CREATE INDEX t11_a_idx ON t11(a); +INSERT INTO t11 +SELECT i,i,'t11','t11d' FROM generate_series(1,10) g(i); + +CREATE TABLE t12 (e int[]) INHERITS (t1); +CREATE INDEX t12_a_idx ON t12(a); +INSERT INTO t12 +SELECT i,i,'t12','{1,2}'::int[] FROM generate_series(1,10) g(i); + +CREATE TABLE t111 () INHERITS (t11, t12); +CREATE INDEX t111_a_idx ON t111(a); +INSERT INTO t111 +SELECT i,i,'t111','t111d','{1,1,1}'::int[] FROM generate_series(1,10) g(i); + +CREATE VIEW v1 WITH (security_barrier=true) AS +SELECT *, (SELECT d FROM t11 WHERE t11.a = t1.a LIMIT 1) AS d +FROM t1 +WHERE a > 5 AND EXISTS(SELECT 1 FROM t12 WHERE t12.a = t1.a); + +SELECT * FROM v1 WHERE a=3; -- should not see anything +SELECT * FROM v1 WHERE a=8; + +EXPLAIN (VERBOSE, COSTS OFF) +UPDATE v1 SET a=100 WHERE snoop(a) AND leakproof(a) AND a = 3; +UPDATE v1 SET a=100 WHERE snoop(a) AND leakproof(a) AND a = 3; + +SELECT * FROM v1 WHERE a=100; -- Nothing should have been changed to 100 +SELECT * FROM t1 WHERE a=100; -- Nothing should have been changed to 100 + +EXPLAIN (VERBOSE, COSTS OFF) +UPDATE v1 SET a=a+1 WHERE snoop(a) AND leakproof(a) AND a = 8; +UPDATE v1 SET a=a+1 WHERE snoop(a) AND leakproof(a) AND a = 8; + +SELECT * FROM v1 WHERE b=8; + +DELETE FROM v1 WHERE snoop(a) AND leakproof(a); -- should not delete everything, just where a>5 + +TABLE t1; -- verify all a<=5 are intact + +DROP TABLE t1, t11, t12, t111 CASCADE; +DROP FUNCTION snoop(anyelement); +DROP FUNCTION leakproof(anyelement);