Skip to content

Commit

Permalink
Merge pull request #81 from epics-base/cas_nameserver
Browse files Browse the repository at this point in the history
CA server (and thus name server demo) now support searches via TCP
  • Loading branch information
shroffk authored Jul 3, 2024
2 parents bca12fd + 3733444 commit 4b58624
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 43 deletions.
68 changes: 54 additions & 14 deletions src/core/com/cosylab/epics/caj/cas/handlers/SearchResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.logging.Level;

import com.cosylab.epics.caj.cas.CAJServerContext;
import com.cosylab.epics.caj.cas.CASTransport;
import com.cosylab.epics.caj.cas.requests.SearchFailedRequest;
import com.cosylab.epics.caj.cas.requests.SearchRequest;
import com.cosylab.epics.caj.impl.CAConstants;
Expand Down Expand Up @@ -49,20 +50,37 @@ protected void internalHandleResponse(
Transport transport,
ByteBuffer[] response) {

ByteBuffer headerBuffer = response[0];
final int start = headerBuffer.position();
// For a UDP client, BroadcastTransport.processRead will call this with response=[buffer].
// That one buffer holds version info, search header and search payload, positioned at start of payload:
//
// 00 00 00 00 00 01 00 0D 00 00 00 01 00 00 00 00 .... .... .... ....
// 00 06 00 08 00 05 00 0D 00 00 00 01 00 00 00 01 .... .... .... ....
// 54 45 53 54 00 00 00 00 TEST ....
// ^^
//
// For a TCP client, CASTransport.processRead will call this with a response=[headerBuffer, payloadBuffer].
// headerBuffer is positioned at the end, and a payloadBuffer positioned at the start:
//
// 00 06 00 08 00 05 00 0D 00 00 00 01 00 00 00 01 .... .... .... ....
// ^^
// 54 45 53 54 00 00 00 00 TEST ....
// ^^
// We need the payload, which holds the zero-padded channel name
ByteBuffer buffer = response.length == 2 ? response[1] : response[0];

final int start = buffer.position();
final int bufferEnd = start + payloadSize;

// to support multiple messages in one UDP packet
headerBuffer.position(bufferEnd);
buffer.position(bufferEnd);

// check channel name size
if (payloadSize <= 1) {
context.getLogger().fine("Empty channel name search request from: " + responseFrom);
return;
}

String channelName = extractString(headerBuffer, start, payloadSize, false);
String channelName = extractString(buffer, start, payloadSize, false);

// empty name check
if (channelName.length() == 0) {
Expand Down Expand Up @@ -92,8 +110,9 @@ protected void internalHandleResponse(
if (completion == ProcessVariableExistanceCompletion.EXISTS_HERE ||
completion == ProcessVariableExistanceCompletion.DOES_NOT_EXIST_HERE ||
completion.doesExistElsewhere())
{
searchResponse(responseFrom, dataType, dataCount, parameter1, completion);
{ // Reply via TCP?
CASTransport tcp = transport instanceof CASTransport ? (CASTransport) transport : null;
searchResponse(responseFrom, tcp, dataType, dataCount, parameter1, completion);
}
// in case of ProcessVariableExistanceCompletion.ASYNC_COMPLETION callback will call searchResponse method
// in case of null, do nothing
Expand All @@ -102,12 +121,13 @@ protected void internalHandleResponse(
/**
* Respond to search response.
* @param responseFrom
* @param tcp Use TCP CASTransport?
* @param dataType
* @param dataCount
* @param cid
* @param completion
*/
private void searchResponse(InetSocketAddress responseFrom, short dataType, int dataCount, int cid, ProcessVariableExistanceCompletion completion) {
private void searchResponse(InetSocketAddress responseFrom, CASTransport tcp, short dataType, int dataCount, int cid, ProcessVariableExistanceCompletion completion) {

//
// ... respond
Expand All @@ -118,20 +138,40 @@ private void searchResponse(InetSocketAddress responseFrom, short dataType, int
try
{
// TODO prepend version header (if context.getLastReceivedSequenceNumber() != 0)
SearchRequest searchRequest = new SearchRequest(context.getBroadcastTransport(), (short)dataCount, cid);
context.getBroadcastTransport().send(searchRequest, responseFrom);
// UDP includes payload (version) in reply, TCP has no payload
if (tcp == null)
{
SearchRequest searchRequest = new SearchRequest(context.getBroadcastTransport(), null, true, (short)dataCount, cid);
context.getLogger().log(Level.FINE, "UDP EXISTS_HERE search reply");
context.getBroadcastTransport().send(searchRequest, responseFrom);
}
else
{
SearchRequest searchRequest = new SearchRequest(context.getBroadcastTransport(), null, false, (short)dataCount, cid);
context.getLogger().log(Level.FINE, "TCP EXISTS_HERE search reply");
tcp.send(searchRequest.getRequestMessage());
}
} catch (Throwable th) {
context.getLogger().log(Level.WARNING, "Failed to send back search response to: " + responseFrom, th);
}
}
else if (completion.doesExistElsewhere())
{
// send back
// Same comments as for EXISTS_HERE
try
{
// TODO prepend version header (if context.getLastReceivedSequenceNumber() != 0)
SearchRequest searchRequest = new SearchRequest(context.getBroadcastTransport(), completion.getOtherAddress(), (short)dataCount, cid);
context.getBroadcastTransport().send(searchRequest, responseFrom);
if (tcp == null)
{
SearchRequest searchRequest = new SearchRequest(context.getBroadcastTransport(), completion.getOtherAddress(), true, (short)dataCount, cid);
context.getLogger().log(Level.FINE, "UDP EXISTS_ELSEWHERE search reply: " + completion.getOtherAddress());
context.getBroadcastTransport().send(searchRequest, responseFrom);
}
else
{
SearchRequest searchRequest = new SearchRequest(context.getBroadcastTransport(), completion.getOtherAddress(), false, (short)dataCount, cid);
context.getLogger().log(Level.FINE, "TCP EXISTS_ELSEWHERE search reply: " + completion.getOtherAddress());
tcp.send(searchRequest.getRequestMessage());
}
} catch (Throwable th) {
context.getLogger().log(Level.WARNING, "Failed to send back search response to: " + responseFrom, th);
}
Expand Down Expand Up @@ -197,7 +237,7 @@ public ProcessVariableExistanceCallbackImpl(InetSocketAddress responseFrom,
* @see gov.aps.jca.cas.ProcessVariableExistanceCallback#processVariableExistanceTestCompleted(gov.aps.jca.cas.ProcessVariableExistanceCompletion)
*/
public void processVariableExistanceTestCompleted(ProcessVariableExistanceCompletion completion) {
searchResponse(responseFrom, dataType, dataCount, cid, completion);
searchResponse(responseFrom, null /* not TCP */, dataType, dataCount, cid, completion);
}

/* (non-Javadoc)
Expand Down
23 changes: 23 additions & 0 deletions src/core/com/cosylab/epics/caj/cas/handlers/VersionResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@

package com.cosylab.epics.caj.cas.handlers;

import java.util.logging.Level;
import java.util.logging.Logger;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;

import com.cosylab.epics.caj.cas.CAJServerContext;
import com.cosylab.epics.caj.cas.CASTransport;
import com.cosylab.epics.caj.impl.Transport;
import com.cosylab.epics.caj.impl.CAConstants;

/**
* Version (request) handler.
Expand Down Expand Up @@ -60,6 +63,26 @@ protected void internalHandleResponse(
{
// TCP only
((CASTransport)transport).setPriority(dataType);

// By responding to client with version info we indicate support for TCP name search
// https://docs.epics-controls.org/en/latest/internal/ca_protocol.html#tcp-search
// "CA_PROTO_SEARCH messages MUST NOT be sent on a Circuit unless a CA_PROTO_VERSION message has been received indicating >= CA_V412."

ByteBuffer my_response = ByteBuffer.allocate(16);
my_response.putShort((short)0); // Command: Version
my_response.putShort((short)0); // Payload Size: nothing
my_response.putShort((short)0); // Priority
my_response.putShort(CAConstants.CA_MINOR_PROTOCOL_REVISION);
my_response.putInt(0); // Reserved parameter 1
my_response.putInt(0); // Reserved parameter 2
try
{
((CASTransport)transport).send(my_response);
}
catch (Exception ex)
{
Logger.global.log(Level.WARNING, "Server cannot send version response", ex);
}
}
}

Expand Down
30 changes: 12 additions & 18 deletions src/core/com/cosylab/epics/caj/cas/requests/SearchRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,14 @@ public class SearchRequest extends AbstractCARequest {

private static final Logger logger = Logger.getLogger(SearchRequest.class.getName());

/**
* @param transport Transport to use
* @param clientMinorVersion
* @param cid Channel ID
*/
public SearchRequest(Transport transport, short clientMinorVersion, int cid)
{
this(transport, null, clientMinorVersion, cid);
}

/**
* @param transport Transport to use
* @param other_address Optional other address to report, null to use this transport
* @param with_version Include minor version in payload (for UDP), or no payload (for TCP)?
* @param clientMinorVersion
* @param cid Channel ID
*/
public SearchRequest(Transport transport, InetSocketAddress other_address, short clientMinorVersion, int cid) {
public SearchRequest(Transport transport, InetSocketAddress other_address, boolean with_version, short clientMinorVersion, int cid) {
super(transport);

// add minor version payload (aligned by 8)
Expand All @@ -72,17 +63,20 @@ public SearchRequest(Transport transport, InetSocketAddress other_address, short
: other_address.getAddress();
if (serverAddress != null && !serverAddress.isAnyLocalAddress())
serverIP = InetAddressUtil.ipv4AddressToInt(serverAddress);
logger.log(Level.FINE, "Replying to search with " + serverAddress + ":" + port);
}

requestMessage = insertCAHeader(transport, requestMessage,
(short)6, (short)8, (short)port, 0,
(short)6, (short)(with_version ? 8 : 0), (short)port, 0,
serverIP, cid);

requestMessage.putShort(CAConstants.CAS_MINOR_PROTOCOL_REVISION);
// clear rest of the message (security)
requestMessage.putShort((short)0);
requestMessage.putInt(0);
if (with_version)
{
requestMessage.putShort(CAConstants.CAS_MINOR_PROTOCOL_REVISION);
// clear rest of the message (security)
requestMessage.putShort((short)0);
requestMessage.putInt(0);
logger.log(Level.FINE, "Replying to search with " + serverIP + ":" + port + ", minor " + CAConstants.CAS_MINOR_PROTOCOL_REVISION);
}
logger.log(Level.FINE, "Replying to search with " + serverIP + ":" + port + ", no payload");
}

/**
Expand Down
3 changes: 3 additions & 0 deletions src/core/com/cosylab/epics/caj/impl/BroadcastTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ protected void processWrite() {
*/
protected void send(ByteBuffer buffer)
{
// TCP-only search?
if (broadcastAddresses == null)
return;
for (int i = 0; i < broadcastAddresses.length; i++)
{
try
Expand Down
55 changes: 52 additions & 3 deletions src/core/com/cosylab/epics/caj/impl/handlers/SearchResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.cosylab.epics.caj.impl.CATransport;
import com.cosylab.epics.caj.impl.Transport;
import com.cosylab.epics.caj.util.InetAddressUtil;
import com.cosylab.epics.caj.util.HexDump;

/**
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
Expand Down Expand Up @@ -53,21 +54,69 @@ protected void internalHandleResponse(
//

short minorVersion = CAConstants.CA_UNKNOWN_MINOR_PROTOCOL_REVISION;

// Search response via UDP: Only response[0]
// Payload size 8, with only a 'short minor' in payload
//
// Hexdump [response[0] @ 16] size = 24
// 00 06 00 08 13 C8 00 00 FF FF FF FF 00 00 00 01 .... .... .... ....
// 00 0B 00 00 00 00 00 00
// or
// Hexdump [response[0] @ 32] size = 40
// 00 00 00 00 00 01 00 0D 00 00 00 01 00 00 00 00 .... .... .... ....
// 00 06 00 08 13 C8 00 00 FF FF FF FF 00 00 00 01 .... .... .... ....
// 00 0D 00 00 00 00 00 00 .... ....

// Search response via TCP: Two buffers
// No payload, second buffer is empty
//
// Hexdump [response[0] @ 16] size = 16
// 00 06 00 00 13 C8 00 00 FF FF FF FF 00 00 00 01 .... .... .... ....
// Hexdump [response[1] @ 0] size = 0

// Earlier CAJ server server added the 8 byte payload to TCP response,
// which caused client to crash.
// Now handle that as well in client.
// Hexdump [response[0] @ 16] size = 16
// 00 06 00 08 13 C8 00 00 FF FF FF FF 00 00 00 01 .... .... .... ....
// Hexdump [response[1] @ 0] size = 8
// 00 0B 00 00 00 00 00 00

// System.out.println("Client received search response");
// System.out.println(response[0]);
// if (response.length > 1)
// System.out.println(response[1]);
//
// byte[] data = new byte[response[0].limit()];
// for (int i=0; i<response[0].limit(); ++i)
// data[i] = response[0].get(i);
// HexDump.hexDump("response[0] @ " + response[0].position(), data, 0, response[0].limit());
// if (response.length > 1)
// {
// data = new byte[response[1].limit()];
// for (int i=0; i<response[1].limit(); ++i)
// data[i] = response[1].get(i);
// HexDump.hexDump("response[1] @ " + response[1].position(), data, 0, response[1].limit());
// }

// Starting with CA V4.1 the minor version number is
// appended to the end of each search reply.
int payloadStart = response[0].position();
if (payloadSize >= 2 /* short size = 2 bytes */)
{
// UDP response (all in buffer 0)
minorVersion = response[0].getShort();
// UDP response (all in buffer 0), or TCP (payload in buffer 1)?
if (response.length == 1)
minorVersion = response[0].getShort();
else
minorVersion = response[1].getShort();
} else if(transport instanceof CATransport) {
// for TCP transport use already provided version
minorVersion = transport.getMinorRevision();
}

// read rest of the playload (needed for UDP)
response[0].position(payloadStart + payloadSize);
if (response.length == 1)
response[0].position(payloadStart + payloadSize);

// signed short conversion -> signed int
int port = dataType & 0xFFFF;
Expand Down
25 changes: 17 additions & 8 deletions test/com/cosylab/epics/caj/cas/test/CANameServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,27 @@
*
* Example usage:
*
* In one terminal,
* export EPICS_CA_SERVER_PORT=9876
* then run an IOC database with a record named "ramp".
* In one terminal, run an IOC database with a record named "ramp"
* under a non-default UDP and TCP port:
* export EPICS_CA_SERVER_PORT=9876
* softIoc -d test/resources/ramp.db
*
* In another terminal,
* caget ramp
* will not be able to connect.
* In another terminal, check that
* caget ramp
* will NOT be able to connect because it searches by default
* via UDP port 5064, while the IOC runs on 9876 (UDP and TCP).
*
* Now run this CANameServer on the same host,
* java -cp target/classes:target/test-classes -DCAJ_DEBUG=true com.cosylab.epics.caj.cas.test.CANameServer
* and try `caget ramp` again.
* It will reach the name server, which replies with 127.0.0.1, port 9876
* to the seach request and client can then reach the IOC.
* The client will reach the name server via UDP 5064.
* The name server replies with 127.0.0.1, port 9876
* to the search request and client can then reach the IOC.
*
* The CANameServer also supports searches via TCP:
* export EPICS_CA_NAME_SERVERS=127.0.0.1
* export EPICS_CA_AUTO_ADDR_LIST=no
* caget ramp
*/
public class CANameServer
{
Expand Down
Loading

0 comments on commit 4b58624

Please sign in to comment.