Skip to content

Commit

Permalink
retry when native metadata client fails (lakesoul-io#313)
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <huazeng@dmetasoul.com>
Co-authored-by: zenghua <huazeng@dmetasoul.com>
  • Loading branch information
Ceng23333 and zenghua committed Sep 1, 2023
1 parent ae7fc43 commit 20b5d71
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class NativeMetadataJavaClient implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(NativeMetadataJavaClient.class);

private final long timeout;
private long timeout;
private long bufferSize;

private long largeBufferSize;
Expand Down Expand Up @@ -229,44 +229,64 @@ private void initialize() {


public JniWrapper executeQuery(Integer queryType, List<String> params) {
getReadLock();
final CompletableFuture<Integer> future = new CompletableFuture<>();
try {
getLibLakeSoulMetaData().execute_query(
new ReferencedIntegerCallback((result, msg) -> {
if (msg.isEmpty()) {
future.complete(result);
} else {
future.completeExceptionally(new SQLException(msg));
}
}, getIntegerCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
queryType,
String.join(PARAM_DELIM, params),
queryType < DAO_TYPE_QUERY_LIST_OFFSET ? sharedBuffer.address() : largeSharedBuffer.address()
);
Integer len = future.get(timeout, TimeUnit.MILLISECONDS);
if (len < 0) return null;
byte[] bytes = new byte[len];
if (queryType < DAO_TYPE_QUERY_LIST_OFFSET)
sharedBuffer.get(0, bytes, 0, len);
else
largeSharedBuffer.get(0, bytes, 0, len);
try {
return JniWrapper.parseFrom(bytes);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
getReadLock();
int retryCounter = NATIVE_METADATA_MAX_RETRY_ATTEMPTS;
while (retryCounter >= 0) {
try {
final CompletableFuture<Integer> future = new CompletableFuture<>();
getLibLakeSoulMetaData().execute_query(
new ReferencedIntegerCallback((result, msg) -> {
if (msg.isEmpty()) {
future.complete(result);
} else {
future.completeExceptionally(new SQLException(msg));
}
}, getIntegerCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
queryType,
String.join(PARAM_DELIM, params),
queryType < DAO_TYPE_QUERY_LIST_OFFSET ? sharedBuffer.address() : largeSharedBuffer.address()
);
Integer len = future.get(timeout, TimeUnit.MILLISECONDS);
if (len < 0) return null;
byte[] bytes = new byte[len];
if (queryType < DAO_TYPE_QUERY_LIST_OFFSET)
sharedBuffer.get(0, bytes, 0, len);
else
largeSharedBuffer.get(0, bytes, 0, len);
return JniWrapper.parseFrom(bytes);
} catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
if (retryCounter == 0) {
throw new RuntimeException(e);
} else {
enlargeBufferAndTimeout();
retryCounter--;
}
} catch (TimeoutException e) {
if (retryCounter == 0) {
LOG.error("Execute Query {} with {} timeout", queryType, params);
throw new RuntimeException(e);
} else {
enlargeBufferAndTimeout();
retryCounter--;
}
}
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
LOG.error("Execute Query {} with {} timeout", queryType, params);
throw new RuntimeException(e);
} finally {
unlockReadLock();
}
return null;
}

private void enlargeBufferAndTimeout() {
bufferSize *= 2;
largeBufferSize *= 2;
sharedBuffer = Runtime.getRuntime(libLakeSoulMetaData).getMemoryManager().allocateDirect(bufferSize);
largeSharedBuffer = Runtime.getRuntime(libLakeSoulMetaData).getMemoryManager().allocateDirect(largeBufferSize);
timeout += 5000L;
}

private void getReadLock() {
Expand All @@ -287,102 +307,150 @@ private void unlockWriteLock() {


public Integer executeInsert(Integer insertType, JniWrapper jniWrapper) {
getWriteLock();
try {
final CompletableFuture<Integer> future = new CompletableFuture<>();

byte[] bytes = jniWrapper.toByteArray();
if (insertType < DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET)
sharedBuffer.put(0, bytes, 0, bytes.length);
else
largeSharedBuffer.put(0, bytes, 0, bytes.length);

getLibLakeSoulMetaData().execute_insert(
new ReferencedIntegerCallback((result, msg) -> {
if (msg.isEmpty()) {
future.complete(result);
} else {
future.completeExceptionally(new SQLException(msg));
}
}, getIntegerCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
insertType,
insertType < DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET ? sharedBuffer.address() : largeSharedBuffer.address(),
bytes.length
);
return future.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
LOG.error("Execute Insert {} with {} timeout", insertType, jniWrapper);
throw new RuntimeException(e);
getWriteLock();
int retryCounter = NATIVE_METADATA_MAX_RETRY_ATTEMPTS;
while (retryCounter >= 0) {
try {
final CompletableFuture<Integer> future = new CompletableFuture<>();

byte[] bytes = jniWrapper.toByteArray();
if (insertType < DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET)
sharedBuffer.put(0, bytes, 0, bytes.length);
else
largeSharedBuffer.put(0, bytes, 0, bytes.length);

getLibLakeSoulMetaData().execute_insert(
new ReferencedIntegerCallback((result, msg) -> {
if (msg.isEmpty()) {
future.complete(result);
} else {
future.completeExceptionally(new SQLException(msg));
}
}, getIntegerCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
insertType,
insertType < DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET ? sharedBuffer.address() : largeSharedBuffer.address(),
bytes.length
);
return future.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
if (retryCounter == 0) {
throw new RuntimeException(e);
} else {
enlargeBufferAndTimeout();
retryCounter--;
}
} catch (TimeoutException e) {
if (retryCounter == 0) {
LOG.error("Execute Insert {} with {} timeout", insertType, jniWrapper);
throw new RuntimeException(e);
} else {
enlargeBufferAndTimeout();
retryCounter--;
}
}
}
} finally {
unlockWriteLock();
}
return -1;
}

public Integer executeUpdate(Integer updateType, List<String> params) {
getWriteLock();
try {
final CompletableFuture<Integer> future = new CompletableFuture<>();

getLibLakeSoulMetaData().execute_update(
new ReferencedIntegerCallback((result, msg) -> {
if (msg.isEmpty()) {
future.complete(result);
} else {
future.completeExceptionally(new SQLException(msg));
}
}, getIntegerCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
updateType,
String.join(PARAM_DELIM, params)
);
return future.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
LOG.error("Execute Update {} with {} timeout", updateType, params);
throw new RuntimeException(e);
getWriteLock();
int retryCounter = NATIVE_METADATA_MAX_RETRY_ATTEMPTS;
while (retryCounter >= 0) {
try {
final CompletableFuture<Integer> future = new CompletableFuture<>();

getLibLakeSoulMetaData().execute_update(
new ReferencedIntegerCallback((result, msg) -> {
if (msg.isEmpty()) {
future.complete(result);
} else {
future.completeExceptionally(new SQLException(msg));
}
}, getIntegerCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
updateType,
String.join(PARAM_DELIM, params)
);
return future.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
if (retryCounter == 0) {
throw new RuntimeException(e);
} else {
enlargeBufferAndTimeout();
retryCounter--;
}
} catch (TimeoutException e) {
if (retryCounter == 0) {
LOG.error("Execute Update {} with {} timeout", updateType, params);
throw new RuntimeException(e);
} else {
enlargeBufferAndTimeout();
retryCounter--;
}
}
}
} finally {
unlockWriteLock();
}
return -1;
}

public List<String> executeQueryScalar(Integer updateType, List<String> params) {
getReadLock();
public List<String> executeQueryScalar(Integer queryScalarType, List<String> params) {
try {
final CompletableFuture<String> future = new CompletableFuture<>();

getLibLakeSoulMetaData().execute_query_scalar(
new ReferencedStringCallback((result, msg) -> {
if (msg.isEmpty()) {
future.complete(result);
} else {
future.completeExceptionally(new SQLException(msg));
}
}, getStringCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
updateType,
String.join(PARAM_DELIM, params)
);
String result = future.get(timeout, TimeUnit.MILLISECONDS);
if (result.isEmpty()) return Collections.emptyList();
return Arrays.stream(result.split(PARAM_DELIM)).collect(Collectors.toList());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
LOG.error("Execute Update {} with {} timeout", updateType, params);
throw new RuntimeException(e);
getReadLock();
int retryCounter = NATIVE_METADATA_MAX_RETRY_ATTEMPTS;
while (retryCounter >= 0) {
try {
final CompletableFuture<String> future = new CompletableFuture<>();

getLibLakeSoulMetaData().execute_query_scalar(
new ReferencedStringCallback((result, msg) -> {
if (msg.isEmpty()) {
future.complete(result);
} else {
future.completeExceptionally(new SQLException(msg));
}
}, getStringCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
queryScalarType,
String.join(PARAM_DELIM, params)
);
String result = future.get(timeout, TimeUnit.MILLISECONDS);
if (result.isEmpty()) return Collections.emptyList();
return Arrays.stream(result.split(PARAM_DELIM)).collect(Collectors.toList());
} catch (InterruptedException | ExecutionException e) {
if (retryCounter == 0) {
throw new RuntimeException(e);
} else {
enlargeBufferAndTimeout();
retryCounter--;
}
} catch (TimeoutException e) {
if (retryCounter == 0) {
LOG.error("Execute QueryScalar {} with {} timeout", queryScalarType, params);
throw new RuntimeException(e);
} else {
enlargeBufferAndTimeout();
retryCounter--;
}
}
}
} finally {
unlockReadLock();
}
return Collections.emptyList();
}

public static Integer insert(NativeUtils.CodedDaoType insertType, JniWrapper jniWrapper) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public class NativeUtils {

public static boolean NATIVE_METADATA_UPDATE_ENABLED = true;

public static int NATIVE_METADATA_MAX_RETRY_ATTEMPTS = 3;

public static int DAO_TYPE_QUERY_ONE_OFFSET = 0;
public static int DAO_TYPE_QUERY_LIST_OFFSET = 100;
public static int DAO_TYPE_INSERT_ONE_OFFSET = 200;
Expand Down

0 comments on commit 20b5d71

Please sign in to comment.