Skip to content

Commit

Permalink
ns
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Oct 24, 2016
1 parent 2f2641d commit 3173bc7
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 44 deletions.
85 changes: 48 additions & 37 deletions src/org/jgroups/protocols/TP.java
Expand Up @@ -236,10 +236,11 @@ public String getBundlerClass() {
return bundler != null? bundler.getClass().getName() : "null"; return bundler != null? bundler.getClass().getName() : "null";
} }


public void setMaxBundleSize(int size) { public TP setMaxBundleSize(int size) {
if(size <= 0) if(size <= 0)
throw new IllegalArgumentException("max_bundle_size (" + size + ") is <= 0"); throw new IllegalArgumentException("max_bundle_size (" + size + ") is <= 0");
max_bundle_size=size; max_bundle_size=size;
return this;
} }
public final int getMaxBundleSize() {return max_bundle_size;} public final int getMaxBundleSize() {return max_bundle_size;}
public int getBundlerCapacity() {return bundler_capacity;} public int getBundlerCapacity() {return bundler_capacity;}
Expand All @@ -263,13 +264,14 @@ public String bundlerWaitStrategy() {


@ManagedAttribute(description="Sets the wait strategy in the RingBufferBundler. Allowed values are \"spin\", " + @ManagedAttribute(description="Sets the wait strategy in the RingBufferBundler. Allowed values are \"spin\", " +
"\"yield\", \"park\", \"spin-park\" and \"spin-yield\" or a fully qualified classname") "\"yield\", \"park\", \"spin-park\" and \"spin-yield\" or a fully qualified classname")
public void bundlerWaitStrategy(String strategy) { public TP bundlerWaitStrategy(String strategy) {
if(bundler instanceof RingBufferBundler) { if(bundler instanceof RingBufferBundler) {
((RingBufferBundler)bundler).waitStrategy(strategy); ((RingBufferBundler)bundler).waitStrategy(strategy);
this.bundler_wait_strategy=strategy; this.bundler_wait_strategy=strategy;
} }
else else
this.bundler_wait_strategy=strategy; this.bundler_wait_strategy=strategy;
return this;
} }


@ManagedAttribute(description="Number of spins before a real lock is acquired") @ManagedAttribute(description="Number of spins before a real lock is acquired")
Expand All @@ -278,10 +280,11 @@ public int bundlerNumSpins() {
} }


@ManagedAttribute(description="Sets the number of times a thread spins until a real lock is acquired") @ManagedAttribute(description="Sets the number of times a thread spins until a real lock is acquired")
public void bundlerNumSpins(int spins) { public TP bundlerNumSpins(int spins) {
this.bundler_num_spins=spins; this.bundler_num_spins=spins;
if(bundler instanceof RingBufferBundler) if(bundler instanceof RingBufferBundler)
((RingBufferBundler)bundler).numSpins(spins); ((RingBufferBundler)bundler).numSpins(spins);
return this;
} }


@ManagedAttribute(description="Is the logical_addr_cache reaper task running") @ManagedAttribute(description="Is the logical_addr_cache reaper task running")
Expand All @@ -295,28 +298,31 @@ public double getAvgBatchSize() {
} }




public void setThreadPoolMinThreads(int size) { public TP setThreadPoolMinThreads(int size) {
thread_pool_min_threads=size; thread_pool_min_threads=size;
if(thread_pool instanceof ThreadPoolExecutor) if(thread_pool instanceof ThreadPoolExecutor)
((ThreadPoolExecutor)thread_pool).setCorePoolSize(size); ((ThreadPoolExecutor)thread_pool).setCorePoolSize(size);
return this;
} }


public int getThreadPoolMinThreads() {return thread_pool_min_threads;} public int getThreadPoolMinThreads() {return thread_pool_min_threads;}




public void setThreadPoolMaxThreads(int size) { public TP setThreadPoolMaxThreads(int size) {
thread_pool_max_threads=size; thread_pool_max_threads=size;
if(thread_pool instanceof ThreadPoolExecutor) if(thread_pool instanceof ThreadPoolExecutor)
((ThreadPoolExecutor)thread_pool).setMaximumPoolSize(size); ((ThreadPoolExecutor)thread_pool).setMaximumPoolSize(size);
return this;
} }


public int getThreadPoolMaxThreads() {return thread_pool_max_threads;} public int getThreadPoolMaxThreads() {return thread_pool_max_threads;}




