diff --git a/src/main/java/com/feedzai/commons/sql/abstraction/engine/impl/PostgreSqlEngine.java b/src/main/java/com/feedzai/commons/sql/abstraction/engine/impl/PostgreSqlEngine.java index e74f4bd7..67e72092 100644 --- a/src/main/java/com/feedzai/commons/sql/abstraction/engine/impl/PostgreSqlEngine.java +++ b/src/main/java/com/feedzai/commons/sql/abstraction/engine/impl/PostgreSqlEngine.java @@ -37,7 +37,17 @@ import com.feedzai.commons.sql.abstraction.engine.impl.postgresql.PostgresSqlQueryExceptionHandler; import com.feedzai.commons.sql.abstraction.util.StringUtils; +import java.util.Collections; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; + +import com.google.common.util.concurrent.Uninterruptibles; import org.postgresql.Driver; import org.postgresql.PGProperty; import org.postgresql.util.PGobject; @@ -113,6 +123,13 @@ public class PostgreSqlEngine extends AbstractDatabaseEngine { */ public static final QueryExceptionHandler PG_QUERY_EXCEPTION_HANDLER = new PostgresSqlQueryExceptionHandler(); + /** + * The {@link ExecutorService} which will execute Postgres {@link Statement queries}. The queries are executed + * asynchronously with another thread so the main thread can log from time to time in order to easily identify cases + * where the query might be stuck due to the database being locked because of another external query. + */ + private final ExecutorService executor = Executors.newFixedThreadPool(1); + /** * Creates a new PostgreSql connection. * @@ -236,10 +253,10 @@ protected void createTable(final DbEntity entity) throws DatabaseEngineException logger.trace(createTableStatement); - Statement s = null; + final AtomicReference queryStatement = new AtomicReference<>(); try { - s = conn.createStatement(); - s.executeUpdate(createTableStatement); + queryStatement.set(conn.createStatement()); + executeQuery(() -> queryStatement.get().executeUpdate(createTableStatement), createTableStatement); } catch (final SQLException ex) { if (ex.getSQLState().startsWith(NAME_ALREADY_EXISTS)) { logger.debug(dev, "'{}' is already defined", entity.getName()); @@ -249,8 +266,8 @@ protected void createTable(final DbEntity entity) throws DatabaseEngineException } } finally { try { - if (s != null) { - s.close(); + if (queryStatement.get() != null) { + queryStatement.get().close(); } } catch (final Exception e) { logger.trace("Error closing statement.", e); @@ -283,10 +300,10 @@ protected void addPrimaryKey(final DbEntity entity) throws DatabaseEngineExcepti logger.trace(addPrimaryKey); - Statement s = null; + final AtomicReference queryStatement = new AtomicReference<>(); try { - s = conn.createStatement(); - s.executeUpdate(addPrimaryKey); + queryStatement.set(conn.createStatement()); + executeQuery(() -> queryStatement.get().executeUpdate(addPrimaryKey), addPrimaryKey); } catch (final SQLException ex) { if (ex.getSQLState().startsWith(TABLE_CAN_ONLY_HAVE_ONE_PRIMARY_KEY)) { logger.debug(dev, "'{}' already has a primary key", entity.getName()); @@ -296,8 +313,8 @@ protected void addPrimaryKey(final DbEntity entity) throws DatabaseEngineExcepti } } finally { try { - if (s != null) { - s.close(); + if (queryStatement.get() != null) { + queryStatement.get().close(); } } catch (final Exception e) { logger.trace("Error closing statement.", e); @@ -334,10 +351,10 @@ protected void addIndexes(final DbEntity entity) throws DatabaseEngineException logger.trace(statement); - Statement s = null; + final AtomicReference queryStatement = new AtomicReference<>(); try { - s = conn.createStatement(); - s.executeUpdate(statement); + queryStatement.set(conn.createStatement()); + executeQuery(() -> queryStatement.get().executeUpdate(statement), statement); } catch (final SQLException ex) { if (ex.getSQLState().startsWith(NAME_ALREADY_EXISTS)) { logger.debug(dev, "'{}' is already defined", idxName); @@ -347,8 +364,8 @@ protected void addIndexes(final DbEntity entity) throws DatabaseEngineException } } finally { try { - if (s != null) { - s.close(); + if (queryStatement.get() != null) { + queryStatement.get().close(); } } catch (final Exception e) { logger.trace("Error closing statement.", e); @@ -495,12 +512,13 @@ protected void dropSequences(DbEntity entity) throws DatabaseEngineException { @Override protected void dropTable(DbEntity entity) throws DatabaseEngineException { - Statement drop = null; + final AtomicReference queryStatement = new AtomicReference<>(); try { - drop = conn.createStatement(); + queryStatement.set(conn.createStatement()); + final String query = format("DROP TABLE %s CASCADE", quotize(entity.getName())); logger.trace(query); - drop.executeUpdate(query); + executeQuery(() -> queryStatement.get().executeUpdate(query), query); } catch (final SQLException ex) { if (ex.getSQLState().startsWith(TABLE_OR_VIEW_DOES_NOT_EXIST)) { logger.debug(dev, "Table '{}' does not exist", entity.getName()); @@ -510,8 +528,8 @@ protected void dropTable(DbEntity entity) throws DatabaseEngineException { } } finally { try { - if (drop != null) { - drop.close(); + if (queryStatement.get() != null) { + queryStatement.get().close(); } } catch (final Exception e) { logger.trace("Error closing statement.", e); @@ -521,8 +539,6 @@ protected void dropTable(DbEntity entity) throws DatabaseEngineException { @Override protected void dropColumn(DbEntity entity, String... columns) throws DatabaseEngineException { - Statement drop = null; - List removeColumns = new ArrayList<>(); removeColumns.add("ALTER TABLE"); removeColumns.add(quotize(entity.getName())); @@ -532,11 +548,12 @@ protected void dropColumn(DbEntity entity, String... columns) throws DatabaseEng } removeColumns.add(join(cols, ",")); + final AtomicReference queryStatement = new AtomicReference<>(); try { - drop = conn.createStatement(); + queryStatement.set(conn.createStatement()); final String query = join(removeColumns, " "); logger.trace(query); - drop.executeUpdate(query); + executeQuery(() -> queryStatement.get().executeUpdate(query), query); } catch (final SQLException ex) { if (ex.getMessage().startsWith(TABLE_OR_VIEW_DOES_NOT_EXIST)) { logger.debug(dev, "Table '{}' does not exist", entity.getName()); @@ -545,8 +562,8 @@ protected void dropColumn(DbEntity entity, String... columns) throws DatabaseEng } } finally { try { - if (drop != null) { - drop.close(); + if (queryStatement.get() != null) { + queryStatement.get().close(); } } catch (final Exception e) { logger.trace("Error closing statement.", e); @@ -582,16 +599,16 @@ protected void addColumn(DbEntity entity, DbColumn... columns) throws DatabaseEn final String addColumnsStatement = join(addColumns, " "); logger.trace(addColumnsStatement); - Statement s = null; + final AtomicReference queryStatement = new AtomicReference<>(); try { - s = conn.createStatement(); - s.executeUpdate(addColumnsStatement); - } catch (final SQLException ex) { + queryStatement.set(conn.createStatement()); + executeQuery(() -> queryStatement.get().executeUpdate(addColumnsStatement), addColumnsStatement); + } catch (final Throwable ex) { throw new DatabaseEngineException("Something went wrong handling statement", ex); } finally { try { - if (s != null) { - s.close(); + if (queryStatement.get() != null) { + queryStatement.get().close(); } } catch (final Exception e) { logger.trace("Error closing statement.", e); @@ -678,11 +695,11 @@ protected void addFks(final DbEntity entity, Set fks) throws DatabaseEngin quotizedForeignColumnsString ); - Statement alterTableStmt = null; + final AtomicReference queryStatement = new AtomicReference<>(); try { - alterTableStmt = conn.createStatement(); + queryStatement.set(conn.createStatement()); logger.trace(alterTable); - alterTableStmt.executeUpdate(alterTable); + executeQuery(() -> queryStatement.get().executeUpdate(alterTable), alterTable); } catch (final SQLException ex) { if (ex.getSQLState().equals(CONSTRAINT_NAME_ALREADY_EXISTS)) { logger.debug(dev, "Foreign key for table '{}' already exists. Error code: {}.", entity.getName(), ex.getSQLState()); @@ -695,8 +712,8 @@ protected void addFks(final DbEntity entity, Set fks) throws DatabaseEngin } } finally { try { - if (alterTableStmt != null) { - alterTableStmt.close(); + if (queryStatement.get() != null) { + queryStatement.get().close(); } } catch (final Exception e) { logger.trace("Error closing statement.", e); @@ -832,4 +849,59 @@ protected ResultIterator createResultIterator(PreparedStatement ps) throws Datab protected QueryExceptionHandler getQueryExceptionHandler() { return PG_QUERY_EXCEPTION_HANDLER; } + + /** + * Helper method that logs the time while the Postgres {@link Statement query} is being executed. + *
+ * This will help identify cases via logs where Postgres database is already locked due to an external query being + * executed and the current one is waiting for that one to finish first. + * + * @param function The {@link Callable function} that will executes Postgres {@link Statement query}. + * @param queryLog The query in {@link String} format which is being executed. Will be used in the logs. + * @throws SQLException If the {@link Statement query} fails during its execution. + * @implNote It prints a log every minute (after 4 min delay) so the user is aware that the query is still being executed. + */ + private void executeQuery(final Callable function, final String queryLog) throws SQLException { + final AtomicBoolean querySucceeded = new AtomicBoolean(true); + final AtomicReference queryException = new AtomicReference<>(); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + final Callable threadFunction = () -> { + try { + return function.call(); + } catch (final SQLException ex) { + queryException.set(ex); + } catch (final Throwable ex) { + queryException.set(new SQLException(ex)); + } finally { + countDownLatch.countDown(); + } + querySucceeded.set(false); + return null; + }; + + final long startMilliSeconds = System.currentTimeMillis(); + try { + this.executor.invokeAll(Collections.singleton(threadFunction), 3L, TimeUnit.MINUTES); + } catch (final InterruptedException ex) { + throw new RuntimeException(ex); + } + + // While thread is still executing the query. + while (countDownLatch.getCount() != 0) { + final boolean queryTerminated = Uninterruptibles.awaitUninterruptibly(countDownLatch, 1L, TimeUnit.MINUTES); + if (!queryTerminated) { + final long secondsPassed = (System.currentTimeMillis() - startMilliSeconds) / 1000; + logger.warn("Postgres is still executing for '{}s' a query.\n" + + "Are you sure Postgres is not blocked due to another query being executed?", + secondsPassed + ); + this.logger.trace("The query still being executed for '{}s' is: '{}'.\n", secondsPassed, queryLog); + } + } + + if (!querySucceeded.get()) { + throw queryException.get(); + } + } }