Skip to content

Commit

Permalink
repo-sqale: app controlled lazy refresh of org-closure before query
Browse files Browse the repository at this point in the history
  • Loading branch information
virgo47 committed May 17, 2021
1 parent 4915bfb commit f7c6583
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 58 deletions.
33 changes: 12 additions & 21 deletions repo/repo-sqale/sql/pg-org-experiments.sql
Expand Up @@ -194,31 +194,22 @@ select * from m_global_metadata

refresh materialized view m_org_closure;

WITH RECURSIVE org_h(ancestor_oid, descendant_oid) AS (
SELECT r.targetoid,
r.owner_oid
FROM m_ref_object_parent_org r
WHERE r.owner_type = 'ORG'::objecttype
UNION
SELECT par.targetoid,
chi.descendant_oid
FROM m_ref_object_parent_org par,
org_h chi
WHERE par.owner_oid = chi.ancestor_oid
)
SELECT count(*) FROM org_h;

select * from m_org;
select count(*) from m_org;
select count(*) from m_user;
select count(*) from m_org_closure;
select count(*) from m_user;

select * from m_org_closure;
-- Perf test adding orgs:
-- trigger with refresh: orgs/closure: 15125/72892 29m31s, ~8.5 orgs/s (most late addObject took ~220ms)
-- empty trigger: orgs/closure: 14052/67735 (after manual refresh taking ~230 ms) 31s, ~450 orgs/s
-- trigger with mark: orgs/closure: 14573/70225 (after m_refresh_org_closure ~300 ms) 32s, ~455 orgs/s
-- trigger with mark: orgs/closure: 59711/291099 (after refresh ~1.9s), 2m13s, ~450 orgs/s

select * from m_object where oid = '62d6f1db-7b97-40de-bfbd-d325020597a0'
;
select * from m_org o
where not exists (select 1 from m_ref_object_parent_org po where po.owner_oid = o.oid);

select * from m_ref_object_parent_org;
select * from m_org_closure;

select * FROM m_global_metadata;

truncate m_ref_object_parent_org;
select * from m_org_closure_internal;
CALL m_refresh_org_closure(true);
73 changes: 53 additions & 20 deletions repo/repo-sqale/sql/pgnew-repo.sql
Expand Up @@ -121,10 +121,10 @@ CREATE TYPE TimeIntervalStatusType AS ENUM ('BEFORE', 'IN', 'AFTER');
-- select * from pg_available_extensions order by name;
DO $$
BEGIN
perform pg_get_functiondef('gen_random_uuid()'::regprocedure);
raise notice 'gen_random_uuid already exists, skipping create EXTENSION pgcrypto';
PERFORM pg_get_functiondef('gen_random_uuid()'::regprocedure);
RAISE NOTICE 'gen_random_uuid already exists, skipping create EXTENSION pgcrypto';
EXCEPTION WHEN undefined_function THEN
create EXTENSION pgcrypto;
CREATE EXTENSION pgcrypto;
END
$$;

Expand All @@ -142,9 +142,9 @@ CREATE OR REPLACE FUNCTION insert_object_oid()
AS $$
BEGIN
IF NEW.oid IS NOT NULL THEN
insert into m_object_oid values (NEW.oid);
INSERT INTO m_object_oid VALUES (NEW.oid);
ELSE
insert into m_object_oid DEFAULT VALUES RETURNING oid INTO NEW.oid;
INSERT INTO m_object_oid DEFAULT VALUES RETURNING oid INTO NEW.oid;
END IF;
-- before trigger must return NEW row to do something
RETURN NEW;
Expand Down Expand Up @@ -183,7 +183,13 @@ END
$$;
-- endregion

-- region Enumeration/code tables
-- region Enumeration/code/management tables
-- Key -> value config table for internal use.
CREATE TABLE m_global_metadata (
name TEXT PRIMARY KEY,
value TEXT
);

