Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clientv3 watch retry may watch a compacted revision. #8668

Closed
penlyzhang opened this issue Oct 9, 2017 · 7 comments
Closed

clientv3 watch retry may watch a compacted revision. #8668

penlyzhang opened this issue Oct 9, 2017 · 7 comments

Comments

@penlyzhang
Copy link

func (w *watchGrpcStream) run() {
  ......
// watch client failed on Recv; spawn another if possible
491         case err := <-w.errc:
492             if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
493                 closeErr = err
494                 return
495             }
496             if wc, closeErr = w.newWatchClient(); closeErr != nil {
497                 return
498             }
....
}

if watch client receive some error response, all watcherStreams should begin resuming. Compaction operation of server may lead to resume failed, becase maybe watchStream.watchRequest.rev is less than compacted revision.

@xiang90
Copy link
Contributor

xiang90 commented Oct 9, 2017

compaction error should not be auto resumed, and it cannot in theory.

if you believe it is not the case, can you write a test to demonstrate that?

@penlyzhang
Copy link
Author

@xiang90

add print info to server code:

func (w *watchGrpcStream) run() {   
.......
case err := <-w.errc:                                                  
  if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader { 
    closeErr = err                                                     
    return                                                             
  }                                                                    
  if wc, closeErr = w.newWatchClient(); closeErr != nil {              
    return                                                             
  }                                                                    
  if ws := w.nextResume(); ws != nil { 
   ///////////////////////// add ////////////////////////////////////
    fmt.Printf("====== please exec compact before wc.Send =====\n")    
    time.Sleep(5 * time.Second)                            
    ////////////////////////// end ///////////////////////////////////	
    wc.Send(ws.initReq.toPB())                                         
  }         
  .......                                                           
}

func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) { 
       ........
	select {                                                                    
	  case outc <- *curWr:                                                        
	  if ws.buf[0].Err() != nil {      
		///////////////////////// add ////////////////////////////////////
		fmt.Printf("receive a error response,error: %s\n", ws.buf[0].Err())  
		///////////////////////// end ////////////////////////////////////
		return                                                                  
	}                       
   ......                                                                                                         
}

test case: test.go

var (
  dialTimeout    = 5 * time.Second
  requestTimeout = 3 * time.Second
  endpoints      = []string{"127.0.0.1:2379", "127.0.0.1:2381", "127.0.0.1:2383"}
)

func PrintEvent(respChan clientv3.WatchChan) {
  for {
    ch, ok := <-respChan
    if !ok {
      fmt.Printf("watch chan closed\n")
      return
    }
    for _, event := range ch.Events {
      fmt.Printf("event, key[%s], value[%s]\n", event.Kv.Key, event.Kv.Value)
    }
  }
}

func main() {
  cli, err := clientv3.New(clientv3.Config{
    Endpoints:            endpoints,
    DialTimeout:          dialTimeout,
    DialKeepAliveTime:    2 * time.Second,
    DialKeepAliveTimeout: 2 * time.Second,
  })
  if err != nil {
    log.Fatal(err)
  }

  var opts []clientv3.OpOption
  respChan := cli.Watch(context.TODO(), "a", opts...)
  go PrintEvent(respChan)

  _, err = cli.Put(context.TODO(), "a", "1")
  if err != nil {
    log.Fatal(err)
  }
  // close_port.sh:
  // sudo iptables -AINPUT -p tcp --dport 2379 -j DROP
  // sudo iptables -AINPUT -p tcp --dport 2381 -j DROP
  cmd := exec.Command("sh", "close_port.sh")
  _, err = cmd.Output()
  if err != nil {
    log.Fatal(err)
  }
}

Test Procedure:

  • go run test.go
  • when run to fmt.Printf("====== please exec compact before wc.Send =====\n") , put some key and exec compact:
./etcdctl --endpoints=127.0.0.1:2383 put a 3
./etcdctl --endpoints=127.0.0.1:2383 put a 4
./etcdctl --endpoints=127.0.0.1:2383 compact 4

result:

event, key[a], value[1]
====== please exec compact before wc.Send =====
receive a error response,error: etcdserver: mvcc: required revision has been compacted
watch chan closed

@xiang90
Copy link
Contributor

xiang90 commented Oct 10, 2017

this exactly what should happen. watcher must not silently miss an event.

what you did is to partition the watcher, put some new keys, compact at head. and when you recovered the watcher, the watcher has no way to transparently catch up from where it was. so the error is returned.

this is expected. reopen if i missed anything.

@xiang90 xiang90 closed this as completed Oct 10, 2017
@xiaoyulei
Copy link
Contributor

@xiang90 If compact happen when re-watch, how to avoid watch fail? After compact, next watch revision is behind compact revision, watch must fail.

@xiang90
Copy link
Contributor

xiang90 commented Oct 11, 2017

@YuleiXiao the watcher is canceled. you need to start from head or any non-compacted version.

@xiaoyulei
Copy link
Contributor

@xiang90 If watch from head, we missed event from last watch revision to head. Application only can get latest revision.

@xiang90
Copy link
Contributor

xiang90 commented Oct 11, 2017

@YuleiXiao

Yea... If you do not want to miss anything do not compact so fast... When you delete a file, you wont be able to find it anymore. Compaction works just like that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

3 participants