Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-24424: Use PreparedStatements in DbNotificationListener getNextNLId #1704

Merged
merged 3 commits into from
Nov 30, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -151,6 +152,10 @@
public class DbNotificationListener extends TransactionalMetaStoreEventListener {

private static final Logger LOG = LoggerFactory.getLogger(DbNotificationListener.class.getName());

private static final String NL_SEL_SQL = "select \"NEXT_VAL\" from \"SEQUENCE_TABLE\" where \"SEQUENCE_NAME\" = ?";
private static final String NL_UPD_SQL = "update \"SEQUENCE_TABLE\" set \"NEXT_VAL\" = ? where \"SEQUENCE_NAME\" = ?";

private static CleanerThread cleaner = null;

private Configuration conf;
Expand Down Expand Up @@ -972,28 +977,41 @@ private static void close(ResultSet rs) {
}
}

private long getNextNLId(Statement stmt, SQLGenerator sqlGenerator, String sequence)
/**
* Get the next notification log ID.
*
* @return The next ID to use for a notification log message
* @throws SQLException if a database access error occurs or this method is
* called on a closed connection
* @throws MetaException if the sequence table is not properly initialized
*/
private long getNextNLId(Connection con, SQLGenerator sqlGenerator, String sequence)
throws SQLException, MetaException {
String s = sqlGenerator.addForUpdateClause("select \"NEXT_VAL\" from " +
"\"SEQUENCE_TABLE\" where \"SEQUENCE_NAME\" = " + quoteString(sequence));
LOG.debug("Going to execute query <" + s + ">");
ResultSet rs = null;
try {
rs = stmt.executeQuery(s);
if (!rs.next()) {
throw new MetaException("Transaction database not properly configured, can't find next NL id.");

final String sfuSql = sqlGenerator.addForUpdateClause(NL_SEL_SQL);
Optional<Long> nextSequenceValue = Optional.empty();

LOG.debug("Going to execute query [{}][1={}]", sfuSql, sequence);
try (PreparedStatement stmt = con.prepareStatement(sfuSql)) {
stmt.setString(1, sequence);
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
nextSequenceValue = Optional.of(rs.getLong(1));
}
}

long nextNLId = rs.getLong(1);
long updatedNLId = nextNLId + 1;
s = "update \"SEQUENCE_TABLE\" set \"NEXT_VAL\" = " + updatedNLId + " where \"SEQUENCE_NAME\" = " +
quoteString(sequence);
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);
return nextNLId;
}finally {
close(rs);
final long updatedNLId = 1L + nextSequenceValue.orElseThrow(
() -> new MetaException("Transaction database not properly configured, failed to determine next NL ID"));

LOG.debug("Going to execute query [{}][1={}][2={}]", NL_UPD_SQL, updatedNLId, sequence);
try (PreparedStatement stmt = con.prepareStatement(NL_UPD_SQL)) {
stmt.setLong(1, updatedNLId);
stmt.setString(2, sequence);
final int rowCount = stmt.executeUpdate();
LOG.debug("Updated {} rows for sequnce {}", rowCount, sequence);
}

return nextSequenceValue.get();
}

private void addWriteNotificationLog(NotificationEvent event, AcidWriteEvent acidWriteEvent, Connection dbConn,
Expand Down Expand Up @@ -1030,7 +1048,7 @@ private void addWriteNotificationLog(NotificationEvent event, AcidWriteEvent aci
rs = pst.executeQuery();
if (!rs.next()) {
// if rs is empty then no lock is taken and thus it can not cause deadlock.
long nextNLId = getNextNLId(stmt, sqlGenerator,
long nextNLId = getNextNLId(dbConn, sqlGenerator,
"org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog");
s = "insert into \"TXN_WRITE_NOTIFICATION_LOG\" " +
"(\"WNL_ID\", \"WNL_TXNID\", \"WNL_WRITEID\", \"WNL_DATABASE\", \"WNL_TABLE\", " +
Expand Down Expand Up @@ -1135,7 +1153,7 @@ private void addNotificationLog(NotificationEvent event, ListenerEvent listenerE
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);

long nextNLId = getNextNLId(stmt, sqlGenerator,
long nextNLId = getNextNLId(dbConn, sqlGenerator,
"org.apache.hadoop.hive.metastore.model.MNotificationLog");

String insertVal;
Expand Down