Skip to content
Permalink
Browse files
[NO ISSUE][OTH] Logging Fixes
- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- Log exception when closing sockets quietly at trace level.
- Fix replication logging levels.
- Fix RemoteLogsNotifier thread name.

Change-Id: I210900a410a18144c22fd5af928151b7e7c4bfbd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15983
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Murtadha Hubail <mhubail@apache.org>
  • Loading branch information
mhubail committed Apr 2, 2022
1 parent 0afd4ee commit 4dfd3b0a8f497e4d5aa95dacaef7a24065e10451
Showing 9 changed files with 28 additions and 23 deletions.
@@ -57,7 +57,7 @@ public RemoteLogsNotifier(INcApplicationContext appCtx, LinkedBlockingQueue<Remo
@Override
public void run() {
final String nodeId = appCtx.getServiceContext().getNodeId();
Thread.currentThread().setName(nodeId + RemoteLogsNotifier.class.getSimpleName());
Thread.currentThread().setName(RemoteLogsNotifier.class.getSimpleName() + ":" + nodeId);
while (!Thread.currentThread().isInterrupted()) {
try {
final RemoteLogRecord logRecord = remoteLogsQ.take();
@@ -38,7 +38,6 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.replication.IReplicationJob;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@@ -153,7 +152,7 @@ private void closeChannels() {
if (!replicationJobsQ.isEmpty()) {
return;
}
LOGGER.log(Level.INFO, "No pending replication jobs. Closing connections to replicas");
LOGGER.trace("no pending replication jobs; closing connections to replicas");
for (ReplicationDestination dest : destinations) {
dest.getReplicas().stream().map(PartitionReplica.class::cast).forEach(PartitionReplica::close);
}
@@ -129,21 +129,21 @@ public void run() {
+ getRemoteAddress() + ")");
try {
if (socketChannel.requiresHandshake() && !socketChannel.handshake()) {
LOGGER.warn("failed to complete handshake");
LOGGER.warn("failed to complete handshake with {}", this::getRemoteAddress);
return;
}
socketChannel.getSocketChannel().configureBlocking(true);
LOGGER.debug("reading replication worker initial request");
LOGGER.trace("reading replication worker initial request");
ReplicationRequestType requestType = ReplicationProtocol.getRequestType(socketChannel, inBuffer);
LOGGER.debug("got request type: {}", requestType);
LOGGER.trace("got request type: {}", requestType);
while (requestType != ReplicationRequestType.GOODBYE) {
handle(requestType);
LOGGER.debug("handled request type: {}", requestType);
LOGGER.trace("handled request type: {}", requestType);
requestType = ReplicationProtocol.getRequestType(socketChannel, inBuffer);
LOGGER.debug("got request type: {}", requestType);
LOGGER.trace("got request type: {}", requestType);
}
} catch (Exception e) {
LOGGER.warn("Unexpected error during replication.", e);
LOGGER.warn("unexpected error during replication.", e);
} finally {
NetworkUtil.closeQuietly(socketChannel);
}
@@ -60,7 +60,7 @@ public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
((PersistentLocalResourceRepository) appCtx.getLocalResourceRepository())
.invalidateResource(replicaRes.getRelativePath().toString());
}
LOGGER.info(() -> "Deleted file: " + localFile.getAbsolutePath());
LOGGER.debug(() -> "Deleted file: " + localFile.getAbsolutePath());
} else {
LOGGER.warn(() -> "Requested to delete a non-existing file: " + localFile.getAbsolutePath());
}
@@ -85,6 +85,11 @@ public void serialize(OutputStream out) throws HyracksDataException {
}
}

@Override
public String toString() {
return "DeleteFileTask{" + "file='" + file + '\'' + '}';
}

public static DeleteFileTask create(DataInput input) throws IOException {
return new DeleteFileTask(input.readUTF());
}
@@ -67,7 +67,6 @@ public void perform(INcApplicationContext appCtx, IReplicationWorker worker) thr
LOGGER.debug("got partition {} files ({})", partition, partitionFiles.size());
final PartitionResourcesListResponse response = new PartitionResourcesListResponse(partition,
partitionReplicatedResources, partitionFiles, appCtx.getReplicaManager().isPartitionOrigin(partition));
LOGGER.debug("partition {} files list to requester", partition);
ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer());
LOGGER.debug("sent partition {} files list to requester", partition);
}
@@ -58,14 +58,15 @@ public void replicate(String file, boolean metadata) {
String masterNode = appCtx.getReplicaManager().isPartitionOrigin(replica.getIdentifier().getPartition())
? appCtx.getServiceContext().getNodeId() : null;
ReplicateFileTask task = new ReplicateFileTask(file, filePath.getFile().length(), metadata, masterNode);
LOGGER.debug("attempting to replicate {} to replica {}", task, replica);
LOGGER.trace("attempting {} to replica {}", task, replica);
ReplicationProtocol.sendTo(replica, task);
// send the file itself
try (RandomAccessFile fromFile = new RandomAccessFile(filePath.getFile(), "r");
FileChannel fileChannel = fromFile.getChannel()) {
NetworkingUtil.sendFile(fileChannel, channel);
}
ReplicationProtocol.waitForAck(replica);
LOGGER.debug("completed {} to replica {}", task, replica);
} catch (IOException e) {
throw new ReplicationException(e);
}
@@ -74,8 +75,10 @@ public void replicate(String file, boolean metadata) {
public void delete(String file) {
try {
final DeleteFileTask task = new DeleteFileTask(file);
LOGGER.trace("attempting {} from replica {}", task, replica);
ReplicationProtocol.sendTo(replica, task);
ReplicationProtocol.waitForAck(replica);
LOGGER.debug("completed {} from replica {}", task, replica);
} catch (IOException e) {
throw new ReplicationException(e);
}
@@ -70,9 +70,9 @@ public void sync() throws IOException {
if (!deltaRecovery) {
deletePartitionFromReplica(partition);
}
LOGGER.debug("getting replica files");
LOGGER.trace("getting replica files");
PartitionResourcesListResponse replicaResourceResponse = getReplicaFiles(partition);
LOGGER.debug("got replica files");
LOGGER.trace("got replica files");
Map<ResourceReference, Long> resourceReferenceLongMap = getValidReplicaResources(
replicaResourceResponse.getPartitionReplicatedResources(), replicaResourceResponse.isOrigin());
// clean up files for invalid resources (deleted or recreated while the replica was down)
@@ -81,11 +81,10 @@ public void sync() throws IOException {
final PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
LOGGER.debug("clean up replica invalid files");
final Set<String> masterFiles =
localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
LOGGER.debug("got master partition files");
LOGGER.trace("got master partition files");
// exclude from the replica files the list of invalid deleted files
final Set<String> replicaFiles = new HashSet<>(replicaResourceResponse.getFiles());
replicaFiles.removeAll(deletedReplicaFiles);
@@ -131,13 +130,12 @@ private void replicateMissingFiles(List<String> files) {
}

private void deleteInvalidFiles(List<String> files) {
LOGGER.debug("deleting replica invalid files");
final FileSynchronizer sync = new FileSynchronizer(appCtx, replica);
// sort files to ensure index metadata files starting with "." are deleted last
files.sort(String::compareTo);
Collections.reverse(files);
LOGGER.info("deleting {}", files);
files.forEach(sync::delete);
LOGGER.debug("completed invalid files deletion");
}

private long getResourceMasterValidSeq(ResourceReference rr) throws HyracksDataException {
@@ -174,7 +172,7 @@ private Set<String> cleanupReplicaInvalidResources(PartitionResourcesListRespons
}
}
if (!invalidFiles.isEmpty()) {
LOGGER.info("will delete the following files from replica {}", invalidFiles);
LOGGER.debug("will delete the following files from replica {}", invalidFiles);
deleteInvalidFiles(new ArrayList<>(invalidFiles));
}
return invalidFiles;
@@ -50,7 +50,7 @@ public void sync(boolean register, boolean deltaRecovery) throws IOException {
LOGGER.debug("starting replica sync process for replica {}", replica);
Object partitionLock = appCtx.getReplicaManager().getPartitionSyncLock(replica.getIdentifier().getPartition());
synchronized (partitionLock) {
LOGGER.debug("acquired partition replica lock");
LOGGER.trace("acquired partition replica lock");
final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager();
try {
// suspend checkpointing datasets to prevent async IO operations while sync'ing replicas
@@ -102,7 +102,7 @@ public synchronized int read(ByteBuffer buffer) throws IOException {
inAppData.limit(0);
}
} else if (bytesRead < 0) {
LOGGER.debug("received EOF; transferredBytes Bytes: {}", transferredBytes);
LOGGER.trace("received EOF; transferred bytes: {}", transferredBytes);
handleEndOfStreamQuietly();
return -1;
}
@@ -195,7 +195,7 @@ public synchronized void close() throws IOException {
new SslHandshake(this).handshake();
} catch (Exception e) {
// ignore exceptions on best effort graceful close handshake
LOGGER.debug("ssl socket close handshake failed", e);
LOGGER.trace("ssl socket close handshake failed", e);
} finally {
socketChannel.close();
}
@@ -243,7 +243,8 @@ private void handleEndOfStreamQuietly() {
close();
}
} catch (Exception e) {
LOGGER.warn("failed to close socket gracefully", e);
// ignore close exception since we are closing quietly
LOGGER.trace("failed to close socket gracefully", e);
}
}

0 comments on commit 4dfd3b0

Please sign in to comment.