Skip to content

Commit

Permalink
Merge pull request #28838 from remilapeyre/consul-size-limit
Browse files Browse the repository at this point in the history
Fix handling large states in the Consul backend
  • Loading branch information
jbardin committed Jul 30, 2021
2 parents aa414f3 + 5f444a6 commit 97a2694
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 57 deletions.
124 changes: 72 additions & 52 deletions internal/backend/remote-state/consul/client.go
Expand Up @@ -198,72 +198,92 @@ func (c *RemoteClient) Put(data []byte) error {
verb = consulapi.KVSet
}

// If the payload is too large we first write the chunks and replace it
// 524288 is the default value, we just hope the user did not set a smaller
// one but there is really no reason for them to do so, if they changed it
// it is certainly to set a larger value.
limit := 524288
if len(payload) > limit {
md5 := md5.Sum(data)
chunks := split(payload, limit)
chunkPaths := make([]string, 0)

// First we write the new chunks
for i, p := range chunks {
path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%x/%d", md5, i)
chunkPaths = append(chunkPaths, path)
_, err := kv.Put(&consulapi.KVPair{
Key: path,
Value: p,
}, nil)

if err != nil {
return err
}
// The payload may be too large to store in a single KV entry in Consul. We
// could try to determine whether it will fit or not before sending the
// request but since we are using the Transaction API and not the KV API,
// it grows by about a 1/3 when it is base64 encoded plus the overhead of
// the fields specific to the Transaction API.
// Rather than trying to calculate the overhead (which could change from
// one version of Consul to another, and between Consul Community Edition
// and Consul Enterprise), we try to send the whole state in one request, if
// it fails because it is too big we then split it in chunks and send each
// chunk separately.
// When splitting in chunks, we make each chunk 524288 bits, which is the
// default max size for raft. If the user changed it, we still may send
// chunks too big and fail but this is not a setting that should be fiddled
// with anyway.

store := func(payload []byte) error {
// KV.Put doesn't return the new index, so we use a single operation
// transaction to get the new index with a single request.
txOps := consulapi.KVTxnOps{
&consulapi.KVTxnOp{
Verb: verb,
Key: c.Path,
Value: payload,
Index: c.modifyIndex,
},
}

// We update the link to point to the new chunks
payload, err = json.Marshal(map[string]interface{}{
"current-hash": fmt.Sprintf("%x", md5),
"chunks": chunkPaths,
})
ok, resp, _, err := kv.Txn(txOps, nil)
if err != nil {
return err
}
}
// transaction was rolled back
if !ok {
return fmt.Errorf("consul CAS failed with transaction errors: %v", resp.Errors)
}

var txOps consulapi.KVTxnOps
// KV.Put doesn't return the new index, so we use a single operation
// transaction to get the new index with a single request.
txOps = consulapi.KVTxnOps{
&consulapi.KVTxnOp{
Verb: verb,
Key: c.Path,
Value: payload,
Index: c.modifyIndex,
},
if len(resp.Results) != 1 {
// this probably shouldn't happen
return fmt.Errorf("expected on 1 response value, got: %d", len(resp.Results))
}

c.modifyIndex = resp.Results[0].ModifyIndex

// We remove all the old chunks
cleanupOldChunks()

return nil
}

ok, resp, _, err := kv.Txn(txOps, nil)
if err != nil {
if err = store(payload); err == nil {
// The payload was small enough to be stored
return nil
} else if !strings.Contains(err.Error(), "too large") {
// We failed for some other reason, report this to the user
return err
}
// transaction was rolled back
if !ok {
return fmt.Errorf("consul CAS failed with transaction errors: %v", resp.Errors)
}

if len(resp.Results) != 1 {
// this probably shouldn't happen
return fmt.Errorf("expected on 1 response value, got: %d", len(resp.Results))
}
// The payload was too large so we split it in multiple chunks

c.modifyIndex = resp.Results[0].ModifyIndex
md5 := md5.Sum(data)
chunks := split(payload, 524288)
chunkPaths := make([]string, 0)

// We remove all the old chunks
cleanupOldChunks()
// First we write the new chunks
for i, p := range chunks {
path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%x/%d", md5, i)
chunkPaths = append(chunkPaths, path)
_, err := kv.Put(&consulapi.KVPair{
Key: path,
Value: p,
}, nil)

return nil
if err != nil {
return err
}
}

// Then we update the link to point to the new chunks
payload, err = json.Marshal(map[string]interface{}{
"current-hash": fmt.Sprintf("%x", md5),
"chunks": chunkPaths,
})
if err != nil {
return err
}
return store(payload)
}

func (c *RemoteClient) Delete() error {
Expand Down
18 changes: 13 additions & 5 deletions internal/backend/remote-state/consul/client_test.go
Expand Up @@ -142,11 +142,6 @@ func TestConsul_largeState(t *testing.T) {
t.Fatal(err)
}

// md5 := md5.Sum(payload)
// if !bytes.Equal(md5[:], remote.MD5) {
// t.Fatal("the md5 sums do not match")
// }

if !bytes.Equal(payload, remote.Data) {
t.Fatal("the data do not match")
}
Expand All @@ -167,6 +162,19 @@ func TestConsul_largeState(t *testing.T) {
},
)

// This payload is just short enough to be stored but will be bigger when
// going through the Transaction API as it will be base64 encoded
testPayload(
t,
map[string]string{
"foo": strings.Repeat("a", 524288-10),
},
[]string{
"tf-unit/test-large-state",
"tf-unit/test-large-state/tfstate.4f407ace136a86521fd0d366972fe5c7/0",
},
)

// We try to replace the payload with a small one, the old chunks should be removed
testPayload(
t,
Expand Down

0 comments on commit 97a2694

Please sign in to comment.