Skip to content

Commit

Permalink
impelmenting MID-3447
Browse files Browse the repository at this point in the history
  • Loading branch information
katkav committed Nov 22, 2016
1 parent 23ecca1 commit 76ab933
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 52 deletions.
Expand Up @@ -12132,6 +12132,7 @@
</xsd:annotation>
<xsd:sequence>
<xsd:element name="maxAge" type="xsd:duration" minOccurs="0"/>
<xsd:element name="maxRecords" type="xsd:int" minOccurs="0"/>
</xsd:sequence>
</xsd:complexType>

Expand Down
Expand Up @@ -33,11 +33,13 @@
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

import javax.xml.datatype.DatatypeFactory;
import javax.xml.datatype.Duration;

import java.lang.reflect.Method;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
Expand All @@ -63,7 +65,7 @@ private Calendar create_2013_07_12_12_00_Calendar() {

return calendar;
}

private Duration createDuration(Calendar when, long now) throws Exception {
long seconds = (now - when.getTimeInMillis()) / 1000;
return DatatypeFactory.newInstance().newDuration("PT" + seconds + "S").negate();
Expand All @@ -77,37 +79,43 @@ private CleanupPolicyType createPolicy(Calendar when, long now) throws Exception

return policy;
}

private CleanupPolicyType createPolicy(int maxRecords) throws Exception {
CleanupPolicyType policy = new CleanupPolicyType();

policy.setMaxRecords(Integer.valueOf(maxRecords));

return policy;
}

@AfterMethod
public void cleaup() {
Session session = getFactory().openSession();
try {
session.beginTransaction();
Query query = session.createQuery("delete from RObjectDeltaOperation");
query.executeUpdate();

query = session.createQuery("delete from RAuditEventRecord");
query.executeUpdate();

query = session.createQuery("select count(*) from " + RAuditEventRecord.class.getSimpleName());
Long count = (Long) query.uniqueResult();

AssertJUnit.assertEquals(0L, (long) count);
session.getTransaction().commit();
} finally {
session.close();
}
}

