Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,30 @@ Build jar from source with
```shell
./gradlew build
```
and find the output JAR file as `build/libs/restructurehdfs-0.3.1-all.jar`. Then run with:
and find the output JAR file as `build/libs/restructurehdfs-0.3.3-all.jar`. Then run with:

```shell
java -jar restructurehdfs-0.3.1-all.jar <webhdfs_url> <hdfs_topic_path> <output_folder>
java -jar restructurehdfs-0.3.3-all.jar --hdfs-uri <webhdfs_url> --output-directory <output_folder> <input_path_1> [<input_path_2> ...]
```
or you can use the short form as well like -
```shell
java -jar restructurehdfs-0.3.3-all.jar -u <webhdfs_url> -o <output_folder> <input_path_1> [<input_path_2> ...]
```

By default, this will output the data in CSV format. If JSON format is preferred, use the following instead:
To display the usage and all available options you can use the help option as follows -
```shell
java -jar restructurehdfs-0.3.3-all.jar --help
```
java -Dorg.radarcns.format=json -jar restructurehdfs-0.3.1-all.jar <webhdfs_url> <hdfs_topic_path> <output_folder>
Note that the options preceded by the `*` in the above output are required to run the app. Also note that there can be multiple input paths from which to read the files. Eg - `/topicAndroidNew/topic1 /topicAndroidNew/topic2 ...`. At least one input path is required.

By default, this will output the data in CSV format. If JSON format is preferred, use the following instead:
```shell
java -jar restructurehdfs-0.3.3-all.jar --format json --hdfs-uri <webhdfs_url> --output-directory <output_folder> <input_path_1> [<input_path_2> ...]
```

Another option is to output the data in compressed form. All files will get the `gz` suffix, and can be decompressed with a GZIP decoder. Note that for a very small number of records, this may actually increase the file size.
```
java -Dorg.radarcns.compression=gzip -jar restructurehdfs-0.3.1-all.jar <webhdfs_url> <hdfs_topic_path> <output_folder>
java -jar restructurehdfs-0.3.3-all.jar --compression gzip --hdfs-uri <webhdfs_url> --output-directory <output_folder> <input_path_1> [<input_path_2> ...]
```

