Skip to content

Commit

Permalink
Fix imageserver to block replication to clients until updated from ma…
Browse files Browse the repository at this point in the history
…ster.
  • Loading branch information
rgooch committed Dec 28, 2018
1 parent c282a68 commit b61806e
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 5 deletions.
8 changes: 6 additions & 2 deletions imageserver/rpcd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var (

type srpcType struct {
imageDataBase *scanner.ImageDataBase
finishedReplication <-chan struct{} // Closed when finished.
replicationMaster string
imageserverResource *srpc.ClientResource
objSrv objectserver.FullObjectServer
Expand All @@ -47,8 +48,10 @@ func Setup(imdb *scanner.ImageDataBase, replicationMaster string,
if *archiveMode && replicationMaster == "" {
return nil, errors.New("replication master required in archive mode")
}
finishedReplication := make(chan struct{})
srpcObj := &srpcType{
imageDataBase: imdb,
finishedReplication: finishedReplication,
replicationMaster: replicationMaster,
imageserverResource: srpc.NewClientResource("tcp", replicationMaster),
objSrv: objSrv,
Expand All @@ -66,8 +69,9 @@ func Setup(imdb *scanner.ImageDataBase, replicationMaster string,
"ListImages",
}})
if replicationMaster != "" {
go srpcObj.replicator()
go srpcObj.replicator(finishedReplication)
} else {
close(finishedReplication)
}

return (*htmlWriter)(srpcObj), nil
}
10 changes: 10 additions & 0 deletions imageserver/rpcd/getImageUpdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ func (t *srpcType) GetImageUpdates(conn *srpc.Conn, decoder srpc.Decoder,
defer conn.Flush()
t.logger.Printf("New image replication client connected from: %s\n",
conn.RemoteAddr())
select {
case <-t.finishedReplication:
default:
t.logger.Println(
"Blocking replication client until I've finished replicating")
<-t.finishedReplication
t.logger.Printf(
"Replication finished, unblocking replication client: %s\n",
conn.RemoteAddr())
}
t.incrementNumReplicationClients(true)
defer t.incrementNumReplicationClients(false)
addChannel := t.imageDataBase.RegisterAddNotifier()
Expand Down
11 changes: 8 additions & 3 deletions imageserver/rpcd/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/Symantec/Dominator/proto/imageserver"
)

func (t *srpcType) replicator() {
func (t *srpcType) replicator(finishedReplication chan<- struct{}) {
initialTimeout := time.Second * 15
timeout := initialTimeout
var nextSleepStopTime time.Time
Expand All @@ -31,7 +31,7 @@ func (t *srpcType) replicator() {
"ImageServer.GetImageUpdates"); err != nil {
t.logger.Println(err)
} else {
if err := t.getUpdates(conn); err != nil {
if err := t.getUpdates(conn, &finishedReplication); err != nil {
if err == io.EOF {
t.logger.Println(
"Connection to image replicator closed")
Expand All @@ -53,7 +53,8 @@ func (t *srpcType) replicator() {
}
}

func (t *srpcType) getUpdates(conn *srpc.Conn) error {
func (t *srpcType) getUpdates(conn *srpc.Conn,
finishedReplication *chan<- struct{}) error {
t.logger.Printf("Image replicator: connected to: %s\n", t.replicationMaster)
replicationStartTime := time.Now()
decoder := gob.NewDecoder(conn)
Expand All @@ -76,6 +77,10 @@ func (t *srpcType) getUpdates(conn *srpc.Conn) error {
t.deleteMissingImages(initialImages)
initialImages = nil
}
if *finishedReplication != nil {
close(*finishedReplication)
*finishedReplication = nil
}
t.logger.Printf("Replicated all current images in %s\n",
format.Duration(time.Since(replicationStartTime)))
continue
Expand Down

0 comments on commit b61806e

Please sign in to comment.