@Test
public void testAuditCleanup() throws Exception {
public void testAuditCleanupMaxAge() throws Exception {
//GIVEN
Calendar calendar = create_2013_07_12_12_00_Calendar();
for (int i = 0; i < 3; i++) {
long timestamp = calendar.getTimeInMillis();
AuditEventRecord record = new AuditEventRecord();
record.addDelta(createObjectDeltaOperation(i));
record.setTimestamp(timestamp);
LOGGER.info("Adding audit record with timestamp {}", new Object[]{new Date(timestamp)});

auditService.audit(record, new SimpleTaskAdapter());
calendar.add(Calendar.HOUR_OF_DAY, 1);
}

Session session = getFactory().openSession();
try {
session.beginTransaction();

Query query = session.createQuery("select count(*) from " + RAuditEventRecord.class.getSimpleName());
Long count = (Long) query.uniqueResult();

AssertJUnit.assertEquals(3L, (long) count);
session.getTransaction().commit();
} finally {
session.close();
}
prepareAuditEventRecords();

//WHEN
calendar = create_2013_07_12_12_00_Calendar();
Calendar calendar = create_2013_07_12_12_00_Calendar();
calendar.add(Calendar.HOUR_OF_DAY, 1);
calendar.add(Calendar.MINUTE, 1);

Expand All @@ -119,33 +127,87 @@ public void testAuditCleanup() throws Exception {
result.recomputeStatus();

//THEN
AssertJUnit.assertTrue(result.isSuccess());
RAuditEventRecord record = assertAndReturnAuditEventRecord(result);

session = getFactory().openSession();
try {
session.beginTransaction();
Date finished = new Date(record.getTimestamp().getTime());

Query query = session.createQuery("from " + RAuditEventRecord.class.getSimpleName());
List<RAuditEventRecord> records = query.list();
Date mark = new Date(NOW);
Duration duration = policy.getMaxAge();
duration.addTo(mark);

AssertJUnit.assertTrue("finished: " + finished + ", mark: " + mark, finished.after(mark));
}

@Test
public void testAuditCleanupMaxRecords() throws Exception {
//GIVEN
prepareAuditEventRecords();

AssertJUnit.assertEquals(1, records.size());
RAuditEventRecord record = records.get(0);
//WHEN
Calendar calendar = create_2013_07_12_12_00_Calendar();
calendar.add(Calendar.HOUR_OF_DAY, 1);
calendar.add(Calendar.MINUTE, 1);

Date finished = new Date(record.getTimestamp().getTime());
final long NOW = System.currentTimeMillis();
CleanupPolicyType policy = createPolicy(1);

Date mark = new Date(NOW);
Duration duration = policy.getMaxAge();
duration.addTo(mark);
OperationResult result = new OperationResult("Cleanup audit");
auditService.cleanupAudit(policy, result);
result.recomputeStatus();

AssertJUnit.assertTrue("finished: " + finished + ", mark: " + mark, finished.after(mark));
//THEN
RAuditEventRecord record = assertAndReturnAuditEventRecord(result);

session.getTransaction().commit();
} finally {
session.close();
}
}

private ObjectDeltaOperation createObjectDeltaOperation(int i) throws Exception {
private RAuditEventRecord assertAndReturnAuditEventRecord(OperationResult result) {
AssertJUnit.assertTrue(result.isSuccess());

Session session = getFactory().openSession();
try {
session.beginTransaction();

Query query = session.createQuery("from " + RAuditEventRecord.class.getSimpleName());
List<RAuditEventRecord> records = query.list();

AssertJUnit.assertEquals(1, records.size());
session.getTransaction().commit();
return records.get(0);
} finally {
session.close();
}

}

private void prepareAuditEventRecords() throws Exception {
Calendar calendar = create_2013_07_12_12_00_Calendar();
for (int i = 0; i < 3; i++) {
long timestamp = calendar.getTimeInMillis();
AuditEventRecord record = new AuditEventRecord();
record.addDelta(createObjectDeltaOperation(i));
record.setTimestamp(timestamp);
LOGGER.info("Adding audit record with timestamp {}", new Object[]{new Date(timestamp)});

auditService.audit(record, new SimpleTaskAdapter());
calendar.add(Calendar.HOUR_OF_DAY, 1);
}

Session session = getFactory().openSession();
try {
session.beginTransaction();

Query query = session.createQuery("select count(*) from " + RAuditEventRecord.class.getSimpleName());
Long count = (Long) query.uniqueResult();

AssertJUnit.assertEquals(3L, (long) count);
session.getTransaction().commit();
} finally {
session.close();
}

}

private ObjectDeltaOperation createObjectDeltaOperation(int i) throws Exception {
ObjectDeltaOperation delta = new ObjectDeltaOperation();
delta.setExecutionResult(new OperationResult("asdf"));
UserType user = new UserType();
Expand Down
Expand Up @@ -48,6 +48,8 @@
import org.hibernate.Query;
import org.hibernate.SQLQuery;
import org.hibernate.Session;
import org.hibernate.criterion.Criterion;
import org.hibernate.criterion.Order;
import org.hibernate.dialect.Dialect;
import org.hibernate.dialect.pagination.LimitHandler;
import org.hibernate.engine.spi.RowSelection;
Expand All @@ -56,13 +58,16 @@
import javax.xml.datatype.Duration;
import javax.xml.datatype.XMLGregorianCalendar;

import java.math.BigInteger;
import java.sql.*;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.hibernate.Criteria;
import org.hibernate.FlushMode;
import org.springframework.beans.factory.annotation.Autowired;

Expand Down Expand Up @@ -237,13 +242,19 @@ public void cleanupAudit(CleanupPolicyType policy, OperationResult parentResult)
Validate.notNull(policy, "Cleanup policy must not be null.");
Validate.notNull(parentResult, "Operation result must not be null.");

final String operation = "deleting";
int attempt = 1;

SqlPerformanceMonitor pm = getPerformanceMonitor();
long opHandle = pm.registerOperationStart("cleanupAudit");

if (policy.getMaxAge() == null) {
cleanupAuditMaxRecords(policy, parentResult);
cleanupAuditMaxAge(policy, parentResult);
}

private void cleanupAuditMaxAge(CleanupPolicyType policy, OperationResult parentResult) {

final String operation = "deletingMaxAge";

SqlPerformanceMonitor pm = getPerformanceMonitor();
long opHandle = pm.registerOperationStart("cleanupAuditMaxAge");
int attempt = 1;

if (policy.getMaxAge() == null) {
return;
}

Expand Down Expand Up @@ -289,6 +300,56 @@ public void cleanupAudit(CleanupPolicyType policy, OperationResult parentResult)
totalCountHolder.getValue(), (System.currentTimeMillis() - start)/1000L);
}
}

private void cleanupAuditMaxRecords(CleanupPolicyType policy, OperationResult parentResult) {

final String operation = "deletingMaxRecords";

SqlPerformanceMonitor pm = getPerformanceMonitor();
long opHandle = pm.registerOperationStart("cleanupAuditMaxRecords");
int attempt = 1;

if (policy.getMaxRecords() == null) {
return;
}

Integer records = policy.getMaxRecords();

// factored out because it produces INFO-level message
Dialect dialect = Dialect.getDialect(baseHelper.getSessionFactoryBean().getHibernateProperties());
if (!dialect.supportsTemporaryTables()) {
LOGGER.error("Dialect {} doesn't support temporary tables, couldn't cleanup audit logs.", dialect);
throw new SystemException("Dialect " + dialect + " doesn't support temporary tables, couldn't cleanup audit logs.");
}

long start = System.currentTimeMillis();
boolean first = true;
Holder<Integer> totalCountHolder = new Holder<>(0);
try {
while (true) {
try {
LOGGER.info("{} audit cleanup, deleting up to {} (duration '{}'), batch size {}{}.",
first ? "Starting" : "Restarting",
records, CLEANUP_AUDIT_BATCH_SIZE,
first ? "" : ", up to now deleted " + totalCountHolder.getValue() + " entries");
first = false;
int count;
do {
// the following method may restart due to concurrency (or any other) problem - in any iteration
count = cleanupAuditAttempt(records, totalCountHolder, dialect, parentResult);
} while (count > 0);
return;
} catch (RuntimeException ex) {
attempt = baseHelper.logOperationAttempt(null, operation, attempt, ex, parentResult);
pm.registerOperationNewTrial(opHandle, attempt);
}
}
} finally {
pm.registerOperationFinish(opHandle, attempt);
LOGGER.info("Audit cleanup finished; deleted {} entries in {} seconds.",
totalCountHolder.getValue(), (System.currentTimeMillis() - start)/1000L);
}
}

private int cleanupAuditAttempt(Date minValue, Duration duration, Holder<Integer> totalCountHolder, Dialect dialect, OperationResult subResult) {

Expand All @@ -315,6 +376,32 @@ private int cleanupAuditAttempt(Date minValue, Duration duration, Holder<Integer
baseHelper.cleanupSessionAndResult(session, subResult);
}
}

private int cleanupAuditAttempt(Integer recordsToKeep, Holder<Integer> totalCountHolder, Dialect dialect, OperationResult subResult) {

long start = System.currentTimeMillis();
LOGGER.debug("Starting audit cleanup batch, deleting up to {} (duration '{}'), batch size {}, up to now deleted {} entries.",
recordsToKeep, CLEANUP_AUDIT_BATCH_SIZE, totalCountHolder.getValue());

Session session = null;
try {
session = baseHelper.beginTransaction();

int count = cleanupAuditAttempt(recordsToKeep, session, dialect);

session.getTransaction().commit();
int totalCount = totalCountHolder.getValue() + count;
totalCountHolder.setValue(totalCount);
LOGGER.debug("Audit cleanup batch finishing successfully in {} milliseconds; total count = {}", System.currentTimeMillis()-start, totalCount);
return count;
} catch (RuntimeException ex) {
LOGGER.debug("Audit cleanup batch finishing with exception in {} milliseconds; exception = {}", System.currentTimeMillis()-start, ex.getMessage());
baseHelper.handleGeneralRuntimeException(ex, session, subResult);
throw new AssertionError("We shouldn't get here."); // just because of the need to return a value
} finally {
baseHelper.cleanupSessionAndResult(session, subResult);
}
}

protected int cleanupAuditAttempt(Date minValue, Session session, Dialect dialect) {
//create temporary table
Expand Down Expand Up @@ -365,6 +452,49 @@ protected int cleanupAuditAttempt(Date minValue, Session session, Dialect dialec

return insertCount;
}

protected int cleanupAuditAttempt(Integer recordsToKeep, Session session, Dialect dialect) {
//create temporary table
final String tempTable = dialect.generateTemporaryTableName(RAuditEventRecord.TABLE_NAME);
createTemporaryTable(session, dialect, tempTable);
StringBuilder selectSB = new StringBuilder();
selectSB.append("select a.id as id from ").append(RAuditEventRecord.TABLE_NAME).append(" a");
selectSB.append(" order by a.").append(RAuditEventRecord.COLUMN_TIMESTAMP).append(" desc");
String selectString = selectSB.toString();

// batch size
RowSelection rowSelection = new RowSelection();
rowSelection.setFirstRow(recordsToKeep);
rowSelection.setMaxRows(CLEANUP_AUDIT_BATCH_SIZE);
LimitHandler limitHandler = dialect.buildLimitHandler(selectString, rowSelection);
selectString = limitHandler.getProcessedSql();

String queryString = "insert into " + tempTable + " " + selectString;
LOGGER.trace("Query string = {}", queryString);
SQLQuery query = session.createSQLQuery(queryString);
query.setParameter(0, CLEANUP_AUDIT_BATCH_SIZE);
query.setParameter(1, recordsToKeep);

int insertCount = query.executeUpdate();
LOGGER.trace("Inserted {} audit record ids ready for deleting.", new Object[]{insertCount});

//drop records from m_audit_event, m_audit_delta
session.createSQLQuery(createDeleteQuery(RObjectDeltaOperation.TABLE_NAME, tempTable,
RObjectDeltaOperation.COLUMN_RECORD_ID)).executeUpdate();
session.createSQLQuery(createDeleteQuery(RAuditEventRecord.TABLE_NAME, tempTable, "id")).executeUpdate();

//drop temporary table
if (dialect.dropTemporaryTableAfterUse()) {
LOGGER.debug("Dropping temporary table.");
StringBuilder sb = new StringBuilder();
sb.append(dialect.getDropTemporaryTableString());
sb.append(' ').append(tempTable);

session.createSQLQuery(sb.toString()).executeUpdate();
}

return insertCount;
}

/**
* This method creates temporary table for cleanup audit method.
Expand Down

0 comments on commit 76ab933

Please sign in to comment.