Skip to content

Commit

Permalink
Implemented and tested closure for MySQL (performance is similar to P…
Browse files Browse the repository at this point in the history
…ostgreSQL).
  • Loading branch information
mederly committed Oct 2, 2014
1 parent f32e66e commit 1c33bf8
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 84 deletions.
Expand Up @@ -61,13 +61,13 @@ public class OrgClosureMultiParentTest extends AbstractOrgClosureTest {
private static final int[] ORG_CHILDREN_IN_LEVEL = { 1, 5, 3, 3, 5, 4, 0};
private static final int[] USER_CHILDREN_IN_LEVEL = { 0, 3, 4, 5, 6, 7, 10};
private static final int[] PARENTS_IN_LEVEL = { 0, 1, 2, 2, 2, 2, 2};
// private static final int[] LINK_ROUNDS_FOR_LEVELS = { 0, 10, 2 ,2 ,2 ,2 , 0 };
// private static final int[] NODE_ROUNDS_FOR_LEVELS = { 0, 0, 0 ,0 ,0 ,0 , 0 };
// private static final int[] USER_ROUNDS_FOR_LEVELS = { 0, 5 ,5 ,5 ,5 ,5 , 5 };

private static final int[] LINK_ROUNDS_FOR_LEVELS = { 0, 10, 15,15,15,15, 0 };
private static final int[] NODE_ROUNDS_FOR_LEVELS = { 5, 10, 15,15,15,15, 0 };
private static final int[] USER_ROUNDS_FOR_LEVELS = { 0, 10,10,20,20,20, 20};

// private static final int[] LINK_ROUNDS_FOR_LEVELS = { 0, 10, 2 ,2 ,2 ,2 , 0 };
// private static final int[] NODE_ROUNDS_FOR_LEVELS = { 0, 0, 0 ,0 ,0 ,0 , 0 };
// private static final int[] USER_ROUNDS_FOR_LEVELS = { 0, 5 ,5 ,5 ,5 ,5 , 5 };
// private static final int[] LINK_ROUNDS_FOR_LEVELS = { 0, 2, 0 ,0 ,0 ,0 , 0 };
// private static final int[] NODE_ROUNDS_FOR_LEVELS = { 0, 0, 0 ,0 ,0 ,0 , 0 };
// private static final int[] USER_ROUNDS_FOR_LEVELS = { 0, 0 ,0 ,0 ,0 ,0 , 0 };
Expand Down Expand Up @@ -95,8 +95,8 @@ public class OrgClosureMultiParentTest extends AbstractOrgClosureTest {
private static boolean CHECK_CLOSURE_MATRIX = false;
private long closureSize;

@Test(enabled = false) public void test100LoadOrgStructure() throws Exception { _test100LoadOrgStructure(); }
@Test(enabled = true) public void test110ScanOrgStructure() throws Exception { _test110ScanOrgStructure() ; }
@Test(enabled = true) public void test100LoadOrgStructure() throws Exception { _test100LoadOrgStructure(); }
@Test(enabled = false) public void test110ScanOrgStructure() throws Exception { _test110ScanOrgStructure() ; }
@Test(enabled = true) public void test150CheckClosure() throws Exception { _test150CheckClosure(); }
@Test(enabled = false) public void test180RemoveAddCycle() throws Exception { _test180RemoveAddCycle(); }
@Test(enabled = false) public void test190AddLink() throws Exception { _test190AddLink(); }
Expand Down
Expand Up @@ -217,49 +217,68 @@ private void addIndependentEdges(List<Edge> edges, Session session) {

if (!edges.isEmpty()) {
String deltaTempTableName = computeDeltaTable(edges, session);
int count;

long startUpdate = System.currentTimeMillis();
String updateInClosureQueryText;
if (repoConfiguration.isUsingH2()) {
updateInClosureQueryText = "update " + closureTableName + " " +
"set val = val + (select val from " + deltaTempTableName + " td " +
"where td.descendant_oid=" + closureTableName + ".descendant_oid and td.ancestor_oid=" + closureTableName + ".ancestor_oid) " +
"where (descendant_oid, ancestor_oid) in (select (descendant_oid, ancestor_oid) from " + deltaTempTableName + ")";
} else if (repoConfiguration.isUsingPostgreSQL()) {
updateInClosureQueryText = "update " + closureTableName + " " +
"set val = val + (select val from " + deltaTempTableName + " td " +
"where td.descendant_oid=" + closureTableName + ".descendant_oid and td.ancestor_oid=" + closureTableName + ".ancestor_oid) " +
"where (descendant_oid, ancestor_oid) in (select descendant_oid, ancestor_oid from " + deltaTempTableName + ")";
} else {
throw new UnsupportedOperationException("implement other databases");
}
Query updateInClosureQuery = session.createSQLQuery(updateInClosureQueryText);
int countUpdate = updateInClosureQuery.executeUpdate();
if (LOGGER.isTraceEnabled())
LOGGER.trace("Updated {} records to closure table ({} ms)", countUpdate, System.currentTimeMillis() - startUpdate);

if (DUMP_TABLES) dumpOrgClosureTypeTable(session, closureTableName);

long startAdd = System.currentTimeMillis();
String addQuery =
"insert into " + closureTableName + " (descendant_oid, ancestor_oid, val) " +
"select descendant_oid, ancestor_oid, val from " + deltaTempTableName + " delta ";
if (countUpdate > 0) {
if (repoConfiguration.isUsingH2()) {
addQuery += " where (descendant_oid, ancestor_oid) not in (select (descendant_oid, ancestor_oid) from " + closureTableName + ")";
} else if (repoConfiguration.isUsingPostgreSQL()) {
addQuery += " where not exists (select 1 from " + closureTableName + " cl where cl.descendant_oid=delta.descendant_oid and cl.ancestor_oid=delta.ancestor_oid)";
} else {
throw new UnsupportedOperationException("implement other databases");
try {
int count;

if (isMySQL()) {

long startUpsert = System.currentTimeMillis();
String upsertQueryText = "insert into " + closureTableName + " (descendant_oid, ancestor_oid, val) " +
"select descendant_oid, ancestor_oid, val from " + deltaTempTableName + " delta " +
"on duplicate key update "+closureTableName+".val = "+closureTableName+".val + values(val)";
Query upsertQuery = session.createSQLQuery(upsertQueryText);
int countUpsert = upsertQuery.executeUpdate();
if (LOGGER.isTraceEnabled())
LOGGER.trace("Added/updated {} records to closure table ({} ms)", countUpsert, System.currentTimeMillis() - startUpsert);
if (DUMP_TABLES) dumpOrgClosureTypeTable(session, closureTableName);

} else { // separate update and insert

long startUpdate = System.currentTimeMillis();
String updateInClosureQueryText;
if (isH2()) {
updateInClosureQueryText = "update " + closureTableName + " " +
"set val = val + (select val from " + deltaTempTableName + " td " +
"where td.descendant_oid=" + closureTableName + ".descendant_oid and td.ancestor_oid=" + closureTableName + ".ancestor_oid) " +
"where (descendant_oid, ancestor_oid) in (select (descendant_oid, ancestor_oid) from " + deltaTempTableName + ")";
} else if (isPostgreSQL()) {
updateInClosureQueryText = "update " + closureTableName + " " +
"set val = val + (select val from " + deltaTempTableName + " td " +
"where td.descendant_oid=" + closureTableName + ".descendant_oid and td.ancestor_oid=" + closureTableName + ".ancestor_oid) " +
"where (descendant_oid, ancestor_oid) in (select descendant_oid, ancestor_oid from " + deltaTempTableName + ")";
} else {
throw new UnsupportedOperationException("implement other databases");
}
Query updateInClosureQuery = session.createSQLQuery(updateInClosureQueryText);
int countUpdate = updateInClosureQuery.executeUpdate();
if (LOGGER.isTraceEnabled())
LOGGER.trace("Updated {} records to closure table ({} ms)", countUpdate, System.currentTimeMillis() - startUpdate);

if (DUMP_TABLES) dumpOrgClosureTypeTable(session, closureTableName);

long startAdd = System.currentTimeMillis();
String addQuery =
"insert into " + closureTableName + " (descendant_oid, ancestor_oid, val) " +
"select descendant_oid, ancestor_oid, val from " + deltaTempTableName + " delta ";
if (countUpdate > 0) {
if (isH2()) {
addQuery += " where (descendant_oid, ancestor_oid) not in (select (descendant_oid, ancestor_oid) from " + closureTableName + ")";
} else if (isPostgreSQL()) {
addQuery += " where not exists (select 1 from " + closureTableName + " cl where cl.descendant_oid=delta.descendant_oid and cl.ancestor_oid=delta.ancestor_oid)";
} else {
throw new UnsupportedOperationException("implement other databases");
}
}
Query addToClosureQuery = session.createSQLQuery(addQuery);
count = addToClosureQuery.executeUpdate();
if (LOGGER.isTraceEnabled())
LOGGER.trace("Added {} records to closure table ({} ms)", count, System.currentTimeMillis() - startAdd);

if (DUMP_TABLES) dumpOrgClosureTypeTable(session, closureTableName);
}
} finally {
dropDeltaTableIfNecessary(session, deltaTempTableName);
}
Query addToClosureQuery = session.createSQLQuery(addQuery);
count = addToClosureQuery.executeUpdate();
if (LOGGER.isTraceEnabled())
LOGGER.trace("Added {} records to closure table ({} ms)", count, System.currentTimeMillis() - startAdd);

if (DUMP_TABLES) dumpOrgClosureTypeTable(session, closureTableName);
}

session.flush();
Expand All @@ -268,6 +287,15 @@ private void addIndependentEdges(List<Edge> edges, Session session) {
LOGGER.trace("--------------------- DONE ADD EDGES: {} ({} ms) ----------------", edges, System.currentTimeMillis()-start);
}

private void dropDeltaTableIfNecessary(Session session, String deltaTempTableName) {
// postgresql deletes the table automatically on commit
// TODO what in case of H2?
if (isMySQL()) {
Query dropQuery = session.createSQLQuery("drop temporary table " + deltaTempTableName);
dropQuery.executeUpdate();
}
}

//endregion

//region Handling DELETE operation
Expand Down Expand Up @@ -318,47 +346,76 @@ private void removeIndependentEdges(List<Edge> edges, Session session) {

if (!edges.isEmpty()) {
String deltaTempTableName = computeDeltaTable(edges, session);
int count;

String deleteFromClosureQueryText, updateInClosureQueryText;
if (repoConfiguration.isUsingH2()) {
deleteFromClosureQueryText = "delete from " + closureTableName + " " +
"where (descendant_oid, ancestor_oid, val) in " +
"(select (descendant_oid, ancestor_oid, val) from " + deltaTempTableName + ")";
updateInClosureQueryText = "update " + closureTableName + " " +
"set val = val - (select val from " + deltaTempTableName + " td " +
"where td.descendant_oid=" + closureTableName + ".descendant_oid and td.ancestor_oid=" + closureTableName + ".ancestor_oid) " +
"where (descendant_oid, ancestor_oid) in (select (descendant_oid, ancestor_oid) from " + deltaTempTableName + ")";
} else if (repoConfiguration.isUsingPostgreSQL()) {
deleteFromClosureQueryText = "delete from " + closureTableName + " " +
"where (descendant_oid, ancestor_oid, val) in " +
"(select descendant_oid, ancestor_oid, val from " + deltaTempTableName + ")";
updateInClosureQueryText = "update " + closureTableName + " " +
"set val = val - (select val from " + deltaTempTableName + " td " +
"where td.descendant_oid=" + closureTableName + ".descendant_oid and td.ancestor_oid=" + closureTableName + ".ancestor_oid) " +
"where (descendant_oid, ancestor_oid) in (select descendant_oid, ancestor_oid from " + deltaTempTableName + ")";
} else {
throw new UnsupportedOperationException("implement other databases");
try {
int count;

String deleteFromClosureQueryText, updateInClosureQueryText;
if (isH2()) {
deleteFromClosureQueryText = "delete from " + closureTableName + " " +
"where (descendant_oid, ancestor_oid, val) in " +
"(select (descendant_oid, ancestor_oid, val) from " + deltaTempTableName + ")";
updateInClosureQueryText = "update " + closureTableName + " " +
"set val = val - (select val from " + deltaTempTableName + " td " +
"where td.descendant_oid=" + closureTableName + ".descendant_oid and td.ancestor_oid=" + closureTableName + ".ancestor_oid) " +
"where (descendant_oid, ancestor_oid) in (select (descendant_oid, ancestor_oid) from " + deltaTempTableName + ")";
} else if (isPostgreSQL()) {
deleteFromClosureQueryText = "delete from " + closureTableName + " " +
"where (descendant_oid, ancestor_oid, val) in " +
"(select descendant_oid, ancestor_oid, val from " + deltaTempTableName + ")";
updateInClosureQueryText = "update " + closureTableName + " " +
"set val = val - (select val from " + deltaTempTableName + " td " +
"where td.descendant_oid=" + closureTableName + ".descendant_oid and td.ancestor_oid=" + closureTableName + ".ancestor_oid) " +
"where (descendant_oid, ancestor_oid) in (select descendant_oid, ancestor_oid from " + deltaTempTableName + ")";
} else if (isMySQL()) {
// http://stackoverflow.com/questions/652770/delete-with-join-in-mysql
// TODO consider this for postgresql/h2 as well
deleteFromClosureQueryText = "delete " + closureTableName + " from " + closureTableName + " " +
"inner join " + deltaTempTableName + " td on " +
"td.descendant_oid = "+closureTableName+".descendant_oid and td.ancestor_oid = "+closureTableName+".ancestor_oid and "+
"td.val = "+closureTableName+".val";
// it is not possible to use temporary table twice in a query
// TODO consider using this in postgresql and h2 as well...
updateInClosureQueryText = "update " + closureTableName +
" join " + deltaTempTableName + " td " +
"on td.descendant_oid=" + closureTableName + ".descendant_oid and td.ancestor_oid=" + closureTableName + ".ancestor_oid " +
"set "+closureTableName+".val = "+closureTableName+".val - td.val";
} else {
throw new UnsupportedOperationException("implement other databases");
}
long startDelete = System.currentTimeMillis();
Query deleteFromClosureQuery = session.createSQLQuery(deleteFromClosureQueryText);
count = deleteFromClosureQuery.executeUpdate();
if (LOGGER.isTraceEnabled())
LOGGER.trace("Deleted {} records from closure table in {} ms", count, System.currentTimeMillis() - startDelete);
if (DUMP_TABLES) dumpOrgClosureTypeTable(session, closureTableName);

long startUpdate = System.currentTimeMillis();
Query updateInClosureQuery = session.createSQLQuery(updateInClosureQueryText);
count = updateInClosureQuery.executeUpdate();
if (LOGGER.isTraceEnabled())
LOGGER.trace("Updated {} records in closure table in {} ms", count, System.currentTimeMillis() - startUpdate);
if (DUMP_TABLES) dumpOrgClosureTypeTable(session, closureTableName);
} finally {
dropDeltaTableIfNecessary(session, deltaTempTableName);
}
long startDelete = System.currentTimeMillis();
Query deleteFromClosureQuery = session.createSQLQuery(deleteFromClosureQueryText);
count = deleteFromClosureQuery.executeUpdate();
if (LOGGER.isTraceEnabled())
LOGGER.trace("Deleted {} records from closure table in {} ms", count, System.currentTimeMillis() - startDelete);
if (DUMP_TABLES) dumpOrgClosureTypeTable(session, closureTableName);

long startUpdate = System.currentTimeMillis();
Query updateInClosureQuery = session.createSQLQuery(updateInClosureQueryText);
count = updateInClosureQuery.executeUpdate();
if (LOGGER.isTraceEnabled())
LOGGER.trace("Updated {} records in closure table in {} ms", count, System.currentTimeMillis() - startUpdate);
if (DUMP_TABLES) dumpOrgClosureTypeTable(session, closureTableName);
}
session.flush();
session.clear();

LOGGER.trace("--------------------- DONE REMOVE EDGES: {} ({} ms) ----------------", edges, System.currentTimeMillis()-start);
}

private boolean isMySQL() {
return repoConfiguration.isUsingMySQL();
}

private boolean isH2() {
return repoConfiguration.isUsingH2();
}

private boolean isPostgreSQL() {
return repoConfiguration.isUsingPostgreSQL();
}
//endregion

//region Handling MODIFY
Expand Down Expand Up @@ -429,10 +486,12 @@ private String computeDeltaTable(List<Edge> edges, Session session) {

long start = System.currentTimeMillis();
String createTablePrefix;
if (repoConfiguration.isUsingH2()) {
if (isH2()) {
createTablePrefix = "create cached local temporary " + deltaTempTableName + " on commit drop";
} else if (repoConfiguration.isUsingPostgreSQL()) {
} else if (isPostgreSQL()) {
createTablePrefix = "create temporary table " + deltaTempTableName;
} else if (isMySQL()) {
createTablePrefix = "create temporary table " + deltaTempTableName + " engine=memory";
} else {
throw new UnsupportedOperationException("define other databases");
}
Expand Down Expand Up @@ -461,7 +520,7 @@ private String computeDeltaTable(List<Edge> edges, Session session) {
if (LOGGER.isTraceEnabled()) LOGGER.trace("Added {} records to temporary delta table {} ({} ms).",
new Object[] {count, deltaTempTableName, System.currentTimeMillis()-start});

if (repoConfiguration.isUsingPostgreSQL()) {
if (isPostgreSQL()) {
start = System.currentTimeMillis();
Query qIndex = session.createSQLQuery("CREATE INDEX " + deltaTempTableName + "_idx " +
" ON " + deltaTempTableName +
Expand All @@ -474,6 +533,8 @@ private String computeDeltaTable(List<Edge> edges, Session session) {
if (DUMP_TABLES) dumpOrgClosureTypeTable(session, closureTableName);
if (DUMP_TABLES) dumpOrgClosureTypeTable(session, deltaTempTableName);

// TODO drop delta table in case of exception

return deltaTempTableName;
}

Expand Down

0 comments on commit 1c33bf8

Please sign in to comment.