Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-8801][yarn/s3] Fix jars downloading issues due to inconsistent timestamp in S3 Filesystem #8215

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 40 additions & 7 deletions flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
Expand All @@ -61,6 +62,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;

Expand All @@ -80,6 +82,12 @@ public final class Utils {
/** Yarn site xml file name populated in YARN container for secure IT run. */
public static final String YARN_SITE_FILE_NAME = "yarn-site.xml";

/** Number of total retries to fetch the remote resources after uploaded in case of FileNotFoundException. */
public static final int REMOTE_RESOURCES_FETCH_NUM_RETRY = 3;

/** Time to wait in milliseconds between each remote resources fetch in case of FileNotFoundException. */
public static final int REMOTE_RESOURCES_FETCH_WAIT_IN_MILLI = 100;

/**
* See documentation.
*/
Expand Down Expand Up @@ -161,15 +169,40 @@ static Tuple2<Path, LocalResource> setupLocalResource(

fs.copyFromLocalFile(false, true, localSrcPath, dst);

// Note: If we used registerLocalResource(FileSystem, Path) here, we would access the remote
// Note: If we directly used registerLocalResource(FileSystem, Path) here, we would access the remote
// file once again which has problems with eventually consistent read-after-write file
// systems. Instead, we decide to preserve the modification time at the remote
// location because this and the size of the resource will be checked by YARN based on
// the values we provide to #registerLocalResource() below.
fs.setTimes(dst, localFile.lastModified(), -1);
// now create the resource instance
LocalResource resource = registerLocalResource(dst, localFile.length(), localFile.lastModified());
tillrohrmann marked this conversation as resolved.
Show resolved Hide resolved
// systems. Instead, we decide to wait until the remote file be available.

FileStatus[] fss = null;
int iter = 1;
while (iter <= REMOTE_RESOURCES_FETCH_NUM_RETRY + 1) {
try {
fss = fs.listStatus(dst);
break;
} catch (FileNotFoundException e) {
LOG.debug("Got FileNotFoundException while fetching uploaded remote resources at retry num {}", iter);
try {
LOG.debug("Sleeping for {}ms", REMOTE_RESOURCES_FETCH_WAIT_IN_MILLI);
TimeUnit.MILLISECONDS.sleep(REMOTE_RESOURCES_FETCH_WAIT_IN_MILLI);
} catch (InterruptedException ie) {
LOG.warn("Failed to sleep for {}ms at retry num {} while fetching uploaded remote resources",
REMOTE_RESOURCES_FETCH_WAIT_IN_MILLI, iter, ie);
}
iter++;
}
}

final long dstModificationTime;
if (fss != null && fss.length > 0) {
dstModificationTime = fss[0].getModificationTime();
LOG.debug("Got modification time {} from remote path {}", dstModificationTime, dst);
} else {
dstModificationTime = localFile.lastModified();
LOG.debug("Failed to fetch remote modification time from {}, using local timestamp {}", dst, dstModificationTime);
}

// now create the resource instance
LocalResource resource = registerLocalResource(dst, localFile.length(), dstModificationTime);
return Tuple2.of(dst, resource);
}

Expand Down