Skip to content

Commit

Permalink
Add some logs while Postgres queries are being executed
Browse files Browse the repository at this point in the history
This commit adds some extra logs to be shown while Postgres queries are being
executed. The goal is to easily identify cases where a query is not being
executed because it's waiting for the Database table to be unlocked first (due
to another query being already executed for example). Since it's not easy to
identify a query timeout that will work for all cases of queries that can be
executed with PDB, at least being able to see in the logs that PDB is still
executing a query helps to identify such scenarios.
  • Loading branch information
Tiago Silva committed Aug 26, 2023
1 parent 46f29b2 commit daeb1dd
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,15 @@
import com.feedzai.commons.sql.abstraction.engine.impl.postgresql.PostgresSqlQueryExceptionHandler;
import com.feedzai.commons.sql.abstraction.util.StringUtils;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.postgresql.Driver;
import org.postgresql.PGProperty;
import org.postgresql.util.PGobject;
Expand Down Expand Up @@ -113,6 +121,13 @@ public class PostgreSqlEngine extends AbstractDatabaseEngine {
*/
public static final QueryExceptionHandler PG_QUERY_EXCEPTION_HANDLER = new PostgresSqlQueryExceptionHandler();

/**
* The {@link ExecutorService} which will print some logs while Postgres {@link Statement update queries} are
* being executed. This will print a log from time to time in order to easily identify cases where the query might
* be stuck due to the database being locked because of another external query.
*/
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

/**
* Creates a new PostgreSql connection.
*
Expand Down Expand Up @@ -236,10 +251,10 @@ protected void createTable(final DbEntity entity) throws DatabaseEngineException

logger.trace(createTableStatement);

Statement s = null;
final AtomicReference<Statement> queryStatement = new AtomicReference<>();
try {
s = conn.createStatement();
s.executeUpdate(createTableStatement);
queryStatement.set(conn.createStatement());
executeUpdateQuery(() -> queryStatement.get().executeUpdate(createTableStatement), createTableStatement);
} catch (final SQLException ex) {
if (ex.getSQLState().startsWith(NAME_ALREADY_EXISTS)) {
logger.debug(dev, "'{}' is already defined", entity.getName());
Expand All @@ -249,8 +264,8 @@ protected void createTable(final DbEntity entity) throws DatabaseEngineException
}
} finally {
try {
if (s != null) {
s.close();
if (queryStatement.get() != null) {
queryStatement.get().close();
}
} catch (final Exception e) {
logger.trace("Error closing statement.", e);
Expand Down Expand Up @@ -283,10 +298,10 @@ protected void addPrimaryKey(final DbEntity entity) throws DatabaseEngineExcepti

logger.trace(addPrimaryKey);

Statement s = null;
final AtomicReference<Statement> queryStatement = new AtomicReference<>();
try {
s = conn.createStatement();
s.executeUpdate(addPrimaryKey);
queryStatement.set(conn.createStatement());
executeUpdateQuery(() -> queryStatement.get().executeUpdate(addPrimaryKey), addPrimaryKey);
} catch (final SQLException ex) {
if (ex.getSQLState().startsWith(TABLE_CAN_ONLY_HAVE_ONE_PRIMARY_KEY)) {
logger.debug(dev, "'{}' already has a primary key", entity.getName());
Expand All @@ -296,8 +311,8 @@ protected void addPrimaryKey(final DbEntity entity) throws DatabaseEngineExcepti
}
} finally {
try {
if (s != null) {
s.close();
if (queryStatement.get() != null) {
queryStatement.get().close();
}
} catch (final Exception e) {
logger.trace("Error closing statement.", e);
Expand Down Expand Up @@ -334,10 +349,10 @@ protected void addIndexes(final DbEntity entity) throws DatabaseEngineException

logger.trace(statement);

Statement s = null;
final AtomicReference<Statement> queryStatement = new AtomicReference<>();
try {
s = conn.createStatement();
s.executeUpdate(statement);
queryStatement.set(conn.createStatement());
executeUpdateQuery(() -> queryStatement.get().executeUpdate(statement), statement);
} catch (final SQLException ex) {
if (ex.getSQLState().startsWith(NAME_ALREADY_EXISTS)) {
logger.debug(dev, "'{}' is already defined", idxName);
Expand All @@ -347,8 +362,8 @@ protected void addIndexes(final DbEntity entity) throws DatabaseEngineException
}
} finally {
try {
if (s != null) {
s.close();
if (queryStatement.get() != null) {
queryStatement.get().close();
}
} catch (final Exception e) {
logger.trace("Error closing statement.", e);
Expand Down Expand Up @@ -495,12 +510,13 @@ protected void dropSequences(DbEntity entity) throws DatabaseEngineException {

@Override
protected void dropTable(DbEntity entity) throws DatabaseEngineException {
Statement drop = null;
final AtomicReference<Statement> queryStatement = new AtomicReference<>();
try {
drop = conn.createStatement();
queryStatement.set(conn.createStatement());

final String query = format("DROP TABLE %s CASCADE", quotize(entity.getName()));
logger.trace(query);
drop.executeUpdate(query);
executeUpdateQuery(() -> queryStatement.get().executeUpdate(query), query);
} catch (final SQLException ex) {
if (ex.getSQLState().startsWith(TABLE_OR_VIEW_DOES_NOT_EXIST)) {
logger.debug(dev, "Table '{}' does not exist", entity.getName());
Expand All @@ -510,8 +526,8 @@ protected void dropTable(DbEntity entity) throws DatabaseEngineException {
}
} finally {
try {
if (drop != null) {
drop.close();
if (queryStatement.get() != null) {
queryStatement.get().close();
}
} catch (final Exception e) {
logger.trace("Error closing statement.", e);
Expand All @@ -521,8 +537,6 @@ protected void dropTable(DbEntity entity) throws DatabaseEngineException {

@Override
protected void dropColumn(DbEntity entity, String... columns) throws DatabaseEngineException {
Statement drop = null;

List<String> removeColumns = new ArrayList<>();
removeColumns.add("ALTER TABLE");
removeColumns.add(quotize(entity.getName()));
Expand All @@ -532,11 +546,12 @@ protected void dropColumn(DbEntity entity, String... columns) throws DatabaseEng
}
removeColumns.add(join(cols, ","));

final AtomicReference<Statement> queryStatement = new AtomicReference<>();
try {
drop = conn.createStatement();
queryStatement.set(conn.createStatement());
final String query = join(removeColumns, " ");
logger.trace(query);
drop.executeUpdate(query);
executeUpdateQuery(() -> queryStatement.get().executeUpdate(query), query);
} catch (final SQLException ex) {
if (ex.getMessage().startsWith(TABLE_OR_VIEW_DOES_NOT_EXIST)) {
logger.debug(dev, "Table '{}' does not exist", entity.getName());
Expand All @@ -545,8 +560,8 @@ protected void dropColumn(DbEntity entity, String... columns) throws DatabaseEng
}
} finally {
try {
if (drop != null) {
drop.close();
if (queryStatement.get() != null) {
queryStatement.get().close();
}
} catch (final Exception e) {
logger.trace("Error closing statement.", e);
Expand Down Expand Up @@ -582,16 +597,16 @@ protected void addColumn(DbEntity entity, DbColumn... columns) throws DatabaseEn
final String addColumnsStatement = join(addColumns, " ");
logger.trace(addColumnsStatement);

Statement s = null;
final AtomicReference<Statement> queryStatement = new AtomicReference<>();
try {
s = conn.createStatement();
s.executeUpdate(addColumnsStatement);
} catch (final SQLException ex) {
queryStatement.set(conn.createStatement());
executeUpdateQuery(() -> queryStatement.get().executeUpdate(addColumnsStatement), addColumnsStatement);
} catch (final Throwable ex) {
throw new DatabaseEngineException("Something went wrong handling statement", ex);
} finally {
try {
if (s != null) {
s.close();
if (queryStatement.get() != null) {
queryStatement.get().close();
}
} catch (final Exception e) {
logger.trace("Error closing statement.", e);
Expand Down Expand Up @@ -678,11 +693,11 @@ protected void addFks(final DbEntity entity, Set<DbFk> fks) throws DatabaseEngin
quotizedForeignColumnsString
);

Statement alterTableStmt = null;
final AtomicReference<Statement> queryStatement = new AtomicReference<>();
try {
alterTableStmt = conn.createStatement();
queryStatement.set(conn.createStatement());
logger.trace(alterTable);
alterTableStmt.executeUpdate(alterTable);
executeUpdateQuery(() -> queryStatement.get().executeUpdate(alterTable), alterTable);
} catch (final SQLException ex) {
if (ex.getSQLState().equals(CONSTRAINT_NAME_ALREADY_EXISTS)) {
logger.debug(dev, "Foreign key for table '{}' already exists. Error code: {}.", entity.getName(), ex.getSQLState());
Expand All @@ -695,8 +710,8 @@ protected void addFks(final DbEntity entity, Set<DbFk> fks) throws DatabaseEngin
}
} finally {
try {
if (alterTableStmt != null) {
alterTableStmt.close();
if (queryStatement.get() != null) {
queryStatement.get().close();
}
} catch (final Exception e) {
logger.trace("Error closing statement.", e);
Expand Down Expand Up @@ -832,4 +847,38 @@ protected ResultIterator createResultIterator(PreparedStatement ps) throws Datab
protected QueryExceptionHandler getQueryExceptionHandler() {
return PG_QUERY_EXCEPTION_HANDLER;
}

/**
* Helper method that logs the time while the Postgres {@link Statement update query} is being executed.
* <br>
* This will help identify cases via logs where Postgres database is already locked due to an external query being
* executed and the current one is waiting for that one to finish first.
*
* @param function The {@link Callable function} that will executes Postgres {@link Statement query}.
* @param queryLog The query in {@link String} format which is being executed. Will be used in the logs.
* @throws SQLException If the {@link Statement query} fails during its execution.
* @implNote It prints a log every 5 minutes so the user is aware that the query is still being executed.
*/
private<T> T executeUpdateQuery(final Callable<T> function, final String queryLog) throws SQLException {
final long startMilliSeconds = System.currentTimeMillis();
final Runnable printLog = () -> {
final long secondsPassed = (System.currentTimeMillis() - startMilliSeconds) / 1000;
this.logger.info("Database is still executing for '{}s' a query.\n" +
"Are you sure the Database is not blocked due to another query being executed?",
secondsPassed
);
this.logger.trace("The query still being executed for '{}s' is: '{}'.\n", secondsPassed, queryLog);
};
final ScheduledFuture<?> scheduledFutureLog = this.executor.scheduleAtFixedRate(printLog, 5L, 5L, TimeUnit.MINUTES);

try {
return function.call();
} catch (final SQLException ex) {
throw ex;
} catch (final Exception ex) {
throw new RuntimeException(ex);
} finally {
scheduledFutureLog.cancel(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,11 @@ protected DatabaseEngine[] prepareEngineForDeadlockRecoveryTest() throws Excepti
engines[0] = DatabaseFactory.getConnection(properties);
final EntityEntry entry = entry().set("COL1", 1).set("COL2", 0).build();

if (this.engineDriver.equals(DatabaseEngineDriver.ORACLE)) {
// Fix ORA-00604: error occurred at recursive SQL level 1ORA-08177: can't serialize access for this transaction
engines[0].getConnection().setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
}

for (int i = 0; i < 2; i++) {
final DbEntity entity0 = entityBuilder.name("TEST" + i).build();
engines[0].dropEntity(entity0);
Expand Down

0 comments on commit daeb1dd

Please sign in to comment.