-
Notifications
You must be signed in to change notification settings - Fork 23
/
CdsUploadSkyMatch.java
335 lines (314 loc) · 14.7 KB
/
CdsUploadSkyMatch.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
package uk.ac.starlink.ttools.task;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import uk.ac.starlink.table.JoinFixAction;
import uk.ac.starlink.table.StarTable;
import uk.ac.starlink.table.StoragePolicy;
import uk.ac.starlink.table.Tables;
import uk.ac.starlink.task.BooleanParameter;
import uk.ac.starlink.task.ChoiceParameter;
import uk.ac.starlink.task.DoubleParameter;
import uk.ac.starlink.task.Environment;
import uk.ac.starlink.task.IntegerParameter;
import uk.ac.starlink.task.Parameter;
import uk.ac.starlink.task.ParameterValueException;
import uk.ac.starlink.task.TaskException;
import uk.ac.starlink.task.URLParameter;
import uk.ac.starlink.ttools.cone.BlockUploader;
import uk.ac.starlink.ttools.cone.CdsUploadMatcher;
import uk.ac.starlink.ttools.cone.Coverage;
import uk.ac.starlink.ttools.cone.CoverageQuerySequenceFactory;
import uk.ac.starlink.ttools.cone.HealpixSortedQuerySequenceFactory;
import uk.ac.starlink.ttools.cone.JELQuerySequenceFactory;
import uk.ac.starlink.ttools.cone.QuerySequenceFactory;
import uk.ac.starlink.ttools.cone.ServiceFindMode;
import uk.ac.starlink.ttools.cone.UploadMatcher;
import uk.ac.starlink.ttools.cone.UrlMocCoverage;
/**
* Upload matcher that uses CDS's Xmatch service.
*
* @author Mark Taylor
* @since 14 May 2014
*/
public class CdsUploadSkyMatch extends SingleMapperTask {
private final Parameter raParam_;
private final Parameter decParam_;
private final DoubleParameter srParam_;
private final Parameter cdstableParam_;
private final ChoiceParameter<UserFindMode> findParam_;
private final IntegerParameter chunkParam_;
private final IntegerParameter maxrecParam_;
private final URLParameter urlParam_;
private final BooleanParameter usemocParam_;
private final BooleanParameter presortParam_;
private final JoinFixActionParameter fixcolsParam_;
private final Parameter insuffixParam_;
private final Parameter cdssuffixParam_;
private static final Logger logger_ =
Logger.getLogger( "uk.ac.starlink.ttools.task" );
/**
* Constructor.
*/
public CdsUploadSkyMatch() {
super( "Crossmatches table on sky position against VizieR/SIMBAD table",
new ChoiceMode(), true, true );
List<Parameter> paramList = new ArrayList<Parameter>();
String system = "ICRS";
String inDescrip = "the input table";
raParam_ =
SkyCoordParameter.createRaParameter( "ra", system, inDescrip );
paramList.add( raParam_ );
decParam_ =
SkyCoordParameter.createDecParameter( "dec", system, inDescrip );
paramList.add( decParam_ );
srParam_ = new DoubleParameter( "sr" );
srParam_.setPrompt( "Search radius value in arcsec (0-180)" );
srParam_.setDescription( new String[] {
"<p>Maximum distance from the local table (ra,dec) position",
"at which counterparts from the remote table will be identified.",
"This is a fixed value is given in arcseconds,",
"and must be in the range 0<=<code>sr</code>>=180",
"(this limit is currently enforced by the CDS Xmatch service).",
"</p>",
} );
srParam_.setMinimum( 0, true );
srParam_.setMaximum( 180, true );
paramList.add( srParam_ );
cdstableParam_ = new Parameter( "cdstable" );
cdstableParam_.setPrompt( "Identifier for remote table" );
cdstableParam_.setDescription( new String[] {
"<p>Identifier of the table from the CDS crossmatch service",
"that is to be matched against the local table.",
"This identifier may be the standard VizieR identifier",
"(e.g. \"<code>II/246/out</code>\"",
"for the 2MASS Point Source Catalogue)",
"or \"<code>simbad</code>\" to indicate SIMBAD data.",
"</p>",
"<p>See for instance",
"<code>http://vizier.u-strasbg.fr/viz-bin/VizieR</code>",
"to find VizieR catalogue identifiers.",
"</p>",
} );
paramList.add( cdstableParam_ );
chunkParam_ = new IntegerParameter( "blocksize" );
chunkParam_.setPrompt( "Maximum number of rows per request" );
chunkParam_.setDescription( new String[] {
"<p>The CDS Xmatch service operates limits on",
"the maximum number of rows that can be uploaded and",
"the maximum number of rows that is returned as a result",
"from a single query.",
"In the case of large input tables,",
"they are broken down into smaller blocks,",
"and one request is sent to the external service for each block.",
"This parameter controls the number of rows in each block.",
"For an input table with fewer rows than this value,",
"the whole thing is done as a single request.",
"</p>",
"<p>At time of writing, the maximum upload size is 100Mb",
"(about 3Mrow; this does not depend on the width of your table),",
"and the maximum return size is 2Mrow.",
"</p>",
} );
chunkParam_.setMinimum( 1 );
chunkParam_.setDefault( Integer.toString( 10 * 1000 ) );
findParam_ =
new ChoiceParameter<UserFindMode>( "find", UserFindMode.class,
UserFindMode.getInstances() );
findParam_.setPrompt( "Which pair matches to include" );
StringBuffer optBuf = new StringBuffer();
for ( UserFindMode findMode : findParam_.getOptionValueList() ) {
optBuf.append( "<li>" )
.append( "<code>" )
.append( findMode.getName() )
.append( "</code>: " )
.append( findMode.getSummary() )
.append( "</li>" )
.append( '\n' );
}
findParam_.setDescription( new String[] {
"<p>Determines which pair matches are included in the result.",
"<ul>",
optBuf.toString(),
"</ul>",
"Note only the <code>" + UserFindMode.ALL + "</code> mode",
"is symmetric between the two tables.",
"</p>",
"<p><strong>Note also that there is a bug</strong> in",
"<code>" + UserFindMode.BEST_REMOTE + "</code>",
"matching.",
"If the match is done in multiple blocks,",
"it's possible for a remote table row to appear matched against",
"one local table row per uploaded block,",
"rather than just once for the whole result.",
"If you're worried about that, set",
"<code>" + chunkParam_.getName(), ">=</code>",
"<em>rowCount</em>.",
"This may be fixed in a future release.",
"</p>",
} );
findParam_.setDefaultOption( UserFindMode.ALL );
paramList.add( findParam_ );
paramList.add( chunkParam_ );
maxrecParam_ = new IntegerParameter( "maxrec" );
maxrecParam_.setPrompt( "Maximum number of output rows" );
maxrecParam_.setDescription( new String[] {
"<p>Limit to the number of rows resulting from this operation.",
"If the value is negative (the default) no limit is imposed.",
"Note however that there can be truncation of the result",
"if the number of records returned from a single chunk",
"exceeds the service hard limit",
"(2,000,000 at time of writing).",
"</p>",
} );
maxrecParam_.setDefault( "-1" );
paramList.add( maxrecParam_ );
urlParam_ = new URLParameter( "serviceurl" );
urlParam_.setPrompt( "URL for CDS Xmatch service" );
urlParam_.setDescription( new String[] {
"<p>The URL at which the CDS Xmatch service can be found.",
"Normally this should not be altered from the default,",
"but if other implementations of the same service are known,",
"this parameter can be used to access them.",
"</p>",
} );
urlParam_.setDefault( CdsUploadMatcher.XMATCH_URL );
paramList.add( urlParam_ );
usemocParam_ = new BooleanParameter( "usemoc" );
usemocParam_.setPrompt( "Use VizieR MOC footprint?" );
usemocParam_.setDescription( new String[] {
"<p>If true, first acquire a MOC coverage map from CDS,",
"and use that to pre-filter rows before uploading them",
"for matching.",
"This should improve efficiency, but have no effect on the result.",
"</p>",
} );
usemocParam_.setDefault( Boolean.TRUE.toString() );
paramList.add( usemocParam_ );
presortParam_ = new BooleanParameter( "presort" );
presortParam_.setPrompt( "Pre-sort rows before uploading?" );
presortParam_.setDescription( new String[] {
"<p>If true, the rows are sorted by HEALPix index before",
"they are uploaded to the CDS X-Match service.",
"If the match is done in multiple blocks,",
"this may improve efficiency,",
"since when matching against a large remote catalogue",
"the X-Match service likes to process requests",
"in which sources are grouped into a small region",
"rather than scattered all over the sky.",
"</p>",
"<p>Note this will have a couple of other side effects that may",
"be undesirable:",
"it will read all the input rows into the task at once,",
"which may make it harder to assess progress,",
"and it will affect the order of the rows in the output table.",
"</p>",
"<p>It is <em>probably</em> only worth setting true for rather",
"large (multi-million-row?) multi-block matches,",
"where both local and remote catalogues are spread over",
"a significant fraction of the sky.",
"But feel free to experiment",
"</p>",
} );
presortParam_.setDefault( Boolean.FALSE.toString() );
paramList.add( presortParam_ );
fixcolsParam_ = new JoinFixActionParameter( "fixcols" );
insuffixParam_ =
fixcolsParam_.createSuffixParameter( "suffixin",
"the input table", "_in" );
cdssuffixParam_ =
fixcolsParam_.createSuffixParameter( "suffixremote",
"the CDS result table",
"_cds" );
paramList.add( fixcolsParam_ );
paramList.add( insuffixParam_ );
paramList.add( cdssuffixParam_ );
getParameterList().addAll( paramList );
}
public TableProducer createProducer( Environment env )
throws TaskException {
/* Interrogate environment for parameter values. */
String raString = raParam_.stringValue( env );
String decString = decParam_.stringValue( env );
double sr = srParam_.doubleValue( env );
String cdsName = cdstableParam_.stringValue( env );
String cdsId = CdsUploadMatcher.toCdsId( cdsName );
if ( cdsId == null ) {
throw new ParameterValueException( cdstableParam_,
"Bad value " + cdsName );
}
double srDeg = sr / 3600.;
final QuerySequenceFactory qsFact0 =
new JELQuerySequenceFactory( raString, decString,
Double.toString( srDeg ) );
UserFindMode userMode = findParam_.objectValue( env );
ServiceFindMode serviceMode = userMode.getServiceMode();
boolean oneToOne = userMode.isOneToOne();
int blocksize = chunkParam_.intValue( env );
long maxrec = maxrecParam_.intValue( env );
URL url = urlParam_.urlValue( env );
final Coverage coverage = usemocParam_.booleanValue( env )
? UrlMocCoverage.getVizierMoc( cdsName, -1 )
: null;
final boolean presort = presortParam_.booleanValue( env );
UploadMatcher umatcher =
new CdsUploadMatcher( url, cdsId, sr, serviceMode );
String tableName = "xmatch(" + cdsIdToTableName( cdsId ) + ")";
JoinFixAction inFixAct =
fixcolsParam_.getJoinFixAction( env, insuffixParam_ );
JoinFixAction cdsFixAct =
fixcolsParam_.getJoinFixAction( env, cdssuffixParam_ );
final TableProducer inProd = createInputProducer( env );
final StoragePolicy storage =
LineTableEnvironment.getStoragePolicy( env );
boolean uploadEmpty = CdsUploadMatcher.UPLOAD_EMPTY;
final BlockUploader blocker =
new BlockUploader( umatcher, blocksize, maxrec, tableName,
inFixAct, cdsFixAct, serviceMode, oneToOne,
uploadEmpty );
/* Create and return an object which will produce the result. */
return new TableProducer() {
public StarTable getTable() throws IOException, TaskException {
StarTable inTable = Tables.randomTable( inProd.getTable() );
Coverage cov;
if ( coverage != null ) {
try {
coverage.initCoverage();
cov = coverage;
}
catch ( IOException e ) {
logger_.log( Level.WARNING,
"Failed to read coverage", e );
cov = null;
}
}
else {
cov = null;
}
QuerySequenceFactory qsFact1 = qsFact0;
if ( cov != null ) {
qsFact1 = new CoverageQuerySequenceFactory( qsFact1, cov );
}
if ( presort ) {
qsFact1 = new HealpixSortedQuerySequenceFactory( qsFact1 );
}
return blocker.runMatch( inTable, qsFact1, storage );
}
};
}
/**
* Turns a CDS Xmatch table identifier into something suitable for
* use in a table name. This is a bit of a hack due to the fact
* that topcat doesn't like table names containing "/" characters.
*
* @param cdsId identifier for a table in the CDS Xmatch service
* @return string suitable for use in a stilts output table
*/
private static String cdsIdToTableName( String cdsId ) {
return cdsId.replaceFirst( "^vizier:", "" )
.replaceAll( "/", "_" );
}
}