Skip to content

Commit

Permalink
Merge pull request #123 from NerdWallet/feature/simple_data_writer
Browse files Browse the repository at this point in the history
Feature/simple data writer
  • Loading branch information
zliu41 committed Jun 11, 2015
2 parents 45c5b85 + 65b39f5 commit 56f8635
Show file tree
Hide file tree
Showing 8 changed files with 662 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ public class ConfigurationKeys {
public static final String WRITER_FILE_SYSTEM_URI = WRITER_PREFIX + ".fs.uri";
public static final String WRITER_STAGING_DIR = WRITER_PREFIX + ".staging.dir";
public static final String WRITER_OUTPUT_DIR = WRITER_PREFIX + ".output.dir";
// WRITER_FINAL_OUTPUT_PATH is used for internal purposes only to pass the absolute writer path to the publisher
public static final String WRITER_FINAL_OUTPUT_PATH = WRITER_PREFIX + ".final.output.path";
public static final String WRITER_BUILDER_CLASS = WRITER_PREFIX + ".builder.class";
public static final String DEFAULT_WRITER_BUILDER_CLASS = "gobblin.writer.AvroDataWriterBuilder";
public static final String WRITER_FILE_NAME = WRITER_PREFIX + ".file.name";
Expand All @@ -223,6 +225,9 @@ public class ConfigurationKeys {
public static final String DEFAULT_WRITER_PARTITION_TIMEZONE = "America/Los_Angeles";
public static final String DEFAULT_WRITER_FILE_PATH_TYPE = "default";

public static final String SIMPLE_WRITER_DELIMITER = "simple.writer.delimiter";
public static final String SIMPLE_WRITER_PREPEND_SIZE = "simple.writer.prepend.size";

/**
* Configuration properties used by the quality checker.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
public enum WriterOutputFormat {
AVRO("avro"),
PARQUET("parquet"),
PROTOBUF("protobuf"),
CSV("csv");

/**
Expand Down
58 changes: 6 additions & 52 deletions gobblin-core/src/main/java/gobblin/writer/AvroHdfsDataWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
package gobblin.writer;

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.avro.Schema;
Expand All @@ -22,11 +21,8 @@
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,22 +31,17 @@
import gobblin.configuration.ConfigurationKeys;
import gobblin.configuration.State;
import gobblin.util.ForkOperatorUtils;
import gobblin.util.WriterUtils;

import gobblin.util.HadoopUtils;

/**
* An implementation of {@link DataWriter} that writes directly to HDFS in Avro format.
*
* @author ynli
*/
class AvroHdfsDataWriter implements DataWriter<GenericRecord> {
class AvroHdfsDataWriter extends FsDataWriter<GenericRecord> {

private static final Logger LOG = LoggerFactory.getLogger(AvroHdfsDataWriter.class);

private final State properties;
private final FileSystem fs;
private final Path stagingFile;
private final Path outputFile;
private final DatumWriter<GenericRecord> datumWriter;
private final DataFileWriter<GenericRecord> writer;

Expand All @@ -70,14 +61,7 @@ public enum CodecType {

public AvroHdfsDataWriter(State properties, String fileName, Schema schema, int numBranches, int branchId)
throws IOException {

String uri = properties.getProp(
ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, numBranches, branchId),
ConfigurationKeys.LOCAL_FS_URI);

Path stagingDir = WriterUtils.getWriterStagingDir(properties, numBranches, branchId);

Path outputDir = WriterUtils.getWriterOutputDir(properties, numBranches, branchId);
super(properties, fileName, numBranches, branchId);

String codecType = properties
.getProp(ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_CODEC_TYPE, numBranches, branchId),
Expand All @@ -93,30 +77,6 @@ public AvroHdfsDataWriter(State properties, String fileName, Schema schema, int

this.schema = schema;

Configuration conf = new Configuration();
// Add all job configuration properties so they are picked up by Hadoop
for (String key : properties.getPropertyNames()) {
conf.set(key, properties.getProp(key));
}
this.fs = FileSystem.get(URI.create(uri), conf);
this.properties = properties;
this.stagingFile = new Path(stagingDir, fileName);

// Deleting the staging file if it already exists, which can happen if the
// task failed and the staging file didn't get cleaned up for some reason.
// Deleting the staging file prevents the task retry from being blocked.
if (this.fs.exists(this.stagingFile)) {
LOG.warn(String.format("Task staging file %s already exists, deleting it", this.stagingFile));
this.fs.delete(this.stagingFile, false);
}

this.outputFile = new Path(outputDir, fileName);

// Create the parent directory of the output file if it does not exist
if (!this.fs.exists(this.outputFile.getParent())) {
this.fs.mkdirs(this.outputFile.getParent());
}

this.datumWriter = new GenericDatumWriter<GenericRecord>();
this.writer = createDatumWriter(this.stagingFile, bufferSize, CodecType.valueOf(codecType), deflateLevel);
}
Expand Down Expand Up @@ -160,23 +120,17 @@ public void commit()
// the output file if it already exists prevents task retry from being blocked.
if (this.fs.exists(this.outputFile)) {
LOG.warn(String.format("Task output file %s already exists", this.outputFile));
this.fs.delete(this.outputFile, false);
}

if (!this.fs.rename(this.stagingFile, this.outputFile)) {
throw new IOException("Failed to commit data from " + this.stagingFile + " to " + this.outputFile);
HadoopUtils.deletePath(this.fs, this.outputFile, false);
}

// Setting the same HDFS properties as the original file
WriterUtils.setFileAttributesFromState(properties, fs, outputFile);
HadoopUtils.renamePath(this.fs, this.stagingFile, this.outputFile);
}

@Override
public void cleanup()
throws IOException {
// Delete the staging file
if (this.fs.exists(this.stagingFile)) {
this.fs.delete(this.stagingFile, false);
HadoopUtils.deletePath(this.fs, this.stagingFile, false);
}
}

Expand Down
76 changes: 76 additions & 0 deletions gobblin-core/src/main/java/gobblin/writer/FsDataWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/* (c) 2015 NerdWallet All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied.
*/

package gobblin.writer;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import gobblin.configuration.ConfigurationKeys;
import gobblin.configuration.State;
import gobblin.util.ForkOperatorUtils;
import gobblin.util.HadoopUtils;
import gobblin.util.WriterUtils;

/**
* An implementation of {@link DataWriter} does the work of setting the output/staging dir
* and creating the FileSystem instance.
*
* @author akshay@nerdwallet.com
*/
public abstract class FsDataWriter<D> implements DataWriter<D> {
private static final Logger LOG = LoggerFactory.getLogger(FsDataWriter.class);

protected final FileSystem fs;
protected final Path stagingFile;
protected final Path outputFile;
protected final State properties;

public FsDataWriter(State properties, String fileName, int numBranches, int branchId) throws IOException {
this.properties = properties;
// initialize file system
String uri = properties.getProp(
ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, numBranches, branchId),
ConfigurationKeys.LOCAL_FS_URI
);
Configuration conf = new Configuration();
// Add all job configuration properties so they are picked up by Hadoop
for (String key : properties.getPropertyNames()) {
conf.set(key, properties.getProp(key));
}
this.fs = FileSystem.get(URI.create(uri), conf);

// initialize staging/output dir
this.stagingFile = new Path(WriterUtils.getWriterStagingDir(properties, numBranches, branchId), fileName);
this.outputFile = new Path(WriterUtils.getWriterOutputDir(properties, numBranches, branchId), fileName);
properties.setProp(ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_FINAL_OUTPUT_PATH, branchId),
this.outputFile.toString());

// Deleting the staging file if it already exists, which can happen if the
// task failed and the staging file didn't get cleaned up for some reason.
// Deleting the staging file prevents the task retry from being blocked.
if (this.fs.exists(this.stagingFile)) {
LOG.warn(String.format("Task staging file %s already exists, deleting it", this.stagingFile));
HadoopUtils.deletePath(this.fs, this.stagingFile, false);
}

// Create the parent directory of the output file if it does not exist
if (!this.fs.exists(this.outputFile.getParent())) {
this.fs.mkdirs(this.outputFile.getParent());
}
}
}

0 comments on commit 56f8635

Please sign in to comment.