Skip to content

Commit

Permalink
topcat: ensure that ConeMatcher can be cancelled properly.
Browse files Browse the repository at this point in the history
Modify the ConeMatcher class returns an object that can be run and
interrupted rather than returning a TableProducer with an
unreferenced thread that the caller cannot access.  This should fix
it so that hitting Stop in the multi-cone window really does shut
down all threads associated with the multi-cone sequence.
However, the STILTS coneskymatch could still in principle leave
threads running if the task is interrupted but the JVM is not shut down.
  • Loading branch information
mbtaylor committed May 11, 2013
1 parent d7efbd7 commit 4c24c57
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 58 deletions.
2 changes: 1 addition & 1 deletion table/src/main/uk/ac/starlink/table/OnceRowPipe.java
Expand Up @@ -188,7 +188,7 @@ public synchronized void close() {
/**
* Throws an IOException if there is one pending.
*/
private void checkError() throws IOException {
private synchronized void checkError() throws IOException {
if ( error_ != null ) {
String msg = error_.getMessage();
if ( msg == null || msg.length() == 0 ) {
Expand Down
42 changes: 28 additions & 14 deletions topcat/src/main/uk/ac/starlink/topcat/join/DalMultiPanel.java
Expand Up @@ -11,7 +11,6 @@
import java.awt.event.ItemEvent;
import java.awt.event.ItemListener;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URL;
import java.net.MalformedURLException;
import java.util.ArrayList;
Expand Down Expand Up @@ -549,7 +548,7 @@ private boolean isActive( MatchWorker worker ) {
*/
private void setActive( MatchWorker worker ) {
if ( matchWorker_ != null && ! matchWorker_.done_ ) {
matchWorker_.interrupt();
matchWorker_.cancel();
}
matchWorker_ = worker;
updateState();
Expand Down Expand Up @@ -820,14 +819,12 @@ public ConeQueryRowSequence createQuerySequence( StarTable table )
long irow_ = -1;

public boolean next() throws IOException {
if ( matchWorker_ != null &&
matchWorker_.isInterrupted() ) {
throw new InterruptedIOException();
if ( matchWorker_ != null && matchWorker_.cancelled_ ) {
throw new IOException( "Cancelled" );
}
boolean retval = rseq.next();
if ( matchWorker_ != null &&
matchWorker_.isInterrupted() ) {
throw new InterruptedIOException();
if ( matchWorker_ != null && matchWorker_.cancelled_ ) {
throw new IOException( "Cancelled" );
}
if ( retval ) {
irow_++;
Expand Down Expand Up @@ -891,7 +888,9 @@ private class MatchWorker extends Thread {
private final StarTable inTable_;
private int inRow_;
private int outRow_;
private boolean done_;
private volatile Thread coneThread_;
private volatile boolean done_;
private volatile boolean cancelled_;

/**
* Constructor.
Expand All @@ -911,6 +910,17 @@ private class MatchWorker extends Thread {
inTable_ = inTable;
}

/**
* Terminates any activity (computations and queries)
* associated with this worker.
*/
public void cancel() {
cancelled_ = true;
if ( coneThread_ != null ) {
coneThread_.interrupt();
}
}

public void run() {

/* Initialise progress GUI. */
Expand All @@ -927,19 +937,23 @@ public void run() {
* when each row arrives the progress GUI is updated. */
matcher_.setStreamOutput( true );
try {
StarTable streamTable = matcher_.getTable();
ConeMatcher.ConeWorker coneWorker = matcher_.createConeWorker();
coneThread_ = new Thread( coneWorker, "Cone worker" );
coneThread_.setDaemon( true );
coneThread_.start();
StarTable streamTable = coneWorker.getTable();
StarTable progressTable = new WrapperStarTable( streamTable ) {
public RowSequence getRowSequence() throws IOException {
return new WrapperRowSequence( super
.getRowSequence() ) {
long irow_ = -1;
public boolean next() throws IOException {
if ( isInterrupted() ) {
throw new InterruptedIOException();
if ( cancelled_ ) {
throw new IOException( "Cancelled" );
}
boolean retval = super.next();
if ( isInterrupted() ) {
throw new InterruptedIOException();
if ( cancelled_ ) {
throw new IOException( "Cancelled" );
}
if ( retval ) {
irow_++;
Expand Down
95 changes: 63 additions & 32 deletions ttools/src/main/uk/ac/starlink/ttools/cone/ConeMatcher.java
Expand Up @@ -39,7 +39,7 @@
* @author Mark Taylor
* @since 31 Aug 2007
*/
public class ConeMatcher implements TableProducer {
public class ConeMatcher {

private final ConeSearcher coneSearcher_;
private final ConeErrorPolicy errAct_;
Expand Down Expand Up @@ -138,7 +138,7 @@ public ConeMatcher( ConeSearcher coneSearcher, ConeErrorPolicy errAct,
}

/**
* Determines whether this object's {@link #getTable} method will
* Determines whether this object's {@link #createConeWorker} method will
* produce a one-read-only table or not. If set true, then the output
* table is good for only a single read (<code>getRowSequence</code>
* may be called only once).
Expand All @@ -151,16 +151,19 @@ public void setStreamOutput( boolean streamOutput ) {
}

/**
* Returns the result, which is a join between the input table and
* Returns an object which can compute the multi-cone result.
* The result is a join between the input table and
* the table on which the cone searches are defined.
* See the <code>ConeWorker</code> documentation for how to use
* the returned object.
*
* <p><strong>Note</strong></p>: if the streamOut attribute of this
* class has been set the result will be a one-read-only table,
* designed for streaming.
* <p><strong>Note</strong></p>: if the <code>streamOut</code>
* attribute of this ConeMatcher has been set the table produced by
* the returned worker will be one-read-only, designed for streaming.
*
* @return joined table
* @return cone worker which can produce the result table
*/
public StarTable getTable() throws IOException, TaskException {
public ConeWorker createConeWorker() throws IOException, TaskException {
StarTable inTable = inProd_.getTable();
ConeQueryRowSequence querySeq = qsFact_.createQuerySequence( inTable );
if ( coverage_ != null ) {
Expand Down Expand Up @@ -204,18 +207,11 @@ public void close() throws IOException {
? new int[ 0 ]
: new ColumnIdentifier( inTable )
.getColumnIndices( copyColIdList_ );
RowPipe rowPipe = new OnceRowPipe();
Thread coneWorker =
new ConeWorker( rowPipe, inTable, resultSeq, iCopyCols,
includeBlanks_,
( distanceCol_ != null &&
distanceCol_.trim().length() > 0 ) ? 1 : 0,
inFixAct_, coneFixAct_, JoinFixAction.NO_ACTION );
coneWorker.setDaemon( true );
coneWorker.start();
StarTable streamTable = rowPipe.waitForStarTable();
return streamOutput_ ? streamTable
: Tables.randomTable( streamTable );
return new ConeWorker( inTable, resultSeq, iCopyCols, includeBlanks_,
( distanceCol_ != null &&
distanceCol_.trim().length() > 0 ) ? 1 : 0,
inFixAct_, coneFixAct_, JoinFixAction.NO_ACTION,
streamOutput_ );
}

/**
Expand Down Expand Up @@ -451,11 +447,24 @@ else if ( unitString.toLowerCase().startsWith( "rad" ) ) {
}

/**
* Thread which performs the individual cone search and writes the
* results down a pipe from which it will be read for output.
* Object which produces the result table.
* It performs the individual cone searches and writes the results
* down a pipe from which it will be read asynchronously for output.
*
* <p>To use an instance of this class, it is necessary to
* call its <code>run</code> method in a separate thread.
* The <code>getTable</code> method may be called before
* the <code>run</code> method has completed (or even started),
* and will return a table whose rows may be streamed.
*
* <p>The run method checks for interruptions, so interrupting the
* thread in which it runs will cause it to stop consuming resources.
*
* <p>This code was originally written for J2SE1.4.
* There may be less baroque ways of achieving the same effect using
* the J2SE5 java.util.concurrent classes (<code>BlockingQueue</code>).
*/
private static class ConeWorker extends Thread {
private final RowPipe rowPipe_;
public static class ConeWorker implements Runnable, TableProducer {
private final StarTable inTable_;
private final ConeResultRowSequence resultSeq_;
private final int[] iCopyCols_;
Expand All @@ -464,11 +473,12 @@ private static class ConeWorker extends Thread {
private final JoinFixAction inFixAct_;
private final JoinFixAction coneFixAct_;
private final JoinFixAction extrasFixAct_;
private final boolean strmOut_;
private final RowPipe rowPipe_;

/**
* Constructor.
*
* @param rowPipe row data pipe
* @param inTable input table
* @param resultSeq cone search result row sequence, positioned at
* the start of the data
Expand All @@ -484,14 +494,15 @@ private static class ConeWorker extends Thread {
* of cone searches
* @param extrasFixAct column name deduplication action for
* extra columns
* @param strmOut whether output is streamed
*/
ConeWorker( RowPipe rowPipe, StarTable inTable,
ConeResultRowSequence resultSeq, int[] iCopyCols,
boolean includeBlanks,
int extraCols, JoinFixAction inFixAct,
JoinFixAction coneFixAct, JoinFixAction extrasFixAct ) {
super( "Cone searcher" );
rowPipe_ = rowPipe;
private ConeWorker( StarTable inTable,
ConeResultRowSequence resultSeq, int[] iCopyCols,
boolean includeBlanks,
int extraCols, JoinFixAction inFixAct,
JoinFixAction coneFixAct,
JoinFixAction extrasFixAct,
boolean strmOut ) {
inTable_ = inTable;
resultSeq_ = resultSeq;
iCopyCols_ = iCopyCols;
Expand All @@ -500,8 +511,28 @@ private static class ConeWorker extends Thread {
inFixAct_ = inFixAct;
coneFixAct_ = coneFixAct;
extrasFixAct_ = extrasFixAct;
strmOut_ = strmOut;
rowPipe_ = new OnceRowPipe();
}

/**
* Returns the result table. It will block until <code>run</code>
* has at least started, but not necessarily until it has completed.
*
* @return result table
*/
public StarTable getTable() throws IOException {
StarTable streamTable = rowPipe_.waitForStarTable();
return strmOut_ ? streamTable
: Tables.randomTable( streamTable );
}

/**
* Does the work of feeding the rows to the result table.
* This method checks regularly for interruptions and will
* stop running if the thread is interrupted, causing a
* read error at the other end of the pipe.
*/
public void run() {
try {
multiCone();
Expand Down
14 changes: 12 additions & 2 deletions ttools/src/main/uk/ac/starlink/ttools/cone/SkyConeMatch2.java
@@ -1,11 +1,13 @@
package uk.ac.starlink.ttools.cone;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.logging.Logger;
import uk.ac.starlink.table.ColumnInfo;
import uk.ac.starlink.table.JoinFixAction;
import uk.ac.starlink.table.StarTable;
import uk.ac.starlink.table.join.PairMode;
import uk.ac.starlink.task.BooleanParameter;
import uk.ac.starlink.task.ChoiceParameter;
Expand Down Expand Up @@ -392,11 +394,19 @@ else if ( mode.toLowerCase().equals( "each" ) ) {
new JELQuerySequenceFactory( raString, decString, srString );

/* Return a table producer using these values. */
ConeMatcher coneMatcher =
final ConeMatcher coneMatcher =
new ConeMatcher( coneSearcher, erract, inProd, qsFact, bestOnly,
footprint, includeBlanks, distFilter, parallelism,
copyColIdList, distanceCol, inFixAct, coneFixAct );
coneMatcher.setStreamOutput( ostream );
return coneMatcher;
return new TableProducer() {
public StarTable getTable() throws IOException, TaskException {
ConeMatcher.ConeWorker worker = coneMatcher.createConeWorker();
Thread thread = new Thread( worker, "Cone Matcher" );
thread.setDaemon( true );
thread.start();
return worker.getTable();
}
};
}
}
Expand Up @@ -9,6 +9,7 @@
import uk.ac.starlink.table.StarTable;
import uk.ac.starlink.table.StoragePolicy;
import uk.ac.starlink.table.Tables;
import uk.ac.starlink.task.TaskException;
import uk.ac.starlink.ttools.TableTestCase;
import uk.ac.starlink.ttools.cone.Coverage;
import uk.ac.starlink.ttools.task.TableProducer;
Expand Down Expand Up @@ -65,7 +66,7 @@ public StarTable getTable() {
new JELQuerySequenceFactory( "RA + 0", "DEC", "0.5" ), true,
null, false, true, parallelism, "*", scoreCol,
JoinFixAction.NO_ACTION, JoinFixAction.NO_ACTION );
StarTable bestResult = Tables.randomTable( bestMatcher.getTable() );
StarTable bestResult = Tables.randomTable( getTable( bestMatcher ) );

assertEquals( messier.getRowCount(), bestResult.getRowCount() );
assertEquals( messier.getColumnCount() + 3 + ( addScore ? 1 : 0 ),
Expand All @@ -76,7 +77,7 @@ public StarTable getTable() {
new JELQuerySequenceFactory( "RA + 0", "DEC", "0.5" ), true,
null, true, true, parallelism, "*", scoreCol,
JoinFixAction.NO_ACTION, JoinFixAction.NO_ACTION );
StarTable eachResult = Tables.randomTable( eachMatcher.getTable() );
StarTable eachResult = Tables.randomTable( getTable( eachMatcher ) );

assertSameData( bestResult, eachResult );
assertEquals( messier.getRowCount(), eachResult.getRowCount() );
Expand All @@ -88,7 +89,7 @@ public StarTable getTable() {
false, null, false, true, parallelism, "RA DEC", scoreCol,
JoinFixAction.makeRenameDuplicatesAction( "_A" ),
JoinFixAction.makeRenameDuplicatesAction( "_B" ) );
StarTable allResult = Tables.randomTable( allMatcher.getTable() );
StarTable allResult = Tables.randomTable( getTable( allMatcher ) );

assertEquals( messier.getRowCount() * nIn, allResult.getRowCount() );
assertEquals( 2 + 3 + ( addScore ? 1 : 0 ),
Expand All @@ -106,8 +107,8 @@ public StarTable getTable() {
new JELQuerySequenceFactory( "RA + 0", "DEC", "0.5" ), true,
footSouth, false, true, parallelism, "*", scoreCol,
JoinFixAction.NO_ACTION, JoinFixAction.NO_ACTION );
StarTable footResultN = Tables.randomTable( footMatcherN.getTable() );
StarTable footResultS = Tables.randomTable( footMatcherS.getTable() );
StarTable footResultN = Tables.randomTable( getTable( footMatcherN ) );
StarTable footResultS = Tables.randomTable( getTable( footMatcherS ) );
long nrN = footResultN.getRowCount();
long nrS = footResultS.getRowCount();
assertTrue( nrN > 10 );
Expand All @@ -129,7 +130,7 @@ protected boolean isGap( int irow ) {
null, false, true, parallelism, "*", scoreCol,
JoinFixAction.NO_ACTION, JoinFixAction.NO_ACTION );
StarTable bestResult2 =
Tables.randomTable( bestMatcher2.getTable() );
Tables.randomTable( getTable( bestMatcher2 ) );
assertEquals( messier.getRowCount() / 2,
bestResult2.getRowCount() );

Expand All @@ -139,7 +140,7 @@ protected boolean isGap( int irow ) {
null, true, true, parallelism, "*", scoreCol,
JoinFixAction.NO_ACTION, JoinFixAction.NO_ACTION );
StarTable eachResult2 =
Tables.randomTable( eachMatcher2.getTable() );
Tables.randomTable( getTable( eachMatcher2 ) );
assertEquals( messier.getRowCount(), eachResult2.getRowCount() );

ConeMatcher allMatcher2 = new ConeMatcher(
Expand All @@ -150,7 +151,8 @@ protected boolean isGap( int irow ) {
false, null, false, true, parallelism, "RA DEC", scoreCol,
JoinFixAction.makeRenameDuplicatesAction( "_A" ),
JoinFixAction.makeRenameDuplicatesAction( "_B" ) );
StarTable allResult2 = Tables.randomTable( allMatcher2.getTable() );
StarTable allResult2 =
Tables.randomTable( getTable( allMatcher2 ) );
assertEquals( messier.getRowCount() * nIn / 2,
allResult2.getRowCount() );
}
Expand Down Expand Up @@ -192,7 +194,7 @@ public ConeQueryRowSequence createQuerySequence( StarTable table )
searcher, errAct, inProd, qsFact3, true, null, false, true,
parallelism, "", scoreCol,
JoinFixAction.NO_ACTION, JoinFixAction.NO_ACTION );
StarTable result3 = Tables.randomTable( matcher3.getTable() );
StarTable result3 = Tables.randomTable( getTable( matcher3 ) );
assertEquals( 3 + ( addScore ? 1 : 0 ), result3.getColumnCount() );
assertEquals( "ID", result3.getColumnInfo( 0 ).getName() );
assertEquals( "RA", result3.getColumnInfo( 1 ).getName() );
Expand All @@ -213,6 +215,13 @@ public ConeQueryRowSequence createQuerySequence( StarTable table )
return allResult;
}

private static StarTable getTable( ConeMatcher coneMatcher )
throws IOException, TaskException {
ConeMatcher.ConeWorker worker = coneMatcher.createConeWorker();
new Thread( worker ).run();
return worker.getTable();
}

/**
* Test coverage that covers a hemisphere at a time.
*/
Expand Down

0 comments on commit 4c24c57

Please sign in to comment.