-- Catalog of often used URIs, typically channels and relation Q-names.
-- Never update values of "uri" manually to change URI for some objects
-- (unless you really want to migrate old URI to a new one).
Expand Down Expand Up @@ -614,10 +620,12 @@ WITH RECURSIVE org_h (
-- depth -- possible later, but cycle detected must be added to the recursive term
) AS (
-- non-recursive term:
-- Gather all organization oids and initialize identity lines (o => o).
-- Gather all organization oids from parent-org refs and initialize identity lines (o => o).
-- We don't want the orgs not in org hierarchy, that would require org triggers too.
SELECT o.oid, o.oid FROM m_org o
-- It's possible to exclude orgs not in parent-org-refs (either owner or target!),
-- but it's not a big deal and makes things simple for JOINs (no outer needed).
WHERE EXISTS(
SELECT 1 FROM m_ref_object_parent_org r
WHERE r.targetOid = o.oid OR r.owner_oid = o.oid)
UNION
-- recursive (iterative) term:
-- Generate their parents (anc => desc, that is target => owner), => means "is parent of".
Expand All @@ -634,25 +642,55 @@ CREATE INDEX m_org_closure_desc_asc_idx
ON m_org_closure (descendant_oid, ancestor_oid);

-- The trigger for m_ref_object_parent_org that flags the view for refresh.
CREATE OR REPLACE FUNCTION m_org_closure_refresh()
CREATE OR REPLACE FUNCTION mark_org_closure_for_refresh()
RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
IF TG_OP = 'TRUNCATE' OR OLD.owner_type = 'ORG' OR NEW.owner_type = 'ORG' THEN
REFRESH MATERIALIZED VIEW m_org_closure;
INSERT INTO m_global_metadata VALUES ('orgClosureRefreshNeeded', 'true')
ON CONFLICT (name) DO UPDATE SET value = 'true';
END IF;

-- after trigger returns null
RETURN NULL;
END $$;

CREATE TRIGGER m_ref_object_parent_org_refresh_tr
CREATE TRIGGER m_ref_object_parent_mark_refresh_tr
AFTER INSERT OR UPDATE OR DELETE ON m_ref_object_parent_org
FOR EACH ROW EXECUTE PROCEDURE m_org_closure_refresh();
CREATE TRIGGER m_ref_object_parent_org_trunc_refresh_tr
FOR EACH ROW EXECUTE PROCEDURE mark_org_closure_for_refresh();
CREATE TRIGGER m_ref_object_parent_mark_refresh_trunc_tr
AFTER TRUNCATE ON m_ref_object_parent_org
FOR EACH STATEMENT EXECUTE PROCEDURE m_org_closure_refresh();
FOR EACH STATEMENT EXECUTE PROCEDURE mark_org_closure_for_refresh();

-- This procedure for conditional refresh when needed is called from the application code.
-- The refresh can be forced, e.g. after many changes with triggers off (or just to be sure).
CREATE OR REPLACE PROCEDURE m_refresh_org_closure(force boolean = false)
LANGUAGE plpgsql
AS $$
DECLARE
flag_val text;
BEGIN
SELECT value INTO flag_val FROM m_global_metadata WHERE name = 'orgClosureRefreshNeeded';
IF flag_val = 'true' OR force THEN
-- We use advisory session lock only for the check + refresh, then release it immediately.
-- This can still dead-lock two transactions in a single thread on the select/delete combo,
-- (I mean, who would do that?!) but works fine for parallel transactions.
PERFORM pg_advisory_lock(47);
BEGIN
SELECT value INTO flag_val FROM m_global_metadata WHERE name = 'orgClosureRefreshNeeded';
IF flag_val = 'true' OR force THEN
REFRESH MATERIALIZED VIEW m_org_closure;
DELETE FROM m_global_metadata WHERE name = 'orgClosureRefreshNeeded';
END IF;
PERFORM pg_advisory_unlock(47);
EXCEPTION WHEN OTHERS THEN
-- Whatever happens we definitely want to release the lock.
PERFORM pg_advisory_unlock(47);
RAISE;
END;
END IF;
END; $$;
-- endregion
-- endregion

Expand Down Expand Up @@ -2067,11 +2105,6 @@ create index idx_qrtz_ft_tg on qrtz_fired_triggers(SCHED_NAME,TRIGGER_GROUP);
*/

-- region Schema versioning and upgrading
CREATE TABLE m_global_metadata (
name TEXT PRIMARY KEY,
value TEXT
);

/*
Procedure applying a DB schema/data change. Use sequential change numbers to identify the changes.
This protects re-execution of the same change on the same database instance.
Expand Down
Expand Up @@ -16,6 +16,7 @@
import com.evolveum.midpoint.repo.sqale.filtering.InOidFilterProcessor;
import com.evolveum.midpoint.repo.sqale.filtering.OrgFilterProcessor;
import com.evolveum.midpoint.repo.sqale.qmodel.SqaleTableMapping;
import com.evolveum.midpoint.repo.sqlbase.JdbcSession;
import com.evolveum.midpoint.repo.sqlbase.SqlQueryContext;
import com.evolveum.midpoint.repo.sqlbase.filtering.FilterProcessor;
import com.evolveum.midpoint.repo.sqlbase.mapping.QueryTableMapping;
Expand All @@ -24,6 +25,8 @@
public class SqaleQueryContext<S, Q extends FlexibleRelationalPathBase<R>, R>
extends SqlQueryContext<S, Q, R> {

private boolean containsOrgFilter = false;

public static <S, Q extends FlexibleRelationalPathBase<R>, R> SqaleQueryContext<S, Q, R> from(
Class<S> schemaType,
SqaleRepoContext sqlRepoContext) {
Expand All @@ -47,6 +50,13 @@ private SqaleQueryContext(
super(entityPath, mapping, sqlRepoContext, query);
}

private SqaleQueryContext(
Q entityPath,
SqaleTableMapping<S, Q, R> mapping,
SqaleQueryContext<?, ?, ?> parentContext) {
super(entityPath, mapping, parentContext);
}

@Override
public SqaleRepoContext repositoryContext() {
return (SqaleRepoContext) super.repositoryContext();
Expand All @@ -66,6 +76,14 @@ public FilterProcessor<OrgFilter> createOrgFilter() {
return repositoryContext().searchCachedRelationId(qName);
}

public void markContainsOrgFilter() {
containsOrgFilter = true;
SqaleQueryContext<?, ?, ?> parentContext = parentContext();
if (parentContext != null) {
parentContext.markContainsOrgFilter();
}
}

/**
* Returns derived {@link SqaleQueryContext} for join or subquery.
*/
Expand All @@ -75,7 +93,21 @@ public FilterProcessor<OrgFilter> createOrgFilter() {
return new SqaleQueryContext<>(
newPath,
(SqaleTableMapping<TS, TQ, TR>) newMapping,
repositoryContext(),
sqlQuery);
this);
}

