Skip to content

Commit

Permalink
Merge pull request #23 from civitaspo/v0.2.0
Browse files Browse the repository at this point in the history
V0.2.0
  • Loading branch information
civitaspo committed Feb 16, 2016
2 parents 837120e + faafe4b commit cb126e2
Show file tree
Hide file tree
Showing 14 changed files with 568 additions and 319 deletions.
3 changes: 3 additions & 0 deletions CHENGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
0.2.0 (2016-02-xx)
==================
- [Add] `decompression` option
33 changes: 18 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ Read files on Hdfs.

- **config_files** list of paths to Hadoop's configuration files (array of strings, default: `[]`)
- **config** overwrites configuration parameters (hash, default: `{}`)
- **path** file path on Hdfs. you can use glob and Date format like `%Y%m%d/%s`.
- **rewind_seconds** When you use Date format in input_path property, the format is executed by using the time which is Now minus this property.
- **partition** when this is true, partition input files and increase task count. (default: `true`)
- **num_partitions** number of partitions. (default: `Runtime.getRuntime().availableProcessors()`)
- **skip_header_lines** Skip this number of lines first. Set 1 if the file has header line. (default: `0`)
- **path** file path on Hdfs. you can use glob and Date format like `%Y%m%d/%s` (string, required).
- **rewind_seconds** When you use Date format in input_path property, the format is executed by using the time which is Now minus this property. (long, default: `0`)
- **partition** when this is true, partition input files and increase task count. (boolean, default: `true`)
- **num_partitions** number of partitions. (long, default: `Runtime.getRuntime().availableProcessors()`)
- **skip_header_lines** Skip this number of lines first. Set 1 if the file has header line. (long, default: `0`)
- **decompression** Decompress compressed files by hadoop compression codec api. (boolean. default: `false`)

## Example

Expand Down Expand Up @@ -77,18 +78,20 @@ int partitionSizeByOneTask = totalFileLength / approximateNumPartitions;
...
*/
int numPartitions;
if (path.toString().endsWith(".gz") || path.toString().endsWith(".bz2") || path.toString().endsWith(".lzo")) {
// if the file is compressed, skip partitioning.
numPartitions = 1;
long numPartitions;
if (task.getPartition()) {
if (file.canDecompress()) {
numPartitions = ((fileLength - 1) / partitionSizeByOneTask) + 1;
}
else if (file.getCodec() != null) { // if not null, the file is compressed.
numPartitions = 1;
}
else {
numPartitions = ((fileLength - 1) / partitionSizeByOneTask) + 1;
}
}
else if (!task.getPartition()) {
// if no partition mode, skip partitioning.
numPartitions = 1;
}
else {
// equalize the file size per task as much as possible.
numPartitions = ((fileLength - 1) / partitionSizeByOneTask) + 1;
numPartitions = 1;
}
/*
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ configurations {
provided
}

version = "0.1.9"
version = "0.2.0"

sourceCompatibility = 1.7
targetCompatibility = 1.7
Expand Down
Loading

0 comments on commit cb126e2

Please sign in to comment.