Skip to content

Commit

Permalink
FLUME-499: Names of incomplete files being written to hdfs should ind…
Browse files Browse the repository at this point in the history
…icate they are incomplete
  • Loading branch information
esammer authored and Jonathan Hsieh committed Jul 1, 2011
1 parent c266dd8 commit 1f5c610
Show file tree
Hide file tree
Showing 4 changed files with 382 additions and 7 deletions.
7 changes: 7 additions & 0 deletions RELEASENOTES
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
Flume 0.9.5
===========

Files written by a collector now have a '.tmp' suffix that indicates
that the file is still being written. When the file is completed,
this suffix is removed.

Flume 0.9.4
===========

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.slf4j.LoggerFactory;

import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeBuilder.FunctionSpec;
import com.cloudera.flume.conf.FlumeBuilder;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
Expand All @@ -44,6 +43,7 @@
import com.cloudera.flume.handlers.text.FormatFactory;
import com.cloudera.flume.handlers.text.output.OutputFormat;
import com.cloudera.flume.reporter.ReportEvent;
import com.cloudera.util.PathManager;
import com.google.common.base.Preconditions;

/**
Expand All @@ -61,6 +61,7 @@ public class CustomDfsSink extends EventSink.Base {
AtomicLong count = new AtomicLong();
String path;
Path dstPath;
PathManager pathManager;

public CustomDfsSink(String path, OutputFormat format) {
Preconditions.checkArgument(path != null);
Expand All @@ -83,10 +84,11 @@ public void append(Event e) throws IOException, InterruptedException {

@Override
public void close() throws IOException {
LOG.info("Closing HDFS file: " + dstPath);
LOG.info("Closing HDFS file: " + pathManager.getOpenPath());
writer.flush();
LOG.info("done writing raw file to hdfs");
writer.close();
pathManager.close();
writer = null;
}

Expand All @@ -103,22 +105,23 @@ public void close() throws IOException {
public void open() throws IOException {
FlumeConfiguration conf = FlumeConfiguration.get();
FileSystem hdfs;

String codecName = conf.getCollectorDfsCompressCodec();
CompressionCodec codec = getCodec(conf, codecName);

if (codec == null) {
dstPath = new Path(path);
hdfs = dstPath.getFileSystem(conf);
writer = hdfs.create(dstPath);
LOG.info("Creating HDFS file: " + dstPath.toString());
pathManager = new PathManager(hdfs, dstPath.getParent(), dstPath.getName());
writer = pathManager.open();
LOG.info("Creating HDFS file: " + pathManager.getOpenPath());
return;
}

Compressor cmp = codec.createCompressor();
dstPath = new Path(path + codec.getDefaultExtension());
hdfs = dstPath.getFileSystem(conf);
writer = hdfs.create(dstPath);
pathManager = new PathManager(hdfs, dstPath.getParent(), dstPath.getName());
writer = pathManager.open();
try {
writer = codec.createOutputStream(writer, cmp);
} catch (NullPointerException npe) {
Expand All @@ -130,7 +133,7 @@ public void open() throws IOException {
throw new IOException("Unable to load compression codec " + codec);
}
LOG.info("Creating " + codec + " compressed HDFS file: "
+ dstPath.toString());
+ pathManager.getOpenPath());
}

private static boolean codecMatches(Class<? extends CompressionCodec> cls,
Expand Down
199 changes: 199 additions & 0 deletions flume-core/src/main/java/com/cloudera/util/PathManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.util;

import java.io.IOException;
import java.io.OutputStream;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* <p>
* A class that manages state transitions for files on a {@link FileSystem}.
* </p>
* <p>
* Primarily this is meant to manage file renaming through various states. There
* are four distinct states; NEW, OPEN, CLOSED, and INVALID (represented by
* {@link State}) and are defined as follows.
* <table>
* <tr>
* <th>State</th>
* <th>Description</th>
* </tr>
* <tr>
* <td>NEW</td>
* <td>The initial state. The file has not yet been {@link #open()}ed. The only
* valid action is to open the file.</td>
* </tr>
* <tr>
* <td>OPEN</td>
* <td>The file is open (i.e. {@link #open()} has been called) and an
* {@link OutputStream} is outstanding. The only valid action is to close the
* file.</td>
* </tr>
* <tr>
* <td>CLOSED</td>
* <td>The file has been closed (i.e. {@link #close()} has been called). No
* further action can be performed on this instance of {@link PathManager}. A
* future improvement would be to support append here.</td>
* </tr>
* <tr>
* <td>INVALID</td>
* <td>An error has occurred and no further action is possible. Any state can
* transition to INVALID. A future improvement would be to support recovery from
* this state. Currently this is not possible.</td>
* </tr>
* </table>
* </p>
* <p>
* When in the NEW state, no file exists. The user is expected to call
* {@link #open()}. On open, the file is created with an
* <q>open file</q> path name. This is
* {@code baseDirectory + File.separator + fileName + PathManager.openExtension}
* and should indicate to of the file system that this file is currently in use
* and should be avoided (if they desire consistency). When the developer is
* done writing data to the file's {@link OutputStream}, they should call
* {@link #close()}. This will transition to the CLOSED state and commit the
* file by renaming it (i.e. removing the {@link PathManager} .openExtension).
* </p>
* <p>
* It is possible to understand what state the file is in by calling
* {@link #getState()} and what the current {@link Path} is by using either
* {@link #getOpenPath()} or {@link #getClosedPath()}, respectively.
* </p>
*/
public class PathManager {

private static final Logger logger = LoggerFactory
.getLogger(PathManager.class);
private static final String openExtension = ".tmp";

private FileSystem fileSystem;
private Path baseDirectory;
private String fileName;
private State state;

private Path openPath;
private Path closedPath;

/**
* Create a new instance that will manage {@code fileName} in the directory
* specified by {@code baseDirectory}. The initial state is NEW.
*
* @param baseDirectory
* A directory in which we can write files.
* @param fileName
* The file name local part (e.g. foo.txt).
*/
public PathManager(FileSystem fileSystem, Path baseDirectory, String fileName) {
this.fileSystem = fileSystem;
this.baseDirectory = baseDirectory;
this.fileName = fileName;
this.state = State.NEW;

this.openPath = new Path(baseDirectory, fileName + openExtension);
this.closedPath = new Path(baseDirectory, fileName);
}

/**
* Opens a file for write.
*
* @return An {@link OutputStream} for writing data.
* @throws IOException
*/
public synchronized OutputStream open() throws IOException {

logger.debug("attempting to transition from " + state + " -> OPEN for "
+ this);

switch (state) {
case NEW:
state = State.OPEN;

return fileSystem.create(openPath);

default:
state = State.INVALID;

throw new IllegalStateException("Illegal state transition from " + state
+ " -> OPEN");
}
}

/**
* Transition a file from open to closed, renaming it appropriately. Note that
* this method doesn't close or flush the {@link OutputStream} returned by
* {@link #open()}.
*
* @return true upon successful rename, false otherwise.
* @throws IOException
*/
public synchronized boolean close() throws IOException {

logger.debug("attempting to transition from " + state + " -> CLOSED for "
+ this);

switch (state) {
case OPEN:
state = State.CLOSED;

return fileSystem.rename(openPath, closedPath);

default:
state = State.INVALID;

throw new IllegalStateException("Illegal state transition from " + state
+ " -> CLOSED");
}
}

@Override
public String toString() {
return "{ fileName:" + fileName + " state:" + state + " baseDirectory:"
+ baseDirectory + " openPath:" + openPath + " closedPath:" + closedPath
+ " fileSystem:" + fileSystem + " }";
}

public Path getBaseDirectory() {
return baseDirectory;
}

public String getFileName() {
return fileName;
}

public State getState() {
return state;
}

public Path getOpenPath() {
return openPath;
}

public Path getClosedPath() {
return closedPath;
}

public static enum State {
NEW, OPEN, CLOSED, INVALID,
}

}
Loading

0 comments on commit 1f5c610

Please sign in to comment.