Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ Another option is to output the data in compressed form. All files will get the
java -jar restructurehdfs-0.3.3-all.jar --compression gzip --hdfs-uri <webhdfs_url> --output-directory <output_folder> <input_path_1> [<input_path_2> ...]
```

Finally, by default, files records are not deduplicated after writing. To enable this behaviour, specify the option `--deduplicate` or `-d`. This set to false by default because of an issue with Biovotion data. Please see - [issue #16](https://github.com/RADAR-base/Restructure-HDFS-topic/issues/16) before enabling it.
By default, files records are not deduplicated after writing. To enable this behaviour, specify the option `--deduplicate` or `-d`. This set to false by default because of an issue with Biovotion data. Please see - [issue #16](https://github.com/RADAR-base/Restructure-HDFS-topic/issues/16) before enabling it.

Finally, while processing, files are staged to a temporary directory and moved to the output directory afterwards. This has the advantage of less chance of data corruption, but it may result in slower performance. Disable staging using the `--no-stage` option.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apply plugin: 'java'
apply plugin: 'application'

group 'org.radarcns.restructurehdfs'
version '0.3.3-SNAPSHOT'
version '0.4.0-SNAPSHOT'
mainClassName = 'org.radarcns.RestructureAvroRecords'

run {
Expand Down
52 changes: 33 additions & 19 deletions src/main/java/org/radarcns/RestructureAvroRecords.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.radarcns.util.CsvAvroConverter;
import org.radarcns.util.FileCacheStore;
import org.radarcns.util.JsonAvroConverter;
import org.radarcns.data.CsvAvroConverter;
import org.radarcns.data.FileCacheStore;
import org.radarcns.data.JsonAvroConverter;
import org.radarcns.data.RecordConverterFactory;
import org.radarcns.util.ProgressBar;
import org.radarcns.util.RecordConverterFactory;
import org.radarcns.util.commandline.CommandLineArgs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,6 +48,7 @@
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.regex.Pattern;

public class RestructureAvroRecords {
private static final Logger logger = LoggerFactory.getLogger(RestructureAvroRecords.class);
Expand All @@ -57,12 +58,14 @@ public class RestructureAvroRecords {
private static final java.nio.file.Path BINS_FILE_NAME = Paths.get("bins.csv");
private static final java.nio.file.Path SCHEMA_OUTPUT_FILE_NAME = Paths.get("schema.json");
private static final SimpleDateFormat FILE_DATE_FORMAT = new SimpleDateFormat("yyyyMMdd_HH");
private static final Pattern ILLEGAL_CHARACTER_PATTERN = Pattern.compile("[^a-zA-Z0-9_-]+");

static {
FILE_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
}

private final RecordConverterFactory converterFactory;
private final boolean doStage;

private java.nio.file.Path outputPath;
private java.nio.file.Path offsetsPath;
Expand Down Expand Up @@ -97,6 +100,7 @@ public static void main(String [] args) {
commandLineArgs.outputDirectory)
.useGzip("gzip".equalsIgnoreCase(commandLineArgs.compression))
.doDeduplicate(commandLineArgs.deduplicate).format(commandLineArgs.format)
.doStage(!commandLineArgs.noStage)
.build();

try {
Expand All @@ -119,6 +123,7 @@ private RestructureAvroRecords(RestructureAvroRecords.Builder builder) {

this.useGzip = builder.useGzip;
this.doDeduplicate = builder.doDeduplicate;
this.doStage = builder.doStage;
logger.info("Deduplicate set to {}", doDeduplicate);

String extension;
Expand Down Expand Up @@ -197,7 +202,7 @@ public void start(String directoryName) throws IOException {

// Actually process the files
for (Map.Entry<String, List<Path>> entry : topicPaths.entrySet()) {
try (FileCacheStore cache = new FileCacheStore(converterFactory, 100, useGzip, doDeduplicate)) {
try (FileCacheStore cache = new FileCacheStore(converterFactory, 100, useGzip, doDeduplicate, doStage)) {
for (Path filePath : entry.getValue()) {
// If JsonMappingException occurs, log the error and continue with other files
try {
Expand Down Expand Up @@ -284,27 +289,18 @@ private void writeRecord(GenericRecord record, String topicName, FileCacheStore
Date time = getDate(keyField, valueField);
java.nio.file.Path outputFileName = createFilename(time, suffix);

String projectId;

if(keyField.get("projectId") == null) {
projectId = "unknown-project";
} else {
// Clean Project id for use in final pathname
projectId = keyField.get("projectId").toString().replaceAll("[^a-zA-Z0-9_-]+", "");
}

// Clean user id and create final output pathname
String userId = keyField.get("userId").toString().replaceAll("[^a-zA-Z0-9_-]+", "");
String projectId = sanitizeId(keyField.get("projectId"), "unknown-project");
String userId = sanitizeId(keyField.get("userId"), "unknown-user");

java.nio.file.Path projectDir = this.outputPath.resolve(projectId);
java.nio.file.Path userDir = projectDir.resolve(userId);
java.nio.file.Path userTopicDir = userDir.resolve(topicName);
java.nio.file.Path outputPath = userTopicDir.resolve(outputFileName);

// Write data
int response = cache.writeRecord(outputPath, record);
FileCacheStore.WriteResponse response = cache.writeRecord(outputPath, record);

if (response == FileCacheStore.CACHE_AND_NO_WRITE || response == FileCacheStore.NO_CACHE_AND_NO_WRITE) {
if (!response.isSuccessful()) {
// Write was unsuccessful due to different number of columns,
// try again with new file name
writeRecord(record, topicName, cache, ++suffix);
Expand All @@ -317,8 +313,9 @@ private void writeRecord(GenericRecord record, String topicName, FileCacheStore
}
}

String sourceId = sanitizeId(keyField.get("sourceId"), "unknown-source");
// Count data (binned and total)
bins.add(topicName, keyField.get("sourceId").toString(), time);
bins.add(topicName, sourceId, time);
processedRecordsCount++;
}
}
Expand Down Expand Up @@ -366,12 +363,25 @@ public static Date getDate(GenericRecord keyField, GenericRecord valueField) {
return new Date(time);
}

private static String sanitizeId(Object id, String defaultValue) {
if (id == null) {
return defaultValue;
}
String idString = ILLEGAL_CHARACTER_PATTERN.matcher(id.toString()).replaceAll("");
if (idString.isEmpty()) {
return defaultValue;
} else {
return idString;
}
}

public static class Builder {
private boolean useGzip;
private boolean doDeduplicate;
private String hdfsUri;
private String outputPath;
private String format;
private boolean doStage;

public Builder(final String uri, final String outputPath) {
this.hdfsUri = uri;
Expand All @@ -397,5 +407,9 @@ public RestructureAvroRecords build() {
return new RestructureAvroRecords(this);
}

public Builder doStage(boolean stage) {
this.doStage = stage;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
* limitations under the License.
*/

package org.radarcns.util;
package org.radarcns.data;

import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
Expand All @@ -34,7 +33,10 @@
import java.io.Reader;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/**
* Converts deep hierarchical Avro records into flat CSV format. It uses a simple dot syntax in the
Expand Down
200 changes: 200 additions & 0 deletions src/main/java/org/radarcns/data/FileCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Copyright 2017 The Hyve
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.radarcns.data;

import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.Writer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipException;

import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;

/** Keeps path handles of a path. */
public class FileCache implements Closeable, Flushable, Comparable<FileCache> {
private static final Logger logger = LoggerFactory.getLogger(FileCache.class);
private static final int BUFFER_SIZE = 8192;

private final Writer writer;
private final RecordConverter recordConverter;
private final Path path;
private final Path tmpPath;
private long lastUse;

/**
* File cache of given path, using given converter factory.
* @param converterFactory converter factory to create a converter to write files with.
* @param path path to cache.
* @param record example record to create converter from, this is not written to path.
* @param gzip whether to gzip the records
* @throws IOException if the file and/or temporary files cannot be correctly read or written to.
*/
public FileCache(RecordConverterFactory converterFactory, Path path,
GenericRecord record, boolean gzip, Path tmpDir) throws IOException {
this.path = path;
boolean fileIsNew = !Files.exists(path) || Files.size(path) == 0;
OutputStream outFile;
if (tmpDir == null) {
this.tmpPath = null;
outFile = Files.newOutputStream(path, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
} else {
this.tmpPath = Files.createTempFile(tmpDir, path.getFileName().toString(),
gzip ? ".tmp.gz" : ".tmp");
outFile = Files.newOutputStream(tmpPath);
}

OutputStream bufOut = new BufferedOutputStream(outFile);
if (gzip) {
bufOut = new GZIPOutputStream(bufOut);
}

InputStream inputStream;
if (fileIsNew) {
inputStream = new ByteArrayInputStream(new byte[0]);
} else {
inputStream = inputStream(new BufferedInputStream(Files.newInputStream(path)), gzip);

if (tmpPath != null) {
try {
copy(path, bufOut, gzip);
} catch (ZipException ex) {
// restart output buffer
bufOut.close();
// clear output file
outFile = Files.newOutputStream(tmpPath);
bufOut = new GZIPOutputStream(new BufferedOutputStream(outFile));
}
}
}

this.writer = new OutputStreamWriter(bufOut);

try (Reader reader = new InputStreamReader(inputStream)) {
this.recordConverter = converterFactory.converterFor(writer, record, fileIsNew, reader);
} catch (IOException ex) {
try {
writer.close();
} catch (IOException exClose) {
logger.error("Failed to close writer for {}", path, ex);
}
throw ex;
}
}

/**
* Write a record to the cache.
* @param record AVRO record
* @return true or false based on {@link RecordConverter} write result
* @throws IOException if the record cannot be used.
*/
public boolean writeRecord(GenericRecord record) throws IOException {
boolean result = this.recordConverter.writeRecord(record);
lastUse = System.nanoTime();
return result;
}

@Override
public void close() throws IOException {
recordConverter.close();
writer.close();
if (tmpPath != null) {
Files.move(tmpPath, path, REPLACE_EXISTING);
}
}

@Override
public void flush() throws IOException {
recordConverter.flush();
}

/**
* Compares time that the filecaches were last used. If equal, it lexicographically compares
* the absolute path of the path.
* @param other FileCache to compare with.
*/
@Override
public int compareTo(@Nonnull FileCache other) {
int result = Long.compare(lastUse, other.lastUse);
if (result != 0) {
return result;
}
return path.compareTo(other.path);
}

/** File that the cache is maintaining. */
public Path getPath() {
return path;
}

private static void copy(Path source, OutputStream sink, boolean gzip) throws IOException {
try (InputStream copyStream = inputStream(Files.newInputStream(source), gzip)) {
copy(copyStream, sink);
} catch (ZipException ex) {
Path corruptPath = null;
String suffix = "";
for (int i = 0; corruptPath == null && i < 100; i++) {
Path path = source.resolveSibling(source.getFileName() + ".corrupted" + suffix);
if (!Files.exists(path)) {
corruptPath = path;
}
suffix = "-" + i;
}
if (corruptPath != null) {
logger.error("Original file {} was corrupted: {}."
+ " Moved to {}.", source, ex, corruptPath);
Files.move(source, corruptPath);
} else {
logger.error("Original file {} was corrupted: {}."
+ " Too many corrupt backups stored, removing file.", source, ex);
}
throw ex;
}
}

private static InputStream inputStream(InputStream in, boolean gzip) throws IOException {
return gzip ? new GZIPInputStream(in) : in;
}

/**
* Reads all bytes from an input stream and writes them to an output stream.
*/
private static void copy(InputStream source, OutputStream sink) throws IOException {
byte[] buf = new byte[BUFFER_SIZE];
int n;
while ((n = source.read(buf)) > 0) {
sink.write(buf, 0, n);
}
}
}
Loading