@Override
public SqaleQueryContext<?, ?, ?> parentContext() {
return (SqaleQueryContext<?, ?, ?>) super.parentContext();
}

@Override
public void beforeQuery() {
if (containsOrgFilter) {
try (JdbcSession jdbcSession = repositoryContext().newJdbcSession().startTransaction()) {
jdbcSession.executeStatement("CALL m_refresh_org_closure()");
jdbcSession.commit();
}
}
}
}
Expand Up @@ -37,6 +37,8 @@ public OrgFilterProcessor(SqaleQueryContext<?, ?, ?> context) {

@Override
public Predicate process(OrgFilter filter) throws QueryException {
context.markContainsOrgFilter(); // necessary for lazy refresh of org closure

FlexibleRelationalPathBase<?> path = context.root();
if (!(path instanceof QObject)) {
throw new QueryException("Org filter can only be used for objects,"
Expand Down
Expand Up @@ -15,11 +15,13 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.evolveum.midpoint.prism.polystring.PolyString;
import com.evolveum.midpoint.prism.query.ObjectQuery;
import com.evolveum.midpoint.repo.sqale.SqaleRepoBaseTest;
import com.evolveum.midpoint.repo.sqale.qmodel.focus.QUser;
import com.evolveum.midpoint.repo.sqale.qmodel.org.QOrg;
import com.evolveum.midpoint.repo.sqale.qmodel.org.QOrgClosure;
import com.evolveum.midpoint.repo.sqlbase.JdbcSession;
import com.evolveum.midpoint.schema.GetOperationOptions;
import com.evolveum.midpoint.schema.SearchResultList;
import com.evolveum.midpoint.schema.SelectorOptions;
Expand All @@ -43,6 +45,13 @@ public class OrgHierarchyPerfTest extends SqaleRepoBaseTest {
public void initObjects() throws Exception {
OperationResult result = createOperationResult();

try (JdbcSession jdbcSession = sqlRepoContext.newJdbcSession().startTransaction()) {
jdbcSession.executeStatement("CALL m_refresh_org_closure(true)");
jdbcSession.commit();
}
assertThat(count(QOrg.CLASS)).isZero();
assertThat(count(new QOrgClosure())).isZero();

createOrgsFor(null, 5, 6, result);

assertThatOperationResult(result).isSuccess();
Expand All @@ -52,6 +61,7 @@ private void createOrgsFor(
OrgType parent, int levels, int typicalCountPerLevel, OperationResult result)
throws SchemaException, ObjectAlreadyExistsException {
if (levels == 0) {
// typical count is used as max for user generation
createUsersFor(parent, typicalCountPerLevel, result);
return;
}
Expand All @@ -73,9 +83,9 @@ private void createOrgsFor(
}
}

private void createUsersFor(OrgType parent, int typicalCountPerLevel, OperationResult result)
private void createUsersFor(OrgType parent, int maxCountPerLevel, OperationResult result)
throws SchemaException, ObjectAlreadyExistsException {
int users = RANDOM.nextInt(typicalCountPerLevel) + typicalCountPerLevel / 2 + 1;
int users = RANDOM.nextInt(maxCountPerLevel) + 1;
for (int i = 1; i <= users; i++) {
repositoryService.addObject(
new UserType(prismContext).name("user" + parent.getName() + "v" + i)
Expand All @@ -89,19 +99,31 @@ private void createUsersFor(OrgType parent, int typicalCountPerLevel, OperationR
}

@Test
// TODO
public void test100Xxx() throws Exception {
when("...");
// OperationResult operationResult = createOperationResult();
// SearchResultList<ObjectType> result = searchObjects(ObjectType.class,
// prismContext.queryFor(ObjectType.class)
// .build(),
// operationResult);

then("...");
System.out.println("Orgs: " + count(QOrg.CLASS));
System.out.println("Org closure: " + count(new QOrgClosure()));
System.out.println("Users: " + count(QUser.class));
given("there are orgs and users, closure is not yet updated");
OperationResult operationResult = createOperationResult();
display("Orgs: " + count(QOrg.CLASS));
display("Users: " + count(QUser.class));
assertThat(count(new QOrgClosure())).isZero();
OrgType org1x1x1 = searchObjects(OrgType.class,
prismContext.queryFor(OrgType.class)
.item(ObjectType.F_NAME).eq(PolyString.fromOrig("org1x1x1"))
.build(),
operationResult).get(0);

when("search for user under some org is initiated");
SearchResultList<UserType> result = searchObjects(UserType.class,
prismContext.queryFor(UserType.class)
.isChildOf(org1x1x1.getOid())
.build(),
operationResult);

then("non-empty result is returned and org closure has been initialized");
assertThat(result).isNotEmpty();
assertThat(count(new QOrgClosure())).isPositive();
display("Orgs: " + count(QOrg.CLASS));
display("Org closure: " + count(new QOrgClosure()));
display("Users: " + count(QUser.class));
}

// support methods
Expand Down
Expand Up @@ -44,10 +44,17 @@ private AuditSqlQueryContext(
super(entityPath, mapping, sqlRepoContext, query);
}

private AuditSqlQueryContext(
Q entityPath,
QueryTableMapping<S, Q, R> mapping,
AuditSqlQueryContext<?, ?, ?> parentContext) {
super(entityPath, mapping, parentContext);
}

@Override
protected <TS, TQ extends FlexibleRelationalPathBase<TR>, TR>
SqlQueryContext<TS, TQ, TR> deriveNew(TQ newPath, QueryTableMapping<TS, TQ, TR> newMapping) {
return new AuditSqlQueryContext<>(
newPath, newMapping, repositoryContext(), sqlQuery);
newPath, newMapping, this);
}
}

0 comments on commit f7c6583

Please sign in to comment.