Skip to content

Commit

Permalink
HIVE-24424: Use PreparedStatements in DbNotificationListener getNextN…
Browse files Browse the repository at this point in the history
…LId (David Mollitor reviewed by Bodor Laszlo, Miklos Gergely)
  • Loading branch information
belugabehr committed Nov 30, 2020
1 parent d180445 commit f0814f0
Showing 1 changed file with 38 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -151,6 +152,10 @@
public class DbNotificationListener extends TransactionalMetaStoreEventListener {

private static final Logger LOG = LoggerFactory.getLogger(DbNotificationListener.class.getName());

private static final String NL_SEL_SQL = "select \"NEXT_VAL\" from \"SEQUENCE_TABLE\" where \"SEQUENCE_NAME\" = ?";
private static final String NL_UPD_SQL = "update \"SEQUENCE_TABLE\" set \"NEXT_VAL\" = ? where \"SEQUENCE_NAME\" = ?";

private static CleanerThread cleaner = null;

private Configuration conf;
Expand Down Expand Up @@ -972,28 +977,41 @@ private static void close(ResultSet rs) {
}
}

private long getNextNLId(Statement stmt, SQLGenerator sqlGenerator, String sequence)
/**
* Get the next notification log ID.
*
* @return The next ID to use for a notification log message
* @throws SQLException if a database access error occurs or this method is
* called on a closed connection
* @throws MetaException if the sequence table is not properly initialized
*/
private long getNextNLId(Connection con, SQLGenerator sqlGenerator, String sequence)
throws SQLException, MetaException {
String s = sqlGenerator.addForUpdateClause("select \"NEXT_VAL\" from " +
"\"SEQUENCE_TABLE\" where \"SEQUENCE_NAME\" = " + quoteString(sequence));
LOG.debug("Going to execute query <" + s + ">");
ResultSet rs = null;
try {
rs = stmt.executeQuery(s);
if (!rs.next()) {
throw new MetaException("Transaction database not properly configured, can't find next NL id.");

final String sfuSql = sqlGenerator.addForUpdateClause(NL_SEL_SQL);
Optional<Long> nextSequenceValue = Optional.empty();

LOG.debug("Going to execute query [{}][1={}]", sfuSql, sequence);
try (PreparedStatement stmt = con.prepareStatement(sfuSql)) {
stmt.setString(1, sequence);
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
nextSequenceValue = Optional.of(rs.getLong(1));
}
}

long nextNLId = rs.getLong(1);
long updatedNLId = nextNLId + 1;
s = "update \"SEQUENCE_TABLE\" set \"NEXT_VAL\" = " + updatedNLId + " where \"SEQUENCE_NAME\" = " +
quoteString(sequence);
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);
return nextNLId;
}finally {
close(rs);
final long updatedNLId = 1L + nextSequenceValue.orElseThrow(
() -> new MetaException("Transaction database not properly configured, failed to determine next NL ID"));

LOG.debug("Going to execute query [{}][1={}][2={}]", NL_UPD_SQL, updatedNLId, sequence);
try (PreparedStatement stmt = con.prepareStatement(NL_UPD_SQL)) {
stmt.setLong(1, updatedNLId);
stmt.setString(2, sequence);
final int rowCount = stmt.executeUpdate();
LOG.debug("Updated {} rows for sequnce {}", rowCount, sequence);
}

return nextSequenceValue.get();
}

private void addWriteNotificationLog(NotificationEvent event, AcidWriteEvent acidWriteEvent, Connection dbConn,
Expand Down Expand Up @@ -1030,7 +1048,7 @@ private void addWriteNotificationLog(NotificationEvent event, AcidWriteEvent aci
rs = pst.executeQuery();
if (!rs.next()) {
// if rs is empty then no lock is taken and thus it can not cause deadlock.
long nextNLId = getNextNLId(stmt, sqlGenerator,
long nextNLId = getNextNLId(dbConn, sqlGenerator,
"org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog");
s = "insert into \"TXN_WRITE_NOTIFICATION_LOG\" " +
"(\"WNL_ID\", \"WNL_TXNID\", \"WNL_WRITEID\", \"WNL_DATABASE\", \"WNL_TABLE\", " +
Expand Down Expand Up @@ -1135,7 +1153,7 @@ private void addNotificationLog(NotificationEvent event, ListenerEvent listenerE
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);

long nextNLId = getNextNLId(stmt, sqlGenerator,
long nextNLId = getNextNLId(dbConn, sqlGenerator,
"org.apache.hadoop.hive.metastore.model.MNotificationLog");

String insertVal;
Expand Down

0 comments on commit f0814f0

Please sign in to comment.