diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 20e02e12fbaad..d70202ced75c1 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -61,6 +62,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; @@ -80,6 +82,12 @@ public final class Utils { /** Yarn site xml file name populated in YARN container for secure IT run. */ public static final String YARN_SITE_FILE_NAME = "yarn-site.xml"; + /** Number of total retries to fetch the remote resources after uploaded in case of FileNotFoundException. */ + public static final int REMOTE_RESOURCES_FETCH_NUM_RETRY = 3; + + /** Time to wait in milliseconds between each remote resources fetch in case of FileNotFoundException. */ + public static final int REMOTE_RESOURCES_FETCH_WAIT_IN_MILLI = 100; + /** * See documentation. */ @@ -161,15 +169,40 @@ static Tuple2 setupLocalResource( fs.copyFromLocalFile(false, true, localSrcPath, dst); - // Note: If we used registerLocalResource(FileSystem, Path) here, we would access the remote + // Note: If we directly used registerLocalResource(FileSystem, Path) here, we would access the remote // file once again which has problems with eventually consistent read-after-write file - // systems. Instead, we decide to preserve the modification time at the remote - // location because this and the size of the resource will be checked by YARN based on - // the values we provide to #registerLocalResource() below. - fs.setTimes(dst, localFile.lastModified(), -1); - // now create the resource instance - LocalResource resource = registerLocalResource(dst, localFile.length(), localFile.lastModified()); + // systems. Instead, we decide to wait until the remote file be available. + FileStatus[] fss = null; + int iter = 1; + while (iter <= REMOTE_RESOURCES_FETCH_NUM_RETRY + 1) { + try { + fss = fs.listStatus(dst); + break; + } catch (FileNotFoundException e) { + LOG.debug("Got FileNotFoundException while fetching uploaded remote resources at retry num {}", iter); + try { + LOG.debug("Sleeping for {}ms", REMOTE_RESOURCES_FETCH_WAIT_IN_MILLI); + TimeUnit.MILLISECONDS.sleep(REMOTE_RESOURCES_FETCH_WAIT_IN_MILLI); + } catch (InterruptedException ie) { + LOG.warn("Failed to sleep for {}ms at retry num {} while fetching uploaded remote resources", + REMOTE_RESOURCES_FETCH_WAIT_IN_MILLI, iter, ie); + } + iter++; + } + } + + final long dstModificationTime; + if (fss != null && fss.length > 0) { + dstModificationTime = fss[0].getModificationTime(); + LOG.debug("Got modification time {} from remote path {}", dstModificationTime, dst); + } else { + dstModificationTime = localFile.lastModified(); + LOG.debug("Failed to fetch remote modification time from {}, using local timestamp {}", dst, dstModificationTime); + } + + // now create the resource instance + LocalResource resource = registerLocalResource(dst, localFile.length(), dstModificationTime); return Tuple2.of(dst, resource); }