Skip to content

Commit

Permalink
Microsoft SQL Server implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
mederly committed Oct 6, 2014
1 parent bd5f645 commit 6fbf1ce
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 46 deletions.
Expand Up @@ -42,8 +42,11 @@
import com.evolveum.midpoint.xml.ns._public.common.common_3.UserType;
import com.evolveum.prism.xml.ns._public.types_3.PolyStringType;
import org.apache.commons.lang.StringUtils;
import org.hibernate.Hibernate;
import org.hibernate.Query;
import org.hibernate.Session;
import org.hibernate.type.LongType;
import org.hibernate.type.StringType;
import org.jgrapht.alg.TransitiveClosure;
import org.jgrapht.graph.DefaultEdge;
import org.jgrapht.graph.SimpleDirectedGraph;
Expand Down Expand Up @@ -170,7 +173,10 @@ protected void checkClosureMatrix(Session session) {
}
LOGGER.info("TC matrix computed in {} ms", System.currentTimeMillis() - start);

Query q = session.createSQLQuery("select descendant_oid, ancestor_oid, val from m_org_closure");
Query q = session.createSQLQuery("select descendant_oid, ancestor_oid, val from m_org_closure")
.addScalar("descendant_oid", StringType.INSTANCE)
.addScalar("ancestor_oid", StringType.INSTANCE)
.addScalar("val", LongType.INSTANCE);
List<Object[]> list = q.list();
LOGGER.info("OrgClosure has {} rows", list.size());