public void setThreadPoolKeepAliveTime(long time) { public TP setThreadPoolKeepAliveTime(long time) {
thread_pool_keep_alive_time=time; thread_pool_keep_alive_time=time;
if(thread_pool instanceof ThreadPoolExecutor) if(thread_pool instanceof ThreadPoolExecutor)
((ThreadPoolExecutor)thread_pool).setKeepAliveTime(time, TimeUnit.MILLISECONDS); ((ThreadPoolExecutor)thread_pool).setKeepAliveTime(time, TimeUnit.MILLISECONDS);
return this;
} }


public long getThreadPoolKeepAliveTime() {return thread_pool_keep_alive_time;} public long getThreadPoolKeepAliveTime() {return thread_pool_keep_alive_time;}
Expand Down Expand Up @@ -365,15 +371,17 @@ public int getDifferentVersionMessages() {
} }


@ManagedOperation(description="Clears the cache for messages from different clusters") @ManagedOperation(description="Clears the cache for messages from different clusters")
public void clearDifferentClusterCache() { public TP clearDifferentClusterCache() {
if(suppress_log_different_cluster != null) if(suppress_log_different_cluster != null)
suppress_log_different_cluster.getCache().clear(); suppress_log_different_cluster.getCache().clear();
return this;
} }


@ManagedOperation(description="Clears the cache for messages from members with different versions") @ManagedOperation(description="Clears the cache for messages from members with different versions")
public void clearDifferentVersionCache() { public TP clearDifferentVersionCache() {
if(suppress_log_different_version != null) if(suppress_log_different_version != null)
suppress_log_different_version.getCache().clear(); suppress_log_different_version.getCache().clear();
return this;
} }




Expand All @@ -384,11 +392,12 @@ public void clearDifferentVersionCache() {
@ManagedOperation(description="If enabled, the timer will run non-blocking tasks on its own (runner) thread, and " + @ManagedOperation(description="If enabled, the timer will run non-blocking tasks on its own (runner) thread, and " +
"not submit them to the thread pool. Otherwise, all tasks are submitted to the thread pool. This attribute is " + "not submit them to the thread pool. Otherwise, all tasks are submitted to the thread pool. This attribute is " +
"experimental and may be removed without notice.") "experimental and may be removed without notice.")
public void enableBlockingTimerTasks(boolean flag) { public TP enableBlockingTimerTasks(boolean flag) {
if(flag != this.timer_handle_non_blocking_tasks) { if(flag != this.timer_handle_non_blocking_tasks) {
this.timer_handle_non_blocking_tasks=flag; this.timer_handle_non_blocking_tasks=flag;
timer.setNonBlockingTaskHandling(flag); timer.setNonBlockingTaskHandling(flag);
} }
return this;
} }




Expand Down Expand Up @@ -512,62 +521,68 @@ public void resetStats() {
avg_batch_size.clear(); avg_batch_size.clear();
} }


public void registerProbeHandler(DiagnosticsHandler.ProbeHandler handler) { public TP registerProbeHandler(DiagnosticsHandler.ProbeHandler handler) {
if(diag_handler != null) if(diag_handler != null)
diag_handler.registerProbeHandler(handler); diag_handler.registerProbeHandler(handler);
else { else {
synchronized(preregistered_probe_handlers) { synchronized(preregistered_probe_handlers) {
preregistered_probe_handlers.add(handler); preregistered_probe_handlers.add(handler);
} }
} }
return this;
} }


public void unregisterProbeHandler(DiagnosticsHandler.ProbeHandler handler) { public TP unregisterProbeHandler(DiagnosticsHandler.ProbeHandler handler) {
if(diag_handler != null) if(diag_handler != null)
diag_handler.unregisterProbeHandler(handler); diag_handler.unregisterProbeHandler(handler);
return this;
} }


/** /**
* Sets a {@link DiagnosticsHandler}. Should be set before the stack is started * Sets a {@link DiagnosticsHandler}. Should be set before the stack is started
* @param handler * @param handler
*/ */
public void setDiagnosticsHandler(DiagnosticsHandler handler) { public TP setDiagnosticsHandler(DiagnosticsHandler handler) {
if(handler != null) { if(handler != null) {
if(diag_handler != null) if(diag_handler != null)
diag_handler.stop(); diag_handler.stop();
diag_handler=handler; diag_handler=handler;
} }
return this;
} }


public Bundler getBundler() {return bundler;} public Bundler getBundler() {return bundler;}


/** Installs a bundler. Needs to be done before the channel is connected */ /** Installs a bundler. Needs to be done before the channel is connected */
public void setBundler(Bundler bundler) { public TP setBundler(Bundler bundler) {
if(bundler != null) if(bundler != null)
this.bundler=bundler; this.bundler=bundler;
return this;
} }




public Executor getThreadPool() { public Executor getThreadPool() {
return thread_pool; return thread_pool;
} }


