diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java index dc031e0c0c63c..729ac9ab9fb0c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java @@ -159,8 +159,13 @@ protected File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IO try { if (blobView.get(jobId, blobKey, incomingFile)) { // now move the temp file to our local cache atomically - BlobUtils.moveTempFileToStore( - incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), log, null); + readWriteLock.writeLock().lock(); + try { + BlobUtils.moveTempFileToStore( + incomingFile, jobId, blobKey, localFile, log, null); + } finally { + readWriteLock.writeLock().unlock(); + } return localFile; } @@ -172,8 +177,13 @@ protected File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IO BlobClient.downloadFromBlobServer( jobId, blobKey, incomingFile, serverAddress, blobClientConfig, numFetchRetries); - BlobUtils.moveTempFileToStore( - incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), log, null); + readWriteLock.writeLock().lock(); + try { + BlobUtils.moveTempFileToStore( + incomingFile, jobId, blobKey, localFile, log, null); + } finally { + readWriteLock.writeLock().unlock(); + } return localFile; } finally { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java index 3154f69d567d9..fbcce58d246ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java @@ -47,9 +47,11 @@ import java.net.Socket; import java.security.MessageDigest; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB; import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE; import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION; import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_RELATED_CONTENT; @@ -57,7 +59,6 @@ import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION; import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR; import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY; -import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB; import static org.apache.flink.runtime.blob.BlobUtils.readFully; import static org.apache.flink.runtime.blob.BlobUtils.readLength; import static org.apache.flink.runtime.blob.BlobUtils.writeLength; @@ -496,13 +497,16 @@ private static BlobKey receiveAndCheckPutResponse( else if (response == RETURN_OKAY) { BlobKey remoteKey = BlobKey.readFromInputStream(is); - BlobKey localKey = BlobKey.createKey(blobType, md.digest()); + byte[] localHash = md.digest(); - if (!localKey.equals(remoteKey)) { + if (blobType != remoteKey.getType()) { + throw new IOException("Detected data corruption during transfer"); + } + if (!Arrays.equals(localHash, remoteKey.getHash())) { throw new IOException("Detected data corruption during transfer"); } - return localKey; + return remoteKey; } else if (response == RETURN_ERROR) { Throwable cause = readExceptionFromStream(is); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java index 0aa45e169bed7..ef2d64db3c0f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.blob; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.AbstractID; import org.apache.flink.util.StringUtils; import java.io.EOFException; @@ -41,7 +42,7 @@ abstract class BlobKey implements Serializable, Comparable { private static final long serialVersionUID = 3847117712521785209L; /** Size of the internal BLOB key in bytes. */ - private static final int SIZE = 20; + public static final int SIZE = 20; /** The byte buffer storing the actual key data. */ private final byte[] key; @@ -67,6 +68,11 @@ enum BlobType { TRANSIENT_BLOB } + /** + * Random component of the key. + */ + private final AbstractID random; + /** * Constructs a new BLOB key. * @@ -76,6 +82,7 @@ enum BlobType { protected BlobKey(BlobType type) { this.type = checkNotNull(type); this.key = new byte[SIZE]; + this.random = new AbstractID(); } /** @@ -87,13 +94,33 @@ protected BlobKey(BlobType type) { * the actual key data */ protected BlobKey(BlobType type, byte[] key) { + if (key == null || key.length != SIZE) { + throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes"); + } + this.type = checkNotNull(type); + this.key = key; + this.random = new AbstractID(); + } + /** + * Constructs a new BLOB key from the given byte array. + * + * @param type + * whether the referenced BLOB is permanent or transient + * @param key + * the actual key data + * @param random + * the random component of the key + */ + protected BlobKey(BlobType type, byte[] key, byte[] random) { if (key == null || key.length != SIZE) { throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes"); } + this.type = checkNotNull(type); this.key = key; + this.random = new AbstractID(random); } /** @@ -107,10 +134,10 @@ protected BlobKey(BlobType type, byte[] key) { @VisibleForTesting static BlobKey createKey(BlobType type) { if (type == PERMANENT_BLOB) { - return new PermanentBlobKey(); - } else { + return new PermanentBlobKey(); + } else { return new TransientBlobKey(); - } + } } /** @@ -125,10 +152,30 @@ static BlobKey createKey(BlobType type) { */ static BlobKey createKey(BlobType type, byte[] key) { if (type == PERMANENT_BLOB) { - return new PermanentBlobKey(key); - } else { + return new PermanentBlobKey(key); + } else { return new TransientBlobKey(key); - } + } + } + + /** + * Returns the right {@link BlobKey} subclass for the given parameters. + * + * @param type + * whether the referenced BLOB is permanent or transient + * @param key + * the actual key data + * @param random + * the random component of the key + * + * @return BlobKey subclass + */ + static BlobKey createKey(BlobType type, byte[] key, byte[] random) { + if (type == PERMANENT_BLOB) { + return new PermanentBlobKey(key, random); + } else { + return new TransientBlobKey(key, random); + } } /** @@ -140,6 +187,15 @@ byte[] getHash() { return key; } + /** + * Returns the (internal) BLOB type which is reflected by the inheriting sub-class. + * + * @return BLOB type, i.e. permanent or transient + */ + BlobType getType() { + return type; + } + /** * Adds the BLOB key to the given {@link MessageDigest}. * @@ -159,13 +215,16 @@ public boolean equals(final Object obj) { final BlobKey bk = (BlobKey) obj; - return Arrays.equals(this.key, bk.key) && this.type == bk.type; + return Arrays.equals(this.key, bk.key) && + this.type == bk.type && + this.random.equals(bk.random); } @Override public int hashCode() { int result = Arrays.hashCode(this.key); result = 37 * result + this.type.hashCode(); + result = 37 * result + this.random.hashCode(); return result; } @@ -183,7 +242,7 @@ public String toString() { // this actually never happens! throw new IllegalStateException("Invalid BLOB type"); } - return typeString + StringUtils.byteToHexString(this.key); + return typeString + StringUtils.byteToHexString(this.key) + "-" + random.toString(); } @Override @@ -203,7 +262,13 @@ public int compareTo(BlobKey o) { if (aarr.length == barr.length) { // same hash contents - compare the BLOB types - return this.type.compareTo(o.type); + int typeCompare = this.type.compareTo(o.type); + if (typeCompare == 0) { + // same type - compare random components + return this.random.compareTo(o.random); + } else { + return typeCompare; + } } else { return aarr.length - barr.length; } @@ -223,6 +288,7 @@ public int compareTo(BlobKey o) { static BlobKey readFromInputStream(InputStream inputStream) throws IOException { final byte[] key = new byte[BlobKey.SIZE]; + final byte[] random = new byte[AbstractID.SIZE]; int bytesRead = 0; // read key @@ -233,6 +299,7 @@ static BlobKey readFromInputStream(InputStream inputStream) throws IOException { } bytesRead += read; } + // read BLOB type final BlobType blobType; { @@ -248,7 +315,17 @@ static BlobKey readFromInputStream(InputStream inputStream) throws IOException { } } - return createKey(blobType, key); + // read random component + bytesRead = 0; + while (bytesRead < AbstractID.SIZE) { + final int read = inputStream.read(random, bytesRead, AbstractID.SIZE - bytesRead); + if (read < 0) { + throw new EOFException("Read an incomplete BLOB key"); + } + bytesRead += read; + } + + return createKey(blobType, key, random); } /** @@ -262,5 +339,6 @@ static BlobKey readFromInputStream(InputStream inputStream) throws IOException { void writeToOutputStream(final OutputStream outputStream) throws IOException { outputStream.write(this.key); outputStream.write(this.type.ordinal()); + outputStream.write(this.random.getBytes()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 7804dfd73e34c..bc61ef76c9f80 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -474,8 +474,13 @@ void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, File localFile) thr incomingFile = createTemporaryFilename(); blobStore.get(jobId, blobKey, incomingFile); - BlobUtils.moveTempFileToStore( - incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), LOG, null); + readWriteLock.writeLock().lock(); + try { + BlobUtils.moveTempFileToStore( + incomingFile, jobId, blobKey, localFile, LOG, null); + } finally { + readWriteLock.writeLock().unlock(); + } return; } finally { @@ -586,10 +591,8 @@ private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, BlobKey.BlobType md.update(value); fos.write(value); - blobKey = BlobKey.createKey(blobType, md.digest()); - // persist file - moveTempFileToStore(incomingFile, jobId, blobKey); + blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType); return blobKey; } finally { @@ -642,10 +645,8 @@ private BlobKey putInputStream( md.update(buf, 0, bytesRead); } - blobKey = BlobKey.createKey(blobType, md.digest()); - // persist file - moveTempFileToStore(incomingFile, jobId, blobKey); + blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType); return blobKey; } finally { @@ -665,20 +666,53 @@ private BlobKey putInputStream( * temporary file created during transfer * @param jobId * ID of the job this blob belongs to or null if job-unrelated - * @param blobKey - * BLOB key identifying the file + * @param digest + * BLOB content digest, i.e. hash + * @param blobType + * whether this file is a permanent or transient BLOB + * + * @return unique BLOB key that identifies the BLOB on the server * * @throws IOException * thrown if an I/O error occurs while moving the file or uploading it to the HA store */ - void moveTempFileToStore( - File incomingFile, @Nullable JobID jobId, BlobKey blobKey) throws IOException { + BlobKey moveTempFileToStore( + File incomingFile, @Nullable JobID jobId, byte[] digest, BlobKey.BlobType blobType) + throws IOException { + + int retries = 10; - File storageFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey); + int attempt = 0; + while (true) { + // add unique component independent of the BLOB content + BlobKey blobKey = BlobKey.createKey(blobType, digest); + File storageFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey); - BlobUtils.moveTempFileToStore( - incomingFile, jobId, blobKey, storageFile, readWriteLock.writeLock(), LOG, - blobKey instanceof PermanentBlobKey ? blobStore : null); + // try again until the key is unique (put the existence check into the lock!) + readWriteLock.writeLock().lock(); + try { + if (!storageFile.exists()) { + BlobUtils.moveTempFileToStore( + incomingFile, jobId, blobKey, storageFile, LOG, + blobKey instanceof PermanentBlobKey ? blobStore : null); + return blobKey; + } + } finally { + readWriteLock.writeLock().unlock(); + } + + ++attempt; + if (attempt >= retries) { + String message = "Failed to find a unique key for BLOB of job " + jobId + " (last tried " + storageFile.getAbsolutePath() + "."; + LOG.error(message + " No retries left."); + throw new IOException(message); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to find a unique key for BLOB of job {} (retry {}, last tried {})", + jobId, attempt, storageFile.getAbsolutePath()); + } + } + } } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java index be625811da547..fa8427e1fd660 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java @@ -37,6 +37,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; +import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB; +import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB; import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE; import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION; import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_RELATED_CONTENT; @@ -44,8 +46,6 @@ import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION; import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR; import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY; -import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB; -import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB; import static org.apache.flink.runtime.blob.BlobUtils.closeSilently; import static org.apache.flink.runtime.blob.BlobUtils.readFully; import static org.apache.flink.runtime.blob.BlobUtils.readLength; @@ -346,9 +346,9 @@ private void put(InputStream inputStream, OutputStream outputStream, byte[] buf) } incomingFile = blobServer.createTemporaryFilename(); - BlobKey blobKey = readFileFully(inputStream, incomingFile, buf, blobType); + byte[] digest = readFileFully(inputStream, incomingFile, buf); - blobServer.moveTempFileToStore(incomingFile, jobId, blobKey); + BlobKey blobKey = blobServer.moveTempFileToStore(incomingFile, jobId, digest, blobType); // Return computed key to client for validation outputStream.write(RETURN_OKAY); @@ -387,16 +387,14 @@ private void put(InputStream inputStream, OutputStream outputStream, byte[] buf) * file to write to * @param buf * An auxiliary buffer for data serialization/deserialization - * @param blobType - * whether to make the data permanent or transient * - * @return the received file's content hash as a BLOB key + * @return the received file's content hash * * @throws IOException * thrown if an I/O error occurs while reading/writing data from/to the respective streams */ - private static BlobKey readFileFully( - final InputStream inputStream, final File incomingFile, final byte[] buf, BlobKey.BlobType blobType) + private static byte[] readFileFully( + final InputStream inputStream, final File incomingFile, final byte[] buf) throws IOException { MessageDigest md = BlobUtils.createMessageDigest(); @@ -417,7 +415,7 @@ private static BlobKey readFileFully( md.update(buf, 0, bytesExpected); } - return BlobKey.createKey(blobType, md.digest()); + return md.digest(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index d8223c8b77680..04f2cdb7a3e7b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -30,6 +30,7 @@ import org.slf4j.Logger; import javax.annotation.Nullable; + import java.io.Closeable; import java.io.EOFException; import java.io.File; @@ -42,7 +43,6 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.UUID; -import java.util.concurrent.locks.Lock; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; @@ -422,7 +422,7 @@ private BlobUtils() { /** * Moves the temporary incomingFile to its permanent location where it is available for - * use. + * use (not thread-safe!). * * @param incomingFile * temporary file created during transfer @@ -432,8 +432,6 @@ private BlobUtils() { * BLOB key identifying the file * @param storageFile * (local) file where the blob is/should be stored - * @param writeLock - * lock to acquire before doing the move * @param log * logger for debug information * @param blobStore @@ -444,9 +442,7 @@ private BlobUtils() { */ static void moveTempFileToStore( File incomingFile, @Nullable JobID jobId, BlobKey blobKey, File storageFile, - Lock writeLock, Logger log, @Nullable BlobStore blobStore) throws IOException { - - writeLock.lock(); + Logger log, @Nullable BlobStore blobStore) throws IOException { try { // first check whether the file already exists @@ -483,8 +479,6 @@ static void moveTempFileToStore( if (incomingFile != null && !incomingFile.delete() && incomingFile.exists()) { log.warn("Could not delete the staging file {} for blob key {} and job {}.", incomingFile, blobKey, jobId); } - - writeLock.unlock(); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java index 2ad8f7268412a..40732fad51474 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java @@ -42,4 +42,16 @@ public PermanentBlobKey() { PermanentBlobKey(byte[] key) { super(BlobType.PERMANENT_BLOB, key); } + + /** + * Constructs a new BLOB key from the given byte array. + * + * @param key + * the actual key data + * @param random + * the random component of the key + */ + PermanentBlobKey(byte[] key, byte[] random) { + super(BlobType.PERMANENT_BLOB, key, random); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java index 43e0f5f50b29d..15a9637244153 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java @@ -42,4 +42,16 @@ public TransientBlobKey() { TransientBlobKey(byte[] key) { super(BlobType.TRANSIENT_BLOB, key); } + + /** + * Constructs a new BLOB key from the given byte array. + * + * @param key + * the actual key data + * @param random + * the random component of the key + */ + TransientBlobKey(byte[] key, byte[] random) { + super(BlobType.TRANSIENT_BLOB, key, random); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java index 54725e13e6c92..cf5bfcb83b3a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java @@ -190,6 +190,9 @@ protected void respondAsLeader(final ChannelHandlerContext ctx, final Routed rou //delete previous log file, if it is different than the current one HashMap lastSubmittedFile = fileMode == FileMode.LOG ? lastSubmittedLog : lastSubmittedStdout; if (lastSubmittedFile.containsKey(taskManagerID)) { + // the BlobKey will almost certainly be different but the old file + // may not exist anymore so we cannot rely on it and need to + // download the new file anyway, even if the hashes match if (!Objects.equals(blobKey, lastSubmittedFile.get(taskManagerID))) { if (!blobCache.deleteFromCache(lastSubmittedFile.get(taskManagerID))) { throw new CompletionException(new FlinkException("Could not delete file for " + taskManagerID + '.')); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java index 8ea2a5ea4331a..37a5128f561e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java @@ -50,7 +50,6 @@ import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted; import static org.apache.flink.runtime.blob.BlobServerPutTest.put; import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -86,6 +85,12 @@ public void testDeleteTransient4() throws IOException { testDelete(new JobID(), new JobID()); } + @Test + public void testDeleteTransient5() throws IOException { + JobID jobId = new JobID(); + testDelete(jobId, jobId); + } + /** * Uploads a (different) byte array for each of the given jobs and verifies that deleting one of * them (via the {@link BlobCacheService}) does not influence the other. @@ -97,7 +102,6 @@ public void testDeleteTransient4() throws IOException { */ private void testDelete(@Nullable JobID jobId1, @Nullable JobID jobId2) throws IOException { - final boolean sameJobId = (jobId1 == jobId2) || (jobId1 != null && jobId1.equals(jobId2)); final Configuration config = new Configuration(); config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); @@ -121,9 +125,10 @@ config, new VoidBlobStore())) { // put two more BLOBs (same key, other key) for another job ID TransientBlobKey key2a = (TransientBlobKey) put(server, jobId2, data, TRANSIENT_BLOB); assertNotNull(key2a); - assertEquals(key1, key2a); + BlobKeyTest.verifyKeyDifferentHashEquals(key1, key2a); TransientBlobKey key2b = (TransientBlobKey) put(server, jobId2, data2, TRANSIENT_BLOB); assertNotNull(key2b); + BlobKeyTest.verifyKeyDifferentHashDifferent(key1, key2b); // issue a DELETE request assertTrue(delete(cache, jobId1, key1)); @@ -133,16 +138,15 @@ config, new VoidBlobStore())) { // delete on server so that the cache cannot re-download assertTrue(server.deleteInternal(jobId1, key1)); verifyDeleted(cache, jobId1, key1); - // deleting a one BLOB should not affect another BLOB, even with the same key if job IDs are different - if (!sameJobId) { - verifyContents(server, jobId2, key2a, data); - } + // deleting one BLOB should not affect another BLOB with a different key + // (and keys are always different now) + verifyContents(server, jobId2, key2a, data); verifyContents(server, jobId2, key2b, data2); // delete first file of second job assertTrue(delete(cache, jobId2, key2a)); - // delete only works on local cache (unless already deleted - key1 == key2a)! - assertTrue(sameJobId || server.getStorageLocation(jobId2, key2a).exists()); + // delete only works on local cache + assertTrue(server.getStorageLocation(jobId2, key2a).exists()); // delete on server so that the cache cannot re-download assertTrue(server.deleteInternal(jobId2, key2a)); verifyDeleted(cache, jobId2, key2a); @@ -150,7 +154,7 @@ config, new VoidBlobStore())) { // delete second file of second job assertTrue(delete(cache, jobId2, key2b)); - // delete only works on local cache (unless already deleted - key1 == key2a)! + // delete only works on local cache assertTrue(server.getStorageLocation(jobId2, key2b).exists()); // delete on server so that the cache cannot re-download assertTrue(server.deleteInternal(jobId2, key2b)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java index bed27d8b9f705..c760d04cbd188 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java @@ -40,7 +40,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.file.AccessDeniedException; -import java.security.MessageDigest; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -57,6 +56,7 @@ import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose; import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB; import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB; +import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals; import static org.apache.flink.runtime.blob.BlobKeyTest.verifyType; import static org.apache.flink.runtime.blob.BlobServerDeleteTest.delete; import static org.apache.flink.runtime.blob.BlobServerGetTest.get; @@ -152,37 +152,37 @@ config, new VoidBlobStore())) { // add the same data under a second jobId BlobKey key2 = put(server, jobId2, data, blobType); - assertNotNull(key); - assertEquals(key, key2); + assertNotNull(key2); + verifyKeyDifferentHashEquals(key, key2); // request for jobId2 should succeed - get(cache, jobId2, key); + get(cache, jobId2, key2); // request for jobId1 should still fail verifyDeleted(cache, jobId1, key); if (blobType == PERMANENT_BLOB) { // still existing on server - assertTrue(server.getStorageLocation(jobId2, key).exists()); + assertTrue(server.getStorageLocation(jobId2, key2).exists()); // delete jobId2 on cache - blobFile = cache.getPermanentBlobService().getStorageLocation(jobId2, key); + blobFile = cache.getPermanentBlobService().getStorageLocation(jobId2, key2); assertTrue(blobFile.delete()); // try to retrieve again - get(cache, jobId2, key); + get(cache, jobId2, key2); // delete on cache and server, verify that it is not accessible anymore - blobFile = cache.getPermanentBlobService().getStorageLocation(jobId2, key); + blobFile = cache.getPermanentBlobService().getStorageLocation(jobId2, key2); assertTrue(blobFile.delete()); - blobFile = server.getStorageLocation(jobId2, key); + blobFile = server.getStorageLocation(jobId2, key2); assertTrue(blobFile.delete()); - verifyDeleted(cache, jobId2, key); + verifyDeleted(cache, jobId2, key2); } else { // deleted eventually on the server by the GET request above - verifyDeletedEventually(server, jobId2, key); + verifyDeletedEventually(server, jobId2, key2); // delete jobId2 on cache - blobFile = cache.getTransientBlobService().getStorageLocation(jobId2, key); + blobFile = cache.getTransientBlobService().getStorageLocation(jobId2, key2); assertTrue(blobFile.delete()); // verify that it is not accessible anymore - verifyDeleted(cache, jobId2, key); + verifyDeleted(cache, jobId2, key2); } } } @@ -548,11 +548,6 @@ private void testConcurrentGetOperations(final JobID jobId, final BlobKey.BlobTy final byte[] data = {1, 2, 3, 4, 99, 42}; - MessageDigest md = BlobUtils.createMessageDigest(); - - // create the correct blob key by hashing our input data - final BlobKey blobKey = BlobKey.createKey(blobType, md.digest(data)); - final ExecutorService executor = Executors.newFixedThreadPool(numberConcurrentGetOperations); try ( @@ -563,7 +558,7 @@ private void testConcurrentGetOperations(final JobID jobId, final BlobKey.BlobTy server.start(); // upload data first - assertEquals(blobKey, put(server, jobId, data, blobType)); + final BlobKey blobKey = put(server, jobId, data, blobType); // now try accessing it concurrently (only HA mode will be able to retrieve it from HA store!) for (int i = 0; i < numberConcurrentGetOperations; i++) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java index aa23c8095ca48..56258c30c9cd7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java @@ -58,6 +58,7 @@ import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFilesExist; import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB; import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB; +import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals; import static org.apache.flink.runtime.blob.BlobKeyTest.verifyType; import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted; import static org.apache.flink.runtime.blob.BlobServerPutTest.BlockingInputStream; @@ -275,6 +276,11 @@ config, new VoidBlobStore())) { BlobKey key1a = put(cache, jobId1, data, blobType); assertNotNull(key1a); verifyType(blobType, key1a); + // second upload of same data should yield a different BlobKey + BlobKey key1a2 = put(cache, jobId1, data, blobType); + assertNotNull(key1a2); + verifyType(blobType, key1a2); + verifyKeyDifferentHashEquals(key1a, key1a2); BlobKey key1b = put(cache, jobId1, data2, blobType); assertNotNull(key1b); @@ -282,19 +288,23 @@ config, new VoidBlobStore())) { // files should be available on the server verifyContents(server, jobId1, key1a, data); + verifyContents(server, jobId1, key1a2, data); verifyContents(server, jobId1, key1b, data2); // now put data for jobId2 and verify that both are ok BlobKey key2a = put(cache, jobId2, data, blobType); assertNotNull(key2a); - assertEquals(key1a, key2a); + verifyType(blobType, key2a); + verifyKeyDifferentHashEquals(key1a, key2a); BlobKey key2b = put(cache, jobId2, data2, blobType); assertNotNull(key2b); - assertEquals(key1b, key2b); + verifyType(blobType, key2b); + verifyKeyDifferentHashEquals(key1b, key2b); // verify the accessibility and the BLOB contents verifyContents(server, jobId1, key1a, data); + verifyContents(server, jobId1, key1a2, data); verifyContents(server, jobId1, key1b, data2); verifyContents(server, jobId2, key2a, data); verifyContents(server, jobId2, key2b, data2); @@ -382,26 +392,32 @@ config, new VoidBlobStore())) { TransientBlobKey key1a = (TransientBlobKey) put(cache, jobId1, new ByteArrayInputStream(data), TRANSIENT_BLOB); assertNotNull(key1a); + // second upload of same data should yield a different BlobKey + BlobKey key1a2 = put(cache, jobId1, new ByteArrayInputStream(data), TRANSIENT_BLOB); + assertNotNull(key1a2); + verifyKeyDifferentHashEquals(key1a, key1a2); TransientBlobKey key1b = (TransientBlobKey) put(cache, jobId1, new ByteArrayInputStream(data2), TRANSIENT_BLOB); assertNotNull(key1b); // files should be available on the server verifyContents(server, jobId1, key1a, data); + verifyContents(server, jobId1, key1a2, data); verifyContents(server, jobId1, key1b, data2); // now put data for jobId2 and verify that both are ok TransientBlobKey key2a = (TransientBlobKey) put(cache, jobId2, new ByteArrayInputStream(data), TRANSIENT_BLOB); assertNotNull(key2a); - assertEquals(key1a, key2a); + verifyKeyDifferentHashEquals(key1a, key2a); TransientBlobKey key2b = (TransientBlobKey) put(cache, jobId2, new ByteArrayInputStream(data2), TRANSIENT_BLOB); assertNotNull(key2b); - assertEquals(key1b, key2b); + verifyKeyDifferentHashEquals(key1b, key2b); // verify the accessibility and the BLOB contents verifyContents(server, jobId1, key1a, data); + verifyContents(server, jobId1, key1a2, data); verifyContents(server, jobId1, key1b, data2); verifyContents(server, jobId2, key2a, data); verifyContents(server, jobId2, key2b, data2); @@ -486,6 +502,10 @@ config, new VoidBlobStore())) { TransientBlobKey key1a = (TransientBlobKey) put(cache, jobId1, new ChunkedInputStream(data, 19), TRANSIENT_BLOB); assertNotNull(key1a); + // second upload of same data should yield a different BlobKey + BlobKey key1a2 = put(cache, jobId1, new ChunkedInputStream(data, 19), TRANSIENT_BLOB); + assertNotNull(key1a2); + verifyKeyDifferentHashEquals(key1a, key1a2); TransientBlobKey key1b = (TransientBlobKey) put(cache, jobId1, new ChunkedInputStream(data2, 19), TRANSIENT_BLOB); @@ -493,21 +513,23 @@ config, new VoidBlobStore())) { // files should be available on the server verifyContents(server, jobId1, key1a, data); + verifyContents(server, jobId1, key1a2, data); verifyContents(server, jobId1, key1b, data2); // now put data for jobId2 and verify that both are ok TransientBlobKey key2a = (TransientBlobKey) put(cache, jobId2, new ChunkedInputStream(data, 19), TRANSIENT_BLOB); assertNotNull(key2a); - assertEquals(key1a, key2a); + verifyKeyDifferentHashEquals(key1a, key2a); TransientBlobKey key2b = (TransientBlobKey) put(cache, jobId2, new ChunkedInputStream(data2, 19), TRANSIENT_BLOB); assertNotNull(key2b); - assertEquals(key1b, key2b); + verifyKeyDifferentHashEquals(key1b, key2b); // verify the accessibility and the BLOB contents verifyContents(server, jobId1, key1a, data); + verifyContents(server, jobId1, key1a2, data); verifyContents(server, jobId1, key1b, data2); verifyContents(server, jobId2, key2a, data); verifyContents(server, jobId2, key2b, data2); @@ -858,7 +880,8 @@ private void testConcurrentPutOperations( // make sure that all blob keys are the same while (blobKeyIterator.hasNext()) { - assertEquals(blobKey, blobKeyIterator.next()); + // check for unique BlobKey, but should have same hash + verifyKeyDifferentHashEquals(blobKey, blobKeyIterator.next()); } // check the uploaded file's contents diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java index 1a3f161725f90..e275949db1656 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java @@ -38,13 +38,11 @@ import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB; import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB; +import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashDifferent; +import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals; import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted; import static org.apache.flink.runtime.blob.BlobServerPutTest.put; import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; /** @@ -131,10 +129,8 @@ public static void testBlobCacheRecovery( // put non-HA data nonHAKey = put(cache0, jobId[0], expected2, TRANSIENT_BLOB); - assertNotEquals(keys[0], nonHAKey); - assertThat(keys[0].getHash(), not(equalTo(nonHAKey.getHash()))); - assertNotEquals(keys[1], nonHAKey); - assertThat(keys[1].getHash(), equalTo(nonHAKey.getHash())); + verifyKeyDifferentHashDifferent(keys[0], nonHAKey); + verifyKeyDifferentHashEquals(keys[1], nonHAKey); // check that the storage directory exists final Path blobServerPath = new Path(storagePath, "blob"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java index 0e97604425a8d..9e4f4b71f8069 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java @@ -47,6 +47,7 @@ import static org.apache.flink.runtime.blob.BlobCachePutTest.verifyDeletedEventually; import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB; import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB; +import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -113,15 +114,13 @@ private static byte[] createTestBuffer() { * * @param file * the file to prepare for the unit tests - * @param blobType - * whether the BLOB should become permanent or transient * * @return the BLOB key of the prepared file * * @throws IOException * thrown if an I/O error occurs while writing to the test file */ - private static BlobKey prepareTestFile(File file, BlobKey.BlobType blobType) throws IOException { + private static byte[] prepareTestFile(File file) throws IOException { MessageDigest md = BlobUtils.createMessageDigest(); @@ -145,7 +144,7 @@ private static BlobKey prepareTestFile(File file, BlobKey.BlobType blobType) thr } } - return BlobKey.createKey(blobType, md.digest()); + return md.digest(); } /** @@ -254,35 +253,38 @@ private void testContentAddressableBuffer(BlobKey.BlobType blobType) byte[] testBuffer = createTestBuffer(); MessageDigest md = BlobUtils.createMessageDigest(); md.update(testBuffer); - BlobKey origKey = BlobKey.createKey(blobType, md.digest()); + byte[] digest = md.digest(); InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort()); client = new BlobClient(serverAddress, getBlobClientConfig()); JobID jobId = new JobID(); - BlobKey receivedKey; // Store the data (job-unrelated) + BlobKey receivedKey1 = null; if (blobType == TRANSIENT_BLOB) { - receivedKey = client.putBuffer(null, testBuffer, 0, testBuffer.length, blobType); - assertEquals(origKey, receivedKey); + receivedKey1 = client.putBuffer(null, testBuffer, 0, testBuffer.length, blobType); + assertArrayEquals(digest, receivedKey1.getHash()); } // try again with a job-related BLOB: - receivedKey = client.putBuffer(jobId, testBuffer, 0, testBuffer.length, blobType); - assertEquals(origKey, receivedKey); + BlobKey receivedKey2 = client.putBuffer(jobId, testBuffer, 0, testBuffer.length, blobType); + assertArrayEquals(digest, receivedKey2.getHash()); + if (blobType == TRANSIENT_BLOB) { + verifyKeyDifferentHashEquals(receivedKey1, receivedKey2); + } // Retrieve the data (job-unrelated) if (blobType == TRANSIENT_BLOB) { - validateGetAndClose(client.getInternal(null, receivedKey), testBuffer); + validateGetAndClose(client.getInternal(null, receivedKey1), testBuffer); // transient BLOBs should be deleted from the server, eventually - verifyDeletedEventually(getBlobServer(), null, receivedKey); + verifyDeletedEventually(getBlobServer(), null, receivedKey1); } // job-related - validateGetAndClose(client.getInternal(jobId, receivedKey), testBuffer); + validateGetAndClose(client.getInternal(jobId, receivedKey2), testBuffer); if (blobType == TRANSIENT_BLOB) { // transient BLOBs should be deleted from the server, eventually - verifyDeletedEventually(getBlobServer(), jobId, receivedKey); + verifyDeletedEventually(getBlobServer(), jobId, receivedKey2); } // Check reaction to invalid keys for job-unrelated blobs @@ -342,41 +344,42 @@ private void testContentAddressableStream(BlobKey.BlobType blobType) throws IOException, InterruptedException { File testFile = temporaryFolder.newFile(); - BlobKey origKey = prepareTestFile(testFile, blobType); + byte[] digest = prepareTestFile(testFile); InputStream is = null; try (BlobClient client = new BlobClient(new InetSocketAddress("localhost", getBlobServer().getPort()), getBlobClientConfig())) { JobID jobId = new JobID(); - BlobKey receivedKey; + BlobKey receivedKey1 = null; // Store the data (job-unrelated) if (blobType == TRANSIENT_BLOB) { is = new FileInputStream(testFile); - receivedKey = client.putInputStream(null, is, blobType); - assertEquals(origKey, receivedKey); + receivedKey1 = client.putInputStream(null, is, blobType); + assertArrayEquals(digest, receivedKey1.getHash()); } // try again with a job-related BLOB: is = new FileInputStream(testFile); - receivedKey = client.putInputStream(jobId, is, blobType); - assertEquals(origKey, receivedKey); + BlobKey receivedKey2 = client.putInputStream(jobId, is, blobType); is.close(); is = null; // Retrieve the data (job-unrelated) if (blobType == TRANSIENT_BLOB) { - validateGetAndClose(client.getInternal(null, receivedKey), testFile); + verifyKeyDifferentHashEquals(receivedKey1, receivedKey2); + + validateGetAndClose(client.getInternal(null, receivedKey1), testFile); // transient BLOBs should be deleted from the server, eventually - verifyDeletedEventually(getBlobServer(), null, receivedKey); + verifyDeletedEventually(getBlobServer(), null, receivedKey1); } // job-related - validateGetAndClose(client.getInternal(jobId, receivedKey), testFile); + validateGetAndClose(client.getInternal(jobId, receivedKey2), testFile); if (blobType == TRANSIENT_BLOB) { // transient BLOBs should be deleted from the server, eventually - verifyDeletedEventually(getBlobServer(), jobId, receivedKey); + verifyDeletedEventually(getBlobServer(), jobId, receivedKey2); } } finally { if (is != null) { @@ -463,7 +466,7 @@ public void testUploadJarFilesHelper() throws Exception { static void uploadJarFile(BlobServer blobServer, Configuration blobClientConfig) throws Exception { final File testFile = File.createTempFile("testfile", ".dat"); testFile.deleteOnExit(); - prepareTestFile(testFile, PERMANENT_BLOB); + prepareTestFile(testFile); InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java index 49f4fc2b457c3..ae538aa30181e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.blob; import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.util.AbstractID; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -29,14 +30,17 @@ import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB; import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.core.IsNot.not; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; /** @@ -46,21 +50,35 @@ public final class BlobKeyTest extends TestLogger { /** * The first key array to be used during the unit tests. */ - private static final byte[] KEY_ARRAY_1 = new byte[20]; + private static final byte[] KEY_ARRAY_1 = new byte[BlobKey.SIZE]; /** * The second key array to be used during the unit tests. */ - private static final byte[] KEY_ARRAY_2 = new byte[20]; + private static final byte[] KEY_ARRAY_2 = new byte[BlobKey.SIZE]; + + /** + * First byte array to use for the random component of a {@link BlobKey}. + */ + private static final byte[] RANDOM_ARRAY_1 = new byte[AbstractID.SIZE]; + + /** + * Second byte array to use for the random component of a {@link BlobKey}. + */ + private static final byte[] RANDOM_ARRAY_2 = new byte[AbstractID.SIZE]; /* - * Initialize the key array. + * Initialize the key and random arrays. */ static { for (int i = 0; i < KEY_ARRAY_1.length; ++i) { KEY_ARRAY_1[i] = (byte) i; KEY_ARRAY_2[i] = (byte) (i + 1); } + for (int i = 0; i < RANDOM_ARRAY_1.length; ++i) { + RANDOM_ARRAY_1[i] = (byte) i; + RANDOM_ARRAY_2[i] = (byte) (i + 1); + } } @Test @@ -89,7 +107,7 @@ public void testSerializationPermanent() throws Exception { * Tests the serialization/deserialization of BLOB keys. */ private void testSerialization(BlobKey.BlobType blobType) throws Exception { - final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1); + final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_1); final BlobKey k2 = CommonTestUtils.createCopySerializable(k1); assertEquals(k1, k2); assertEquals(k1.hashCode(), k2.hashCode()); @@ -107,16 +125,25 @@ public void testEqualsPermanent() { } /** - * Tests the equals method. + * Tests the {@link BlobKey#equals(Object)} and {@link BlobKey#hashCode()} methods. */ private void testEquals(BlobKey.BlobType blobType) { - final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1); - final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1); - final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2); + final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_1); + final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_1); + final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2, RANDOM_ARRAY_1); + final BlobKey k4 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_2); assertTrue(k1.equals(k2)); assertTrue(k2.equals(k1)); + assertEquals(k1.hashCode(), k2.hashCode()); assertFalse(k1.equals(k3)); assertFalse(k3.equals(k1)); + assertFalse(k1.equals(k4)); + assertFalse(k4.equals(k1)); + + //noinspection ObjectEqualsNull + assertFalse(k1.equals(null)); + //noinspection EqualsBetweenInconvertibleTypes + assertFalse(k1.equals(this)); } /** @@ -124,8 +151,8 @@ private void testEquals(BlobKey.BlobType blobType) { */ @Test public void testEqualsDifferentBlobType() { - final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1); - final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1); + final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1, RANDOM_ARRAY_1); + final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1, RANDOM_ARRAY_1); assertFalse(k1.equals(k2)); assertFalse(k2.equals(k1)); } @@ -144,19 +171,22 @@ public void testComparesPermanent() { * Tests the compares method. */ private void testCompares(BlobKey.BlobType blobType) { - final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1); - final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1); - final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2); + final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_1); + final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_1); + final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2, RANDOM_ARRAY_1); + final BlobKey k4 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_2); assertThat(k1.compareTo(k2), is(0)); assertThat(k2.compareTo(k1), is(0)); assertThat(k1.compareTo(k3), lessThan(0)); + assertThat(k1.compareTo(k4), lessThan(0)); assertThat(k3.compareTo(k1), greaterThan(0)); + assertThat(k4.compareTo(k1), greaterThan(0)); } @Test public void testComparesDifferentBlobType() { - final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1); - final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1); + final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1, RANDOM_ARRAY_1); + final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1, RANDOM_ARRAY_1); assertThat(k1.compareTo(k2), greaterThan(0)); assertThat(k2.compareTo(k1), lessThan(0)); } @@ -175,7 +205,7 @@ public void testStreamsPermanent() throws Exception { * Test the serialization/deserialization using input/output streams. */ private void testStreams(BlobKey.BlobType blobType) throws IOException { - final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1); + final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_1); final ByteArrayOutputStream baos = new ByteArrayOutputStream(20); k1.writeToOutputStream(baos); @@ -187,6 +217,28 @@ private void testStreams(BlobKey.BlobType blobType) throws IOException { assertEquals(k1, k2); } + /** + * Verifies that the two given key's are different in total but share the same hash. + * + * @param key1 first blob key + * @param key2 second blob key + */ + static void verifyKeyDifferentHashEquals(BlobKey key1, BlobKey key2) { + assertNotEquals(key1, key2); + assertThat(key1.getHash(), equalTo(key2.getHash())); + } + + /** + * Verifies that the two given key's are different in total and also have different hashes. + * + * @param key1 first blob key + * @param key2 second blob key + */ + static void verifyKeyDifferentHashDifferent(BlobKey key1, BlobKey key2) { + assertNotEquals(key1, key2); + assertThat(key1.getHash(), not(equalTo(key2.getHash()))); + } + /** * Verifies that the given key is of an expected type. * diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java index a110d4a80d803..fde21ba36d75a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java @@ -47,10 +47,11 @@ import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFileCountForJob; import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB; import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB; +import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals; import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted; import static org.apache.flink.runtime.blob.BlobServerPutTest.put; import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -86,6 +87,12 @@ public void testDeleteTransient4() throws IOException { testDeleteTransient(new JobID(), new JobID()); } + @Test + public void testDeleteTransient5() throws IOException { + JobID jobId = new JobID(); + testDeleteTransient(jobId, jobId); + } + /** * Uploads a (different) byte array for each of the given jobs and verifies that deleting one of * them (via the {@link BlobServer}) does not influence the other. @@ -97,7 +104,6 @@ public void testDeleteTransient4() throws IOException { */ private void testDeleteTransient(@Nullable JobID jobId1, @Nullable JobID jobId2) throws IOException { - final boolean sameJobId = (jobId1 == jobId2) || (jobId1 != null && jobId1.equals(jobId2)); final Configuration config = new Configuration(); config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); @@ -118,7 +124,7 @@ private void testDeleteTransient(@Nullable JobID jobId1, @Nullable JobID jobId2) // put two more BLOBs (same key, other key) for another job ID TransientBlobKey key2a = (TransientBlobKey) put(server, jobId2, data, TRANSIENT_BLOB); assertNotNull(key2a); - assertEquals(key1, key2a); + verifyKeyDifferentHashEquals(key1, key2a); TransientBlobKey key2b = (TransientBlobKey) put(server, jobId2, data2, TRANSIENT_BLOB); assertNotNull(key2b); @@ -126,10 +132,9 @@ private void testDeleteTransient(@Nullable JobID jobId1, @Nullable JobID jobId2) assertTrue(delete(server, jobId1, key1)); verifyDeleted(server, jobId1, key1); - // deleting a one BLOB should not affect another BLOB, even with the same key if job IDs are different - if (!sameJobId) { - verifyContents(server, jobId2, key2a, data); - } + // deleting a one BLOB should not affect another BLOB with a different key + // (and keys are always different now) + verifyContents(server, jobId2, key2a, data); verifyContents(server, jobId2, key2b, data2); // delete first file of second job @@ -284,7 +289,7 @@ private void testJobCleanup(BlobKey.BlobType blobType) throws IOException { BlobKey key1a = put(server, jobId1, data, blobType); BlobKey key2 = put(server, jobId2, data, blobType); - assertEquals(key1a, key2); + assertArrayEquals(key1a.getHash(), key2.getHash()); BlobKey key1b = put(server, jobId1, data2, blobType); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java index 492727935624c..e3b53096e2c90 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java @@ -42,7 +42,6 @@ import java.io.IOException; import java.nio.file.AccessDeniedException; import java.nio.file.NoSuchFileException; -import java.security.MessageDigest; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -58,6 +57,7 @@ import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose; import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB; import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB; +import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals; import static org.apache.flink.runtime.blob.BlobServerPutTest.put; import static org.apache.flink.runtime.blob.BlobUtils.JOB_DIR_PREFIX; import static org.junit.Assert.assertArrayEquals; @@ -141,18 +141,18 @@ private void testGetFailsDuringLookup( // add the same data under a second jobId BlobKey key2 = put(server, jobId2, data, blobType); - assertNotNull(key); - assertEquals(key, key2); + assertNotNull(key2); + verifyKeyDifferentHashEquals(key, key2); // request for jobId2 should succeed - get(server, jobId2, key); + get(server, jobId2, key2); // request for jobId1 should still fail verifyDeleted(server, jobId1, key); // same checks as for jobId1 but for jobId2 should also work: - blobFile = server.getStorageLocation(jobId2, key); + blobFile = server.getStorageLocation(jobId2, key2); assertTrue(blobFile.delete()); - verifyDeleted(server, jobId2, key); + verifyDeleted(server, jobId2, key2); } } @@ -373,11 +373,6 @@ private void testConcurrentGetOperations( final byte[] data = {1, 2, 3, 4, 99, 42}; - MessageDigest md = BlobUtils.createMessageDigest(); - - // create the correct blob key by hashing our input data - final BlobKey blobKey = BlobKey.createKey(blobType, md.digest(data)); - doAnswer( new Answer() { @Override @@ -398,7 +393,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable { server.start(); // upload data first - assertEquals(blobKey, put(server, jobId, data, blobType)); + final BlobKey blobKey = put(server, jobId, data, blobType); // now try accessing it concurrently (only HA mode will be able to retrieve it from HA store!) if (blobType == PERMANENT_BLOB) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java index aefd0a302e417..dcf49a5ca697e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java @@ -62,6 +62,7 @@ import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose; import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB; import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB; +import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals; import static org.apache.flink.runtime.blob.BlobServerGetTest.get; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -226,21 +227,26 @@ private void testPutBufferSuccessfulGet( // put data for jobId1 and verify BlobKey key1a = put(server, jobId1, data, blobType); assertNotNull(key1a); + // second upload of same data should yield a different BlobKey + BlobKey key1a2 = put(server, jobId1, data, blobType); + assertNotNull(key1a2); + verifyKeyDifferentHashEquals(key1a, key1a2); BlobKey key1b = put(server, jobId1, data2, blobType); assertNotNull(key1b); verifyContents(server, jobId1, key1a, data); + verifyContents(server, jobId1, key1a2, data); verifyContents(server, jobId1, key1b, data2); // now put data for jobId2 and verify that both are ok BlobKey key2a = put(server, jobId2, data, blobType); assertNotNull(key2a); - assertEquals(key1a, key2a); + verifyKeyDifferentHashEquals(key1a, key2a); BlobKey key2b = put(server, jobId2, data2, blobType); assertNotNull(key2b); - assertEquals(key1b, key2b); + verifyKeyDifferentHashEquals(key1b, key2b); // verify the accessibility and the BLOB contents verifyContents(server, jobId2, key2a, data); @@ -249,6 +255,7 @@ private void testPutBufferSuccessfulGet( // verify the accessibility and the BLOB contents one more time (transient BLOBs should // not be deleted here) verifyContents(server, jobId1, key1a, data); + verifyContents(server, jobId1, key1a2, data); verifyContents(server, jobId1, key1b, data2); verifyContents(server, jobId2, key2a, data); verifyContents(server, jobId2, key2b, data2); @@ -311,21 +318,26 @@ private void testPutStreamSuccessfulGet( // put data for jobId1 and verify BlobKey key1a = put(server, jobId1, new ByteArrayInputStream(data), blobType); assertNotNull(key1a); + // second upload of same data should yield a different BlobKey + BlobKey key1a2 = put(server, jobId1, new ByteArrayInputStream(data), blobType); + assertNotNull(key1a2); + verifyKeyDifferentHashEquals(key1a, key1a2); BlobKey key1b = put(server, jobId1, new ByteArrayInputStream(data2), blobType); assertNotNull(key1b); verifyContents(server, jobId1, key1a, data); + verifyContents(server, jobId1, key1a2, data); verifyContents(server, jobId1, key1b, data2); // now put data for jobId2 and verify that both are ok BlobKey key2a = put(server, jobId2, new ByteArrayInputStream(data), blobType); assertNotNull(key2a); - assertEquals(key1a, key2a); + verifyKeyDifferentHashEquals(key1a, key2a); BlobKey key2b = put(server, jobId2, new ByteArrayInputStream(data2), blobType); assertNotNull(key2b); - assertEquals(key1b, key2b); + verifyKeyDifferentHashEquals(key1b, key2b); // verify the accessibility and the BLOB contents verifyContents(server, jobId2, key2a, data); @@ -334,6 +346,7 @@ private void testPutStreamSuccessfulGet( // verify the accessibility and the BLOB contents one more time (transient BLOBs should // not be deleted here) verifyContents(server, jobId1, key1a, data); + verifyContents(server, jobId1, key1a2, data); verifyContents(server, jobId1, key1b, data2); verifyContents(server, jobId2, key2a, data); verifyContents(server, jobId2, key2b, data2); @@ -396,21 +409,26 @@ private void testPutChunkedStreamSuccessfulGet( // put data for jobId1 and verify BlobKey key1a = put(server, jobId1, new ChunkedInputStream(data, 19), blobType); assertNotNull(key1a); + // second upload of same data should yield a different BlobKey + BlobKey key1a2 = put(server, jobId1, new ChunkedInputStream(data, 19), blobType); + assertNotNull(key1a2); + verifyKeyDifferentHashEquals(key1a, key1a2); BlobKey key1b = put(server, jobId1, new ChunkedInputStream(data2, 19), blobType); assertNotNull(key1b); verifyContents(server, jobId1, key1a, data); + verifyContents(server, jobId1, key1a2, data); verifyContents(server, jobId1, key1b, data2); // now put data for jobId2 and verify that both are ok BlobKey key2a = put(server, jobId2, new ChunkedInputStream(data, 19), blobType); assertNotNull(key2a); - assertEquals(key1a, key2a); + verifyKeyDifferentHashEquals(key1a, key2a); BlobKey key2b = put(server, jobId2, new ChunkedInputStream(data2, 19), blobType); assertNotNull(key2b); - assertEquals(key1b, key2b); + verifyKeyDifferentHashEquals(key1b, key2b); // verify the accessibility and the BLOB contents verifyContents(server, jobId2, key2a, data); @@ -419,6 +437,7 @@ private void testPutChunkedStreamSuccessfulGet( // verify the accessibility and the BLOB contents one more time (transient BLOBs should // not be deleted here) verifyContents(server, jobId1, key1a, data); + verifyContents(server, jobId1, key1a2, data); verifyContents(server, jobId1, key1b, data2); verifyContents(server, jobId2, key2a, data); verifyContents(server, jobId2, key2b, data2); @@ -700,7 +719,7 @@ private void testConcurrentPutOperations( // make sure that all blob keys are the same while (blobKeyIterator.hasNext()) { - assertEquals(blobKey, blobKeyIterator.next()); + verifyKeyDifferentHashEquals(blobKey, blobKeyIterator.next()); } // check the uploaded file's contents diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java index 35575b5f20155..5b8d0e9e3531f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java @@ -39,12 +39,10 @@ import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB; import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB; +import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals; import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted; import static org.apache.flink.runtime.blob.BlobServerPutTest.put; import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -126,8 +124,7 @@ public static void testBlobServerRecovery(final Configuration config, final Blob // put non-HA data nonHAKey = put(server0, jobId[0], expected2, TRANSIENT_BLOB); - assertNotEquals(keys[1], nonHAKey); - assertThat(keys[1].getHash(), equalTo(nonHAKey.getHash())); + verifyKeyDifferentHashEquals(keys[1], nonHAKey); // check that the storage directory exists final Path blobServerPath = new Path(storagePath, "blob");