Skip to content

Commit

Permalink
#1506 Experiment #3. Use single-thread thread pools for Dispatcher to…
Browse files Browse the repository at this point in the history
… support batch & process to overcome context switching consistency issue in Windows.
  • Loading branch information
Dennis Sheirer committed Mar 26, 2023
1 parent 2782953 commit 70338db
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@

import io.github.dsheirer.sample.Listener;
import io.github.dsheirer.source.heartbeat.HeartbeatManager;
import io.github.dsheirer.util.ThreadPool;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -66,8 +64,8 @@ public void start()
{
if(mHeartbeatTimerFuture == null)
{
mHeartbeatTimerFuture = ThreadPool.SCHEDULED.scheduleAtFixedRate(() -> fireOptionalHeartbeat(),
mHeartbeatInterval, mHeartbeatInterval, TimeUnit.MILLISECONDS);
// mHeartbeatTimerFuture = ThreadPool.SCHEDULED.scheduleAtFixedRate(() -> fireOptionalHeartbeat(),
// mHeartbeatInterval, mHeartbeatInterval, TimeUnit.MILLISECONDS);
}
}

Expand Down
170 changes: 107 additions & 63 deletions src/main/java/io/github/dsheirer/util/Dispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@
*/
package io.github.dsheirer.util;

import io.github.dsheirer.controller.NamingThreadFactory;
import io.github.dsheirer.sample.Listener;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
Expand All @@ -33,11 +39,12 @@ public class Dispatcher<E> implements Listener<E>
{
private final static Logger mLog = LoggerFactory.getLogger(Dispatcher.class);
private static final long OVERFLOW_LOG_EVENT_WAIT_PERIOD = TimeUnit.SECONDS.toMillis(10);
private LinkedBlockingQueue<E> mQueue;
private LinkedTransferQueue<E> mQueue;
private Listener<E> mListener;
private AtomicBoolean mRunning = new AtomicBoolean();
private String mThreadName;
private Thread mThread;
private ScheduledExecutorService mExecutorService;
private ScheduledFuture<?> mScheduledFuture;
private E mPoisonPill;
private long mLastOverflowLogEvent;

Expand All @@ -53,7 +60,7 @@ public Dispatcher(int maxSize, String threadName, E poisonPill)
{
throw new IllegalArgumentException("Poison pill must be non-null");
}
mQueue = new LinkedBlockingQueue<>(maxSize);
mQueue = new LinkedTransferQueue<>();
mThreadName = threadName;
mPoisonPill = poisonPill;
}
Expand Down Expand Up @@ -86,16 +93,16 @@ public void receive(E e)
{
if(mRunning.get())
{
if(!mQueue.offer(e))
{
if(System.currentTimeMillis() > (mLastOverflowLogEvent + OVERFLOW_LOG_EVENT_WAIT_PERIOD))
{
mLastOverflowLogEvent = System.currentTimeMillis();
mLog.warn("Dispatcher - temporary buffer overflow for thread [" + mThreadName + "] - throwing away samples - " +
" processor flag:" + (mRunning.get() ? "running" : "stopped") +
" thread:" + (mThread != null ? (mThread.isAlive() ? mThread.getState() : "dead") : "null" ));
}
}
mQueue.add(e);
// if(!mQueue.add(e))
// {
// if(System.currentTimeMillis() > (mLastOverflowLogEvent + OVERFLOW_LOG_EVENT_WAIT_PERIOD))
// {
// mLastOverflowLogEvent = System.currentTimeMillis();
// mLog.warn("Dispatcher - temporary buffer overflow for thread [" + mThreadName + "] - throwing away samples - " +
// " processor flag:" + (mRunning.get() ? "running" : "stopped"));
// }
// }
}
}

Expand All @@ -106,10 +113,24 @@ public void start()
{
if(mRunning.compareAndSet(false, true))
{
mThread = new Thread(new Processor());
mThread.setName(mThreadName);
mThread.setPriority(Thread.MAX_PRIORITY);
mThread.start();
if(mScheduledFuture != null)
{
mScheduledFuture.cancel(true);
}

if(mExecutorService != null)
{
mExecutorService.shutdown();
mExecutorService = null;
}

mQueue.clear();
mExecutorService = Executors.newSingleThreadScheduledExecutor(new NamingThreadFactory(mThreadName));
mScheduledFuture = mExecutorService.scheduleAtFixedRate(new Processor(), 0, 25, TimeUnit.MILLISECONDS);
// mThread = new Thread(new Processor());
// mThread.setName(mThreadName);
// mThread.setPriority(Thread.MAX_PRIORITY);
// mThread.start();
}
}

Expand All @@ -120,18 +141,30 @@ public void stop()
{
if(mRunning.compareAndSet(true, false))
{
mQueue.offer(mPoisonPill);
// mQueue.offer(mPoisonPill);

try
if(mScheduledFuture != null)
{
mThread.interrupt();
mThread.join();
mThread = null;
mScheduledFuture.cancel(true);
mScheduledFuture = null;
mQueue.clear();
}
catch(Exception e)

if(mExecutorService != null)
{
mLog.error("Timeout while waiting to join terminating buffer processor thread");
mExecutorService.shutdown();
mExecutorService = null;
}
// try
// {
// mThread.interrupt();
// mThread.join();
// mThread = null;
// }
// catch(Exception e)
// {
// mLog.error("Timeout while waiting to join terminating buffer processor thread");
// }
}
}

Expand Down Expand Up @@ -159,49 +192,60 @@ class Processor implements Runnable
@Override
public void run()
{
try
{
mQueue.clear();
List<E> elements = new ArrayList<>();

E element;
mQueue.drainTo(elements, 25);

while(mRunning.get())
for(E element: elements)
{
if(mRunning.get() && mListener != null)
{
try
{
element = mQueue.take();

if(mPoisonPill == element)
{
mRunning.set(false);
}
else if(element != null)
{
if(mListener == null)
{
throw new IllegalStateException("Listener for [" + mThreadName + "] is null");
}
mListener.receive(element);
}
}
catch(InterruptedException e)
{
//Normal shutdown is by interrupt
mRunning.set(false);
}
catch(Exception e)
{
mLog.error("Error while processing element", e);
}
mListener.receive(element);
}

//Shutting down - clear the queue
mQueue.clear();
}
catch(Throwable t)
{
mLog.error("Unexpected error thrown from the Dispatcher thread", t);
}
// try
// {
// mQueue.clear();
//
// E element;
//
// while(mRunning.get())
// {
// try
// {
// element = mQueue.take();
//
// if(mPoisonPill == element)
// {
// mRunning.set(false);
// }
// else if(element != null)
// {
// if(mListener == null)
// {
// throw new IllegalStateException("Listener for [" + mThreadName + "] is null");
// }
// mListener.receive(element);
// }
// }
// catch(InterruptedException e)
// {
// //Normal shutdown is by interrupt
// mRunning.set(false);
// }
// catch(Exception e)
// {
// mLog.error("Error while processing element", e);
// }
// }
//
// //Shutting down - clear the queue
// mQueue.clear();
// }
// catch(Throwable t)
// {
// mLog.error("Unexpected error thrown from the Dispatcher thread", t);
// }
}
}
}

0 comments on commit 70338db

Please sign in to comment.