Skip to content

Commit

Permalink
1. fixes on the buffer to send back info in accepting conn
Browse files Browse the repository at this point in the history
2. fix for UcxConnectionTest.testEstablishConnection, it's now working
to test the UcxConnection establishment logics
  • Loading branch information
lucyge2022 committed Nov 3, 2023
1 parent 653da91 commit 131441d
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ public static UcxConnection initNewConnection(InetSocketAddress remoteAddr, UcpW
.setSocketAddress(remoteAddr));

UcxConnection newConnection = new UcxConnection();
newConnection.setRemoteAddress(remoteAddr);
newConnection.setTagToReceive(mTagGenerator.incrementAndGet());
// generate tag to recv from remote, build up connectionEstablishBuf

Expand Down Expand Up @@ -320,6 +321,7 @@ public static UcxConnection acceptIncomingConnection(
newConnection.setEndpoint(remoteEp);

// build establishConnBuf
establishConnBuf.clear();
AlluxioUcxUtils.writeConnectionMetadata(newConnection.getTagToSend(),
newConnection.getTagToReceive(),establishConnBuf, worker);
establishConnBuf.clear();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package alluxio.worker.ucx;

import alluxio.concurrent.jsr.CompletableFuture;
import alluxio.conf.PropertyKey;
import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions;

import org.apache.log4j.PropertyConfigurator;
import org.junit.BeforeClass;
import org.junit.Test;
import org.openucx.jucx.ucp.UcpConnectionRequest;
Expand All @@ -14,16 +18,25 @@
import org.openucx.jucx.ucp.UcpParams;
import org.openucx.jucx.ucp.UcpWorker;
import org.openucx.jucx.ucp.UcpWorkerParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Properties;

public class UcxConnectionTest {
public static UcpContext sGlobalContext;
private static Logger LOG = LoggerFactory.getLogger(UcxConnectionTest.class);


@BeforeClass
public static void initContext() {
System.out.println("start initContext...");
// PropertyConfigurator.configure("/root/github/alluxio/conf/log4j.properties");
// Properties props = new Properties();
// props.setProperty(PropertyKey.LOGGER_TYPE.toString(), "Console");
// System.out.println("start initContext...");

sGlobalContext = new UcpContext(new UcpParams()
.requestStreamFeature()
.requestTagFeature()
Expand All @@ -41,13 +54,14 @@ public void testEstablishConnection() throws Exception {
.setConnectionHandler(new UcpListenerConnectionHandler() {
@Override
public void onConnectionRequest(UcpConnectionRequest connectionRequest) {
System.out.println("Got incoming req...");
incomingConn.complete(connectionRequest);
// mConnectionRequests.offer(connectionRequest);
}
});
InetSocketAddress remoteAddr = new InetSocketAddress(localAddr, serverPort);
UcpListener ucpListener = serverWorker.newListener(
listenerParams.setSockAddr(remoteAddr));
System.out.println("Bound UcpListener on address:" + ucpListener.getAddress());
Thread serverThread = new Thread(() -> {
try {
while (serverWorker.progress() == 0) {
Expand Down Expand Up @@ -77,5 +91,4 @@ public void onConnectionRequest(UcpConnectionRequest connectionRequest) {
System.out.println("Conn established to server:" + connToServer.toString());
serverThread.join();
}

}

0 comments on commit 131441d

Please sign in to comment.