Skip to content
Permalink
Browse files
[NO ISSUE][OTH] Txn Logs Replication Trace Logs
- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Add debug logs for txn logs replication.

Change-Id: Id4a98e30763f9a86952e1dc1c226af89dddc2b0a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15865
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
  • Loading branch information
mhubail authored and AliSolaiman committed Mar 25, 2022
1 parent 06f7c18 commit 16acb99937eb2b6a0d639615109aac79e0ef8301
Showing 8 changed files with 32 additions and 1 deletion.
@@ -102,6 +102,7 @@ public synchronized void flushed(long componentSequence, long lsn, long componen
public synchronized void masterFlush(long masterLsn, long localLsn) throws HyracksDataException {
final IndexCheckpoint latest = getLatest();
latest.getMasterNodeFlushMap().put(masterLsn, localLsn);
LOGGER.debug("index {} master flush {} -> {}", indexPath, masterLsn, localLsn);
final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(),
latest.getValidComponentSequence(), latest.getLastComponentId(), null);
persist(next);
@@ -35,4 +35,9 @@ public interface IReplicationWorker extends Runnable {
* @return the reusable buffer
*/
ByteBuffer getReusableBuffer();

/**
* @return The remote address of the sender
*/
String getRemoteAddress();
}
@@ -47,7 +47,8 @@ public RemoteLogsProcessor(INcApplicationContext appCtx) {
public void process(ByteBuffer logsBatch, RemoteLogRecord reusableLog, IReplicationWorker worker) {
while (logsBatch.hasRemaining()) {
// get rid of log size
logsBatch.getInt();
int batchSize = logsBatch.getInt();
LOGGER.debug("received logs batch size {} from {}", batchSize, worker.getRemoteAddress());
reusableLog.readRemoteLog(logsBatch);
reusableLog.setLogSource(LogSource.REMOTE);
switch (reusableLog.getLogType()) {
@@ -74,6 +75,8 @@ public void process(ByteBuffer logsBatch, RemoteLogRecord reusableLog, IReplicat
flushLog.setRequester(this);
flushLog.setLogSource(LogSource.REMOTE);
flushLog.setMasterLsn(reusableLog.getLSN());
LOGGER.debug("received master LSN {} for partition {}", reusableLog.getLSN(),
reusableLog.getResourcePartition());
logManager.log(flushLog);
break;
default:
@@ -23,9 +23,12 @@

import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.replication.management.LogReplicationManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ReplicationLogBuffer {

private static final Logger LOGGER = LogManager.getLogger();
private final int logBufferSize;
private final AtomicBoolean full;
private int appendOffset;
@@ -117,6 +120,7 @@ private void internalFlush(int beginOffset, int endOffset) {
private void transferBuffer(ByteBuffer buffer) {
if (buffer.remaining() <= batchSize) {
//the current batch can be sent as it is
LOGGER.debug("sending txn logs batch size {}", buffer.remaining());
replicationManager.transferBatch(buffer);
return;
}
@@ -142,6 +146,7 @@ private void transferBuffer(ByteBuffer buffer) {
//return to the beginning of the batch position
buffer.reset();
}
LOGGER.debug("sending logs slice size {}", buffer.remaining());
replicationManager.transferBatch(buffer);
//return the original limit to check the new remaining size
buffer.limit(totalTransferLimit);
@@ -131,6 +131,9 @@ public void replicate(ILogRecord logRecord) throws InterruptedException {
ackTracker.track(logRecord, new HashSet<>(destinations.keySet()));
}
}
if (logRecord.getLogType() == LogType.FLUSH) {
LOGGER.debug("appending flush lsn {} to replication queue", logRecord.getLSN());
}
appendToLogBuffer(logRecord);
}

@@ -114,6 +114,7 @@ public static void transferBufferToChannel(ISocketChannel socketChannel, ByteBuf
while (requestBuffer.hasRemaining()) {
socketChannel.write(requestBuffer);
}
socketChannel.getSocketChannel().socket().getOutputStream().flush();
}

//unused
@@ -151,6 +151,15 @@ public ByteBuffer getReusableBuffer() {
return outBuffer;
}

@Override
public String getRemoteAddress() {
try {
return socketChannel.getSocketChannel().getRemoteAddress().toString();
} catch (Exception e) {
return "unknown";
}
}

private void handle(ReplicationRequestType requestType) throws HyracksDataException {
final IReplicaTask task =
(IReplicaTask) ReplicationProtocol.readMessage(requestType, socketChannel, inBuffer);
@@ -34,12 +34,15 @@
import org.apache.asterix.replication.management.ReplicationChannel;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.network.ISocketChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* A task to replicate transaction logs from master replica
*/
public class ReplicateLogsTask implements IReplicaTask {

private static final Logger LOGGER = LogManager.getLogger();
public static final int END_REPLICATION_LOG_SIZE = 1;
private final String nodeId;

@@ -61,6 +64,7 @@ public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
logsBuffer = ReplicationProtocol.readRequest(channel, logsBuffer);
// check if it is end of handshake
if (logsBuffer.remaining() == END_REPLICATION_LOG_SIZE) {
LOGGER.info("ending log replication with {}", worker.getRemoteAddress());
break;
}
logsProcessor.process(logsBuffer, reusableLog, worker);

0 comments on commit 16acb99

Please sign in to comment.