Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support specified naming UDP push port for client #5439

Merged
merged 2 commits into from
Apr 22, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 28 additions & 26 deletions api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,63 +22,65 @@
* @author Nacos
*/
public class PropertyKeyConst {

public static final String IS_USE_CLOUD_NAMESPACE_PARSING = "isUseCloudNamespaceParsing";

public static final String IS_USE_ENDPOINT_PARSING_RULE = "isUseEndpointParsingRule";

public static final String ENDPOINT = "endpoint";

public static final String ENDPOINT_PORT = "endpointPort";

public static final String NAMESPACE = "namespace";

public static final String USERNAME = "username";

public static final String PASSWORD = "password";

public static final String ACCESS_KEY = "accessKey";

public static final String SECRET_KEY = "secretKey";

public static final String RAM_ROLE_NAME = "ramRoleName";

public static final String SERVER_ADDR = "serverAddr";

public static final String CONTEXT_PATH = "contextPath";

public static final String CLUSTER_NAME = "clusterName";

public static final String ENCODE = "encode";

public static final String CONFIG_LONG_POLL_TIMEOUT = "configLongPollTimeout";

public static final String CONFIG_RETRY_TIME = "configRetryTime";

public static final String MAX_RETRY = "maxRetry";

public static final String ENABLE_REMOTE_SYNC_CONFIG = "enableRemoteSyncConfig";

public static final String NAMING_LOAD_CACHE_AT_START = "namingLoadCacheAtStart";

public static final String NAMING_CLIENT_BEAT_THREAD_COUNT = "namingClientBeatThreadCount";

public static final String NAMING_POLLING_THREAD_COUNT = "namingPollingThreadCount";

public static final String NAMING_REQUEST_DOMAIN_RETRY_COUNT = "namingRequestDomainMaxRetryCount";

public static final String NAMING_PUSH_EMPTY_PROTECTION = "namingPushEmptyProtection";

public static final String PUSH_RECEIVER_UDP_PORT = "push.receiver.udp.port";

/**
* Get the key value of some variable value from the system property.
*/
public static class SystemEnv {

public static final String ALIBABA_ALIWARE_ENDPOINT_PORT = "ALIBABA_ALIWARE_ENDPOINT_PORT";

public static final String ALIBABA_ALIWARE_NAMESPACE = "ALIBABA_ALIWARE_NAMESPACE";

public static final String ALIBABA_ALIWARE_ENDPOINT_URL = "ALIBABA_ALIWARE_ENDPOINT_URL";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.alibaba.nacos.client.naming.core;

import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.utils.IoUtils;
Expand All @@ -25,6 +26,7 @@

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand All @@ -38,23 +40,32 @@
* @author xuanyin
*/
public class PushReceiver implements Runnable, Closeable {

private static final Charset UTF_8 = Charset.forName("UTF-8");

private static final int UDP_MSS = 64 * 1024;

private ScheduledExecutorService executorService;

private DatagramSocket udpSocket;

private HostReactor hostReactor;

private volatile boolean closed = false;


public static String getPushReceiverUdpPort() {
return System.getenv(PropertyKeyConst.PUSH_RECEIVER_UDP_PORT);
}

public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
this.udpSocket = new DatagramSocket();
String udpPort = getPushReceiverUdpPort();
if (StringUtils.isEmpty(udpPort)) {
this.udpSocket = new DatagramSocket();
} else {
this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
}
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Expand All @@ -64,32 +75,32 @@ public Thread newThread(Runnable r) {
return thread;
}
});

this.executorService.execute(this);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] init udp socket failed", e);
}
}

@Override
public void run() {
while (!closed) {
try {

// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

udpSocket.receive(packet);

String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());

PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
hostReactor.processServiceJson(pushPacket.data);

// send ack to server
ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"\"}";
Expand All @@ -103,7 +114,7 @@ public void run() {
ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
}

udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
packet.getSocketAddress()));
} catch (Exception e) {
Expand All @@ -114,7 +125,7 @@ public void run() {
}
}
}

@Override
public void shutdown() throws NacosException {
String className = this.getClass().getName();
Expand All @@ -124,16 +135,16 @@ public void shutdown() throws NacosException {
udpSocket.close();
NAMING_LOGGER.info("{} do shutdown stop", className);
}

public static class PushPacket {

public String type;

public long lastRefTime;

public String data;
}

public int getUdpPort() {
return this.udpSocket.getLocalPort();
}
Expand Down