Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] N2N TxSubmission #63

Merged
merged 31 commits into from
Apr 13, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
8ccd4c1
chore: add some logging
nemo83 Mar 30, 2024
eaefd9e
chore: intermediate work
nemo83 Mar 30, 2024
604bcbb
chore: test 1
nemo83 Mar 30, 2024
d4b07f6
chore: stuff
nemo83 Mar 30, 2024
f6f68e8
chore: adding tx logic
nemo83 Mar 30, 2024
c433f13
chore: different serialization
nemo83 Mar 30, 2024
2345231
chore: logs
nemo83 Apr 1, 2024
11f24a9
chore: submit tx still not working
nemo83 Apr 1, 2024
74c0e8a
chore: it works
nemo83 Apr 2, 2024
eca480e
chore: cleaned up a bit the code
nemo83 Apr 2, 2024
293ae10
chore: less logging
nemo83 Apr 2, 2024
b3490b9
chore: initial comit
nemo83 Apr 2, 2024
f6f9971
chore: cleaning
nemo83 Apr 2, 2024
13361ea
chore: less logs
nemo83 Apr 2, 2024
397dac0
chore: vector
nemo83 Apr 2, 2024
68d8805
chore: concurrent
nemo83 Apr 2, 2024
aa8b1b5
chore: pendingTx should be used instead of map to check agent state
nemo83 Apr 2, 2024
8391916
chore: fix comodification
nemo83 Apr 2, 2024
e2733a9
chore: clear data structures on reset
nemo83 Apr 2, 2024
8ee076c
chore: handshake
nemo83 Apr 2, 2024
507d91d
chore: added keep alive
nemo83 Apr 2, 2024
74162c0
chore: switched from Map of Tx to Vector to keep order of tx
nemo83 Apr 5, 2024
36edfc5
chore: removed unused commented code
nemo83 Apr 5, 2024
efc9e7b
chore: rolled back unwanted changes
nemo83 Apr 5, 2024
94c0a78
chore: rolled back unwanted changes
nemo83 Apr 5, 2024
1960988
chore: cleaned up code
nemo83 Apr 5, 2024
e6de692
chore: replaced vector w/ concurredlinkedqueue
nemo83 Apr 8, 2024
6cb8bf6
chore: replaced TransactionUtil w/ TxUtil for tx hash derivation
nemo83 Apr 8, 2024
77ffa99
chore: added era management
nemo83 Apr 9, 2024
483cd69
chore: added listener and integration test
nemo83 Apr 10, 2024
9ed1083
chore: removed temporary integration test
nemo83 Apr 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
import lombok.extern.slf4j.Slf4j;

import java.net.SocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

