Skip to content

Commit

Permalink
Merge pull request #18214 from serathius/robustness-separate-persisted
Browse files Browse the repository at this point in the history
Separate persisted responses without knowing their revision to prevent duplicating state during linearization
  • Loading branch information
serathius committed Jun 25, 2024
2 parents a897676 + 1870222 commit 917ded9
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 45 deletions.
7 changes: 5 additions & 2 deletions tests/robustness/model/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin
if response.ClientError != "" {
return fmt.Sprintf("err: %q", response.ClientError)
}
if response.PartialResponse {
return fmt.Sprintf("unknown, rev: %d", response.Revision)
if response.Persisted {
if response.PersistedRevision != 0 {
return fmt.Sprintf("unknown, rev: %d", response.PersistedRevision)
}
return fmt.Sprintf("unknown")
}
switch request.Type {
case Range:
Expand Down
28 changes: 19 additions & 9 deletions tests/robustness/model/deterministic.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
if request.Range.Revision < newState.CompactRevision {
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()}}
}
return newState, MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: newState.Revision}}
return newState, MaybeEtcdResponse{Persisted: true, PersistedRevision: newState.Revision}
case Txn:
failure := false
for _, cond := range request.Txn.Conditions {
Expand Down Expand Up @@ -351,15 +351,17 @@ type LeaseRevokeRequest struct {
}
type DefragmentRequest struct{}

// MaybeEtcdResponse extends EtcdResponse to represent partial or failed responses.
// Possible states:
// * Normal response. Only EtcdResponse is set.
// * Partial response. The EtcdResponse.Revision and PartialResponse are set.
// * Failed response. Only Err is set.
// MaybeEtcdResponse extends EtcdResponse to include partial information about responses to a request.
// Possible response state information:
// * Normal response. Client observed response. Only EtcdResponse is set.
// * Persisted. Client didn't observe response, but we know it was persisted by etcd. Only Persisted is set
// * Persisted with Revision. Client didn't observe response, but we know that it was persisted, and it's revision. Both Persisted and PersistedRevision is set.
// * Error response. Client observed error, but we don't know if it was persisted. Only Error is set.
type MaybeEtcdResponse struct {
EtcdResponse
PartialResponse bool
Error string
Persisted bool
PersistedRevision int64
Error string
}

var ErrEtcdFutureRev = errors.New("future rev")
Expand All @@ -376,7 +378,15 @@ type EtcdResponse struct {
}

func Match(r1, r2 MaybeEtcdResponse) bool {
return ((r1.PartialResponse || r2.PartialResponse) && (r1.Revision == r2.Revision)) || reflect.DeepEqual(r1, r2)
r1Revision := r1.Revision
if r1.Persisted {
r1Revision = r1.PersistedRevision
}
r2Revision := r2.Revision
if r2.Persisted {
r2Revision = r2.PersistedRevision
}
return (r1.Persisted && r1.PersistedRevision == 0) || (r2.Persisted && r2.PersistedRevision == 0) || ((r1.Persisted || r2.Persisted) && (r1.Error != "" || r2.Error != "" || r1Revision == r2Revision)) || reflect.DeepEqual(r1, r2)
}

type TxnResponse struct {
Expand Down
2 changes: 1 addition & 1 deletion tests/robustness/model/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func failedResponse(err error) MaybeEtcdResponse {
}

func partialResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: revision}}
return MaybeEtcdResponse{Persisted: true, PersistedRevision: revision}
}

