-
Notifications
You must be signed in to change notification settings - Fork 24.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Encrypted blob store repository - take I #46170
Merged
albertzaharovits
merged 27 commits into
elastic:client-side-encrypted-snapshot-repos
from
albertzaharovits:encrypted-repo-poc
Oct 10, 2019
Merged
Changes from 20 commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
1b648a7
POC
albertzaharovits e48c568
EncryptedRepository lifecycle
albertzaharovits 61e4f2a
encryptionMetadataBlobPath
albertzaharovits 5b7f5ea
Almost...
albertzaharovits 23bcd23
Done???
albertzaharovits 9e50384
Merge branch 'master' into encrypted-repo-poc
albertzaharovits f7ac3ed
Nit
albertzaharovits d892c2c
WORKS!
albertzaharovits 5f8d77b
Merge branch 'master' into encrypted-repo-poc
albertzaharovits ad6f14a
Merge branch 'master' into encrypted-repo-poc
albertzaharovits f1a44de
Chunk size
albertzaharovits 7b3eb4d
SunJCE mrrrr
albertzaharovits a54513c
Merge branch 'master' into encrypted-repo-poc
albertzaharovits 43087e5
Always failIfExists for encryption metadata
albertzaharovits c160245
Parameterize for provider and chunk size
albertzaharovits 85b1803
compile oversight
albertzaharovits 7345c9b
License
albertzaharovits 5fd4e61
Adjust sizes
albertzaharovits 24378fc
Refactoring in a new plugin WIP
albertzaharovits c1649c8
Works!
albertzaharovits 411f5da
Merge branch 'master' into encrypted-repo-poc
albertzaharovits ab8d6c6
Changes to move encrypted snapshots code to x-pack module
5e75538
Merge branch 'encrypted-repo-poc' of github.com:albertzaharovits/elas…
albertzaharovits de4aeb9
Merge branch 'master' into encrypted-repo-poc
albertzaharovits 69fc7e5
FIPS libs
albertzaharovits b483b57
Straight GCM but with the bc-fips lib
albertzaharovits 8345c3a
Merge branch 'master' into encrypted-repo-poc
albertzaharovits File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
esplugin { | ||
description 'The encrypted repository plugin adds support for client-side AES-GCM encrypted repositories.' | ||
classname 'org.elasticsearch.repositories.encrypted.EncryptedRepositoryPlugin' | ||
} | ||
|
||
dependencies { | ||
compile 'org.bouncycastle:bcprov-jdk15on:1.62' | ||
} |
308 changes: 308 additions & 0 deletions
308
...encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,308 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
package org.elasticsearch.repositories.encrypted; | ||
|
||
import org.bouncycastle.jce.provider.BouncyCastleProvider; | ||
import org.elasticsearch.cluster.metadata.RepositoryMetaData; | ||
import org.elasticsearch.common.Strings; | ||
import org.elasticsearch.common.blobstore.BlobContainer; | ||
import org.elasticsearch.common.blobstore.BlobMetaData; | ||
import org.elasticsearch.common.blobstore.BlobPath; | ||
import org.elasticsearch.common.blobstore.BlobStore; | ||
import org.elasticsearch.common.blobstore.DeleteResult; | ||
import org.elasticsearch.common.bytes.BytesReference; | ||
import org.elasticsearch.common.hash.MessageDigests; | ||
import org.elasticsearch.common.io.Streams; | ||
import org.elasticsearch.common.settings.SecureSetting; | ||
import org.elasticsearch.common.settings.SecureString; | ||
import org.elasticsearch.common.settings.Setting; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.ByteSizeUnit; | ||
import org.elasticsearch.common.unit.ByteSizeValue; | ||
import org.elasticsearch.repositories.Repository; | ||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository; | ||
|
||
import java.io.ByteArrayInputStream; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.security.InvalidAlgorithmParameterException; | ||
import java.security.InvalidKeyException; | ||
import java.security.NoSuchAlgorithmException; | ||
import java.security.spec.InvalidKeySpecException; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.function.Function; | ||
|
||
import javax.crypto.Cipher; | ||
import javax.crypto.CipherInputStream; | ||
import javax.crypto.IllegalBlockSizeException; | ||
import javax.crypto.KeyGenerator; | ||
import javax.crypto.NoSuchPaddingException; | ||
import javax.crypto.SecretKey; | ||
import javax.crypto.SecretKeyFactory; | ||
import javax.crypto.spec.IvParameterSpec; | ||
import javax.crypto.spec.PBEKeySpec; | ||
import javax.crypto.spec.SecretKeySpec; | ||
|
||
public class EncryptedRepository extends BlobStoreRepository { | ||
|
||
static final Setting.AffixSetting<SecureString> ENCRYPTION_PASSWORD_SETTING = Setting.affixKeySetting("repository.encrypted.", | ||
"password", key -> SecureSetting.secureString(key, null)); | ||
|
||
private static final Setting<String> DELEGATE_TYPE = new Setting<>("delegate_type", "", Function.identity()); | ||
private static final int GCM_TAG_BYTES_LENGTH = 16; | ||
private static final String ENCRYPTION_MODE = "AES/GCM/NoPadding"; | ||
private static final String ENCRYPTION_METADATA_PREFIX = "encryption-metadata-"; | ||
// always the same IV because the key is randomly generated anew (Key-IV pair is never repeated) | ||
//private static final GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(128, new byte[] {0,1,2,3,4,5,6,7,8,9,10,11 }); | ||
private static final IvParameterSpec ivParameterSpec = new IvParameterSpec(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11 }); | ||
// given the mode, the IV and the tag length, the maximum "chunk" size is ~64GB, we set it to 32GB to err on the safe side | ||
public static final ByteSizeValue MAX_CHUNK_SIZE = new ByteSizeValue(32, ByteSizeUnit.GB); | ||
|
||
private static final BouncyCastleProvider BC_PROV = new BouncyCastleProvider(); | ||
|
||
private final BlobStoreRepository delegatedRepository; | ||
private final SecretKey masterSecretKey; | ||
|
||
protected EncryptedRepository(BlobStoreRepository delegatedRepository, SecretKey masterSecretKey) { | ||
super(delegatedRepository); | ||
this.delegatedRepository = delegatedRepository; | ||
this.masterSecretKey = masterSecretKey; | ||
} | ||
|
||
@Override | ||
protected BlobStore createBlobStore() throws Exception { | ||
return new EncryptedBlobStoreDecorator(this.delegatedRepository.blobStore(), this.masterSecretKey); | ||
} | ||
|
||
@Override | ||
protected void doStart() { | ||
this.delegatedRepository.start(); | ||
super.doStart(); | ||
} | ||
|
||
@Override | ||
protected void doStop() { | ||
super.doStop(); | ||
this.delegatedRepository.stop(); | ||
} | ||
|
||
@Override | ||
protected void doClose() { | ||
super.doClose(); | ||
this.delegatedRepository.close(); | ||
} | ||
|
||
@Override | ||
public ByteSizeValue chunkSize() { | ||
ByteSizeValue delegatedChunkSize = this.delegatedRepository.chunkSize(); | ||
if (delegatedChunkSize == null || delegatedChunkSize.compareTo(MAX_CHUNK_SIZE) > 0) { | ||
return MAX_CHUNK_SIZE; | ||
} else { | ||
return delegatedChunkSize; | ||
} | ||
} | ||
|
||
/** | ||
* Returns a new encrypted repository factory | ||
*/ | ||
public static Repository.Factory newRepositoryFactory(final Settings settings) { | ||
final Map<String, char[]> cachedRepositoryPasswords = new HashMap<>(); | ||
for (String repositoryName : ENCRYPTION_PASSWORD_SETTING.getNamespaces(settings)) { | ||
Setting<SecureString> encryptionPasswordSetting = ENCRYPTION_PASSWORD_SETTING | ||
.getConcreteSettingForNamespace(repositoryName); | ||
SecureString encryptionPassword = encryptionPasswordSetting.get(settings); | ||
cachedRepositoryPasswords.put(repositoryName, encryptionPassword.getChars()); | ||
} | ||
return new Repository.Factory() { | ||
|
||
@Override | ||
public Repository create(RepositoryMetaData metadata) { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public Repository create(RepositoryMetaData metaData, Function<String, Repository.Factory> typeLookup) throws Exception { | ||
String delegateType = DELEGATE_TYPE.get(metaData.settings()); | ||
if (Strings.hasLength(delegateType) == false) { | ||
throw new IllegalArgumentException(DELEGATE_TYPE.getKey() + " must be set"); | ||
} | ||
if (false == cachedRepositoryPasswords.containsKey(metaData.name())) { | ||
throw new IllegalArgumentException( | ||
ENCRYPTION_PASSWORD_SETTING.getConcreteSettingForNamespace(metaData.name()).getKey() + " must be set"); | ||
} | ||
SecretKey secretKey = generateSecretKeyFromPassword(cachedRepositoryPasswords.get(metaData.name())); | ||
Repository.Factory factory = typeLookup.apply(delegateType); | ||
Repository delegatedRepository = factory.create(new RepositoryMetaData(metaData.name(), | ||
delegateType, metaData.settings())); | ||
if (false == (delegatedRepository instanceof BlobStoreRepository)) { | ||
throw new IllegalArgumentException("Unsupported type " + DELEGATE_TYPE.getKey()); | ||
} | ||
return new EncryptedRepository((BlobStoreRepository)delegatedRepository, secretKey); | ||
} | ||
}; | ||
} | ||
|
||
private static class EncryptedBlobStoreDecorator implements BlobStore { | ||
|
||
private final BlobStore delegatedBlobStore; | ||
private final SecretKey masterSecretKey; | ||
|
||
EncryptedBlobStoreDecorator(BlobStore blobStore, SecretKey masterSecretKey) { | ||
this.delegatedBlobStore = blobStore; | ||
this.masterSecretKey = masterSecretKey; | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
this.delegatedBlobStore.close(); | ||
} | ||
|
||
@Override | ||
public BlobContainer blobContainer(BlobPath path) { | ||
BlobPath encryptionMetadataBlobPath = BlobPath.cleanPath(); | ||
encryptionMetadataBlobPath = encryptionMetadataBlobPath.add(ENCRYPTION_METADATA_PREFIX + keyId(this.masterSecretKey)); | ||
for (String pathComponent : path) { | ||
encryptionMetadataBlobPath = encryptionMetadataBlobPath.add(pathComponent); | ||
} | ||
return new EncryptedBlobContainerDecorator(this.delegatedBlobStore.blobContainer(path), | ||
this.delegatedBlobStore.blobContainer(encryptionMetadataBlobPath), this.masterSecretKey); | ||
} | ||
} | ||
|
||
private static class EncryptedBlobContainerDecorator implements BlobContainer { | ||
|
||
private final BlobContainer delegatedBlobContainer; | ||
private final BlobContainer encryptionMetadataBlobContainer; | ||
private final SecretKey masterSecretKey; | ||
|
||
EncryptedBlobContainerDecorator(BlobContainer delegatedBlobContainer, BlobContainer encryptionMetadataBlobContainer, | ||
SecretKey masterSecretKey) { | ||
this.delegatedBlobContainer = delegatedBlobContainer; | ||
this.encryptionMetadataBlobContainer = encryptionMetadataBlobContainer; | ||
this.masterSecretKey = masterSecretKey; | ||
} | ||
|
||
@Override | ||
public BlobPath path() { | ||
return this.delegatedBlobContainer.path(); | ||
} | ||
|
||
@Override | ||
public InputStream readBlob(String blobName) throws IOException { | ||
final BytesReference dataDecryptionKeyBytes = Streams.readFully(this.encryptionMetadataBlobContainer.readBlob(blobName)); | ||
try { | ||
SecretKey dataDecryptionKey = unwrapKey(BytesReference.toBytes(dataDecryptionKeyBytes), this.masterSecretKey); | ||
Cipher cipher = Cipher.getInstance(ENCRYPTION_MODE, BC_PROV); | ||
cipher.init(Cipher.DECRYPT_MODE, dataDecryptionKey, ivParameterSpec); | ||
return new CipherInputStream(this.delegatedBlobContainer.readBlob(blobName), cipher); | ||
} catch (InvalidKeyException | NoSuchAlgorithmException | NoSuchPaddingException | InvalidAlgorithmParameterException e) { | ||
throw new IOException(e); | ||
} | ||
} | ||
|
||
@Override | ||
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { | ||
try { | ||
SecretKey dataEncryptionKey = generateRandomSecretKey(); | ||
byte[] wrappedDataEncryptionKey = wrapKey(dataEncryptionKey, this.masterSecretKey); | ||
try (InputStream stream = new ByteArrayInputStream(wrappedDataEncryptionKey)) { | ||
this.encryptionMetadataBlobContainer.writeBlob(blobName, stream, wrappedDataEncryptionKey.length, failIfAlreadyExists); | ||
} | ||
Cipher cipher = Cipher.getInstance(ENCRYPTION_MODE, BC_PROV); | ||
cipher.init(Cipher.ENCRYPT_MODE, dataEncryptionKey, ivParameterSpec); | ||
this.delegatedBlobContainer.writeBlob(blobName, new CipherInputStream(inputStream, cipher), blobSize + GCM_TAG_BYTES_LENGTH, | ||
failIfAlreadyExists); | ||
} catch (NoSuchAlgorithmException | InvalidKeyException | NoSuchPaddingException | IllegalBlockSizeException | ||
| InvalidAlgorithmParameterException e) { | ||
throw new IOException(e); | ||
} | ||
} | ||
|
||
@Override | ||
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) | ||
throws IOException { | ||
// does not support atomic write | ||
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); | ||
} | ||
|
||
@Override | ||
public void deleteBlob(String blobName) throws IOException { | ||
this.delegatedBlobContainer.deleteBlob(blobName); | ||
this.encryptionMetadataBlobContainer.deleteBlob(blobName); | ||
} | ||
|
||
@Override | ||
public DeleteResult delete() throws IOException { | ||
DeleteResult result = this.delegatedBlobContainer.delete(); | ||
this.encryptionMetadataBlobContainer.delete(); | ||
return result; | ||
} | ||
|
||
@Override | ||
public Map<String, BlobMetaData> listBlobs() throws IOException { | ||
return this.delegatedBlobContainer.listBlobs(); | ||
} | ||
|
||
@Override | ||
public Map<String, BlobContainer> children() throws IOException { | ||
return this.delegatedBlobContainer.children(); | ||
} | ||
|
||
@Override | ||
public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException { | ||
Map<String, BlobMetaData> delegatedBlobs = this.delegatedBlobContainer.listBlobsByPrefix(blobNamePrefix); | ||
Map<String, BlobMetaData> delegatedBlobsWithPlainSize = new HashMap<>(delegatedBlobs.size()); | ||
for (Map.Entry<String, BlobMetaData> entry : delegatedBlobs.entrySet()) { | ||
delegatedBlobsWithPlainSize.put(entry.getKey(), new BlobMetaData() { | ||
|
||
@Override | ||
public String name() { | ||
return entry.getValue().name(); | ||
} | ||
|
||
@Override | ||
public long length() { | ||
return entry.getValue().length() - GCM_TAG_BYTES_LENGTH; | ||
} | ||
}); | ||
} | ||
return delegatedBlobsWithPlainSize; | ||
} | ||
} | ||
|
||
private static SecretKey generateSecretKeyFromPassword(char[] password) throws NoSuchAlgorithmException, InvalidKeySpecException { | ||
byte[] salt = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}; // same salt for 1:1 password to key | ||
PBEKeySpec spec = new PBEKeySpec(password, salt, 65536, 256); | ||
SecretKey tmp = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(spec); | ||
return new SecretKeySpec(tmp.getEncoded(), "AES"); | ||
} | ||
|
||
private static String keyId(SecretKey secretKey) { | ||
return MessageDigests.toHexString(MessageDigests.sha256().digest(secretKey.getEncoded())); | ||
} | ||
|
||
private static SecretKey generateRandomSecretKey() throws NoSuchAlgorithmException { | ||
KeyGenerator keyGen = KeyGenerator.getInstance("AES"); | ||
keyGen.init(256); | ||
return keyGen.generateKey(); | ||
} | ||
|
||
private static byte[] wrapKey(SecretKey toWrap, SecretKey keyWrappingKey) | ||
throws NoSuchAlgorithmException, NoSuchPaddingException, InvalidKeyException, IllegalBlockSizeException { | ||
Cipher cipher = Cipher.getInstance("AESWrap"); | ||
cipher.init(Cipher.WRAP_MODE, keyWrappingKey); | ||
return cipher.wrap(toWrap); | ||
} | ||
|
||
private static SecretKey unwrapKey(byte[] toUnwrap, SecretKey keyEncryptionKey) | ||
throws NoSuchAlgorithmException, NoSuchPaddingException, InvalidKeyException { | ||
Cipher cipher = Cipher.getInstance("AESWrap"); | ||
cipher.init(Cipher.UNWRAP_MODE, keyEncryptionKey); | ||
return (SecretKey) cipher.unwrap(toUnwrap, "AES", Cipher.SECRET_KEY); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CipherInputStream does not support
marking
(i.e.CipherInputStream.markSupported()
returns false) which means that failed requests to S3 / GCS can't be auto-retried.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Urgh good find ... I guess we could create our own cipher stream that supports it, but then we'd either have to buffer the encrypted data to disk or reencrypt the whole thing on retry right? (maybe there's a better way that I don't see?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If those repositories (blob stores) only require reseting back to the beginning of the stream (to retry the whole request) then we could probably handle it with reencryption.
But if we want to
mark
an arbitrary point in the stream andreset
to it, then re-encryption probably isn't going to be an option because you can't reset aCipher
to an artibrary point, and that means any checksums will be thrown out because the Cipher will think it has processed more data than was actually written to the blob store.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking was to simply reset back to
0
to get a fresh Cipher, but then just dump the bytes up to whatever pointx
we actually want to reset to and only start producing the bytes starting fromx
to have a CPU-expensive by IO-cheap way of implementing resetting to an arbitrary point. Wouldn't that work?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for raising this point @ywelsch !
I understand that this is a problem for upload (hence encryption). Decryption is the
CipherInputStream
wrapping the cloud client's stream so it should not be affected. Please correct me if I'm mistaken.I looked over the S3 SDK sources and I believe
AwsChunkedEncodingInputStream
is where an input stream which does not supportmark/reset
will be memory buffered during an update. This chunking is recommended for buffers larger than 100MB. Note that this chunking is different from the chunking we do at the blob level; this is chunking done inside the SDK during the multipart upload. The chunk buffers are generally small, only 128Kb by default. Hence I believe generalmark/reset
is required (not only from the beginning).Assuming that renouncing the chunked-upload-and-retry by the cloud SDK library is a last resort option, we're left with four other choices:
AES/CTR
withHmacSha512
. This encryption scheme is able to produce amark/reset
-ableCipherInputStream
(there is such an implementation in the BC library). This gets us coding the decryption and authentication separately, exposing us to some pitfalls, but I am confident we can straighten them all in reviews.AES/GCM
can be translated to anAES/CTR
scheme, therefore we mightbufferseek the plain text and redo the encryption upon areset
, using theCTR
cipher specifically configured for the seek position. The code will be nasty, but we can test that re-winded and re-encrypted streams are identical to the originalAES/GCM
ciphertext.From my pure engineering perspective I would pick option 3. The code implemented with BC should be very neat and the caveats doing the MAC yourself are manageable. I don't like buffering given how much has been invested in developing seek-able streams al over the codebase (hence discounting 1 and 2), and option 4 is a bit too complex. That being said I think option 4, when it finally works, will be easier to "prove" correct (because we can test cipher text equality). Also, on-disk spooling, if feasible, would be a great leeway in terms of future configurability of the encryption plugin; maybe spooling is required by other features too?
I am curious what's your thinking on this @original-brownbear @ywelsch @tvernum .
In the mean time I will try to implement option 3 using BouncyCastle to see if there's a noticeable performance impact.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would agree that 3 seems like the cleanest option here.
But as I said above, I think 4 is fine too. The situation of having the reset the stream and retry the upload of a chunk should be a relatively rare occurence (obviously this depends on the specific networking situation, but even then the time spent on retries should be small relative to the overall snapshot time)
=> Assuming 3 doesn't work, I think it'd be better to spent some CPU here and go with 4 than to introduce the complications (there's a number of open questions this would raise in regards to disk usage IMO) of spooling to disk (2.) or restricting upload chunk sizing flexibility (1.).