Skip to content

Commit

Permalink
Add some logs while Postgres queries are being executed
Browse files Browse the repository at this point in the history
This commit adds some extra logs to be shown while Postgres queries are being
executed. The goal is to easily identify cases where a query is not being
executed because it's waiting for the Database table to be unlocked first (due
to another query being already executed for example). Since it's not easy to
identify a query timeout that will work for all cases of queries that can be
executed with PDB, at least being able to see in the logs that PDB is still
executing a query helps to identify such scenarios.
  • Loading branch information
Tiago Silva committed Aug 23, 2023
1 parent 46f29b2 commit bed8274
Showing 1 changed file with 109 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,17 @@
import com.feedzai.commons.sql.abstraction.engine.impl.postgresql.PostgresSqlQueryExceptionHandler;
import com.feedzai.commons.sql.abstraction.util.StringUtils;

import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import com.google.common.util.concurrent.Uninterruptibles;
import org.postgresql.Driver;
import org.postgresql.PGProperty;
import org.postgresql.util.PGobject;
Expand Down Expand Up @@ -113,6 +123,13 @@ public class PostgreSqlEngine extends AbstractDatabaseEngine {
*/
public static final QueryExceptionHandler PG_QUERY_EXCEPTION_HANDLER = new PostgresSqlQueryExceptionHandler();

/**
* The {@link ExecutorService} which will execute Postgres {@link Statement queries}. The queries are executed
* asynchronously with another thread so the main thread can log from time to time in order to easily identify cases
* where the query might be stuck due to the database being locked because of another external query.
*/
private final ExecutorService executor = Executors.newFixedThreadPool(1);

/**
* Creates a new PostgreSql connection.
*
Expand Down Expand Up @@ -236,10 +253,10 @@ protected void createTable(final DbEntity entity) throws DatabaseEngineException

logger.trace(createTableStatement);

Statement s = null;
final AtomicReference<Statement> queryStatement = new AtomicReference<>();
try {
s = conn.createStatement();
s.executeUpdate(createTableStatement);
queryStatement.set(conn.createStatement());
executeQuery(() -> queryStatement.get().executeUpdate(createTableStatement), createTableStatement);
} catch (final SQLException ex) {
if (ex.getSQLState().startsWith(NAME_ALREADY_EXISTS)) {
logger.debug(dev, "'{}' is already defined", entity.getName());
Expand All @@ -249,8 +266,8 @@ protected void createTable(final DbEntity entity) throws DatabaseEngineException
}
} finally {
try {
if (s != null) {
s.close();
if (queryStatement.get() != null) {
queryStatement.get().close();
}
} catch (final Exception e) {
logger.trace("Error closing statement.", e);
Expand Down Expand Up @@ -283,10 +300,10 @@ protected void addPrimaryKey(final DbEntity entity) throws DatabaseEngineExcepti

logger.trace(addPrimaryKey);

Statement s = null;
final AtomicReference<Statement> queryStatement = new AtomicReference<>();
try {
s = conn.createStatement();
s.executeUpdate(addPrimaryKey);
queryStatement.set(conn.createStatement());
executeQuery(() -> queryStatement.get().executeUpdate(addPrimaryKey), addPrimaryKey);
} catch (final SQLException ex) {
if (ex.getSQLState().startsWith(TABLE_CAN_ONLY_HAVE_ONE_PRIMARY_KEY)) {
logger.debug(dev, "'{}' already has a primary key", entity.getName());
Expand All @@ -296,8 +313,8 @@ protected void addPrimaryKey(final DbEntity entity) throws DatabaseEngineExcepti
}
} finally {
try {
if (s != null) {
s.close();
if (queryStatement.get() != null) {
queryStatement.get().close();
}
} catch (final Exception e) {
logger.trace("Error closing statement.", e);
Expand Down Expand Up @@ -334,10 +351,10 @@ protected void addIndexes(final DbEntity entity) throws DatabaseEngineException

logger.trace(statement);

Statement s = null;
final AtomicReference<Statement> queryStatement = new AtomicReference<>();
try {
s = conn.createStatement();
s.executeUpdate(statement);
queryStatement.set(conn.createStatement());
executeQuery(() -> queryStatement.get().executeUpdate(statement), statement);
} catch (final SQLException ex) {
if (ex.getSQLState().startsWith(NAME_ALREADY_EXISTS)) {
logger.debug(dev, "'{}' is already defined", idxName);
Expand All @@ -347,8 +364,8 @@ protected void addIndexes(final DbEntity entity) throws DatabaseEngineException
}
} finally {
try {
if (s != null) {
s.close();
if (queryStatement.get() != null) {
queryStatement.get().close();
}
} catch (final Exception e) {
logger.trace("Error closing statement.", e);
Expand Down Expand Up @@ -495,12 +512,13 @@ protected void dropSequences(DbEntity entity) throws DatabaseEngineException {

@Override
protected void dropTable(DbEntity entity) throws DatabaseEngineException {
Statement drop = null;
final AtomicReference<Statement> queryStatement = new AtomicReference<>();
try {
drop = conn.createStatement();
queryStatement.set(conn.createStatement());

final String query = format("DROP TABLE %s CASCADE", quotize(entity.getName()));
logger.trace(query);
drop.executeUpdate(query);
executeQuery(() -> queryStatement.get().executeUpdate(query), query);
} catch (final SQLException ex) {
if (ex.getSQLState().startsWith(TABLE_OR_VIEW_DOES_NOT_EXIST)) {
logger.debug(dev, "Table '{}' does not exist", entity.getName());
Expand All @@ -510,8 +528,8 @@ protected void dropTable(DbEntity entity) throws DatabaseEngineException {
}
} finally {
try {
if (drop != null) {
drop.close();
if (queryStatement.get() != null) {
queryStatement.get().close();
}
} catch (final Exception e) {
logger.trace("Error closing statement.", e);
Expand All @@ -521,8 +539,6 @@ protected void dropTable(DbEntity entity) throws DatabaseEngineException {

@Override
protected void dropColumn(DbEntity entity, String... columns) throws DatabaseEngineException {
Statement drop = null;

List<String> removeColumns = new ArrayList<>();
removeColumns.add("ALTER TABLE");
removeColumns.add(quotize(entity.getName()));
Expand All @@ -532,11 +548,12 @@ protected void dropColumn(DbEntity entity, String... columns) throws DatabaseEng
}
removeColumns.add(join(cols, ","));

final AtomicReference<Statement> queryStatement = new AtomicReference<>();
try {
drop = conn.createStatement();
queryStatement.set(conn.createStatement());
final String query = join(removeColumns, " ");
logger.trace(query);
drop.executeUpdate(query);
executeQuery(() -> queryStatement.get().executeUpdate(query), query);
} catch (final SQLException ex) {
if (ex.getMessage().startsWith(TABLE_OR_VIEW_DOES_NOT_EXIST)) {
logger.debug(dev, "Table '{}' does not exist", entity.getName());
Expand All @@ -545,8 +562,8 @@ protected void dropColumn(DbEntity entity, String... columns) throws DatabaseEng
}
} finally {
try {
if (drop != null) {
drop.close();
if (queryStatement.get() != null) {
queryStatement.get().close();
}
} catch (final Exception e) {
logger.trace("Error closing statement.", e);
Expand Down Expand Up @@ -582,16 +599,16 @@ protected void addColumn(DbEntity entity, DbColumn... columns) throws DatabaseEn
final String addColumnsStatement = join(addColumns, " ");
logger.trace(addColumnsStatement);

Statement s = null;
final AtomicReference<Statement> queryStatement = new AtomicReference<>();
try {
s = conn.createStatement();
s.executeUpdate(addColumnsStatement);
} catch (final SQLException ex) {
queryStatement.set(conn.createStatement());
executeQuery(() -> queryStatement.get().executeUpdate(addColumnsStatement), addColumnsStatement);
} catch (final Throwable ex) {
throw new DatabaseEngineException("Something went wrong handling statement", ex);
} finally {
try {
if (s != null) {
s.close();
if (queryStatement.get() != null) {
queryStatement.get().close();
}
} catch (final Exception e) {
logger.trace("Error closing statement.", e);
Expand Down Expand Up @@ -678,11 +695,11 @@ protected void addFks(final DbEntity entity, Set<DbFk> fks) throws DatabaseEngin
quotizedForeignColumnsString
);

Statement alterTableStmt = null;
final AtomicReference<Statement> queryStatement = new AtomicReference<>();
try {
alterTableStmt = conn.createStatement();
queryStatement.set(conn.createStatement());
logger.trace(alterTable);
alterTableStmt.executeUpdate(alterTable);
executeQuery(() -> queryStatement.get().executeUpdate(alterTable), alterTable);
} catch (final SQLException ex) {
if (ex.getSQLState().equals(CONSTRAINT_NAME_ALREADY_EXISTS)) {
logger.debug(dev, "Foreign key for table '{}' already exists. Error code: {}.", entity.getName(), ex.getSQLState());
Expand All @@ -695,8 +712,8 @@ protected void addFks(final DbEntity entity, Set<DbFk> fks) throws DatabaseEngin
}
} finally {
try {
if (alterTableStmt != null) {
alterTableStmt.close();
if (queryStatement.get() != null) {
queryStatement.get().close();
}
} catch (final Exception e) {
logger.trace("Error closing statement.", e);
Expand Down Expand Up @@ -832,4 +849,59 @@ protected ResultIterator createResultIterator(PreparedStatement ps) throws Datab
protected QueryExceptionHandler getQueryExceptionHandler() {
return PG_QUERY_EXCEPTION_HANDLER;
}

/**
* Helper method that logs the time while the Postgres {@link Statement query} is being executed.
* <br>
* This will help identify cases via logs where Postgres database is already locked due to an external query being
* executed and the current one is waiting for that one to finish first.
*
* @param function The {@link Runnable function} that will executes Postgres {@link Statement query}.
* @param queryLog The query in {@link String} format which is being executed. Will be used in the logs.
* @throws SQLException If the {@link Statement query} fails during its execution.
* @implNote It prints a log every 3 seconds so the user is aware that the query is still being executed.
*/
private<T> void executeQuery(final Callable<T> function, final String queryLog) throws SQLException {
final AtomicBoolean querySucceeded = new AtomicBoolean(true);
final AtomicReference<SQLException> queryException = new AtomicReference<>();

final CountDownLatch countDownLatch = new CountDownLatch(1);
final Callable<T> threadFunction = () -> {
try {
return function.call();
} catch (final SQLException ex) {
queryException.set(ex);
} catch (final Throwable ex) {
queryException.set(new SQLException(ex));
} finally {
countDownLatch.countDown();
}
querySucceeded.set(false);
return null;
};

final long startMilliSeconds = System.currentTimeMillis();
try {
this.executor.invokeAll(Collections.singleton(threadFunction), 3L, TimeUnit.MINUTES);
} catch (final InterruptedException ex) {
throw new RuntimeException(ex);
}

// While thread is still executing the query.
while (countDownLatch.getCount() != 0) {
final boolean queryTerminated = Uninterruptibles.awaitUninterruptibly(countDownLatch, 1L, TimeUnit.MINUTES);
if (!queryTerminated) {
final long secondsPassed = (System.currentTimeMillis() - startMilliSeconds) / 1000;
logger.warn("Postgres is still executing for '{}s' a query.\n" +
"Are you sure Postgres is not blocked due to another query being executed?",
secondsPassed
);
this.logger.trace("The query still being executed for '{}s' is: '{}'.\n", secondsPassed, queryLog);
}
}

if (!querySucceeded.get()) {
throw queryException.get();
}
}
}

0 comments on commit bed8274

Please sign in to comment.