func putRequest(key, value string) EtcdRequest {
Expand Down
32 changes: 22 additions & 10 deletions tests/robustness/model/non_deterministic.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,19 @@ func (states nonDeterministicState) apply(request EtcdRequest, response MaybeEtc
var newStates nonDeterministicState
switch {
case response.Error != "":
newStates = states.stepFailedResponse(request)
case response.PartialResponse:
newStates = states.applyResponseRevision(request, response.EtcdResponse.Revision)
newStates = states.applyFailedRequest(request)
case response.Persisted && response.PersistedRevision == 0:
newStates = states.applyPersistedRequest(request)
case response.Persisted && response.PersistedRevision != 0:
newStates = states.applyPersistedRequestWithRevision(request, response.PersistedRevision)
default:
newStates = states.applySuccessfulResponse(request, response.EtcdResponse)
newStates = states.applyRequestWithResponse(request, response.EtcdResponse)
}
return len(newStates) > 0, newStates
}

// stepFailedResponse duplicates number of states by considering both cases, request was persisted and request was lost.
func (states nonDeterministicState) stepFailedResponse(request EtcdRequest) nonDeterministicState {
// applyFailedRequest returns both the original states and states with applied request. It considers both cases, request was persisted and request was lost.
func (states nonDeterministicState) applyFailedRequest(request EtcdRequest) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states)*2)
for _, s := range states {
newStates = append(newStates, s)
Expand All @@ -80,8 +82,18 @@ func (states nonDeterministicState) stepFailedResponse(request EtcdRequest) nonD
return newStates
}

// applyResponseRevision filters possible states by leaving ony states that would return proper revision.
func (states nonDeterministicState) applyResponseRevision(request EtcdRequest, responseRevision int64) nonDeterministicState {
// applyPersistedRequest applies request to all possible states.
func (states nonDeterministicState) applyPersistedRequest(request EtcdRequest) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states))
for _, s := range states {
newState, _ := s.Step(request)
newStates = append(newStates, newState)
}
return newStates
}

// applyPersistedRequestWithRevision applies request to all possible states, but leaves only states that would return proper revision.
func (states nonDeterministicState) applyPersistedRequestWithRevision(request EtcdRequest, responseRevision int64) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states))
for _, s := range states {
newState, modelResponse := s.Step(request)
Expand All @@ -92,8 +104,8 @@ func (states nonDeterministicState) applyResponseRevision(request EtcdRequest, r
return newStates
}

// applySuccessfulResponse filters possible states by leaving ony states that would respond correctly.
func (states nonDeterministicState) applySuccessfulResponse(request EtcdRequest, response EtcdResponse) nonDeterministicState {
// applyRequestWithResponse applies request to all possible states, but leaves only state that would return proper response.
func (states nonDeterministicState) applyRequestWithResponse(request EtcdRequest, response EtcdResponse) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states))
for _, s := range states {
newState, modelResponse := s.Step(request)
Expand Down
80 changes: 75 additions & 5 deletions tests/robustness/model/non_deterministic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func TestModelResponseMatch(t *testing.T) {
},
{
resp1: getResponse("key", "a", 1, 1),
resp2: partialResponse(0),
resp2: partialResponse(2),
expectMatch: false,
},
{
Expand All @@ -416,9 +416,14 @@ func TestModelResponseMatch(t *testing.T) {
},
{
resp1: putResponse(3),
resp2: partialResponse(0),
resp2: partialResponse(1),
expectMatch: false,
},
{
resp1: putResponse(3),
resp2: partialResponse(0),
expectMatch: true,
},
{
resp1: deleteResponse(1, 5),
resp2: deleteResponse(1, 5),
Expand Down Expand Up @@ -446,13 +451,18 @@ func TestModelResponseMatch(t *testing.T) {
},
{
resp1: deleteResponse(0, 5),
resp2: partialResponse(0),
resp2: partialResponse(4),
expectMatch: false,
},
{
resp1: deleteResponse(0, 5),
resp2: partialResponse(0),
expectMatch: true,
},
{
resp1: deleteResponse(1, 5),
resp2: partialResponse(0),
expectMatch: false,
expectMatch: true,
},
{
resp1: deleteResponse(0, 5),
Expand Down Expand Up @@ -491,12 +501,72 @@ func TestModelResponseMatch(t *testing.T) {
},
{
resp1: compareRevisionAndPutResponse(true, 7),
resp2: partialResponse(0),
resp2: partialResponse(4),
expectMatch: false,
},
{
resp1: compareRevisionAndPutResponse(false, 7),
resp2: partialResponse(3),
expectMatch: false,
},
{
resp1: compareRevisionAndPutResponse(false, 7),
resp2: partialResponse(0),
expectMatch: true,
},
{
resp1: MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: 1, Txn: &TxnResponse{Failure: false, Results: []EtcdOperationResult{{Deleted: 1}}}}},
resp2: failedResponse(errors.New("failed request")),
expectMatch: false,
},
{
resp1: failedResponse(errors.New("failed request 1")),
resp2: failedResponse(errors.New("failed request 2")),
expectMatch: false,
},
{
resp1: failedResponse(errors.New("failed request")),
resp2: failedResponse(errors.New("failed request")),
expectMatch: true,
},
{
resp1: putResponse(2),
resp2: MaybeEtcdResponse{Persisted: true},
expectMatch: true,
},
{
resp1: putResponse(2),
resp2: MaybeEtcdResponse{Persisted: true, PersistedRevision: 2},
expectMatch: true,
},
{
resp1: putResponse(2),
resp2: MaybeEtcdResponse{Persisted: true, PersistedRevision: 3},
expectMatch: false,
},
{
resp1: failedResponse(errors.New("failed request")),
resp2: MaybeEtcdResponse{Persisted: true},
expectMatch: true,
},
{
resp1: failedResponse(errors.New("failed request")),
resp2: MaybeEtcdResponse{Persisted: true, PersistedRevision: 2},
expectMatch: true,
},
{
resp1: MaybeEtcdResponse{Persisted: true},
resp2: MaybeEtcdResponse{Persisted: true, PersistedRevision: 2},
expectMatch: true,
},
{
resp1: MaybeEtcdResponse{Persisted: true, PersistedRevision: 2},
resp2: MaybeEtcdResponse{Persisted: true, PersistedRevision: 2},
expectMatch: true,
},
{
resp1: MaybeEtcdResponse{Persisted: true, PersistedRevision: 1},
resp2: MaybeEtcdResponse{Persisted: true, PersistedRevision: 2},
expectMatch: false,
},
}
Expand Down
2 changes: 1 addition & 1 deletion tests/robustness/validate/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func filterSerializableOperations(clients []report.ClientReport) []porcupine.Ope
}

