Skip to content

Commit

Permalink
ARTEMIS-3679 Brokers shutdown after daylight saving fall back
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 authored and brusdev committed Feb 23, 2022
1 parent 3be4dd6 commit 101c0d2
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ public String currentTimestampSQL() {
return format(sql("current-timestamp"), tableName);
}

@Override
public String currentTimestampTimeZoneId() {
return sql("current-timestamp-timezone-id");
}

@Override
public String writeStateSQL() {
return format(sql("write-state"), tableName, STATE_ROW_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ enum DatabaseStoreType {

String currentTimestampSQL();

String currentTimestampTimeZoneId();

String writeStateSQL();

String readStateSQL();
Expand Down
23 changes: 16 additions & 7 deletions artemis-jdbc-store/src/main/resources/journal-sql.properties
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ delete-journal-record=DELETE FROM %s WHERE id = ?
delete-journal-tx-record=DELETE FROM %s WHERE txId=?
count-journal-record=SELECT COUNT(*) FROM %s

create-node-manager-store-table=CREATE TABLE %s (ID INT NOT NULL, HOLDER_ID VARCHAR(128), HOLDER_EXPIRATION_TIME TIMESTAMP, NODE_ID CHAR(36),STATE CHAR(1), PRIMARY KEY(ID))
create-node-manager-store-table=CREATE TABLE %s (ID INT NOT NULL, HOLDER_ID VARCHAR(128), HOLDER_EXPIRATION_TIME BIGINT, NODE_ID CHAR(36),STATE CHAR(1), PRIMARY KEY(ID))
create-state=INSERT INTO %s (ID) VALUES (%s)
try-acquire-lock=UPDATE %s SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR (HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP AND ? > CURRENT_TIMESTAMP)) AND ID = %s
try-acquire-lock=UPDATE %s SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR HOLDER_EXPIRATION_TIME < ?) AND ID = %s
try-release-lock=UPDATE %s SET HOLDER_ID = NULL, HOLDER_EXPIRATION_TIME = NULL WHERE HOLDER_ID = ? AND ID = %s
is-locked=SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME, CURRENT_TIMESTAMP FROM %s WHERE ID = %s
renew-lock=UPDATE %s SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND HOLDER_EXPIRATION_TIME IS NOT NULL AND ? >= HOLDER_EXPIRATION_TIME AND ? > CURRENT_TIMESTAMP AND ID = %s
is-locked=SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME FROM %s WHERE ID = %s
renew-lock=UPDATE %s SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND HOLDER_EXPIRATION_TIME IS NOT NULL AND ? >= HOLDER_EXPIRATION_TIME AND ID = %s
current-timestamp=SELECT CURRENT_TIMESTAMP FROM %s
write-state=UPDATE %s SET STATE = ? WHERE ID = %s
read-state=SELECT STATE FROM %s WHERE ID = %s
Expand All @@ -65,6 +65,8 @@ table-names-case.derby=upper
# PostgreSQL SQL statements
create-file-table.postgresql=CREATE TABLE %s (ID BIGSERIAL, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA OID, PRIMARY KEY(ID))
create-journal-table.postgresql=CREATE TABLE %s(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BYTEA,txDataSize INTEGER,txData BYTEA,txCheckNoRecords INTEGER,seq BIGINT)
current-timestamp.postgresql=SELECT NOW() AT TIME ZONE 'UTC'
current-timestamp-timezone-id.postgresql=UTC

# 1 GiB
max-blob-size.postgresql=1073741824
Expand All @@ -75,14 +77,18 @@ create-file-table.mysql=CREATE TABLE %s(ID BIGINT NOT NULL AUTO_INCREMENT, FILEN
append-to-file.mysql=SELECT DATA, ID FROM %s WHERE ID=? FOR UPDATE
create-journal-table.mysql=CREATE TABLE %s(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record LONGBLOB,txDataSize INTEGER,txData LONGBLOB,txCheckNoRecords INTEGER,seq BIGINT) ENGINE=InnoDB
copy-file-record-by-id.mysql=UPDATE %1$s, (SELECT DATA AS FROM_DATA FROM %1$s WHERE id=?) SELECT_COPY SET DATA=FROM_DATA WHERE id=?
current-timestamp.mysql=SELECT UTC_TIMESTAMP(6)
current-timestamp-timezone-id.mysql=UTC
# 4 GiB
max-blob-size.mysql=4294967296
table-names-case.mysql=lower

# Oracle SQL statements
create-file-table.oracle=CREATE TABLE %s(ID NUMBER(19) GENERATED BY DEFAULT ON NULL AS IDENTITY, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))
create-journal-table.oracle=CREATE TABLE %s(id NUMBER(19),recordType NUMBER(5),compactCount NUMBER(5),txId NUMBER(19),userRecordType NUMBER(5),variableSize NUMBER(10),record BLOB,txDataSize NUMBER(10),txData BLOB,txCheckNoRecords NUMBER(10),seq NUMBER(19))

create-node-manager-store-table.oracle=CREATE TABLE %s (ID INT NOT NULL, HOLDER_ID VARCHAR(128), HOLDER_EXPIRATION_TIME NUMBER(19), NODE_ID CHAR(36),STATE CHAR(1), PRIMARY KEY(ID))
current-timestamp.oracle=SELECT SYSTIMESTAMP AT TIME ZONE 'UTC' FROM DUAL
current-timestamp-timezone-id.oracle=UTC
# 4 GiB
max-blob-size.oracle=4294967296
table-names-case.oracle=upper
Expand All @@ -94,10 +100,13 @@ create-file-table.db2=CREATE TABLE %s (ID BIGINT GENERATED ALWAYS AS IDENTITY (S
replace-file.db2=UPDATE %s SET DATA=? WHERE ID=?
append-to-file.db2=UPDATE %s SET DATA = (DATA || ?) WHERE ID=?
table-names-case.db2=upper
current-timestamp.db2=SELECT CURRENT TIMESTAMP - CURRENT TIMEZONE FROM SYSIBM.SYSDUMMY1
current-timestamp-timezone-id.db2=UTC

# MSSQL SQL statements
create-file-table.mssql=CREATE TABLE %s (ID BIGINT NOT NULL IDENTITY, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA VARBINARY(max), PRIMARY KEY(ID))
create-journal-table.mssql=CREATE TABLE %s(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record VARBINARY(max),txDataSize INTEGER,txData VARBINARY(max),txCheckNoRecords INTEGER,seq BIGINT NOT NULL, PRIMARY KEY(seq))
create-node-manager-store-table.mssql=CREATE TABLE %s (ID INT NOT NULL, HOLDER_ID VARCHAR(128), HOLDER_EXPIRATION_TIME DATETIME, NODE_ID CHAR(36),STATE CHAR(1), PRIMARY KEY(ID))
current-timestamp.mssql=SELECT CURRENT_TIMESTAMP
create-node-manager-store-table.mssql=CREATE TABLE %s (ID INT NOT NULL, HOLDER_ID VARCHAR(128), HOLDER_EXPIRATION_TIME BIGINT, NODE_ID CHAR(36),STATE CHAR(1), PRIMARY KEY(ID))
current-timestamp.mssql=SELECT SYSUTCDATETIME()
current-timestamp-timezone-id.mssql=UTC
max-blob-size.mssql=2147483647
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Objects;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

Expand All @@ -43,6 +44,7 @@ final class JdbcLeaseLock implements LeaseLock {
private final String renewLock;
private final String isLocked;
private final String currentDateTime;
private final TimeZone currentDateTimeTimeZone;
private final long expirationMillis;
private final int queryTimeout;
private boolean maybeAcquired;
Expand All @@ -60,6 +62,7 @@ final class JdbcLeaseLock implements LeaseLock {
String renewLock,
String isLocked,
String currentDateTime,
String currentDateTimeTimeZoneId,
long expirationMIllis,
long queryTimeoutMillis,
String lockName) {
Expand All @@ -72,6 +75,7 @@ final class JdbcLeaseLock implements LeaseLock {
this.renewLock = renewLock;
this.isLocked = isLocked;
this.currentDateTime = currentDateTime;
this.currentDateTimeTimeZone = currentDateTimeTimeZoneId == null ? null : TimeZone.getTimeZone(currentDateTimeTimeZoneId);
this.expirationMillis = expirationMIllis;
this.maybeAcquired = false;
this.connectionProvider = connectionProvider;
Expand All @@ -86,7 +90,6 @@ final class JdbcLeaseLock implements LeaseLock {
}
}
this.queryTimeout = expectedTimeout;

}

public String holderId() {
Expand Down Expand Up @@ -114,14 +117,14 @@ private String readableLockStatus() {
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
try (PreparedStatement preparedStatement = connection.prepareStatement(this.isLocked)) {
final long currentTimestamp = dbCurrentTimeMillis(connection);
final String lockStatus;
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
lockStatus = null;
} else {
final String currentHolderId = resultSet.getString(1);
final Timestamp expirationTime = resultSet.getTimestamp(2);
final Timestamp currentTimestamp = resultSet.getTimestamp(3);
final long expirationTime = resultSet.getLong(2);
lockStatus = "holderId = " + currentHolderId + " expirationTime = " + expirationTime + " currentTimestamp = " + currentTimestamp;
}
}
Expand All @@ -138,23 +141,57 @@ private String readableLockStatus() {
}
}

public long dbCurrentTimeMillis() {
SQLException suppressed = null;
try (Connection connection = connectionProvider.getConnection()) {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
try {
return dbCurrentTimeMillis(connection);
} catch (SQLException e) {
suppressed = e;
connection.rollback();
suppressed = null;
throw new IllegalStateException(e);
} finally {
connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) {
final RuntimeException stateEx = new IllegalStateException(e);
if (suppressed != null) {
stateEx.addSuppressed(suppressed);
}
throw stateEx;
}
}

private long dbCurrentTimeMillis(Connection connection) throws SQLException {
try (PreparedStatement currentDateTime = connection.prepareStatement(this.currentDateTime)) {
return dbCurrentTimeMillis(connection, queryTimeout, currentDateTime, currentDateTimeTimeZone);
}

public static long dbCurrentTimeMillis(final Connection connection,
final int queryTimeout,
final String currentDateTimeSql,
final TimeZone currentDateTimeTimeZone) throws SQLException {
try (PreparedStatement currentDateTime = connection.prepareStatement(currentDateTimeSql)) {
if (queryTimeout >= 0) {
currentDateTime.setQueryTimeout(queryTimeout);
}
final long startTime = stripMilliseconds(System.currentTimeMillis());
try (ResultSet resultSet = currentDateTime.executeQuery()) {
resultSet.next();
final long endTime = stripMilliseconds(System.currentTimeMillis());
final Timestamp currentTimestamp = resultSet.getTimestamp(1);
final long currentTime = currentTimestamp.getTime();
final long currentTimeMillis = stripMilliseconds(currentTime);
if (currentTimeMillis < startTime) {
LOGGER.warnf("[%s] %s query currentTimestamp = %s on database should happen AFTER %s on broker", lockName, holderId, currentTimestamp, new Timestamp(startTime));

final long currentTime = (currentDateTimeTimeZone == null ?
resultSet.getTimestamp(1) :
resultSet.getTimestamp(1, Calendar.getInstance(currentDateTimeTimeZone))).getTime();
final long currentTimeNoMillis = stripMilliseconds(currentTime);
if (currentTimeNoMillis < startTime) {
LOGGER.warnf("currentTimestamp = %d on database should happen AFTER %d on broker", currentTimeNoMillis, startTime);
}
if (currentTimeMillis > endTime) {
LOGGER.warnf("[%s] %s query currentTimestamp = %s on database should happen BEFORE %s on broker", lockName, holderId, currentTimestamp, new Timestamp(endTime));
if (currentTimeNoMillis > endTime) {
LOGGER.warnf("currentTimestamp = %d on database should happen BEFORE %d on broker", currentTimeNoMillis, endTime);
}
return currentTime;
}
Expand All @@ -170,15 +207,13 @@ public boolean renew() {
try (PreparedStatement preparedStatement = connection.prepareStatement(this.renewLock)) {
final long now = dbCurrentTimeMillis(connection);
final long localExpirationTime = now + expirationMillis;
final Timestamp expirationTime = new Timestamp(localExpirationTime);
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s is renewing lock with expirationTime = %s",
lockName, holderId, expirationTime);
lockName, holderId, localExpirationTime);
}
preparedStatement.setTimestamp(1, expirationTime);
preparedStatement.setLong(1, localExpirationTime);
preparedStatement.setString(2, holderId);
preparedStatement.setTimestamp(3, expirationTime);
preparedStatement.setTimestamp(4, expirationTime);
preparedStatement.setLong(3, localExpirationTime);
final int updatedRows = preparedStatement.executeUpdate();
final boolean renewed = updatedRows == 1;
connection.commit();
Expand Down Expand Up @@ -218,11 +253,10 @@ public boolean tryAcquire() {
final long now = dbCurrentTimeMillis(connection);
preparedStatement.setString(1, holderId);
final long localExpirationTime = now + expirationMillis;
final Timestamp expirationTime = new Timestamp(localExpirationTime);
preparedStatement.setTimestamp(2, expirationTime);
preparedStatement.setTimestamp(3, expirationTime);
LOGGER.debugf("[%s] %s is trying to acquire lock with expirationTime %s",
lockName, holderId, expirationTime);
preparedStatement.setLong(2, localExpirationTime);
preparedStatement.setLong(3, now);
LOGGER.debugf("[%s] %s is trying to acquire lock with expirationTime %l",
lockName, holderId, localExpirationTime);
final boolean acquired = preparedStatement.executeUpdate() == 1;
connection.commit();
if (acquired) {
Expand Down Expand Up @@ -263,19 +297,17 @@ private boolean checkValidHolderId(Predicate<? super String> holderIdFilter) {
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
try (PreparedStatement preparedStatement = connection.prepareStatement(this.isLocked)) {
final long currentTimestampMillis = dbCurrentTimeMillis(connection);
boolean result;
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
result = false;
} else {
final String currentHolderId = resultSet.getString(1);
result = holderIdFilter.test(currentHolderId);
final Timestamp expirationTime = resultSet.getTimestamp(2);
final Timestamp currentTimestamp = resultSet.getTimestamp(3);
final long currentTimestampMillis = currentTimestamp.getTime();
final long lockExpirationTime = resultSet.getLong(2);
boolean zombie = false;
if (expirationTime != null) {
final long lockExpirationTime = expirationTime.getTime();
if (lockExpirationTime > 0) {
final long expiredBy = currentTimestampMillis - lockExpirationTime;
if (expiredBy > 0) {
result = false;
Expand All @@ -285,7 +317,7 @@ private boolean checkValidHolderId(Predicate<? super String> holderIdFilter) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s has found %s with holderId = %s expirationTime = %s currentTimestamp = %s",
lockName, holderId, zombie ? "zombie lock" : "lock",
currentHolderId, expirationTime, currentTimestamp);
currentHolderId, lockExpirationTime, currentTimestampMillis);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ static JdbcLeaseLock createLiveLock(String holderId,
return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireLiveLockSQL(),
sqlProvider.tryReleaseLiveLockSQL(), sqlProvider.renewLiveLockSQL(),
sqlProvider.isLiveLockedSQL(), sqlProvider.currentTimestampSQL(),
expirationMillis, queryTimeoutMillis, "LIVE");
sqlProvider.currentTimestampTimeZoneId(), expirationMillis, queryTimeoutMillis, "LIVE");
}

static JdbcLeaseLock createBackupLock(String holderId,
Expand All @@ -108,7 +108,7 @@ static JdbcLeaseLock createBackupLock(String holderId,
return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireBackupLockSQL(),
sqlProvider.tryReleaseBackupLockSQL(), sqlProvider.renewBackupLockSQL(),
sqlProvider.isBackupLockedSQL(), sqlProvider.currentTimestampSQL(),
expirationMillis, queryTimeoutMillis, "BACKUP");
sqlProvider.currentTimestampTimeZoneId(), expirationMillis, queryTimeoutMillis, "BACKUP");
}

@Override
Expand Down

0 comments on commit 101c0d2

Please sign in to comment.