public void setThreadPool(Executor thread_pool) { public TP setThreadPool(Executor thread_pool) {
if(this.thread_pool != null) if(this.thread_pool != null)
shutdownThreadPool(this.thread_pool); shutdownThreadPool(this.thread_pool);
this.thread_pool=thread_pool; this.thread_pool=thread_pool;
if(timer instanceof TimeScheduler3) if(timer instanceof TimeScheduler3)
((TimeScheduler3)timer).setThreadPool(thread_pool); ((TimeScheduler3)timer).setThreadPool(thread_pool);
return this;
} }


public ThreadFactory getThreadPoolThreadFactory() { public ThreadFactory getThreadPoolThreadFactory() {
return thread_factory; return thread_factory;
} }


public void setThreadPoolThreadFactory(ThreadFactory factory) { public TP setThreadPoolThreadFactory(ThreadFactory factory) {
thread_factory=factory; thread_factory=factory;
if(thread_pool instanceof ThreadPoolExecutor) if(thread_pool instanceof ThreadPoolExecutor)
((ThreadPoolExecutor)thread_pool).setThreadFactory(factory); ((ThreadPoolExecutor)thread_pool).setThreadFactory(factory);
return this;
} }


public TimeScheduler getTimer() {return timer;} public TimeScheduler getTimer() {return timer;}
Expand All @@ -577,28 +592,25 @@ public void setThreadPoolThreadFactory(ThreadFactory factory) {
* running timer with tasks in it can wreak havoc ! * running timer with tasks in it can wreak havoc !
* @param timer * @param timer
*/ */
public void setTimer(TimeScheduler timer) { public TP setTimer(TimeScheduler timer) {this.timer=timer; return this;}
this.timer=timer;
}


public TimeService getTimeService() {return time_service;} public TimeService getTimeService() {return time_service;}


public void setTimeService(TimeService ts) { public TP setTimeService(TimeService ts) {
if(ts == null) if(ts == null)
return; return this;
if(time_service != null) if(time_service != null)
time_service.stop(); time_service.stop();
time_service=ts; time_service=ts;
time_service.start(); time_service.start();
return this;
} }


public ThreadFactory getThreadFactory() { public ThreadFactory getThreadFactory() {
return thread_factory; return thread_factory;
} }


public void setThreadFactory(ThreadFactory factory) { public TP setThreadFactory(ThreadFactory factory) {thread_factory=factory; return this;}
thread_factory=factory;
}


public SocketFactory getSocketFactory() { public SocketFactory getSocketFactory() {
return socket_factory; return socket_factory;
Expand All @@ -618,19 +630,18 @@ public void setSocketFactory(SocketFactory factory) {
public String getThreadNamingPattern() {return thread_naming_pattern;} public String getThreadNamingPattern() {return thread_naming_pattern;}




public long getNumMessagesSent() {return msg_stats.getNumMsgsSent();} public long getNumMessagesSent() {return msg_stats.getNumMsgsSent();}
public void incrBatchesSent(int delta) {if(stats) msg_stats.incrNumBatchesSent(delta);} public TP incrBatchesSent(int delta) {if(stats) msg_stats.incrNumBatchesSent(delta); return this;}
public void incrNumSingleMsgsSent(int d) {if(stats) msg_stats.incrNumSingleMsgsSent(d);} public TP incrNumSingleMsgsSent(int d) {if(stats) msg_stats.incrNumSingleMsgsSent(d); return this;}
public InetAddress getBindAddress() {return bind_addr;} public InetAddress getBindAddress() {return bind_addr;}
public void setBindAddress(InetAddress bind_addr) {this.bind_addr=bind_addr;} public TP setBindAddress(InetAddress bind_addr) {this.bind_addr=bind_addr; return this;}
public int getBindPort() {return bind_port;} public int getBindPort() {return bind_port;}
public void setBindPort(int port) {this.bind_port=port;} public TP setBindPort(int port) {this.bind_port=port; return this;}
public void setBindToAllInterfaces(boolean flag) {this.receive_on_all_interfaces=flag;} public TP setBindToAllInterfaces(boolean flag) {this.receive_on_all_interfaces=flag; return this;}

public boolean isReceiveOnAllInterfaces() {return receive_on_all_interfaces;}
public boolean isReceiveOnAllInterfaces() {return receive_on_all_interfaces;}
public List<NetworkInterface> getReceiveInterfaces() {return receive_interfaces;} public List<NetworkInterface> getReceiveInterfaces() {return receive_interfaces;}
public void setPortRange(int range) {this.port_range=range;} public TP setPortRange(int range) {this.port_range=range; return this;}
public int getPortRange() {return port_range ;} public int getPortRange() {return port_range;}






Expand Down Expand Up @@ -697,9 +708,9 @@ public static int getNumThreads() {
@ManagedAttribute(description="Whether the diagnostics handler is running or not") @ManagedAttribute(description="Whether the diagnostics handler is running or not")
public boolean isDiagnosticsHandlerRunning() {return diag_handler != null && diag_handler.isRunning();} public boolean isDiagnosticsHandlerRunning() {return diag_handler != null && diag_handler.isRunning();}


public void setLogDiscardMessages(boolean flag) {log_discard_msgs=flag;} public TP setLogDiscardMessages(boolean flag) {log_discard_msgs=flag; return this;}
public boolean getLogDiscardMessages() {return log_discard_msgs;} public boolean getLogDiscardMessages() {return log_discard_msgs;}
public void setLogDiscardMessagesVersion(boolean flag) {log_discard_msgs_version=flag;} public TP setLogDiscardMessagesVersion(boolean flag) {log_discard_msgs_version=flag; return this;}
public boolean getLogDiscardMessagesVersion() {return log_discard_msgs_version;} public boolean getLogDiscardMessagesVersion() {return log_discard_msgs_version;}
public boolean getUseIpAddresses() {return use_ip_addrs;} public boolean getUseIpAddresses() {return use_ip_addrs;}


Expand Down
3 changes: 2 additions & 1 deletion src/org/jgroups/protocols/UDP.java
Expand Up @@ -165,11 +165,12 @@ public int getMulticastTTL() {
return ip_ttl; return ip_ttl;
} }


public void setMaxBundleSize(int size) { public UDP setMaxBundleSize(int size) {
super.setMaxBundleSize(size); super.setMaxBundleSize(size);
if(size > Global.MAX_DATAGRAM_PACKET_SIZE) if(size > Global.MAX_DATAGRAM_PACKET_SIZE)
throw new IllegalArgumentException("max_bundle_size (" + size + ") cannot exceed the max datagram " + throw new IllegalArgumentException("max_bundle_size (" + size + ") cannot exceed the max datagram " +
"packet size of " + Global.MAX_DATAGRAM_PACKET_SIZE); "packet size of " + Global.MAX_DATAGRAM_PACKET_SIZE);
return this;
} }


@ManagedAttribute(description="Number of messages dropped when sending because of insufficient buffer space") @ManagedAttribute(description="Number of messages dropped when sending because of insufficient buffer space")
Expand Down
12 changes: 6 additions & 6 deletions src/org/jgroups/protocols/pbcast/NAKACK2.java
Expand Up @@ -242,12 +242,12 @@ public void setResendLastSeqno(boolean flag) {
public boolean isXmitFromRandomMember() {return xmit_from_random_member;} public boolean isXmitFromRandomMember() {return xmit_from_random_member;}
public boolean isDiscardDeliveredMsgs() {return discard_delivered_msgs;} public boolean isDiscardDeliveredMsgs() {return discard_delivered_msgs;}
public boolean getLogDiscardMessages() {return log_discard_msgs;} public boolean getLogDiscardMessages() {return log_discard_msgs;}
public void setUseMcastXmit(boolean use_mcast_xmit) {this.use_mcast_xmit=use_mcast_xmit;} public NAKACK2 setUseMcastXmit(boolean use_mcast_xmit) {this.use_mcast_xmit=use_mcast_xmit; return this;}
public void setUseMcastXmitReq(boolean flag) {this.use_mcast_xmit_req=flag;} public NAKACK2 setUseMcastXmitReq(boolean flag) {this.use_mcast_xmit_req=flag; return this;}
public void setLogDiscardMessages(boolean flag) {log_discard_msgs=flag;} public NAKACK2 setLogDiscardMessages(boolean flag) {log_discard_msgs=flag; return this;}
public void setLogNotFoundMessages(boolean flag) {log_not_found_msgs=flag;} public NAKACK2 setLogNotFoundMessages(boolean flag) {log_not_found_msgs=flag; return this;}
public void setXmitFromRandomMember(boolean xmit_from_random_member) { public NAKACK2 setXmitFromRandomMember(boolean xmit_from_random_member) {
this.xmit_from_random_member=xmit_from_random_member; this.xmit_from_random_member=xmit_from_random_member; return this;
} }
public void setDiscardDeliveredMsgs(boolean discard_delivered_msgs) { public void setDiscardDeliveredMsgs(boolean discard_delivered_msgs) {
this.discard_delivered_msgs=discard_delivered_msgs; this.discard_delivered_msgs=discard_delivered_msgs;
Expand Down

0 comments on commit 3173bc7

Please sign in to comment.