@Slf4j
public abstract class NodeClient {
Expand Down Expand Up @@ -69,10 +66,7 @@ public void start() {

configureChannel(b);

List<Agent> allAgents = Arrays.stream(agents).collect(Collectors.toList());
satran004 marked this conversation as resolved.
Show resolved Hide resolved
allAgents.add(0, handshakeAgent);

b.handler(new ChannelInitializer<Channel>() {
b.handler(new ChannelInitializer<>() {

@Override
public void initChannel(Channel ch)
Expand All @@ -84,8 +78,7 @@ public void initChannel(Channel ch)
}
});

SocketAddress socketAddress = null;
socketAddress = createSocketAddress();
SocketAddress socketAddress = createSocketAddress();

session = new Session(socketAddress, b, handshakeAgent, agents);
session.setSessionListener(sessionListener);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
package com.bloxbean.cardano.yaci.core.protocol.txsubmission;

import com.bloxbean.cardano.yaci.core.common.TxBodyType;
import com.bloxbean.cardano.yaci.core.protocol.Agent;
import com.bloxbean.cardano.yaci.core.protocol.Message;
import com.bloxbean.cardano.yaci.core.protocol.txsubmission.messges.*;
import com.bloxbean.cardano.yaci.core.protocol.txsubmission.model.TxSubmissionRequest;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;

@Slf4j
public class TxSubmissionAgent extends Agent {
private final Map<String, byte[]> txs;
private final List<String> reqTxIds;
private final List<String> reqNonBlockingTxIds;
public class TxSubmissionAgent extends Agent<TxSubmissionListener> {
// txs should be stored in a thread-safe, ordered (tx dependency/chaining) data structure.
private final ConcurrentLinkedQueue<TxSubmissionRequest> txs;
/**
* Is the queue of TX received from client
*/
private final ConcurrentLinkedQueue<String> pendingTxIds;
/**
* It's the temporary list of TX ids requested from Server
*/
private final ConcurrentLinkedQueue<String> requestedTxIds;

public TxSubmissionAgent() {
this.currenState = TxSubmissionState.Init;
txs = new HashMap<>();
reqTxIds = new ArrayList<>();
reqNonBlockingTxIds = new ArrayList<>();
this.txs = new ConcurrentLinkedQueue<>();
this.pendingTxIds = new ConcurrentLinkedQueue<>();
this.requestedTxIds = new ConcurrentLinkedQueue<>();
}

@Override
Expand All @@ -34,8 +41,7 @@ public Message buildNextMessage() {
case Init:
return new Init();
case TxIdsNonBlocking:
return getReplyTxIds();
case TxIdsBlocking:
case TxIdsBlocking:
return getReplyTxIds();
case Txs:
return getReplyTxs();
Expand All @@ -44,54 +50,139 @@ public Message buildNextMessage() {
}
}

private Optional<TxSubmissionRequest> findTxIdAndHash(String id) {
return txs.stream().filter(txSubmissionRequest -> txSubmissionRequest.getTxHash().equals(id)).findAny();
}

private Optional<TxSubmissionRequest> removeTxIdAndHash(String id) {
var txIdAndHashOpt = txs.stream().filter(txSubmissionRequest -> txSubmissionRequest.getTxHash().equals(id)).findAny();
txIdAndHashOpt.ifPresent(txs::remove);
return txIdAndHashOpt;
}

private ReplyTxIds getReplyTxIds() {
if (txs != null && txs.size() > 0) {
if (!pendingTxIds.isEmpty()) {
ReplyTxIds replyTxIds = new ReplyTxIds();
txs.forEach((id, txBytes) -> {
replyTxIds.addTxId(id, txBytes.length);
});
// Not limiting how many txs to add, as pendingTxIds should be already capped to num of req txs
pendingTxIds
.stream()
.flatMap(id -> findTxIdAndHash(id).stream())
.forEach(txSubmissionRequest -> replyTxIds.addTxId(txSubmissionRequest.getTxHash(), txSubmissionRequest.getTxnBytes().length));
if (log.isDebugEnabled())
log.debug("TxIds: {}", replyTxIds.getTxIdAndSizeMap().size());
return replyTxIds;
} else
return new ReplyTxIds();
}
return new ReplyTxIds();
}

private ReplyTxs getReplyTxs() {
if (reqTxIds.isEmpty())
if (requestedTxIds.isEmpty())
return new ReplyTxs();

ReplyTxs replyTxs = new ReplyTxs();
for (String txId: reqTxIds) {
byte[] tx = txs.get(txId);
replyTxs.addTx(tx);
}
requestedTxIds.forEach(txId -> removeTxIdAndHash(txId).ifPresent(txSubmissionRequest -> replyTxs.addTx(txSubmissionRequest.getTxnBytes())));

// Ids of requested TXs don't seem to be acked from server.
// Removing them right away now.
requestedTxIds.forEach(pendingTxIds::remove);
if (log.isDebugEnabled())
log.debug("Txs: {}", replyTxs.getTxns().size());
return replyTxs;
}

@Override
public void processResponse(Message message) {
if (message == null) return;
if (message instanceof RequestTxIds) {
if (((RequestTxIds) message).isBlocking()) {
handleRequestTxIdsBlocking((RequestTxIds) message);

if (message instanceof Init) {
log.warn("init");
} else if (message instanceof RequestTxIds) {
var requestTxIds = (RequestTxIds) message;
if (requestTxIds.isBlocking()) {
if (log.isDebugEnabled())
log.debug("RequestTxIds - Blocking, ack: {}, req: {}", requestTxIds.getAckTxIds(), requestTxIds.getReqTxIds());
handleRequestTxIdsBlocking(requestTxIds);
} else {
handleRequestTxIdsNonBlocking((RequestTxIds) message);
if (log.isDebugEnabled())
log.debug("RequestTxIds - NonBlocking, ack: {}, req: {}", requestTxIds.getAckTxIds(), requestTxIds.getReqTxIds());
handleRequestTxIdsNonBlocking(requestTxIds);
}
} else if (message instanceof RequestTxs) {
handleRequestTxs((RequestTxs) message);
}
}

private void handleRequestTxs(RequestTxs requestTxs) {
log.info("RequestTxs >>" + requestTxs);
requestedTxIds.clear();
requestedTxIds.addAll(requestTxs.getTxIds());
getAgentListeners().forEach(listener -> listener.handleRequestTxs(requestTxs));
}

private void handleRequestTxIdsNonBlocking(RequestTxIds requestTxIds) {
log.info("RequestTxIdsNonBlocking >> " + requestTxIds);
// process ack
removeAcknowledgedTxs(requestTxIds.getAckTxIds());
addTxToQueue(requestTxIds.getReqTxIds());
getAgentListeners().forEach(listener -> listener.handleRequestTxIdsNonBlocking(requestTxIds));
}

private void handleRequestTxIdsBlocking(RequestTxIds requestTxIds) {
log.info("RequestTxIdsBlocking >> " + requestTxIds);
// process ack
removeAcknowledgedTxs(requestTxIds.getAckTxIds());
addTxToQueue(requestTxIds.getReqTxIds());
getAgentListeners().forEach(listener -> listener.handleRequestTxIdsBlocking(requestTxIds));
}

private void addTxToQueue(int numTxToAdd) {
// pendingTxIds size can't exceed numTxToAdd
var txToAdd = numTxToAdd - pendingTxIds.size();
if (!txs.isEmpty()) {
txs.stream()
.map(TxSubmissionRequest::getTxHash)
.filter(txHash -> !pendingTxIds.contains(txHash))
.limit(txToAdd)
.forEach(pendingTxIds::add);
} else {
if (log.isDebugEnabled())
log.debug("Nothing to do, txs is empty");
}
}

private void addTxToQueue(String txHash) {
if (!pendingTxIds.contains(txHash)) {
pendingTxIds.add(txHash);
}
}

private void removeAcknowledgedTxs(int numAcknowledgedTransactions) {
if (numAcknowledgedTransactions > 0) {
var numTxToRemove = Math.min(numAcknowledgedTransactions, pendingTxIds.size());
for (int i = 0; i < numTxToRemove; i++) {
var txHash = pendingTxIds.poll();
if (txHash != null) {
// remove from map
removeTxIdAndHash(txHash);
// removed from queue
pendingTxIds.remove(txHash);
}
}

}

}

public void enqueueTransaction(String txHash, byte[] txBytes, TxBodyType txBodyType) {
if (txs.stream().anyMatch(txSubmissionRequest -> txSubmissionRequest.getTxHash().equals(txHash))) {
return;
}
txs.add(TxSubmissionRequest.builder().txHash(txHash).txnBytes(txBytes).txBodyType(txBodyType).build());
if (TxSubmissionState.TxIdsBlocking.equals(currenState)) {
addTxToQueue(txHash);
this.sendNextMessage();
}
}

public boolean hasPendingTx() {
return !pendingTxIds.isEmpty();
}

@Override
Expand All @@ -101,6 +192,9 @@ public boolean isDone() {

@Override
public void reset() {
txs.clear();
pendingTxIds.clear();
requestedTxIds.clear();
this.currenState = TxSubmissionState.Init;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.bloxbean.cardano.yaci.core.protocol.txsubmission;

import com.bloxbean.cardano.yaci.core.protocol.AgentListener;
import com.bloxbean.cardano.yaci.core.protocol.txsubmission.messges.RequestTxIds;
import com.bloxbean.cardano.yaci.core.protocol.txsubmission.messges.RequestTxs;

public interface TxSubmissionListener extends AgentListener {
void handleRequestTxs(RequestTxs requestTxs);

void handleRequestTxIdsNonBlocking(RequestTxIds requestTxIds);

void handleRequestTxIdsBlocking(RequestTxIds requestTxIds);
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
package com.bloxbean.cardano.yaci.core.protocol.txsubmission.messges;

import com.bloxbean.cardano.yaci.core.protocol.Message;
import com.bloxbean.cardano.yaci.core.protocol.localstate.api.Era;
import com.bloxbean.cardano.yaci.core.protocol.txsubmission.serializers.TxSubmissionMessagesSerializers;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;

import java.util.HashMap;
import java.util.Map;

@Getter
@AllArgsConstructor
@NoArgsConstructor
@Builder
@RequiredArgsConstructor
public class ReplyTxIds implements Message {

private final Era era;

private Map<String, Integer> txIdAndSizeMap;

public ReplyTxIds() {
this(Era.Babbage);
}

public void addTxId(String id, int size) {
if (txIdAndSizeMap == null)
txIdAndSizeMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
package com.bloxbean.cardano.yaci.core.protocol.txsubmission.messges;

import com.bloxbean.cardano.yaci.core.protocol.Message;
import com.bloxbean.cardano.yaci.core.protocol.localstate.api.Era;
import com.bloxbean.cardano.yaci.core.protocol.txsubmission.serializers.TxSubmissionMessagesSerializers;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;

import java.util.ArrayList;
import java.util.List;

@Getter
@AllArgsConstructor
@NoArgsConstructor
@Builder
@RequiredArgsConstructor
public class ReplyTxs implements Message {

private final Era era;

private List<byte[]> txns;

public ReplyTxs() {
this(Era.Babbage);
}

public void addTx(byte[] tx) {
if (txns == null)
txns = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.bloxbean.cardano.yaci.core.protocol.txsubmission.model;

import com.bloxbean.cardano.client.transaction.util.TransactionUtil;
import com.bloxbean.cardano.yaci.core.common.TxBodyType;
import com.bloxbean.cardano.yaci.core.util.HexUtil;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;

@AllArgsConstructor
@Getter
@Builder
public class TxSubmissionRequest {

private final TxBodyType txBodyType;
private final byte[] txnBytes;
private final String txHash;

public TxSubmissionRequest(byte[] txnBytes) {
this(TxBodyType.BABBAGE, txnBytes);
}

public TxSubmissionRequest(TxBodyType txBodyType, byte[] txnBytes) {
if (txnBytes == null)
throw new RuntimeException("TxBytes can't be null");

this.txBodyType = txBodyType;
this.txnBytes = txnBytes;
this.txHash = TransactionUtil.getTxHash(txnBytes);
}

@Override
public String toString() {
return "TxSubmissionRequest{" +
"txBodyType=" + txBodyType +
", txnBytes=" + (txnBytes != null? HexUtil.encodeHexString(txnBytes) : "") +
", txnHash=" + txHash +
'}';
}
}
Loading
Loading