/
BdioWriter.java
276 lines (239 loc) · 8.92 KB
/
BdioWriter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
/*
* Copyright 2016 Black Duck Software, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.blackducksoftware.bdio2;
import static com.google.common.base.Preconditions.checkState;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import com.blackducksoftware.common.io.ExtraIO;
import com.github.jsonldjava.core.JsonLdConsts;
import com.github.jsonldjava.utils.JsonUtils;
/**
* A writer for serializing BDIO nodes to a byte stream. This writer always produces the Zip form of BDIO data,
* generally this is used to write data to disk. For writing BDIO data to the network, it is likely better to
* break up the data into multiple requests prior to serialization.
*
* @author jgustie
*/
public class BdioWriter implements Closeable {
// TODO What about signing?
/**
* Supplier of output streams for writing BDIO data.
*/
@FunctionalInterface
public interface StreamSupplier extends Closeable {
/**
* Returns a new output stream for writing BDIO with the supplied entry name. The caller is responsible for
* closing the supplied stream.
*/
OutputStream newStream() throws IOException;
/**
* Called one to release all resources associated with this stream supplier. It is implementation specific as to
* how existing unclosed streams will be handled.
*/
@Override
default void close() throws IOException {
// By default, do nothing on close
}
}
/**
* A stream supplier for writing BDIO out to a file.
*/
public static class BdioFile implements StreamSupplier {
/**
* The output stream used for constructing the Zip file.
*/
private final ZipOutputStream out;
private final OutputStream bufferedOut;
/**
* The number of entries written to the BDIO document. Starts at -1 to account for an initial "header" entry
* that includes an empty named graph solely for expressing graph metadata.
*/
private final AtomicInteger entryCount = new AtomicInteger(-1);
// TODO Take a java.nio.file.Path/File instead? Also take a String?
public BdioFile(OutputStream outputStream) {
out = new ZipOutputStream(ExtraIO.buffer(outputStream));
bufferedOut = new BufferedOutputStream(out) {
@Override
public void close() throws IOException {
// Do not close the Zip file for each entry, just flush it to disk
super.flush();
}
};
}
@Override
public OutputStream newStream() throws IOException {
out.putNextEntry(new ZipEntry(Bdio.dataEntryName(entryCount.getAndIncrement())));
return bufferedOut;
}
@Override
public void close() throws IOException {
out.close();
}
}
/**
* Closed state.
*/
private final AtomicBoolean closed = new AtomicBoolean();
/**
* Started state.
*/
private final AtomicBoolean started = new AtomicBoolean();
/**
* Matching header/footer state.
*/
private final AtomicBoolean needsFooter = new AtomicBoolean();
/**
* The number of bytes remaining before the current entry is full. Entries have a fixed size limit that must not be
* exceeded (or parsing will fail). This value is established once the entry is opened and updated each time data is
* written out.
* <p>
* NOTE: This is the <em>uncompressed</em> entry size.
*/
private final AtomicInteger remaining = new AtomicInteger();
/**
* The BDIO metadata.
*/
private final BdioMetadata metadata;
/**
* The source of output streams for each entry.
*/
private final StreamSupplier entryStreams;
/**
* The leading bytes used to start each entry in the Zip file.
*/
private final byte[] header;
/**
* The bytes used to delimit each JSON-LD node within each entry.
*/
private final byte[] delimiter;
/**
* The trailing bytes used to finish each entry in the Zip file.
*/
private final byte[] footer;
/**
* The output stream for the current entry.
*/
private OutputStream out;
/**
* Creates a new writer using the supplied metadata and source of output streams.
*/
public BdioWriter(BdioMetadata metadata, StreamSupplier entryStreams) {
this.metadata = Objects.requireNonNull(metadata);
this.entryStreams = Objects.requireNonNull(entryStreams);
// Generate these fixed byte arrays used to serialize each graph
header = new StringBuilder()
.append("{\n ")
.append('"').append(JsonLdConsts.ID).append('"')
.append(" : ")
.append('"').append(metadata.id()).append('"')
.append(",\n ")
.append('"').append(JsonLdConsts.GRAPH).append('"')
.append(" : [ ")
.toString().getBytes(UTF_8);
delimiter = ", ".getBytes(UTF_8);
footer = " ]\n}\n".getBytes(UTF_8);
}
/**
* Starts writing the BDIO document. This must be called exactly once, <em>before</em> starting to call {@code next}
* to write individual nodes.
*/
public void start() throws IOException {
checkState(started.compareAndSet(false, true), "already started");
// Write just the full metadata as the first entry
out = entryStreams.newStream();
Writer writer = new OutputStreamWriter(out, UTF_8);
JsonUtils.writePrettyPrint(writer, metadata.asNamedGraph());
writer.flush();
}
/**
* Writes an individual node to the BDIO document.
*/
public void next(Map<String, Object> node) throws IOException {
checkState(started.get(), "not started");
// TODO Have a reusable pool of ByteArrayOutputStream wrapped writers?
byte[] serializedNode = JsonUtils.toPrettyString(node).replace("\n", "\n ").getBytes(UTF_8);
if (remaining.addAndGet((delimiter.length + serializedNode.length) * -1) > 0L) {
// It fits, write it out
out.write(delimiter);
out.write(serializedNode);
} else {
// It didn't fit, create a new entry and try again
nextEntry();
if (remaining.addAndGet(serializedNode.length * -1) > 0L) {
out.write(serializedNode);
} else {
throw new EntrySizeViolationException(null, Math.abs(remaining.get()) + Bdio.MAX_ENTRY_SIZE);
}
}
}
/**
* Closes this BDIO document, finishing all output and releasing any resources.
*/
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
try {
// Write the footer for the current entry
closeStream();
// TODO Write out signature files here? Or are we signing each file?
} finally {
// Release the source of entry streams
entryStreams.close();
}
}
}
/**
* Force close the current entry before reaching the maximum file size.
*/
public void closeEntry() {
// Really just drain the remaining size so the next write will open a new entry
remaining.set(0);
}
/**
* Increments the entry being written out.
*/
private void nextEntry() throws IOException {
// Create a new entry and reset the remaining size
closeStream();
out = entryStreams.newStream();
out.write(header);
needsFooter.set(true);
remaining.set(Bdio.MAX_ENTRY_SIZE - header.length - footer.length);
}
/**
* Closes the current entry stream.
*/
private void closeStream() throws IOException {
// Check if we need to finish the previous entry
if (out != null) {
if (needsFooter.get()) {
out.write(footer);
}
out.close();
}
}
}