From d8afdaacfa0c9f0df7fa5d1f3b449c2dc9273ea9 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 6 Nov 2015 16:15:06 +0100 Subject: [PATCH] [FLINK-2979] Fix RollingSink truncate for Hadoop 2.7 The problem was, that truncate is asynchronous and the RollingSink was not taking this into account. Now it has a loop after the truncate call that waits until the file is actually truncated. This also changes the Hadoop 2.6 travis build to 2.7, instead. --- .travis.yml | 4 +- .../streaming/connectors/fs/RollingSink.java | 94 ++++++++++++++++--- 2 files changed, 84 insertions(+), 14 deletions(-) diff --git a/.travis.yml b/.travis.yml index 94766991d5c37..d6b7ee43175f9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -17,7 +17,7 @@ language: java matrix: include: - jdk: "oraclejdk8" - env: PROFILE="-Dhadoop.version=2.6.0 -Dscala-2.11 -Pinclude-tez -Pinclude-yarn-tests + env: PROFILE="-Dhadoop.version=2.7.0 -Dscala-2.11 -Pinclude-tez -Pinclude-yarn-tests" - jdk: "oraclejdk8" env: PROFILE="-Dhadoop.version=2.5.0 -Pinclude-yarn-tests" - jdk: "openjdk7" @@ -41,7 +41,7 @@ notifications: secure: iYjxJn8OkCRslJ30/PcE+EbMiqfKwsvUJiVUEQAEXqCEwZg+wYDsN0ilPQQT0zU16mYWKoMTx71zrOZpjirGq7ww0XZ0wAfXDjgmTxX/DaEdp87uNgTRdQzLV7mQouMKZni28eoa08Rb2NIoLLQ39q7uCu0W/p7vAD2e9xHlBBE= env: - global: + global: # Global variable to avoid hanging travis builds when downloading cache archives. - MALLOC_ARENA_MAX=2 # username and password for Apache Nexus (maven deploy) diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java index c705767613a6e..2112b28d22888 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java @@ -17,9 +17,7 @@ */ package org.apache.flink.streaming.connectors.fs; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; +import org.apache.commons.lang3.time.StopWatch; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; @@ -32,6 +30,7 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +38,9 @@ import java.io.Serializable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -174,6 +176,13 @@ public class RollingSink extends RichSinkFunction implements InputTypeConf */ private final String DEFAULT_PART_REFIX = "part"; + /** + * The default timeout for asynchronous operations such as recoverLease and truncate. In + * milliseconds. + */ + private final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000; + + /** * The base {@code Path} that stored all rolling bucket directories. */ @@ -222,6 +231,17 @@ public class RollingSink extends RichSinkFunction implements InputTypeConf private String partPrefix = DEFAULT_PART_REFIX; + /** + * The timeout for asynchronous operations such as recoverLease and truncate. In + * milliseconds. + */ + private long asyncTimeout = DEFAULT_ASYNC_TIMEOUT_MS; + + // -------------------------------------------------------------------------------------------- + // Internal fields (not configurable by user) + // -------------------------------------------------------------------------------------------- + + /** * The part file that we are currently writing to. */ @@ -232,10 +252,6 @@ public class RollingSink extends RichSinkFunction implements InputTypeConf */ private transient Path currentBucketDirectory; - // -------------------------------------------------------------------------------------------- - // Internal fields (not configurable by user) - // -------------------------------------------------------------------------------------------- - /** * The {@code FSDataOutputStream} for the current part file. */ @@ -587,7 +603,7 @@ private Method reflectTruncate(FileSystem fs) { public void notifyCheckpointComplete(long checkpointId) throws Exception { synchronized (bucketState.pendingFilesPerCheckpoint) { Set pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); - Set checkpointsToRemove = Sets.newHashSet(); + Set checkpointsToRemove = new HashSet<>(); for (Long pastCheckpointId : pastCheckpointIds) { if (pastCheckpointId <= checkpointId) { LOG.debug("Moving pending files to final location for checkpoint {}", pastCheckpointId); @@ -628,7 +644,7 @@ public BucketState snapshotState(long checkpointId, long checkpointTimestamp) th synchronized (bucketState.pendingFilesPerCheckpoint) { bucketState.pendingFilesPerCheckpoint.put(checkpointId, bucketState.pendingFiles); } - bucketState.pendingFiles = Lists.newArrayList(); + bucketState.pendingFiles = new ArrayList<>(); return bucketState; } @@ -675,7 +691,51 @@ public void restoreState(BucketState state) { // truncate it or write a ".valid-length" file to specify up to which point it is valid if (refTruncate != null) { LOG.debug("Truncating {} to valid length {}", partPath, bucketState.currentFileValidLength); - refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength); + // some-one else might still hold the lease from a previous try, we are + // recovering, after all ... + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem) fs; + LOG.debug("Trying to recover file lease {}", partPath); + dfs.recoverLease(partPath); + boolean isclosed= dfs.isFileClosed(partPath); + StopWatch sw = new StopWatch(); + sw.start(); + while(!isclosed) { + if(sw.getTime() > asyncTimeout) { + break; + } + try { + Thread.sleep(500); + } catch (InterruptedException e1) { + // ignore it + } + isclosed = dfs.isFileClosed(partPath); + } + } + Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength); + if (!truncated) { + LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath); + + // we must wait for the asynchronous truncate operation to complete + StopWatch sw = new StopWatch(); + sw.start(); + long newLen = fs.getFileStatus(partPath).getLen(); + while(newLen != bucketState.currentFileValidLength) { + if(sw.getTime() > asyncTimeout) { + break; + } + try { + Thread.sleep(500); + } catch (InterruptedException e1) { + // ignore it + } + newLen = fs.getFileStatus(partPath).getLen(); + } + if (newLen != bucketState.currentFileValidLength) { + throw new RuntimeException("Truncate did not truncate to right length. Should be " + bucketState.currentFileValidLength + " is " + newLen + "."); + } + } + } else { LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength); Path validLengthFilePath = new Path(partPath.getParent(), validLengthPrefix + partPath.getName()).suffix(validLengthSuffix); @@ -864,6 +924,16 @@ public RollingSink disableCleanupOnOpen() { return this; } + /** + * Sets the default timeout for asynchronous operations such as recoverLease and truncate. + * + * @param timeout The timeout, in milliseconds. + */ + public RollingSink setAsyncTimeout(long timeout) { + this.asyncTimeout = timeout; + return this; + } + // -------------------------------------------------------------------------------------------- // Internal Classes // -------------------------------------------------------------------------------------------- @@ -888,13 +958,13 @@ static final class BucketState implements Serializable { /** * Pending files that accumulated since the last checkpoint. */ - List pendingFiles = Lists.newArrayList(); + List pendingFiles = new ArrayList<>(); /** * When doing a checkpoint we move the pending files since the last checkpoint to this map * with the id of the checkpoint. When we get the checkpoint-complete notification we move * pending files of completed checkpoints to their final location. */ - final Map> pendingFilesPerCheckpoint = Maps.newHashMap(); + final Map> pendingFilesPerCheckpoint = new HashMap<>(); } }