Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions example/config_avoid_create_0byte_file.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
hdfs_example: &hdfs_example
config_files:
- /etc/hadoop/conf/core-site.xml
- /etc/hadoop/conf/hdfs-site.xml
config:
fs.defaultFS: 'hdfs://hadoop-nn1:8020'
fs.hdfs.impl: 'org.apache.hadoop.hdfs.DistributedFileSystem'
fs.file.impl: 'org.apache.hadoop.fs.LocalFileSystem'

local_fs_example: &local_fs_example
config:
fs.defaultFS: 'file:///'
fs.hdfs.impl: 'org.apache.hadoop.fs.RawLocalFileSystem'
fs.file.impl: 'org.apache.hadoop.fs.RawLocalFileSystem'
io.compression.codecs: 'org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec'

exec:
min_output_tasks: 10

in:
type: file
path_prefix: example/data
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
header_line: true
stop_on_invalid_record: true
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}


out:
type: hdfs
<<: *local_fs_example
path_prefix: /tmp/embulk-output-hdfs_example/file_
file_ext: csv
delete_in_advance: FILE_ONLY
formatter:
type: csv
newline: CRLF
newline_in_field: LF
header_line: false
charset: UTF-8
quote_policy: NONE
quote: '"'
escape: '\'
null_string: ''
default_timezone: UTC
20 changes: 9 additions & 11 deletions src/main/java/org/embulk/output/hdfs/HdfsFileOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,30 +122,28 @@ public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex)
{
private final List<String> hdfsFileNames = new ArrayList<>();
private int fileIndex = 0;
private Path currentPath = null;
private OutputStream output = null;

@Override
public void nextFile()
{
closeCurrentStream();
Path path = new Path(pathPrefix + String.format(sequenceFormat, taskIndex, fileIndex) + pathSuffix);
try {
FileSystem fs = getFs(task);
output = fs.create(path, task.getOverwrite());
logger.info("Uploading '{}'", path);
}
catch (IOException e) {
logger.error(e.getMessage());
throw new RuntimeException(e);
}
hdfsFileNames.add(path.toString());
currentPath = new Path(pathPrefix + String.format(sequenceFormat, taskIndex, fileIndex) + pathSuffix);
fileIndex++;
}

@Override
public void add(Buffer buffer)
{
try {
// this implementation is for creating file when there is data.
if (output == null) {
FileSystem fs = getFs(task);
output = fs.create(currentPath, task.getOverwrite());
logger.info("Uploading '{}'", currentPath);
hdfsFileNames.add(currentPath.toString());
}
output.write(buffer.array(), buffer.offset(), buffer.limit());
}
catch (IOException e) {
Expand Down