diff --git a/README.md b/README.md index fa274524..e401ff4b 100644 --- a/README.md +++ b/README.md @@ -41,4 +41,6 @@ Another option is to output the data in compressed form. All files will get the java -jar restructurehdfs-0.3.3-all.jar --compression gzip --hdfs-uri --output-directory [ ...] ``` -Finally, by default, files records are not deduplicated after writing. To enable this behaviour, specify the option `--deduplicate` or `-d`. This set to false by default because of an issue with Biovotion data. Please see - [issue #16](https://github.com/RADAR-base/Restructure-HDFS-topic/issues/16) before enabling it. +By default, files records are not deduplicated after writing. To enable this behaviour, specify the option `--deduplicate` or `-d`. This set to false by default because of an issue with Biovotion data. Please see - [issue #16](https://github.com/RADAR-base/Restructure-HDFS-topic/issues/16) before enabling it. + +Finally, while processing, files are staged to a temporary directory and moved to the output directory afterwards. This has the advantage of less chance of data corruption, but it may result in slower performance. Disable staging using the `--no-stage` option. \ No newline at end of file diff --git a/build.gradle b/build.gradle index c3ecc1ad..a5d12733 100644 --- a/build.gradle +++ b/build.gradle @@ -2,7 +2,7 @@ apply plugin: 'java' apply plugin: 'application' group 'org.radarcns.restructurehdfs' -version '0.3.3-SNAPSHOT' +version '0.4.0-SNAPSHOT' mainClassName = 'org.radarcns.RestructureAvroRecords' run { diff --git a/src/main/java/org/radarcns/RestructureAvroRecords.java b/src/main/java/org/radarcns/RestructureAvroRecords.java index ea502c92..24f25168 100644 --- a/src/main/java/org/radarcns/RestructureAvroRecords.java +++ b/src/main/java/org/radarcns/RestructureAvroRecords.java @@ -28,11 +28,11 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; -import org.radarcns.util.CsvAvroConverter; -import org.radarcns.util.FileCacheStore; -import org.radarcns.util.JsonAvroConverter; +import org.radarcns.data.CsvAvroConverter; +import org.radarcns.data.FileCacheStore; +import org.radarcns.data.JsonAvroConverter; +import org.radarcns.data.RecordConverterFactory; import org.radarcns.util.ProgressBar; -import org.radarcns.util.RecordConverterFactory; import org.radarcns.util.commandline.CommandLineArgs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.TimeZone; +import java.util.regex.Pattern; public class RestructureAvroRecords { private static final Logger logger = LoggerFactory.getLogger(RestructureAvroRecords.class); @@ -57,12 +58,14 @@ public class RestructureAvroRecords { private static final java.nio.file.Path BINS_FILE_NAME = Paths.get("bins.csv"); private static final java.nio.file.Path SCHEMA_OUTPUT_FILE_NAME = Paths.get("schema.json"); private static final SimpleDateFormat FILE_DATE_FORMAT = new SimpleDateFormat("yyyyMMdd_HH"); + private static final Pattern ILLEGAL_CHARACTER_PATTERN = Pattern.compile("[^a-zA-Z0-9_-]+"); static { FILE_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC")); } private final RecordConverterFactory converterFactory; + private final boolean doStage; private java.nio.file.Path outputPath; private java.nio.file.Path offsetsPath; @@ -97,6 +100,7 @@ public static void main(String [] args) { commandLineArgs.outputDirectory) .useGzip("gzip".equalsIgnoreCase(commandLineArgs.compression)) .doDeduplicate(commandLineArgs.deduplicate).format(commandLineArgs.format) + .doStage(!commandLineArgs.noStage) .build(); try { @@ -119,6 +123,7 @@ private RestructureAvroRecords(RestructureAvroRecords.Builder builder) { this.useGzip = builder.useGzip; this.doDeduplicate = builder.doDeduplicate; + this.doStage = builder.doStage; logger.info("Deduplicate set to {}", doDeduplicate); String extension; @@ -197,7 +202,7 @@ public void start(String directoryName) throws IOException { // Actually process the files for (Map.Entry> entry : topicPaths.entrySet()) { - try (FileCacheStore cache = new FileCacheStore(converterFactory, 100, useGzip, doDeduplicate)) { + try (FileCacheStore cache = new FileCacheStore(converterFactory, 100, useGzip, doDeduplicate, doStage)) { for (Path filePath : entry.getValue()) { // If JsonMappingException occurs, log the error and continue with other files try { @@ -284,17 +289,8 @@ private void writeRecord(GenericRecord record, String topicName, FileCacheStore Date time = getDate(keyField, valueField); java.nio.file.Path outputFileName = createFilename(time, suffix); - String projectId; - - if(keyField.get("projectId") == null) { - projectId = "unknown-project"; - } else { - // Clean Project id for use in final pathname - projectId = keyField.get("projectId").toString().replaceAll("[^a-zA-Z0-9_-]+", ""); - } - - // Clean user id and create final output pathname - String userId = keyField.get("userId").toString().replaceAll("[^a-zA-Z0-9_-]+", ""); + String projectId = sanitizeId(keyField.get("projectId"), "unknown-project"); + String userId = sanitizeId(keyField.get("userId"), "unknown-user"); java.nio.file.Path projectDir = this.outputPath.resolve(projectId); java.nio.file.Path userDir = projectDir.resolve(userId); @@ -302,9 +298,9 @@ private void writeRecord(GenericRecord record, String topicName, FileCacheStore java.nio.file.Path outputPath = userTopicDir.resolve(outputFileName); // Write data - int response = cache.writeRecord(outputPath, record); + FileCacheStore.WriteResponse response = cache.writeRecord(outputPath, record); - if (response == FileCacheStore.CACHE_AND_NO_WRITE || response == FileCacheStore.NO_CACHE_AND_NO_WRITE) { + if (!response.isSuccessful()) { // Write was unsuccessful due to different number of columns, // try again with new file name writeRecord(record, topicName, cache, ++suffix); @@ -317,8 +313,9 @@ private void writeRecord(GenericRecord record, String topicName, FileCacheStore } } + String sourceId = sanitizeId(keyField.get("sourceId"), "unknown-source"); // Count data (binned and total) - bins.add(topicName, keyField.get("sourceId").toString(), time); + bins.add(topicName, sourceId, time); processedRecordsCount++; } } @@ -366,12 +363,25 @@ public static Date getDate(GenericRecord keyField, GenericRecord valueField) { return new Date(time); } + private static String sanitizeId(Object id, String defaultValue) { + if (id == null) { + return defaultValue; + } + String idString = ILLEGAL_CHARACTER_PATTERN.matcher(id.toString()).replaceAll(""); + if (idString.isEmpty()) { + return defaultValue; + } else { + return idString; + } + } + public static class Builder { private boolean useGzip; private boolean doDeduplicate; private String hdfsUri; private String outputPath; private String format; + private boolean doStage; public Builder(final String uri, final String outputPath) { this.hdfsUri = uri; @@ -397,5 +407,9 @@ public RestructureAvroRecords build() { return new RestructureAvroRecords(this); } + public Builder doStage(boolean stage) { + this.doStage = stage; + return this; + } } } diff --git a/src/main/java/org/radarcns/util/CsvAvroConverter.java b/src/main/java/org/radarcns/data/CsvAvroConverter.java similarity index 98% rename from src/main/java/org/radarcns/util/CsvAvroConverter.java rename to src/main/java/org/radarcns/data/CsvAvroConverter.java index 60a028b9..a954b808 100644 --- a/src/main/java/org/radarcns/util/CsvAvroConverter.java +++ b/src/main/java/org/radarcns/data/CsvAvroConverter.java @@ -14,9 +14,8 @@ * limitations under the License. */ -package org.radarcns.util; +package org.radarcns.data; -import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.MappingIterator; import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.ObjectWriter; @@ -34,7 +33,10 @@ import java.io.Reader; import java.io.Writer; import java.nio.ByteBuffer; -import java.util.*; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; /** * Converts deep hierarchical Avro records into flat CSV format. It uses a simple dot syntax in the diff --git a/src/main/java/org/radarcns/data/FileCache.java b/src/main/java/org/radarcns/data/FileCache.java new file mode 100644 index 00000000..ceef5ce6 --- /dev/null +++ b/src/main/java/org/radarcns/data/FileCache.java @@ -0,0 +1,200 @@ +/* + * Copyright 2017 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.radarcns.data; + +import org.apache.avro.generic.GenericRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.Flushable; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Reader; +import java.io.Writer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; +import java.util.zip.ZipException; + +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; + +/** Keeps path handles of a path. */ +public class FileCache implements Closeable, Flushable, Comparable { + private static final Logger logger = LoggerFactory.getLogger(FileCache.class); + private static final int BUFFER_SIZE = 8192; + + private final Writer writer; + private final RecordConverter recordConverter; + private final Path path; + private final Path tmpPath; + private long lastUse; + + /** + * File cache of given path, using given converter factory. + * @param converterFactory converter factory to create a converter to write files with. + * @param path path to cache. + * @param record example record to create converter from, this is not written to path. + * @param gzip whether to gzip the records + * @throws IOException if the file and/or temporary files cannot be correctly read or written to. + */ + public FileCache(RecordConverterFactory converterFactory, Path path, + GenericRecord record, boolean gzip, Path tmpDir) throws IOException { + this.path = path; + boolean fileIsNew = !Files.exists(path) || Files.size(path) == 0; + OutputStream outFile; + if (tmpDir == null) { + this.tmpPath = null; + outFile = Files.newOutputStream(path, StandardOpenOption.APPEND, StandardOpenOption.CREATE); + } else { + this.tmpPath = Files.createTempFile(tmpDir, path.getFileName().toString(), + gzip ? ".tmp.gz" : ".tmp"); + outFile = Files.newOutputStream(tmpPath); + } + + OutputStream bufOut = new BufferedOutputStream(outFile); + if (gzip) { + bufOut = new GZIPOutputStream(bufOut); + } + + InputStream inputStream; + if (fileIsNew) { + inputStream = new ByteArrayInputStream(new byte[0]); + } else { + inputStream = inputStream(new BufferedInputStream(Files.newInputStream(path)), gzip); + + if (tmpPath != null) { + try { + copy(path, bufOut, gzip); + } catch (ZipException ex) { + // restart output buffer + bufOut.close(); + // clear output file + outFile = Files.newOutputStream(tmpPath); + bufOut = new GZIPOutputStream(new BufferedOutputStream(outFile)); + } + } + } + + this.writer = new OutputStreamWriter(bufOut); + + try (Reader reader = new InputStreamReader(inputStream)) { + this.recordConverter = converterFactory.converterFor(writer, record, fileIsNew, reader); + } catch (IOException ex) { + try { + writer.close(); + } catch (IOException exClose) { + logger.error("Failed to close writer for {}", path, ex); + } + throw ex; + } + } + + /** + * Write a record to the cache. + * @param record AVRO record + * @return true or false based on {@link RecordConverter} write result + * @throws IOException if the record cannot be used. + */ + public boolean writeRecord(GenericRecord record) throws IOException { + boolean result = this.recordConverter.writeRecord(record); + lastUse = System.nanoTime(); + return result; + } + + @Override + public void close() throws IOException { + recordConverter.close(); + writer.close(); + if (tmpPath != null) { + Files.move(tmpPath, path, REPLACE_EXISTING); + } + } + + @Override + public void flush() throws IOException { + recordConverter.flush(); + } + + /** + * Compares time that the filecaches were last used. If equal, it lexicographically compares + * the absolute path of the path. + * @param other FileCache to compare with. + */ + @Override + public int compareTo(@Nonnull FileCache other) { + int result = Long.compare(lastUse, other.lastUse); + if (result != 0) { + return result; + } + return path.compareTo(other.path); + } + + /** File that the cache is maintaining. */ + public Path getPath() { + return path; + } + + private static void copy(Path source, OutputStream sink, boolean gzip) throws IOException { + try (InputStream copyStream = inputStream(Files.newInputStream(source), gzip)) { + copy(copyStream, sink); + } catch (ZipException ex) { + Path corruptPath = null; + String suffix = ""; + for (int i = 0; corruptPath == null && i < 100; i++) { + Path path = source.resolveSibling(source.getFileName() + ".corrupted" + suffix); + if (!Files.exists(path)) { + corruptPath = path; + } + suffix = "-" + i; + } + if (corruptPath != null) { + logger.error("Original file {} was corrupted: {}." + + " Moved to {}.", source, ex, corruptPath); + Files.move(source, corruptPath); + } else { + logger.error("Original file {} was corrupted: {}." + + " Too many corrupt backups stored, removing file.", source, ex); + } + throw ex; + } + } + + private static InputStream inputStream(InputStream in, boolean gzip) throws IOException { + return gzip ? new GZIPInputStream(in) : in; + } + + /** + * Reads all bytes from an input stream and writes them to an output stream. + */ + private static void copy(InputStream source, OutputStream sink) throws IOException { + byte[] buf = new byte[BUFFER_SIZE]; + int n; + while ((n = source.read(buf)) > 0) { + sink.write(buf, 0, n); + } + } +} diff --git a/src/main/java/org/radarcns/util/FileCacheStore.java b/src/main/java/org/radarcns/data/FileCacheStore.java similarity index 64% rename from src/main/java/org/radarcns/util/FileCacheStore.java rename to src/main/java/org/radarcns/data/FileCacheStore.java index f64ac1b8..11eb96de 100644 --- a/src/main/java/org/radarcns/util/FileCacheStore.java +++ b/src/main/java/org/radarcns/data/FileCacheStore.java @@ -14,9 +14,11 @@ * limitations under the License. */ -package org.radarcns.util; +package org.radarcns.data; import org.apache.avro.generic.GenericRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.Flushable; @@ -25,34 +27,34 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; +import static org.radarcns.util.ThrowingConsumer.tryCatch; + /** * Caches open file handles. If more than the limit is cached, the half of the files that were used * the longest ago cache are evicted from cache. */ public class FileCacheStore implements Flushable, Closeable { + private static final Logger logger = LoggerFactory.getLogger(FileCacheStore.class); + private final boolean gzip; private final boolean deduplicate; + private final Path tmpDir; private RecordConverterFactory converterFactory; private final int maxFiles; private final Map caches; - // Response codes for each write record case - public static final int CACHE_AND_WRITE = 1; //used cache and write successful - public static final int NO_CACHE_AND_WRITE= 2; - public static final int CACHE_AND_NO_WRITE =3; - public static final int NO_CACHE_AND_NO_WRITE =4; - - - public FileCacheStore(RecordConverterFactory converterFactory, int maxFiles, boolean gzip, boolean deduplicate) { + public FileCacheStore(RecordConverterFactory converterFactory, int maxFiles, boolean gzip, boolean deduplicate, boolean stage) throws IOException { this.converterFactory = converterFactory; this.maxFiles = maxFiles; this.caches = new HashMap<>(maxFiles * 4 / 3 + 1); this.gzip = gzip; this.deduplicate = deduplicate; + this.tmpDir = stage ? Files.createTempDirectory("restructurehdfs") : null; } /** @@ -64,15 +66,15 @@ public FileCacheStore(RecordConverterFactory converterFactory, int maxFiles, boo * @return Integer value according to one of the response codes. * @throws IOException when failing to open a file or writing to it. */ - public int writeRecord(Path path, GenericRecord record) throws IOException { + public WriteResponse writeRecord(Path path, GenericRecord record) throws IOException { FileCache cache = caches.get(path); if (cache != null) { if(cache.writeRecord(record)){ - return CACHE_AND_WRITE; + return WriteResponse.CACHE_AND_WRITE; } else { // This is the case when cache is used but write is unsuccessful // because of different number columns in same topic - return CACHE_AND_NO_WRITE; + return WriteResponse.CACHE_AND_NO_WRITE; } } else { ensureCapacity(); @@ -80,14 +82,14 @@ public int writeRecord(Path path, GenericRecord record) throws IOException { Path dir = path.getParent(); Files.createDirectories(dir); - cache = new FileCache(converterFactory, path, record, gzip); + cache = new FileCache(converterFactory, path, record, gzip, tmpDir); caches.put(path, cache); - if(cache.writeRecord(record)) { - return NO_CACHE_AND_WRITE; + if (cache.writeRecord(record)) { + return WriteResponse.NO_CACHE_AND_WRITE; } else { // The file path was not in cache but the file exists and this write is // unsuccessful because of different number of columns - return NO_CACHE_AND_NO_WRITE; + return WriteResponse.NO_CACHE_AND_NO_WRITE; } } @@ -127,9 +129,47 @@ public void close() throws IOException { converterFactory.sortUnique(cache.getPath()); } } + if (tmpDir != null) { + Files.walk(tmpDir) + .sorted(Comparator.reverseOrder()) + .forEach(tryCatch(Files::delete, (p, ex) -> logger.warn( + "Failed to remove temporary file {}: {}", p, ex))); + } } finally { caches.clear(); } } + // Response codes for each write record case + public enum WriteResponse { + /** Cache hit and write was successful. */ + CACHE_AND_WRITE(true, true), + /** Cache hit and write was unsuccessful because of a mismatch in number of columns. */ + CACHE_AND_NO_WRITE(true, false), + /** Cache miss and write was successful. */ + NO_CACHE_AND_WRITE(false, true), + /** Cache miss and write was unsuccessful because of a mismatch in number of columns. */ + NO_CACHE_AND_NO_WRITE(false, false); + + private final boolean successful; + private final boolean cacheHit; + + /** + * Write status. + * @param cacheHit whether the cache was used to write. + * @param successful whether the write was successful. + */ + WriteResponse(boolean cacheHit, boolean successful) { + this.cacheHit = cacheHit; + this.successful = successful; + } + + public boolean isSuccessful() { + return successful; + } + + public boolean isCacheHit() { + return cacheHit; + } + } } diff --git a/src/main/java/org/radarcns/util/JsonAvroConverter.java b/src/main/java/org/radarcns/data/JsonAvroConverter.java similarity index 99% rename from src/main/java/org/radarcns/util/JsonAvroConverter.java rename to src/main/java/org/radarcns/data/JsonAvroConverter.java index b84ad570..05a03252 100644 --- a/src/main/java/org/radarcns/util/JsonAvroConverter.java +++ b/src/main/java/org/radarcns/data/JsonAvroConverter.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.radarcns.util; +package org.radarcns.data; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; diff --git a/src/main/java/org/radarcns/util/RecordConverter.java b/src/main/java/org/radarcns/data/RecordConverter.java similarity index 97% rename from src/main/java/org/radarcns/util/RecordConverter.java rename to src/main/java/org/radarcns/data/RecordConverter.java index 810dc7ea..b1518cf7 100644 --- a/src/main/java/org/radarcns/util/RecordConverter.java +++ b/src/main/java/org/radarcns/data/RecordConverter.java @@ -14,13 +14,14 @@ * limitations under the License. */ -package org.radarcns.util; +package org.radarcns.data; + +import org.apache.avro.generic.GenericRecord; import java.io.Closeable; import java.io.Flushable; import java.io.IOException; import java.util.Map; -import org.apache.avro.generic.GenericRecord; /** Converts a GenericRecord to Java primitives or writes it to file. */ public interface RecordConverter extends Flushable, Closeable { diff --git a/src/main/java/org/radarcns/util/RecordConverterFactory.java b/src/main/java/org/radarcns/data/RecordConverterFactory.java similarity index 99% rename from src/main/java/org/radarcns/util/RecordConverterFactory.java rename to src/main/java/org/radarcns/data/RecordConverterFactory.java index 993a97c5..2b5a8a2f 100644 --- a/src/main/java/org/radarcns/util/RecordConverterFactory.java +++ b/src/main/java/org/radarcns/data/RecordConverterFactory.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.radarcns.util; +package org.radarcns.data; import org.apache.avro.generic.GenericRecord; diff --git a/src/main/java/org/radarcns/util/FileCache.java b/src/main/java/org/radarcns/util/FileCache.java deleted file mode 100644 index 0122f8cf..00000000 --- a/src/main/java/org/radarcns/util/FileCache.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2017 The Hyve - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.radarcns.util; - -import java.io.*; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; -import javax.annotation.Nonnull; -import org.apache.avro.generic.GenericRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Keeps path handles of a path. */ -public class FileCache implements Closeable, Flushable, Comparable { - private static final Logger logger = LoggerFactory.getLogger(FileCache.class); - - private final Writer writer; - private final RecordConverter recordConverter; - private final Path path; - private long lastUse; - - /** - * File cache of given path, using given converter factory. - * @param converterFactory converter factory to create a converter to write files with. - * @param path path to cache. - * @param record example record to create converter from, this is not written to path. - * @param gzip whether to gzip the records - * @throws IOException - */ - public FileCache(RecordConverterFactory converterFactory, Path path, - GenericRecord record, boolean gzip) throws IOException { - this.path = path; - boolean fileIsNew = !Files.exists(path) || Files.size(path) == 0; - - OutputStream outFile = Files.newOutputStream(path, - StandardOpenOption.APPEND, StandardOpenOption.CREATE); - InputStream inputStream = new BufferedInputStream(Files.newInputStream(path)); - OutputStream bufOut = new BufferedOutputStream(outFile); - if (gzip) { - bufOut = new GZIPOutputStream(bufOut); - if (!fileIsNew) { - inputStream = new GZIPInputStream(inputStream); - } - } - - this.writer = new OutputStreamWriter(bufOut); - - try (Reader reader = new InputStreamReader(inputStream)) { - this.recordConverter = converterFactory.converterFor(writer, record, fileIsNew, reader); - } catch (IOException ex) { - try { - writer.close(); - } catch (IOException exClose) { - logger.error("Failed to close writer for {}", path, ex); - } - throw ex; - } - } - - /** - * Write a record to the cache. - * @param record AVRO record - * @return true or false based on {@link RecordConverter} write result - * @throws IOException - */ - public boolean writeRecord(GenericRecord record) throws IOException { - boolean result = this.recordConverter.writeRecord(record); - lastUse = System.nanoTime(); - return result; - } - - @Override - public void close() throws IOException { - recordConverter.close(); - writer.close(); - } - - @Override - public void flush() throws IOException { - recordConverter.flush(); - } - - /** - * Compares time that the filecaches were last used. If equal, it lexicographically compares - * the absolute path of the path. - * @param other FileCache to compare with. - */ - @Override - public int compareTo(@Nonnull FileCache other) { - int result = Long.compare(lastUse, other.lastUse); - if (result != 0) { - return result; - } - return path.compareTo(other.path); - } - - /** File that the cache is maintaining. */ - public Path getPath() { - return path; - } -} diff --git a/src/main/java/org/radarcns/util/ThrowingConsumer.java b/src/main/java/org/radarcns/util/ThrowingConsumer.java new file mode 100644 index 00000000..7670f86b --- /dev/null +++ b/src/main/java/org/radarcns/util/ThrowingConsumer.java @@ -0,0 +1,20 @@ +package org.radarcns.util; + +import java.io.IOException; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +@FunctionalInterface +public interface ThrowingConsumer { + void accept(T t) throws IOException; + + static Consumer tryCatch(ThrowingConsumer consumer, BiConsumer catchClause) { + return t -> { + try { + consumer.accept(t); + } catch (IOException ex) { + catchClause.accept(t, ex); + } + }; + } +} diff --git a/src/main/java/org/radarcns/util/commandline/CommandLineArgs.java b/src/main/java/org/radarcns/util/commandline/CommandLineArgs.java index 63a89a5e..effc68bf 100644 --- a/src/main/java/org/radarcns/util/commandline/CommandLineArgs.java +++ b/src/main/java/org/radarcns/util/commandline/CommandLineArgs.java @@ -28,4 +28,7 @@ public class CommandLineArgs { @Parameter(names = { "-h", "--help"}, help = true, description = "Display the usage of the program with available options.") public boolean help; + + @Parameter(names = { "--no-stage"}, description = "Do not stage output files into a temporary directory before moving them to the data directory. This increases performance but may leave corrupted data files.") + public boolean noStage = false; } \ No newline at end of file diff --git a/src/main/java/org/radarcns/util/commandline/HdfsUriValidator.java b/src/main/java/org/radarcns/util/commandline/HdfsUriValidator.java index 3a625a5a..6680b42d 100644 --- a/src/main/java/org/radarcns/util/commandline/HdfsUriValidator.java +++ b/src/main/java/org/radarcns/util/commandline/HdfsUriValidator.java @@ -1,8 +1,8 @@ package org.radarcns.util.commandline; -import com.beust.jcommander.ParameterException; import com.beust.jcommander.IParameterValidator; +import com.beust.jcommander.ParameterException; public class HdfsUriValidator implements IParameterValidator{ @Override diff --git a/src/main/java/org/radarcns/util/commandline/PathValidator.java b/src/main/java/org/radarcns/util/commandline/PathValidator.java index d3d3935d..ae25ea67 100644 --- a/src/main/java/org/radarcns/util/commandline/PathValidator.java +++ b/src/main/java/org/radarcns/util/commandline/PathValidator.java @@ -1,7 +1,7 @@ package org.radarcns.util.commandline; -import com.beust.jcommander.ParameterException; import com.beust.jcommander.IParameterValidator; +import com.beust.jcommander.ParameterException; public class PathValidator implements IParameterValidator{ @Override diff --git a/src/test/java/org/radarcns/OffsetRangeSetTest.java b/src/test/java/org/radarcns/OffsetRangeSetTest.java index 82488afe..4bf29416 100644 --- a/src/test/java/org/radarcns/OffsetRangeSetTest.java +++ b/src/test/java/org/radarcns/OffsetRangeSetTest.java @@ -2,7 +2,9 @@ import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class OffsetRangeSetTest { @Test diff --git a/src/test/java/org/radarcns/RestructureAvroRecordsTest.java b/src/test/java/org/radarcns/RestructureAvroRecordsTest.java index 3c51d660..b48d7f1a 100644 --- a/src/test/java/org/radarcns/RestructureAvroRecordsTest.java +++ b/src/test/java/org/radarcns/RestructureAvroRecordsTest.java @@ -16,19 +16,16 @@ package org.radarcns; -import static org.junit.Assert.*; - -import java.text.SimpleDateFormat; -import java.util.Calendar; -import java.util.Date; -import java.util.TimeZone; import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.junit.Test; +import java.util.Date; + +import static org.junit.Assert.assertEquals; + public class RestructureAvroRecordsTest { @Test public void createHourTimestamp() throws Exception { diff --git a/src/test/java/org/radarcns/util/CsvAvroConverterTest.java b/src/test/java/org/radarcns/data/CsvAvroConverterTest.java similarity index 96% rename from src/test/java/org/radarcns/util/CsvAvroConverterTest.java rename to src/test/java/org/radarcns/data/CsvAvroConverterTest.java index 0ac8ce43..992c8338 100644 --- a/src/test/java/org/radarcns/util/CsvAvroConverterTest.java +++ b/src/test/java/org/radarcns/data/CsvAvroConverterTest.java @@ -14,27 +14,7 @@ * limitations under the License. */ -package org.radarcns.util; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import com.fasterxml.jackson.databind.JsonMappingException; - -import java.io.*; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Arrays; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; +package org.radarcns.data; import org.apache.avro.Schema; import org.apache.avro.Schema.Parser; @@ -50,6 +30,34 @@ import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Reader; +import java.io.StringReader; +import java.io.StringWriter; +import java.io.Writer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + public class CsvAvroConverterTest { @Rule public ExpectedException exception = ExpectedException.none(); diff --git a/src/test/java/org/radarcns/util/FileCacheStoreTest.java b/src/test/java/org/radarcns/data/FileCacheStoreTest.java similarity index 82% rename from src/test/java/org/radarcns/util/FileCacheStoreTest.java rename to src/test/java/org/radarcns/data/FileCacheStoreTest.java index a2447f1b..5aeb2c6c 100644 --- a/src/test/java/org/radarcns/util/FileCacheStoreTest.java +++ b/src/test/java/org/radarcns/data/FileCacheStoreTest.java @@ -14,13 +14,7 @@ * limitations under the License. */ -package org.radarcns.util; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; +package org.radarcns.data; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; @@ -29,8 +23,28 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +@RunWith(Parameterized.class) public class FileCacheStoreTest { + + @Parameterized.Parameters + public static Collection doStage() { + return Arrays.asList(Boolean.TRUE, Boolean.FALSE); + } + + @Parameterized.Parameter + public Boolean doStage; + @Rule public TemporaryFolder folder = new TemporaryFolder(); @@ -58,32 +72,32 @@ public void appendLine() throws IOException { GenericRecord record; - try (FileCacheStore cache = new FileCacheStore(csvFactory, 2, false, false)) { + try (FileCacheStore cache = new FileCacheStore(csvFactory, 2, false, false, doStage)) { record = new GenericRecordBuilder(simpleSchema).set("a", "something").build(); - assertEquals(cache.writeRecord(f1, record), FileCacheStore.NO_CACHE_AND_WRITE); + assertEquals(cache.writeRecord(f1, record), FileCacheStore.WriteResponse.NO_CACHE_AND_WRITE); record = new GenericRecordBuilder(simpleSchema).set("a", "somethingElse").build(); - assertEquals(cache.writeRecord(f1, record), FileCacheStore.CACHE_AND_WRITE); + assertEquals(cache.writeRecord(f1, record), FileCacheStore.WriteResponse.CACHE_AND_WRITE); record = new GenericRecordBuilder(simpleSchema).set("a", "something").build(); - assertEquals(cache.writeRecord(f2, record), FileCacheStore.NO_CACHE_AND_WRITE); + assertEquals(cache.writeRecord(f2, record), FileCacheStore.WriteResponse.NO_CACHE_AND_WRITE); record = new GenericRecordBuilder(simpleSchema).set("a", "third").build(); - assertEquals(cache.writeRecord(f1, record), FileCacheStore.CACHE_AND_WRITE); + assertEquals(cache.writeRecord(f1, record), FileCacheStore.WriteResponse.CACHE_AND_WRITE); record = new GenericRecordBuilder(simpleSchema).set("a", "f3").build(); - assertEquals(cache.writeRecord(f3, record), FileCacheStore.NO_CACHE_AND_WRITE); + assertEquals(cache.writeRecord(f3, record), FileCacheStore.WriteResponse.NO_CACHE_AND_WRITE); record = new GenericRecordBuilder(simpleSchema).set("a", "f2").build(); - assertEquals(cache.writeRecord(f2, record), FileCacheStore.NO_CACHE_AND_WRITE); + assertEquals(cache.writeRecord(f2, record), FileCacheStore.WriteResponse.NO_CACHE_AND_WRITE); record = new GenericRecordBuilder(simpleSchema).set("a", "f3").build(); - assertEquals(cache.writeRecord(f3, record), FileCacheStore.CACHE_AND_WRITE); + assertEquals(cache.writeRecord(f3, record), FileCacheStore.WriteResponse.CACHE_AND_WRITE); record = new GenericRecordBuilder(simpleSchema).set("a", "f4").build(); - assertEquals(cache.writeRecord(f4, record), FileCacheStore.NO_CACHE_AND_WRITE); + assertEquals(cache.writeRecord(f4, record), FileCacheStore.WriteResponse.NO_CACHE_AND_WRITE); record = new GenericRecordBuilder(simpleSchema).set("a", "f3").build(); - assertEquals(cache.writeRecord(f3, record), FileCacheStore.CACHE_AND_WRITE); + assertEquals(cache.writeRecord(f3, record), FileCacheStore.WriteResponse.CACHE_AND_WRITE); record = new GenericRecordBuilder(conflictSchema).set("a", "f3"). set("b", "conflict").build(); - assertEquals(cache.writeRecord(f3, record), FileCacheStore.CACHE_AND_NO_WRITE); + assertEquals(cache.writeRecord(f3, record), FileCacheStore.WriteResponse.CACHE_AND_NO_WRITE); record = new GenericRecordBuilder(conflictSchema).set("a", "f1"). set("b", "conflict").build(); // Cannot write to file even though the file is not in cache since schema is different - assertEquals(cache.writeRecord(f1, record), FileCacheStore.NO_CACHE_AND_NO_WRITE); + assertEquals(cache.writeRecord(f1, record), FileCacheStore.WriteResponse.NO_CACHE_AND_NO_WRITE); // Can write the same record to a new file - assertEquals(cache.writeRecord(newFile, record), FileCacheStore.NO_CACHE_AND_WRITE); + assertEquals(cache.writeRecord(newFile, record), FileCacheStore.WriteResponse.NO_CACHE_AND_WRITE); } assertEquals("a\nsomething\nsomethingElse\nthird\n", new String(Files.readAllBytes(f1))); diff --git a/src/test/java/org/radarcns/util/FileCacheTest.java b/src/test/java/org/radarcns/data/FileCacheTest.java similarity index 86% rename from src/test/java/org/radarcns/util/FileCacheTest.java rename to src/test/java/org/radarcns/data/FileCacheTest.java index 9481d72c..9779284c 100644 --- a/src/test/java/org/radarcns/util/FileCacheTest.java +++ b/src/test/java/org/radarcns/data/FileCacheTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.radarcns.util; +package org.radarcns.data; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; @@ -24,6 +24,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.BufferedReader; import java.io.IOException; @@ -32,6 +34,8 @@ import java.io.Reader; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; import java.util.zip.GZIPInputStream; import static org.junit.Assert.assertEquals; @@ -40,16 +44,28 @@ /** * Created by joris on 03/07/2017. */ +@RunWith(Parameterized.class) public class FileCacheTest { + @Parameterized.Parameters + public static Collection useTmpDir() { + return Arrays.asList(Boolean.TRUE, Boolean.FALSE); + } + @Rule public TemporaryFolder folder = new TemporaryFolder(); private Path path; private RecordConverterFactory csvFactory; private Record exampleRecord; + private Path tmpDir; + + @Parameterized.Parameter + public Boolean useTmpDir; @Before public void setUp() throws IOException { this.path = folder.newFile("f").toPath(); + this.tmpDir = useTmpDir ? folder.newFolder().toPath() : null; + this.csvFactory = CsvAvroConverter.getFactory(); Schema schema = SchemaBuilder.record("simple").fields() .name("a").type("string").noDefault() @@ -59,7 +75,7 @@ public void setUp() throws IOException { @Test public void testGzip() throws IOException { - try (FileCache cache = new FileCache(csvFactory, path, exampleRecord, true)) { + try (FileCache cache = new FileCache(csvFactory, path, exampleRecord, true, tmpDir)) { cache.writeRecord(exampleRecord); } @@ -77,11 +93,11 @@ public void testGzip() throws IOException { @Test public void testGzipAppend() throws IOException { - try (FileCache cache = new FileCache(csvFactory, path, exampleRecord, true)) { + try (FileCache cache = new FileCache(csvFactory, path, exampleRecord, true, tmpDir)) { cache.writeRecord(exampleRecord); } - try (FileCache cache = new FileCache(csvFactory, path, exampleRecord, true)) { + try (FileCache cache = new FileCache(csvFactory, path, exampleRecord, true, tmpDir)) { cache.writeRecord(exampleRecord); } @@ -101,7 +117,7 @@ public void testGzipAppend() throws IOException { @Test public void testPlain() throws IOException { - try (FileCache cache = new FileCache(csvFactory, path, exampleRecord, false)) { + try (FileCache cache = new FileCache(csvFactory, path, exampleRecord, false, tmpDir)) { cache.writeRecord(exampleRecord); } @@ -116,11 +132,12 @@ public void testPlain() throws IOException { @Test public void testPlainAppend() throws IOException { - try (FileCache cache = new FileCache(csvFactory, path, exampleRecord, false)) { + + try (FileCache cache = new FileCache(csvFactory, path, exampleRecord, false, tmpDir)) { cache.writeRecord(exampleRecord); } - try (FileCache cache = new FileCache(csvFactory, path, exampleRecord, false)) { + try (FileCache cache = new FileCache(csvFactory, path, exampleRecord, false, tmpDir)) { cache.writeRecord(exampleRecord); } @@ -137,10 +154,11 @@ public void testPlainAppend() throws IOException { @Test public void compareTo() throws IOException { Path file3 = folder.newFile("g").toPath(); + Path tmpDir = folder.newFolder().toPath(); - try (FileCache cache1 = new FileCache(csvFactory, path, exampleRecord, false); - FileCache cache2 = new FileCache(csvFactory, path, exampleRecord, false); - FileCache cache3 = new FileCache(csvFactory, file3, exampleRecord, false)) { + try (FileCache cache1 = new FileCache(csvFactory, path, exampleRecord, false, tmpDir); + FileCache cache2 = new FileCache(csvFactory, path, exampleRecord, false, tmpDir); + FileCache cache3 = new FileCache(csvFactory, file3, exampleRecord, false, tmpDir)) { assertEquals(0, cache1.compareTo(cache2)); // filenames are not equal assertEquals(-1, cache1.compareTo(cache3)); diff --git a/src/test/java/org/radarcns/util/JsonAvroConverterTest.java b/src/test/java/org/radarcns/data/JsonAvroConverterTest.java similarity index 92% rename from src/test/java/org/radarcns/util/JsonAvroConverterTest.java rename to src/test/java/org/radarcns/data/JsonAvroConverterTest.java index 9b5d4eb8..524ce380 100644 --- a/src/test/java/org/radarcns/util/JsonAvroConverterTest.java +++ b/src/test/java/org/radarcns/data/JsonAvroConverterTest.java @@ -14,22 +14,11 @@ * limitations under the License. */ -package org.radarcns.util; - -import static org.junit.Assert.assertEquals; -import static org.radarcns.util.CsvAvroConverterTest.writeTestNumbers; +package org.radarcns.data; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.SerializationFeature; - -import java.io.*; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; import org.apache.avro.Schema; import org.apache.avro.Schema.Parser; import org.apache.avro.generic.GenericDatumReader; @@ -40,6 +29,22 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.StringReader; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.radarcns.data.CsvAvroConverterTest.writeTestNumbers; + public class JsonAvroConverterTest { @Rule public TemporaryFolder folder = new TemporaryFolder(); diff --git a/src/test/resources/org/radarcns/util/full.avsc b/src/test/resources/org/radarcns/data/full.avsc similarity index 100% rename from src/test/resources/org/radarcns/util/full.avsc rename to src/test/resources/org/radarcns/data/full.avsc diff --git a/src/test/resources/org/radarcns/util/full.json b/src/test/resources/org/radarcns/data/full.json similarity index 100% rename from src/test/resources/org/radarcns/util/full.json rename to src/test/resources/org/radarcns/data/full.json