Expand Down Expand Up @@ -555,7 +561,7 @@ protected void removeUsersFromOrg(String nodeOid, OperationResult result) throws
protected void randomRemoveOrgStructure(OperationResult result) throws Exception {
openSessionIfNeeded();

int CHECK_EACH = 15;
int CHECK_EACH = 1;

int count = 0;
long totalTime = 0;
Expand Down
Expand Up @@ -91,7 +91,7 @@ public class OrgClosureMultiParentTest extends AbstractOrgClosureTest {
private static final String TEST_19x_CHILD_OID = "o00.....-....-....-....-............";
private static final String TEST_19x_PARENT_OID = "o0......-....-....-....-............";

private static boolean CHECK_CHILDREN_SETS = false;
private static boolean CHECK_CHILDREN_SETS = true;
private static boolean CHECK_CLOSURE_MATRIX = true;
private long closureSize;

Expand Down
Expand Up @@ -220,7 +220,7 @@ private void addIndependentEdges(List<Edge> edges, Session session) {
try {
int count;

if (isMySQL() || isOracle()) {
if (isMySQL() || isOracle() || isSQLServer()) {

long startUpsert = System.currentTimeMillis();
String upsertQueryText;
Expand All @@ -229,7 +229,15 @@ private void addIndependentEdges(List<Edge> edges, Session session) {
upsertQueryText = "insert into " + closureTableName + " (descendant_oid, ancestor_oid, val) " +
"select descendant_oid, ancestor_oid, val from " + deltaTempTableName + " delta " +
"on duplicate key update " + closureTableName + ".val = " + closureTableName + ".val + values(val)";
} else {
} else if (isSQLServer()) {
// TODO try if this one (without prefixes in INSERT clause does not work for Oracle)
upsertQueryText = "merge into " + closureTableName + " closure " +
"using (select descendant_oid, ancestor_oid, val from " + deltaTempTableName + ") delta " +
"on (closure.descendant_oid = delta.descendant_oid and closure.ancestor_oid = delta.ancestor_oid) " +
"when matched then update set closure.val = closure.val + delta.val " +
"when not matched then insert (descendant_oid, ancestor_oid, val) " +
"values (delta.descendant_oid, delta.ancestor_oid, delta.val);";
} else { // Oracle
upsertQueryText = "merge into " + closureTableName + " closure " +
"using (select descendant_oid, ancestor_oid, val from " + deltaTempTableName + ") delta " +
"on (closure.descendant_oid = delta.descendant_oid and closure.ancestor_oid = delta.ancestor_oid) " +
Expand Down Expand Up @@ -304,6 +312,15 @@ private void dropDeltaTableIfNecessary(Session session, String deltaTempTableNam
if (isMySQL()) {
Query dropQuery = session.createSQLQuery("drop temporary table " + deltaTempTableName);
dropQuery.executeUpdate();
} else if (isSQLServer()) {
// TODO drop temporary if using SQL Server
Query dropQuery = session.createSQLQuery(
"if (exists (" +
"select * " +
"from sys.tables " +
"where name like '"+deltaTempTableName+"%'))\n" +
"drop table " + deltaTempTableName + ";");
dropQuery.executeUpdate();
}
}

Expand Down Expand Up @@ -377,18 +394,31 @@ private void removeIndependentEdges(List<Edge> edges, Session session) {
"set val = val - (select val from " + deltaTempTableName + " td " +
"where td.descendant_oid=" + closureTableName + ".descendant_oid and td.ancestor_oid=" + closureTableName + ".ancestor_oid) " +
"where (descendant_oid, ancestor_oid) in (select descendant_oid, ancestor_oid from " + deltaTempTableName + ")";
} else if (isSQLServer()) {
// delete is the same as for MySQL
deleteFromClosureQueryText = "delete " + closureTableName + " from " + closureTableName + " " +
"inner join " + deltaTempTableName + " td on " +
"td.descendant_oid = "+closureTableName+".descendant_oid and td.ancestor_oid = "+closureTableName+".ancestor_oid and "+
"td.val = "+closureTableName+".val";
// update is also done via inner join (as in MySQL), but using slightly different syntax
updateInClosureQueryText = "update " + closureTableName + " " +
"set "+closureTableName+".val = "+closureTableName+".val - td.val " +
"from "+closureTableName + " " +
"inner join " + deltaTempTableName + " td " +
"on td.descendant_oid=" + closureTableName + ".descendant_oid and " +
"td.ancestor_oid=" + closureTableName + ".ancestor_oid";
} else if (isMySQL()) {
// http://stackoverflow.com/questions/652770/delete-with-join-in-mysql
// TODO consider this for postgresql/h2 as well
// TODO consider this for other databases as well
deleteFromClosureQueryText = "delete " + closureTableName + " from " + closureTableName + " " +
"inner join " + deltaTempTableName + " td on " +
"td.descendant_oid = "+closureTableName+".descendant_oid and td.ancestor_oid = "+closureTableName+".ancestor_oid and "+
"td.val = "+closureTableName+".val";
"td.val = "+closureTableName+".val";
// it is not possible to use temporary table twice in a query
// TODO consider using this in postgresql and h2 as well...
updateInClosureQueryText = "update " + closureTableName +
" join " + deltaTempTableName + " td " +
"on td.descendant_oid=" + closureTableName + ".descendant_oid and td.ancestor_oid=" + closureTableName + ".ancestor_oid " +
"on td.descendant_oid=" + closureTableName + ".descendant_oid and td.ancestor_oid=" + closureTableName + ".ancestor_oid " +
"set "+closureTableName+".val = "+closureTableName+".val - td.val";
} else {
throw new UnsupportedOperationException("implement other databases");
Expand Down Expand Up @@ -424,6 +454,9 @@ private boolean isOracle() {
return repoConfiguration.isUsingOracle();
}

private boolean isSQLServer() {
return repoConfiguration.isUsingSQLServer();
}

private boolean isH2() {
return repoConfiguration.isUsingH2();
Expand Down Expand Up @@ -494,10 +527,12 @@ private String computeDeltaTable(List<Edge> edges, Session session) {

String deltaTempTableName;

if (!isOracle()) {
deltaTempTableName = "m_org_closure_delta_" + System.currentTimeMillis() + "_" + ((int) (Math.random() * 10000000.0));
} else {
if (isOracle()) {
deltaTempTableName = "m_org_closure_temp_delta"; // table definition is global
} else {
deltaTempTableName =
(isSQLServer()?"##":"") +
"m_org_closure_delta_" + System.currentTimeMillis() + "_" + ((int) (Math.random() * 10000000.0));
}

if (COUNT_CLOSURE_RECORDS && LOGGER.isTraceEnabled()) {
Expand All @@ -506,44 +541,51 @@ private String computeDeltaTable(List<Edge> edges, Session session) {
LOGGER.trace("OrgClosure has {} rows", list.toString());
}

long start = System.currentTimeMillis();
String createTablePrefix;
if (isH2()) {
createTablePrefix = "create cached local temporary " + deltaTempTableName + " on commit drop as";
} else if (isPostgreSQL()) {
createTablePrefix = "create temporary table " + deltaTempTableName + " as ";
} else if (isMySQL()) {
createTablePrefix = "create temporary table " + deltaTempTableName + " engine=memory as ";
} else if (isOracle()) {
// todo skip if this is first in this transaction
Query q = session.createSQLQuery("delete from " + deltaTempTableName);
int count = q.executeUpdate();
LOGGER.trace("Deleted {} rows from temporary table {}", count, deltaTempTableName);
createTablePrefix = "insert into " + deltaTempTableName;
long start;
int count;

String selectClause = "select t1.descendant_oid as descendant_oid, t2.ancestor_oid as ancestor_oid, " +
"sum(t1.val*t2.val) as val " +
"from " + closureTableName + " t1, " + closureTableName + " t2 " +
"where " + getWhereClause(edges) + " " +
"group by t1.descendant_oid, t2.ancestor_oid";

if (isSQLServer()) {
// we create the table manually, because we want to have an index on it, and
// with serializable transactions it is not possible to create index within the transaction (after inserting data)
start = System.currentTimeMillis();
Query createTableQuery = session.createSQLQuery("create table " + deltaTempTableName + " (" +
"descendant_oid NVARCHAR(36), " +
"ancestor_oid NVARCHAR(36), " +
"val INT, " +
"PRIMARY KEY (descendant_oid, ancestor_oid))");
createTableQuery.executeUpdate();
if (LOGGER.isTraceEnabled()) LOGGER.trace("Empty delta table created in {} ms", System.currentTimeMillis() - start);

Query insertQuery = session.createSQLQuery("insert into " + deltaTempTableName + " " + selectClause);
start = System.currentTimeMillis();
count = insertQuery.executeUpdate();
} else {
throw new UnsupportedOperationException("define other databases");
}
// t1.ancestor_oid = :tail and t2.descendant_oid = :head
StringBuilder whereClause = new StringBuilder();
boolean first = true;
for (Edge edge : edges) {
if (first) {
first = false;
String createTablePrefix;
if (isH2()) {
createTablePrefix = "create cached local temporary " + deltaTempTableName + " on commit drop as ";
} else if (isPostgreSQL()) {
createTablePrefix = "create temporary table " + deltaTempTableName + " as ";
} else if (isMySQL()) {
createTablePrefix = "create temporary table " + deltaTempTableName + " engine=memory as ";
} else if (isOracle()) {
// todo skip if this is first in this transaction
Query q = session.createSQLQuery("delete from " + deltaTempTableName);
int c = q.executeUpdate();
LOGGER.trace("Deleted {} rows from temporary table {}", c, deltaTempTableName);
createTablePrefix = "insert into " + deltaTempTableName + " ";
} else {
whereClause.append(" or ");
throw new UnsupportedOperationException("define other databases");
}
whereClause.append("(t1.ancestor_oid = '").append(edge.getTail()).append("'");
whereClause.append("and t2.descendant_oid = '").append(edge.getHead()).append("')");
Query query1 = session.createSQLQuery(createTablePrefix + selectClause);
start = System.currentTimeMillis();
count = query1.executeUpdate();
}
Query query1 = session.createSQLQuery(
createTablePrefix + " " +
"select t1.descendant_oid as descendant_oid, t2.ancestor_oid as ancestor_oid, " +
"sum(t1.val*t2.val) as val " +
"from "+closureTableName+" t1, "+closureTableName+" t2 " +
"where " + whereClause.toString() + " " +
"group by t1.descendant_oid, t2.ancestor_oid");
int count = query1.executeUpdate();

if (LOGGER.isTraceEnabled()) LOGGER.trace("Added {} records to temporary delta table {} ({} ms).",
new Object[] {count, deltaTempTableName, System.currentTimeMillis()-start});

Expand All @@ -556,7 +598,8 @@ private String computeDeltaTable(List<Edge> edges, Session session) {
qIndex.executeUpdate();
if (LOGGER.isTraceEnabled()) LOGGER.trace("Index created in {} ms", System.currentTimeMillis()-start);
}
// TODO index for MySQL!!!

// TODO index for MySQL !!!

if (DUMP_TABLES) dumpOrgClosureTypeTable(session, closureTableName);
if (DUMP_TABLES) dumpOrgClosureTypeTable(session, deltaTempTableName);
Expand All @@ -566,6 +609,21 @@ private String computeDeltaTable(List<Edge> edges, Session session) {
return deltaTempTableName;
}

private String getWhereClause(List<Edge> edges) {
StringBuilder whereClause = new StringBuilder();
boolean first = true;
for (Edge edge : edges) {
if (first) {
first = false;
} else {
whereClause.append(" or ");
}
whereClause.append("(t1.ancestor_oid = '").append(edge.getTail()).append("'");
whereClause.append("and t2.descendant_oid = '").append(edge.getHead()).append("')");
}
return whereClause.toString();
}

private void dumpOrgClosureTypeTable(Session session, String tableName) {
Query q = session.createSQLQuery("select descendant_oid, ancestor_oid, val from " + tableName);
List<Object[]> list = q.list();
Expand Down

0 comments on commit 6fbf1ce

Please sign in to comment.