/
BackgroundGraphResult.java
148 lines (126 loc) · 3.92 KB
/
BackgroundGraphResult.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/*******************************************************************************
* Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*******************************************************************************/
package org.eclipse.rdf4j.query.impl;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.eclipse.rdf4j.common.iteration.IterationWrapper;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.query.GraphQueryResult;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.rio.RDFHandler;
import org.eclipse.rdf4j.rio.RDFHandlerException;
import org.eclipse.rdf4j.rio.RDFParser;
/**
* Provides concurrent access to statements as they are being parsed when instances of this class are run as Threads.
*
* @author James Leigh
*/
public class BackgroundGraphResult extends IterationWrapper<Statement, QueryEvaluationException>
implements GraphQueryResult, Runnable, RDFHandler {
private final RDFParser parser;
private final Charset charset;
private final InputStream in;
private final String baseURI;
private final CountDownLatch namespacesReady = new CountDownLatch(1);
private final CountDownLatch finishedParsing = new CountDownLatch(1);
private final Map<String, String> namespaces = new ConcurrentHashMap<>();
private final QueueCursor<Statement> queue;
public BackgroundGraphResult(RDFParser parser, InputStream in, Charset charset, String baseURI) {
this(new QueueCursor<Statement>(10), parser, in, charset, baseURI);
}
public BackgroundGraphResult(QueueCursor<Statement> queue, RDFParser parser, InputStream in, Charset charset,
String baseURI) {
super(queue);
this.queue = queue;
this.parser = parser;
this.in = in;
this.charset = charset;
this.baseURI = baseURI;
}
@Override
protected void handleClose() throws QueryEvaluationException {
try {
super.handleClose();
} finally {
queue.done();
}
try {
finishedParsing.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
queue.checkException();
}
}
@Override
public void run() {
try {
try {
parser.setRDFHandler(this);
if (charset == null) {
parser.parse(in, baseURI);
} else {
parser.parse(new InputStreamReader(in, charset), baseURI);
}
} finally {
in.close();
}
} catch (Exception e) {
queue.toss(e);
} finally {
queue.done();
namespacesReady.countDown();
finishedParsing.countDown();
}
}
@Override
public void startRDF() throws RDFHandlerException {
// no-op
}
@Override
public Map<String, String> getNamespaces() {
try {
namespacesReady.await();
// Show the user an unmodifiable view on the map but we can still change it here
return Collections.unmodifiableMap(namespaces);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Collections.emptyMap();
} finally {
queue.checkException();
}
}
@Override
public void handleComment(String comment) throws RDFHandlerException {
// ignore
}
@Override
public void handleNamespace(String prefix, String uri) throws RDFHandlerException {
namespaces.put(prefix, uri);
}
@Override
public void handleStatement(Statement st) throws RDFHandlerException {
namespacesReady.countDown();
try {
queue.put(st);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
queue.toss(e);
queue.done();
}
}
@Override
public void endRDF() throws RDFHandlerException {
namespacesReady.countDown();
}
}