Skip to content

Commit

Permalink
- Returning list of PingData elements when return_entire_cache is true (
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jan 31, 2024
1 parent f4a70ce commit 40a615e
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 47 deletions.
118 changes: 90 additions & 28 deletions src/org/jgroups/protocols/Discovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;


/**
Expand Down Expand Up @@ -158,6 +159,8 @@ public void discoveryRequestReceived(Address sender, String logical_name, Physic
public boolean useDiskCache() {return use_disk_cache;}
public <T extends Discovery> T useDiskCache(boolean flag) {use_disk_cache=flag; return (T)this;}
public <T extends Discovery> T discoveryRspExpiryTime(long t) {this.discovery_rsp_expiry_time=t; return (T)this;}
public boolean sendCacheOnJoin() {return send_cache_on_join;}
public <T extends Discovery> T sendCacheOnJoin(boolean b) {this.send_cache_on_join=b; return (T)this;}



Expand Down Expand Up @@ -191,7 +194,6 @@ public void resetStats() {
num_discovery_requests=0;
}


public void addResponse(Responses rsp) {
synchronized(ping_responses) {
ping_responses.put(System.nanoTime(), rsp);
Expand Down Expand Up @@ -315,11 +317,8 @@ public void up(MessageBatch batch) {
}

protected Object handle(PingHeader hdr, Message msg) {
PingData data=readPingData(msg.getArray(), msg.getOffset(), msg.getLength());
Address logical_addr=data != null? data.getAddress() : msg.getSrc();

List<PingData> data=readPingData(msg.getArray(), msg.getOffset(), msg.getLength());
switch(hdr.type) {

case PingHeader.GET_MBRS_REQ: // return Rsp(local_addr, coord)
if(cluster_name == null || hdr.cluster_name == null) {
log.warn("cluster_name (%s) or cluster_name of header (%s) is null; passing up discovery " +
Expand All @@ -328,48 +327,53 @@ protected Object handle(PingHeader hdr, Message msg) {
else if(!cluster_name.equals(hdr.cluster_name)) {
log.warn("%s: discarding discovery request for cluster '%s' from %s; " +
"our cluster name is '%s'. Please separate your clusters properly",
logical_addr, hdr.cluster_name, msg.getSrc(), cluster_name);
msg.src(), hdr.cluster_name, msg.getSrc(), cluster_name);
return null;
}

// add physical address and logical name of the discovery sender (if available) to the cache
if(data != null) { // null if use_ip_addrs is true
addDiscoveryResponseToCaches(logical_addr, data.getLogicalName(), data.getPhysicalAddr());
discoveryRequestReceived(msg.getSrc(), data.getLogicalName(), data.getPhysicalAddr());
addResponse(data, false);
for(PingData pd: data) {
Address logical_addr=pd != null? pd.getAddress() : msg.getSrc();
addDiscoveryResponseToCaches(logical_addr, pd.getLogicalName(), pd.getPhysicalAddr());
discoveryRequestReceived(msg.getSrc(), pd.getLogicalName(), pd.getPhysicalAddr());
addResponse(pd, false);
}
}

if(return_entire_cache) {
Map<Address,PhysicalAddress> cache=(Map<Address,PhysicalAddress>)down(new Event(Event.GET_LOGICAL_PHYSICAL_MAPPINGS));
if(cache != null) {
List<PingData> rsps=new ArrayList<>();
for(Map.Entry<Address,PhysicalAddress> entry: cache.entrySet()) {
Address addr=entry.getKey();
// JGRP-1492: only return our own address, and addresses in view.
if(addr.equals(local_addr) || (view != null && view.containsMember(addr))) {
PhysicalAddress physical_addr=entry.getValue();
sendDiscoveryResponse(addr, physical_addr, NameCache.get(addr), msg.getSrc(), isCoord(addr));
String name=NameCache.get(addr);
rsps.add(new PingData(addr, is_server, name, physical_addr).coord(isCoord(addr)));
}
}
sendDiscoveryResponse(rsps, msg.src());
}
return null;
}

// Only send a response if hdr.mbrs is not empty and contains myself. Otherwise always send my info
Collection<? extends Address> mbrs=data != null? data.mbrs() : null;
boolean drop_because_of_rank=max_rank_to_reply > 0 && hdr.initialDiscovery() && Util.getRank(view, local_addr) > max_rank_to_reply;
if(drop_because_of_rank || (mbrs != null && !mbrs.contains(local_addr)))
if(max_rank_to_reply > 0 && hdr.initialDiscovery() && Util.getRank(view, local_addr) > max_rank_to_reply)
return null;
// Only send a response if hdr.mbrs is not empty and contains myself. Otherwise, always send my info
if(data.stream().anyMatch(pd -> pd.mbrs() != null && pd.mbrs().contains(local_addr)))
return null;
PhysicalAddress physical_addr=(PhysicalAddress)down(new Event(Event.GET_PHYSICAL_ADDRESS, local_addr));
sendDiscoveryResponse(local_addr, physical_addr, NameCache.get(local_addr), msg.getSrc(), is_coord);
PingData p=new PingData(local_addr, is_server, NameCache.get(local_addr), physical_addr).coord(is_coord);
sendDiscoveryResponse(List.of(p), msg.src());
return null;

case PingHeader.GET_MBRS_RSP:
// add physical address (if available) to transport's cache
if(data != null) {
// find the top discovery prot and deliver the response: https://issues.redhat.com/browse/JGRP-2230
Discovery d=findTopmostDiscoveryProtocol();
log.trace("%s: received GET_MBRS_RSP from %s: %s%s",
local_addr, msg.getSrc(), data,
log.trace("%s: received GET_MBRS_RSP from %s: %s%s", local_addr, msg.getSrc(), print(data),
d != this? ", delivering it to " + d.getClass().getSimpleName() : "");
d.handleDiscoveryResponse(data, msg.getSrc());
}
Expand Down Expand Up @@ -400,8 +404,18 @@ protected Discovery findTopmostDiscoveryProtocol() {
return ret;
}

protected void handleDiscoveryResponse(List<PingData> data, Address sender) {
// add physical address (if available) to transport's cache
if(data != null) {
for(PingData pd: data)
handleDiscoveryResponse(pd, sender);
}
}

protected void handleDiscoveryResponse(PingData data, Address sender) {
// add physical address (if available) to transport's cache
if(data == null)
return;
Address logical_addr=data.getAddress() != null? data.getAddress() : sender;
addDiscoveryResponseToCaches(logical_addr, data.getLogicalName(), data.getPhysicalAddr());
boolean overwrite=Objects.equals(logical_addr, sender);
Expand Down Expand Up @@ -596,33 +610,55 @@ protected synchronized void startCacheDissemination(List<Address> curr_mbrs, Lis
* @param data the PingData instance to serialize.
* @return
*/
protected byte[] serializeWithoutView(PingData data) {
final PingData clone = new PingData(data.getAddress(), data.isServer(), data.getLogicalName(), data.getPhysicalAddr()).coord(data.isCoord());
protected ByteArray serializeWithoutView(PingData data) {
final PingData clone=new PingData(data.getAddress(), data.isServer(), data.getLogicalName(), data.getPhysicalAddr())
.coord(data.isCoord());
try {
return Util.streamableToByteBuffer(clone);
return marshal(clone);
}
catch(Exception e) {
log.error(Util.getMessage("ErrorSerializingPingData"), e);
return null;
}
}

protected static PingData deserialize(final byte[] data) throws Exception {
return Util.streamableFromByteBuffer(PingData::new, data);
protected static List<PingData> deserialize(final byte[] data, int offset, int length) throws Exception {
ByteArrayDataInputStream in=new ByteArrayDataInputStream(data, offset, length);
int num=in.readInt();
if(num == 0)
return null;
List<PingData> list=new ArrayList<>(num);
for(int i=0; i < num; i++) {
PingData pd=new PingData();
pd.readFrom(in);
list.add(pd);
}
return list;
}

public static ByteArray marshal(PingData data) {
public static ByteArray marshal(PingData ... list) {
return marshal(List.of(list));
}

public static ByteArray marshal(List<PingData> list) {
try {
return Util.streamableToBuffer(data);
int size=Integer.BYTES + list.stream().mapToInt(PingData::serializedSize).sum();
ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(size);
out.writeInt(list.size());
for(PingData data: list)
data.writeTo(out);
return out.getBuffer();
}
catch(Exception e) {
return null;
}
}

protected PingData readPingData(byte[] buffer, int offset, int length) {
protected List<PingData> readPingData(byte[] buffer, int offset, int length) {
try {
return buffer != null? Util.streamableFromBuffer(PingData::new, buffer, offset, length) : null;
if(buffer == null)
return null;
return deserialize(buffer, offset, length);
}
catch(Exception ex) {
log.error("%s: failed reading PingData from message: %s", local_addr, ex);
Expand Down Expand Up @@ -652,6 +688,32 @@ protected void sendDiscoveryResponse(Address logical_addr, PhysicalAddress physi
down_prot.down(rsp_msg);
}

protected void sendDiscoveryResponse(List<PingData> list, final Address sender) {
Message rsp_msg=new BytesMessage(sender).setFlag(Message.Flag.OOB, Message.Flag.DONT_BUNDLE)
.putHeader(this.id, new PingHeader(PingHeader.GET_MBRS_RSP)).setArray(marshal(list));

if(stagger_timeout > 0) {
int view_size=view != null? view.size() : 10;
int rank=Util.getRank(view, local_addr); // returns 0 if view or local_addr are null
long sleep_time=rank == 0? Util.random(stagger_timeout)
: stagger_timeout * rank / view_size - (stagger_timeout / view_size);
timer.schedule(() -> {
log.trace("%s: received GET_MBRS_REQ from %s, sending staggered response %s", local_addr, sender, print(list));
down_prot.down(rsp_msg);
}, sleep_time, TimeUnit.MILLISECONDS, sends_can_block);
return;
}

log.trace("%s: received GET_MBRS_REQ from %s, sending response %s", local_addr, sender, print(list));
down_prot.down(rsp_msg);
}

protected static String print(List<PingData> list) {
if(list == null)
return "null";
return list.stream().map(PingData::toString).collect(Collectors.joining("\n"));
}

protected static String addressAsString(Address address) {
if(address == null)
return "";
Expand Down Expand Up @@ -689,7 +751,7 @@ protected void disseminateDiscoveryInformation(List<Address> current_mbrs, List<

// 2. Send information about new_mbrs to <everyone - self - left_mbrs - new_mbrs>
Set<Address> targets=new HashSet<>(current_mbrs);
targets.removeAll(new_mbrs);
new_mbrs.forEach(targets::remove);

if(!targets.isEmpty()) {
for(Address addr : new_mbrs) {
Expand Down
31 changes: 20 additions & 11 deletions src/org/jgroups/protocols/JDBC_PING.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.jgroups.Address;
import org.jgroups.annotations.Property;
import org.jgroups.util.ByteArray;
import org.jgroups.util.Responses;
import org.jgroups.util.Util;

Expand Down Expand Up @@ -218,13 +219,17 @@ protected void readAll(Connection connection, List<Address> members, String clus
while(resultSet.next()) {
byte[] bytes=resultSet.getBytes(1);
try {
PingData data=deserialize(bytes);
reads++;
if(data == null || (members != null && !members.contains(data.getAddress())))
continue;
rsps.addResponse(data, false);
if(local_addr != null && !local_addr.equals(data.getAddress()))
addDiscoveryResponseToCaches(data.getAddress(), data.getLogicalName(), data.getPhysicalAddr());
List<PingData> list=readPingData(bytes, 0, bytes.length);
if(list != null) {
for(PingData data: list) {
reads++;
if(data == null || (members != null && !members.contains(data.getAddress())))
continue;
rsps.addResponse(data, false);
if(local_addr != null && !local_addr.equals(data.getAddress()))
addDiscoveryResponseToCaches(data.getAddress(), data.getLogicalName(), data.getPhysicalAddr());
}
}
}
catch(Exception e) {
int row=resultSet.getRow();
Expand Down Expand Up @@ -305,11 +310,11 @@ protected Connection getConnection() {


protected synchronized void insert(Connection connection, PingData data, String clustername, String address) throws SQLException {
final byte[] serializedPingData = serializeWithoutView(data);
final ByteArray serializedPingData = serializeWithoutView(data);
try (PreparedStatement ps=connection.prepareStatement(insert_single_sql)) {
ps.setString(1, address);
ps.setString(2, clustername);
ps.setBytes(3, serializedPingData);
ps.setBytes(3, serializedPingData.getBytes());
if(log.isTraceEnabled())
log.trace("%s: SQL for insertion: %s", local_addr, ps);
ps.executeUpdate();
Expand Down Expand Up @@ -492,8 +497,12 @@ public static void main(String[] args) throws ClassNotFoundException {
while(resultSet.next()) {
byte[] bytes=resultSet.getBytes(1);
try {
PingData data=deserialize(bytes);
System.out.printf("%d %s\n", index++, data);
List<PingData> list=deserialize(bytes, 0, bytes.length);
if(list != null) {
for(PingData data: list) {
System.out.printf("%d %s\n", index++, data);
}
}
}
catch(Exception e) {
}
Expand Down
10 changes: 3 additions & 7 deletions src/org/jgroups/protocols/PingData.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,9 @@ public boolean isServer() {
return Util.isFlagSet(flags, is_server) || Util.isFlagSet(flags, is_coord); // a coord is always a server
}

public Address getAddress() {
return sender;
}

public String getLogicalName() {
return logical_name;
}
public Address getSender() {return sender;}
public Address getAddress() {return sender;}
public String getLogicalName() {return logical_name;}

public PhysicalAddress getPhysicalAddr() {return physical_addr;}
public PingData mbrs(Collection<? extends Address> mbrs) {this.mbrs=mbrs; return this;}
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/RED.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void resetStats() {
}

public Object down(Message msg) {
if(enabled) {
if(enabled && bundler != null) {
int current_queue_size=bundler.getQueueSize();
double avg;
lock.lock();
Expand Down

0 comments on commit 40a615e

Please sign in to comment.