func validateSerializableRead(lg *zap.Logger, replay *model.EtcdReplay, request model.EtcdRequest, response model.MaybeEtcdResponse) error {
if response.PartialResponse || response.Error != "" {
if response.Persisted || response.Error != "" {
return nil
}
state, err := replay.StateForRevision(request.Range.Revision)
Expand Down
25 changes: 15 additions & 10 deletions tests/robustness/validate/patch_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func patchOperations(operations []porcupine.Operation, watchEvents map[model.Eve
newOperations = append(newOperations, op)
continue
}
var resourceVersion int64
if op.Call <= lastObservedOperation.Call {
matchingEvent := matchWatchEvent(request.Txn, watchEvents)
if matchingEvent != nil {
Expand All @@ -84,7 +85,7 @@ func patchOperations(operations []porcupine.Operation, watchEvents map[model.Eve
if eventTime < op.Return {
op.Return = eventTime
}
op.Output = model.MaybeEtcdResponse{PartialResponse: true, EtcdResponse: model.EtcdResponse{Revision: matchingEvent.Revision}}
resourceVersion = matchingEvent.Revision
}
}
persistedReturnTime := matchReturnTime(request, persistedOperations)
Expand All @@ -94,9 +95,17 @@ func patchOperations(operations []porcupine.Operation, watchEvents map[model.Eve
op.Return = *persistedReturnTime
}
}
if persistedReturnTime == nil && canBeDiscarded(request.Txn) {
// Remove non persisted operations
continue
if isUniqueTxn(request.Txn) {
if persistedReturnTime == nil {
// Remove non persisted operations
continue
} else {
if resourceVersion != 0 {
op.Output = model.MaybeEtcdResponse{Persisted: true, PersistedRevision: resourceVersion}
} else {
op.Output = model.MaybeEtcdResponse{Persisted: true}
}
}
}
// Leave operation as it is as we cannot discard it.
newOperations = append(newOperations, op)
Expand Down Expand Up @@ -137,12 +146,8 @@ func matchWatchEvent(request *model.TxnRequest, watchEvents map[model.Event]clie
return nil
}

func canBeDiscarded(request *model.TxnRequest) bool {
return operationsCanBeDiscarded(request.OperationsOnSuccess) && operationsCanBeDiscarded(request.OperationsOnFailure)
}

func operationsCanBeDiscarded(ops []model.EtcdOperation) bool {
return hasUniqueWriteOperation(ops) || !hasWriteOperation(ops)
func isUniqueTxn(request *model.TxnRequest) bool {
return (hasUniqueWriteOperation(request.OperationsOnSuccess) || !hasWriteOperation(request.OperationsOnSuccess)) && (hasUniqueWriteOperation(request.OperationsOnFailure) || !hasWriteOperation(request.OperationsOnFailure))
}

func hasWriteOperation(ops []model.EtcdOperation) bool {
Expand Down
14 changes: 7 additions & 7 deletions tests/robustness/validate/patch_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestPatchHistory(t *testing.T) {
putRequest("key", "value"),
},
expectedRemainingOperations: []porcupine.Operation{
{Return: 1000000000, Output: model.MaybeEtcdResponse{Error: "failed"}},
{Return: 1000000000, Output: model.MaybeEtcdResponse{Persisted: true}},
},
},
{
Expand All @@ -81,7 +81,7 @@ func TestPatchHistory(t *testing.T) {
putRequest("key2", "value"),
},
expectedRemainingOperations: []porcupine.Operation{
{Return: 3, Output: model.MaybeEtcdResponse{Error: "failed"}},
{Return: 3, Output: model.MaybeEtcdResponse{Persisted: true}},
{Return: 4, Output: putResponse(model.EtcdOperationResult{})},
},
},
Expand All @@ -95,7 +95,7 @@ func TestPatchHistory(t *testing.T) {
},
watchOperations: watchPutEvent("key", "value", 2, 3),
expectedRemainingOperations: []porcupine.Operation{
{Return: 3, Output: model.MaybeEtcdResponse{PartialResponse: true, EtcdResponse: model.EtcdResponse{Revision: 2}}},
{Return: 3, Output: model.MaybeEtcdResponse{Persisted: true, PersistedRevision: 2}},
},
},
{
Expand Down Expand Up @@ -133,7 +133,7 @@ func TestPatchHistory(t *testing.T) {
putRequestWithLease("key", "value", 123),
},
expectedRemainingOperations: []porcupine.Operation{
{Return: 1000000000, Output: model.MaybeEtcdResponse{Error: "failed"}},
{Return: 1000000000, Output: model.MaybeEtcdResponse{Persisted: true}},
},
},
{
Expand All @@ -147,7 +147,7 @@ func TestPatchHistory(t *testing.T) {
putRequestWithLease("key2", "value", 234),
},
expectedRemainingOperations: []porcupine.Operation{
{Return: 3, Output: model.MaybeEtcdResponse{Error: "failed"}},
{Return: 3, Output: model.MaybeEtcdResponse{Persisted: true}},
{Return: 4, Output: putResponse(model.EtcdOperationResult{})},
},
},
Expand Down Expand Up @@ -212,7 +212,7 @@ func TestPatchHistory(t *testing.T) {
putRequest("key", "value"),
},
expectedRemainingOperations: []porcupine.Operation{
{Return: 1000000000, Output: model.MaybeEtcdResponse{Error: "failed"}},
{Return: 1000000000, Output: model.MaybeEtcdResponse{Persisted: true}},
},
},
{
Expand Down Expand Up @@ -267,7 +267,7 @@ func TestPatchHistory(t *testing.T) {
putRequest("key", "value"),
},
expectedRemainingOperations: []porcupine.Operation{
{Return: 1000000000, Output: model.MaybeEtcdResponse{Error: "failed"}},
{Return: 1000000000, Output: model.MaybeEtcdResponse{Persisted: true}},
},
},
{
Expand Down

0 comments on commit 917ded9

Please sign in to comment.