7
7
from collections import defaultdict
8
8
from collections import Counter
9
9
from collections import Sequence
10
+ from datetime import datetime
11
+ from datetime import timedelta
12
+ from functools import wraps
10
13
import logging
11
14
import re
12
15
import socket
16
+ import sys
13
17
from time import sleep
14
18
15
19
from cachetools import LRUCache
18
22
from rdflib .term import Identifier
19
23
import SPARQLWrapper
20
24
from SPARQLWrapper .SPARQLExceptions import EndPointNotFound
25
+ from SPARQLWrapper .SPARQLExceptions import QueryBadFormed
21
26
from SPARQLWrapper .SPARQLExceptions import SPARQLWrapperException
22
27
from xml .sax .expatreader import SAXParseException
23
28
# noinspection PyUnresolvedReferences
40
45
logger = logging .getLogger (__name__ )
41
46
42
47
48
+ class QueryException (Exception ):
49
+ pass
43
50
44
51
45
- class EvalException (Exception ):
52
+ class IncompleteQueryException (Exception ):
46
53
pass
47
54
48
55
49
- class QueryException ( EvalException ):
56
+ class MultiQueryException ( Exception ):
50
57
pass
51
58
52
59
@@ -221,7 +228,7 @@ def _get_vars_values_mapping(graph_pattern, source_target_pairs):
221
228
_values = [(t ,) for t in sorted (set (targets ))]
222
229
_val_idx = 1
223
230
else :
224
- raise QueryException (
231
+ raise IncompleteQueryException (
225
232
"tried to run a query on a graph pattern without "
226
233
"%s and %s vars:\n %s" % (SOURCE_VAR , TARGET_VAR , graph_pattern )
227
234
)
@@ -276,7 +283,42 @@ def _ask_chunk_result_extractor(q_res, _vars, _ret_val_mapping):
276
283
return chunk_res
277
284
278
285
279
- # noinspection PyBroadException
286
+ def _exception_closes_worker_guard (func ):
287
+ """Temporarily closes _multi_query for current worker.
288
+
289
+ This is a workaround for SCOOP's map otherwise having already dispatched
290
+ further work to this worker, despite an exception of a previous _multi_query
291
+ not being handled in origin yet.
292
+
293
+ An exception being raised out of _multi_query would normally cause origin to
294
+ back-off for config.ERROR_WAIT and retry. This "quick fails" all remaining
295
+ work in the time frame.
296
+ """
297
+ closed = []
298
+ wait = timedelta (
299
+ seconds = config .ERROR_WAIT * .75 # rather don't close too long
300
+ )
301
+
302
+ @wraps (func )
303
+ def _multi_query_wrapper (* args , ** kwds ):
304
+ if closed :
305
+ if datetime .utcnow () - closed [0 ] < wait :
306
+ logger .warning (
307
+ '_multi_query temporarily closed for worker due to '
308
+ 'previous exception'
309
+ )
310
+ raise MultiQueryException ('closed for worker' )
311
+ else :
312
+ closed .pop ()
313
+ try :
314
+ return func (* args , ** kwds )
315
+ except :
316
+ closed .append (datetime .utcnow ())
317
+ raise
318
+ return _multi_query_wrapper
319
+
320
+
321
+ @_exception_closes_worker_guard
280
322
def _multi_query (
281
323
sparql , timeout , graph_pattern , source_target_pairs ,
282
324
batch_size ,
@@ -303,15 +345,16 @@ def _multi_query(
303
345
t , q_res = _query (sparql , timeout , q , ** kwds )
304
346
chunk_res = _chunk_res (
305
347
q_res , _vars , _ret_val_mapping , ** kwds )
306
- except EndPointNotFound :
348
+ except EndPointNotFound as e :
307
349
# happens if the endpoint reports a 404...
308
350
# as virtuoso in rare cases seems to report a 404 let's
309
351
# retry after some time but then cancel
310
352
if retry :
311
353
logger .info (
312
- 'SPARQL endpoint reports a 404, will retry once in 10s'
354
+ 'SPARQL endpoint reports a 404, will retry in %ds' ,
355
+ config .ERROR_WAIT
313
356
)
314
- sleep (10 )
357
+ sleep (config . ERROR_WAIT )
315
358
continue
316
359
else :
317
360
logger .exception (
@@ -320,7 +363,7 @@ def _multi_query(
320
363
'could not perform query:\n %s for %s\n Exception:' ,
321
364
q , val_chunk ,
322
365
)
323
- raise
366
+ six . reraise ( MultiQueryException , e , sys . exc_info ()[ 2 ])
324
367
except (SPARQLWrapperException , SAXParseException , URLError ) as e :
325
368
if (isinstance (e , SPARQLWrapperException ) and
326
369
re .search (
@@ -346,45 +389,47 @@ def _multi_query(
346
389
# error. It is very likely that the endpoint is dead...
347
390
if retry :
348
391
logger .warning (
349
- 'could not perform query, retry in 10s:\n '
392
+ 'URLError, seems we cannot reach SPARQL endpoint, '
393
+ 'retry in %ds. Tried to perform query:\n '
350
394
'%s for %s\n Exception:' ,
351
- q , val_chunk ,
395
+ config . ERROR_WAIT , q , val_chunk ,
352
396
exc_info = 1 , # appends exception to message
353
397
)
354
- sleep (10 )
398
+ sleep (config . ERROR_WAIT )
355
399
continue
356
400
else :
357
401
logger .exception (
358
- 'could not perform query:\n %s for %s\n Exception:' ,
402
+ 'URLError, seems we cannot reach SPARQL endpoint, '
403
+ 'giving up after 3 retries. Tried to perform query:'
404
+ '\n %s for %s\n Exception:' ,
359
405
q , val_chunk ,
360
- exc_info = 1 , # appends exception to message
361
406
)
362
- raise
407
+ six . reraise ( MultiQueryException , e , sys . exc_info ()[ 2 ])
363
408
else :
364
409
logger .warning (
365
- 'could not perform query:\n %s for %s\n Exception:' ,
410
+ 'could not perform query, replacing with 0 result:\n '
411
+ '%s for %s\n Exception:' ,
366
412
q , val_chunk ,
367
413
exc_info = 1 , # appends exception to message
368
414
)
369
415
t , chunk_res = timer () - _start_time , {}
370
- except Exception :
416
+ except Exception as e :
371
417
if retry :
372
418
logger .warning (
373
- 'unhandled exception, retry in 10s :\n '
419
+ 'unhandled exception, retry in %ds :\n '
374
420
'Query:\n %s\n Chunk:%r\n Exception:' ,
375
- q , val_chunk ,
421
+ config . ERROR_WAIT , q , val_chunk ,
376
422
exc_info = 1 , # appends exception to message
377
423
)
378
- sleep (10 )
424
+ sleep (config . ERROR_WAIT )
379
425
continue
380
426
else :
381
427
logger .exception (
382
- 'unhandled exception:\n '
428
+ 'unhandled exception, giving up after 3 retries :\n '
383
429
'Query:\n %s\n Chunk:%r\n Exception:' ,
384
430
q , val_chunk ,
385
- exc_info = 1 , # appends exception to message
386
431
)
387
- t , chunk_res = timer () - _start_time , {}
432
+ six . reraise ( MultiQueryException , e , sys . exc_info ()[ 2 ])
388
433
break
389
434
_res_update (res , chunk_res , ** kwds )
390
435
total_time += t
0 commit comments