Permalink
Browse files

Ensured all connections are properly closed and code style fixes

  • Loading branch information...
abuijze committed Apr 12, 2014
1 parent d27e1e7 commit 9d5416ef74891aa0a2d38ce1c6c6d8bac03dc023
Showing with 65 additions and 45 deletions.
  1. +65 −45 core/src/main/java/org/axonframework/eventstore/jdbc/DefaultEventEntryStore.java
@@ -37,7 +37,6 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-
import javax.sql.DataSource;
import static org.axonframework.common.jdbc.JdbcUtils.closeQuietly;
@@ -56,11 +55,11 @@
private final ConnectionProvider connectionProvider;
- private final EventSqlSchema sqldef;
+ private final EventSqlSchema sqlSchema;
/**
* Initialize the EventEntryStore, fetching connections from the given <code>dataSource</code> and executing SQL
- * statements using given <code>sqldef</code>.
+ * statements using given <code>sqlSchema</code>.
*
* @param dataSource The data source used to create connections
* @param sqlSchema The SQL Definitions
@@ -73,14 +72,14 @@ public DefaultEventEntryStore(DataSource dataSource, EventSqlSchema sqlSchema) {
* Initialize the EventEntryStore, fetching connections from the given <code>connectionProvider</code> and
* executing
* SQL
- * statements using given <code>sqldef</code>.
+ * statements using given <code>sqlSchema</code>.
*
* @param connectionProvider Used to obtain connections
* @param sqlSchema The SQL Definitions
*/
public DefaultEventEntryStore(ConnectionProvider connectionProvider, EventSqlSchema sqlSchema) {
this.connectionProvider = connectionProvider;
- this.sqldef = sqlSchema;
+ this.sqlSchema = sqlSchema;
}
/**
@@ -96,36 +95,37 @@ public DefaultEventEntryStore(ConnectionProvider connectionProvider) {
@Override
public SerializedDomainEventData loadLastSnapshotEvent(String aggregateType, Object identifier) {
ResultSet result = null;
+ Connection connection = null;
try {
- result = sqldef.sql_loadLastSnapshot(connectionProvider.getConnection(), identifier, aggregateType)
+ connection = connectionProvider.getConnection();
+ result = sqlSchema.sql_loadLastSnapshot(connection, identifier, aggregateType)
.executeQuery();
if (result.next()) {
- return sqldef.createSerializedDomainEventData(result);
+ return sqlSchema.createSerializedDomainEventData(result);
}
return null;
} catch (SQLException e) {
throw new EventStoreException("Exception while attempting to load last snapshot event of "
- + aggregateType + "/" + identifier, e);
+ + aggregateType + "/" + identifier, e);
} finally {
closeQuietly(result);
+ closeQuietly(connection);
}
}
@Override
public Iterator<SerializedDomainEventData> fetchFiltered(String whereClause, List<Object> parameters,
int batchSize) {
- Connection connection = null;
- PreparedStatement statement = null;
try {
- connection = connectionProvider.getConnection();
+ Connection connection = connectionProvider.getConnection();
return new ConnectionResourceManagingIterator(
- new FilteredBatchingIterator(whereClause, parameters, batchSize, sqldef, connection),
+ new FilteredBatchingIterator(whereClause, parameters, batchSize, sqlSchema, connection),
connection);
} catch (SQLException e) {
- closeQuietly(connection);
- closeQuietly(statement);
throw new EventStoreException("Exception while attempting to read from the Event Store database", e);
}
+ // we don't want to close the connection here. The ConnectionResourceManagingIterator will close the connection
+ // when it finishes iterating the results.
}
@@ -134,8 +134,10 @@ public void persistSnapshot(String aggregateType, DomainEventMessage snapshotEve
SerializedObject serializedPayload, SerializedObject serializedMetaData) {
byte[] data = (byte[]) serializedMetaData.getData();
PreparedStatement preparedStatement = null;
+ Connection connection = null;
try {
- preparedStatement = sqldef.sql_insertSnapshotEventEntry(connectionProvider.getConnection(),
+ connection = connectionProvider.getConnection();
+ preparedStatement = sqlSchema.sql_insertSnapshotEventEntry(connection,
snapshotEvent.getIdentifier(),
snapshotEvent.getAggregateIdentifier()
.toString(),
@@ -152,6 +154,7 @@ public void persistSnapshot(String aggregateType, DomainEventMessage snapshotEve
throw new EventStoreException("Exception while attempting to persist a snapshot", e);
} finally {
closeQuietly(preparedStatement);
+ closeQuietly(connection);
}
}
@@ -160,9 +163,11 @@ public void persistEvent(String aggregateType, DomainEventMessage event, Seriali
SerializedObject serializedMetaData) {
PreparedStatement preparedStatement = null;
+ Connection connection = null;
try {
byte[] data = (byte[]) serializedMetaData.getData();
- preparedStatement = sqldef.sql_insertDomainEventEntry(connectionProvider.getConnection(),
+ connection = connectionProvider.getConnection();
+ preparedStatement = sqlSchema.sql_insertDomainEventEntry(connection,
event.getIdentifier(),
event.getAggregateIdentifier().toString(),
event.getSequenceNumber(),
@@ -178,6 +183,7 @@ public void persistEvent(String aggregateType, DomainEventMessage event, Seriali
throw new EventStoreException("Exception occurred while attempting to persist an event", e);
} finally {
closeQuietly(preparedStatement);
+ closeQuietly(connection);
}
}
@@ -188,13 +194,17 @@ public void pruneSnapshots(String type, DomainEventMessage mostRecentSnapshotEve
maxSnapshotsArchived);
if (redundantSnapshots.hasNext()) {
long sequenceOfFirstSnapshotToPrune = redundantSnapshots.next();
+ Connection connection = null;
try {
- executeUpdate(sqldef.sql_pruneSnapshots(connectionProvider.getConnection(),
+ connection = connectionProvider.getConnection();
+ executeUpdate(sqlSchema.sql_pruneSnapshots(connection,
type,
mostRecentSnapshotEvent.getAggregateIdentifier(),
sequenceOfFirstSnapshotToPrune), "prune snapshots");
} catch (SQLException e) {
throw new EventStoreException("An exception occurred while attempting to prune snapshots", e);
+ } finally {
+ closeQuietly(connection);
}
}
}
@@ -214,7 +224,7 @@ public void pruneSnapshots(String type, DomainEventMessage mostRecentSnapshotEve
Connection connection = null;
try {
connection = connectionProvider.getConnection();
- statement = sqldef.sql_findSnapshotSequenceNumbers(connection,
+ statement = sqlSchema.sql_findSnapshotSequenceNumbers(connection,
type,
snapshotEvent.getAggregateIdentifier());
resultSet = statement.executeQuery();
@@ -244,11 +254,11 @@ public void pruneSnapshots(String type, DomainEventMessage mostRecentSnapshotEve
PreparedStatement statement = null;
try {
connection = connectionProvider.getConnection();
- statement = sqldef.sql_fetchFromSequenceNumber(connection, aggregateType, identifier,
+ statement = sqlSchema.sql_fetchFromSequenceNumber(connection, aggregateType, identifier,
firstSequenceNumber);
statement.setFetchSize(fetchSize);
return new ConnectionResourceManagingIterator(
- new PreparedStatementIterator(statement, sqldef),
+ new PreparedStatementIterator(statement, sqlSchema),
connection);
} catch (SQLException e) {
closeQuietly(connection);
@@ -258,6 +268,7 @@ public void pruneSnapshots(String type, DomainEventMessage mostRecentSnapshotEve
}
private static class ConnectionResourceManagingIterator implements Iterator<SerializedDomainEventData>, Closeable {
+
private final Iterator<SerializedDomainEventData> inner;
private final Connection connection;
@@ -292,26 +303,27 @@ public void close() throws IOException {
* This class will NOT close the connection pass into its constructor.
*/
private static class FilteredBatchingIterator implements Iterator<SerializedDomainEventData>, Closeable {
+
private final Connection connection;
private PreparedStatementIterator currentBatch;
private SerializedDomainEventData next;
private SerializedDomainEventData lastItem;
private final String whereClause;
private final List<Object> parameters;
private final int batchSize;
- private final EventSqlSchema sqldef;
+ private final EventSqlSchema sqlSchema;
public FilteredBatchingIterator(
String whereClause,
List<Object> parameters,
int batchSize,
- EventSqlSchema sqldef,
+ EventSqlSchema sqlSchema,
Connection connection) {
this.whereClause = whereClause;
this.parameters = parameters;
this.batchSize = batchSize;
this.connection = connection;
- this.sqldef = sqldef;
+ this.sqlSchema = sqlSchema;
this.currentBatch = fetchBatch();
if (currentBatch.hasNext()) {
@@ -323,12 +335,12 @@ private PreparedStatementIterator fetchBatch() {
LinkedList<Object> params = new LinkedList<Object>(parameters);
String batchWhereClause = buildWhereClause(params);
try {
- final PreparedStatement sql = sqldef.sql_getFetchAll(
+ final PreparedStatement sql = sqlSchema.sql_getFetchAll(
connection,
batchWhereClause,
params.toArray());
sql.setMaxRows(batchSize);
- return new PreparedStatementIterator(sql, sqldef);
+ return new PreparedStatementIterator(sql, sqlSchema);
} catch (SQLException e) {
throw new EventStoreException("Exception occurred while attempting to execute prepared statement", e);
}
@@ -341,13 +353,13 @@ private String buildWhereClause(LinkedList<Object> params) {
StringBuilder sb = new StringBuilder();
if (lastItem != null) {
sb.append("(")
- .append("(e.timeStamp > ?)")
- .append(" OR ")
- .append("(e.timeStamp = ? AND e.sequenceNumber > ?)")
- .append(" OR ")
- .append("(e.timeStamp = ? AND e.sequenceNumber = ? AND e.aggregateIdentifier > ?)")
- .append(")");
- Object dateTimeSql = sqldef.sql_dateTime(lastItem.getTimestamp());
+ .append("(e.timeStamp > ?)")
+ .append(" OR ")
+ .append("(e.timeStamp = ? AND e.sequenceNumber > ?)")
+ .append(" OR ")
+ .append("(e.timeStamp = ? AND e.sequenceNumber = ? AND e.aggregateIdentifier > ?)")
+ .append(")");
+ Object dateTimeSql = sqlSchema.sql_dateTime(lastItem.getTimestamp());
params.add(0, dateTimeSql);
params.add(1, dateTimeSql);
@@ -401,16 +413,15 @@ public void close() throws IOException {
}
private static class PreparedStatementIterator implements Iterator<SerializedDomainEventData>, Closeable {
+
private final PreparedStatement statement;
- private final EventSqlSchema sqldef;
private final ResultSetIterator rsIterator;
- public PreparedStatementIterator(PreparedStatement statement, EventSqlSchema sqldef) {
+ public PreparedStatementIterator(PreparedStatement statement, EventSqlSchema sqlSchema) {
this.statement = statement;
- this.sqldef = sqldef;
try {
ResultSet resultSet = statement.executeQuery();
- rsIterator = new ResultSetIterator(resultSet, sqldef);
+ rsIterator = new ResultSetIterator(resultSet, sqlSchema);
} catch (SQLException e) {
throw new EventStoreException("Exception occurred while attempting to execute query on statement", e);
}
@@ -443,15 +454,16 @@ public void close() throws IOException {
}
private static class ResultSetIterator implements Iterator<SerializedDomainEventData>, Closeable {
+
private final ResultSet rs;
- private final EventSqlSchema sqldef;
+ private final EventSqlSchema sqlSchema;
boolean hasCalledNext = false;
boolean hasNext;
private int counter = 0;
- public ResultSetIterator(ResultSet resultSet, EventSqlSchema sqldef) {
+ public ResultSetIterator(ResultSet resultSet, EventSqlSchema sqlSchema) {
this.rs = resultSet;
- this.sqldef = sqldef;
+ this.sqlSchema = sqlSchema;
}
@Override
@@ -478,8 +490,7 @@ public SerializedDomainEventData next() {
if (hasNext) {
counter++;
}
- SerializedDomainEventData eventData = sqldef.createSerializedDomainEventData(rs);
- return eventData;
+ return sqlSchema.createSerializedDomainEventData(rs);
} catch (SQLException e) {
throw new EventStoreException("Exception occurred while attempting to read next event from ResultSet",
e);
@@ -513,10 +524,19 @@ private int executeUpdate(PreparedStatement preparedStatement, String descriptio
}
}
+ /**
+ * Performs the DDL queries to create the schema necessary for this EventEntryStore implementation.
+ *
+ * @throws SQLException when an error occurs executing SQL statements
+ */
public void createSchema() throws SQLException {
- executeUpdate(sqldef.sql_createDomainEventEntryTable(connectionProvider.getConnection()),
- "create domain event entry table");
- executeUpdate(sqldef.sql_createSnapshotEventEntryTable(connectionProvider.getConnection()),
- "create snapshot entry table");
+ Connection connection = null;
+ try {
+ connection = connectionProvider.getConnection();
+ executeUpdate(sqlSchema.sql_createDomainEventEntryTable(connection), "create domain event entry table");
+ executeUpdate(sqlSchema.sql_createSnapshotEventEntryTable(connection), "create snapshot entry table");
+ } finally {
+ closeQuietly(connection);
+ }
}
}

0 comments on commit 9d5416e

Please sign in to comment.