Finally, by default, files records are not deduplicated after writing. To enable this behaviour, specify the option `-Dorg.radarcns.deduplicate=true`. This set to false by default because of an issue with Biovotion data. Please see - [issue #16](https://github.com/RADAR-base/Restructure-HDFS-topic/issues/16) before enabling it.
Finally, by default, files records are not deduplicated after writing. To enable this behaviour, specify the option `--deduplicate` or `-d`. This set to false by default because of an issue with Biovotion data. Please see - [issue #16](https://github.com/RADAR-base/Restructure-HDFS-topic/issues/16) before enabling it.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ targetCompatibility = '1.8'
ext.avroVersion = '1.8.2'
ext.jacksonVersion = '2.8.9'
ext.hadoopVersion = '2.7.3'
ext.jCommanderVersion = '1.72'

repositories {
jcenter()
Expand All @@ -27,6 +28,8 @@ dependencies {
compile group: 'com.fasterxml.jackson.core' , name: 'jackson-databind', version: jacksonVersion
compile group: 'com.fasterxml.jackson.dataformat' , name: 'jackson-dataformat-csv', version: jacksonVersion

compile group: 'com.beust', name: 'jcommander', version: jCommanderVersion

runtime group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: hadoopVersion

testCompile group: 'junit', name: 'junit', version: '4.12'
Expand Down
85 changes: 68 additions & 17 deletions src/main/java/org/radarcns/RestructureAvroRecords.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.radarcns;

import com.beust.jcommander.JCommander;
import com.fasterxml.jackson.databind.JsonMappingException;
import org.apache.avro.Schema.Field;
import org.apache.avro.file.DataFileReader;
Expand All @@ -32,6 +33,7 @@
import org.radarcns.util.JsonAvroConverter;
import org.radarcns.util.ProgressBar;
import org.radarcns.util.RecordConverterFactory;
import org.radarcns.util.commandline.CommandLineArgs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -70,27 +72,39 @@ public class RestructureAvroRecords {

private long processedFileCount;
private long processedRecordsCount;
private static final boolean USE_GZIP = "gzip".equalsIgnoreCase(System.getProperty("org.radarcns.compression"));

// Default set to false because causes loss of records from Biovotion data. https://github.com/RADAR-base/Restructure-HDFS-topic/issues/16
private static final boolean DO_DEDUPLICATE = "true".equalsIgnoreCase(System.getProperty("org.radarcns.deduplicate", "false"));
private final boolean useGzip;
private final boolean doDeduplicate;

public static void main(String [] args) throws Exception {
if (args.length != 3) {
System.out.println("Usage: hadoop jar restructurehdfs-all-0.2.jar <webhdfs_url> <hdfs_root_directory> <output_folder>");
System.exit(1);

final CommandLineArgs commandLineArgs = new CommandLineArgs();
final JCommander parser = JCommander.newBuilder().addObject(commandLineArgs).build();

parser.setProgramName("hadoop jar restructurehdfs-all-0.3.3.jar");
parser.parse(args);

if(commandLineArgs.help) {
parser.usage();
System.exit(0);
}

logger.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
logger.info("Starting...");
logger.info("In: " + args[0] + args[1]);
logger.info("Out: " + args[2]);

long time1 = System.currentTimeMillis();

RestructureAvroRecords restr = new RestructureAvroRecords(args[0], args[2]);
RestructureAvroRecords restr = new RestructureAvroRecords.Builder(commandLineArgs.hdfsUri,
commandLineArgs.outputDirectory)
.useGzip("gzip".equalsIgnoreCase(commandLineArgs.compression))
.doDeduplicate(commandLineArgs.deduplicate).format(commandLineArgs.format)
.build();

try {
restr.start(args[1]);
for(String input : commandLineArgs.inputPaths) {
logger.info("In: " + commandLineArgs.hdfsUri + input);
logger.info("Out: " + commandLineArgs.outputDirectory);
restr.start(input);
}
} catch (IOException ex) {
logger.error("Processing failed", ex);
}
Expand All @@ -99,12 +113,16 @@ public static void main(String [] args) throws Exception {
logger.info("Time taken: {} seconds", (System.currentTimeMillis() - time1)/1000d);
}

public RestructureAvroRecords(String inputPath, String outputPath) {
this.setInputWebHdfsURL(inputPath);
this.setOutputPath(outputPath);
private RestructureAvroRecords(RestructureAvroRecords.Builder builder) {
this.setInputWebHdfsURL(builder.hdfsUri);
this.setOutputPath(builder.outputPath);

this.useGzip = builder.useGzip;
this.doDeduplicate = builder.doDeduplicate;
logger.info("Deduplicate set to {}", doDeduplicate);

String extension;
if (System.getProperty("org.radarcns.format", "csv").equalsIgnoreCase("json")) {
if (builder.format.equalsIgnoreCase("json")) {
logger.info("Writing output files in JSON format");
converterFactory = JsonAvroConverter.getFactory();
extension = "json";
Expand All @@ -113,7 +131,7 @@ public RestructureAvroRecords(String inputPath, String outputPath) {
converterFactory = CsvAvroConverter.getFactory();
extension = "csv";
}
if (USE_GZIP) {
if (this.useGzip) {
logger.info("Compressing output files in GZIP format");
extension += ".gz";
}
Expand Down Expand Up @@ -179,7 +197,7 @@ public void start(String directoryName) throws IOException {

// Actually process the files
for (Map.Entry<String, List<Path>> entry : topicPaths.entrySet()) {
try (FileCacheStore cache = new FileCacheStore(converterFactory, 100, USE_GZIP, DO_DEDUPLICATE)) {
try (FileCacheStore cache = new FileCacheStore(converterFactory, 100, useGzip, doDeduplicate)) {
for (Path filePath : entry.getValue()) {
// If JsonMappingException occurs, log the error and continue with other files
try {
Expand Down Expand Up @@ -347,4 +365,37 @@ public static Date getDate(GenericRecord keyField, GenericRecord valueField) {
long time = (Long) keyField.get("start");
return new Date(time);
}

public static class Builder {
private boolean useGzip;
private boolean doDeduplicate;
private String hdfsUri;
private String outputPath;
private String format;

public Builder(final String uri, final String outputPath) {
this.hdfsUri = uri;
this.outputPath = outputPath;
}

public Builder useGzip(final boolean gzip) {
this.useGzip = gzip;
return this;
}

public Builder doDeduplicate(final boolean dedup) {
this.doDeduplicate = dedup;
return this;
}

public Builder format(final String format) {
this.format = format;
return this;
}

public RestructureAvroRecords build() {
return new RestructureAvroRecords(this);
}

}
}
31 changes: 31 additions & 0 deletions src/main/java/org/radarcns/util/commandline/CommandLineArgs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.radarcns.util.commandline;

import com.beust.jcommander.Parameter;

import java.util.ArrayList;
import java.util.List;

public class CommandLineArgs {

@Parameter(description = "<input_path_1> [<input_path_2> ...]", variableArity = true, required = true)
public List<String> inputPaths = new ArrayList<>();

@Parameter(names = { "-f", "--format" }, description = "Format to use when converting the files. JSON and CSV is available.")
public String format = "csv";

@Parameter(names = { "-c", "--compression" }, description = "Compression to use when converting the files. Gzip is available.")
public String compression = "none";

// Default set to false because causes loss of records from Biovotion data. https://github.com/RADAR-base/Restructure-HDFS-topic/issues/16
@Parameter(names = { "-d", "--deduplicate" }, description = "Boolean to define if to use deduplication or not.")
public boolean deduplicate;

@Parameter(names = { "-u", "--hdfs-uri" }, description = "The HDFS uri to connect to. Eg - 'hdfs://<HOST>:<RPC_PORT>/<PATH>'.", required = true, validateWith = { HdfsUriValidator.class, PathValidator.class })
public String hdfsUri;

@Parameter(names = { "-o", "--output-directory"}, description = "The output folder where the files are to be extracted.", required = true, validateWith = PathValidator.class)
public String outputDirectory;

@Parameter(names = { "-h", "--help"}, help = true, description = "Display the usage of the program with available options.")
public boolean help;
}
16 changes: 16 additions & 0 deletions src/main/java/org/radarcns/util/commandline/HdfsUriValidator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.radarcns.util.commandline;


import com.beust.jcommander.ParameterException;
import com.beust.jcommander.IParameterValidator;

public class HdfsUriValidator implements IParameterValidator{
@Override
public void validate(String name, String value) throws ParameterException {
if (! value.matches("((hdfs)|(webhdfs)):(/?/?)[^\\s]+")) {
throw new ParameterException("Parameter " + name + " should be a valid HDFS or WebHDFS URI. "
+ "Eg - hdfs://<HOST>:<RPC_PORT>/<PATH>. (found " + value
+ "). Please run with --help or -h for more information.");
}
}
}
15 changes: 15 additions & 0 deletions src/main/java/org/radarcns/util/commandline/PathValidator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.radarcns.util.commandline;

import com.beust.jcommander.ParameterException;
import com.beust.jcommander.IParameterValidator;

public class PathValidator implements IParameterValidator{
@Override
public void validate(String name, String value) throws ParameterException {
if (value == null || value.isEmpty()) {
throw new ParameterException("Parameter " + name + " should be supplied. "
+ "It cannot be empty or null. (found " + value +")."
+ "Please run with --help or -h for more information.");
}
}
}