From c282a685a2c7dbcf93990a00b5a63fa1fa9d9d8e Mon Sep 17 00:00:00 2001 From: Richard Gooch Date: Thu, 27 Dec 2018 09:07:22 -0800 Subject: [PATCH 1/2] Fix imageserver startup to single-thread fetching missing objects. --- imageserver/scanner/api.go | 1 + imageserver/scanner/load.go | 41 ++++++++++++++++++++++--------------- 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/imageserver/scanner/api.go b/imageserver/scanner/api.go index 94187cfb..21d723f2 100644 --- a/imageserver/scanner/api.go +++ b/imageserver/scanner/api.go @@ -42,6 +42,7 @@ type ImageDataBase struct { deduperLock sync.Mutex deduper *stringutil.StringDeduplicator pendingImageLock sync.Mutex + objectFetchLock sync.Mutex // Unprotected by any lock. objectServer objectserver.FullObjectServer replicationMaster string diff --git a/imageserver/scanner/load.go b/imageserver/scanner/load.go index 8a54b3dd..d83e2882 100644 --- a/imageserver/scanner/load.go +++ b/imageserver/scanner/load.go @@ -162,8 +162,8 @@ func (imdb *ImageDataBase) loadFile(filename string, defer file.Close() reader := fsutil.NewChecksumReader(file) decoder := gob.NewDecoder(reader) - var image image.Image - if err := decoder.Decode(&image); err != nil { + var img image.Image + if err := decoder.Decode(&img); err != nil { return err } if err := reader.VerifyChecksum(); err != nil { @@ -175,38 +175,45 @@ func (imdb *ImageDataBase) loadFile(filename string, return err } } - if imageIsExpired(&image) { + if imageIsExpired(&img) { imdb.logger.Printf("Deleting already expired image: %s\n", filename) return os.Remove(pathname) } - if err := image.VerifyObjects(imdb.objectServer); err != nil { + if err := img.VerifyObjects(imdb.objectServer); err != nil { if imdb.replicationMaster == "" || !strings.Contains(err.Error(), "not available") { return fmt.Errorf("error verifying: %s: %s", filename, err) } - client, err := srpc.DialHTTP("tcp", imdb.replicationMaster, time.Minute) - if err != nil { - return err - } - defer client.Close() - objClient := objectclient.AttachObjectClient(client) - defer objClient.Close() - err = image.GetMissingObjects(imdb.objectServer, objClient, + err = imdb.fetchMissingObjects(&img, prefixlogger.New(filename+": ", logger)) if err != nil { return err } } - image.FileSystem.RebuildInodePointers() + img.FileSystem.RebuildInodePointers() imdb.deduperLock.Lock() - image.ReplaceStrings(imdb.deduper.DeDuplicate) + img.ReplaceStrings(imdb.deduper.DeDuplicate) imdb.deduperLock.Unlock() - if err := image.Verify(); err != nil { + if err := img.Verify(); err != nil { return err } - imdb.scheduleExpiration(&image, filename) + imdb.scheduleExpiration(&img, filename) imdb.Lock() defer imdb.Unlock() - imdb.imageMap[filename] = &image + imdb.imageMap[filename] = &img return nil } + +func (imdb *ImageDataBase) fetchMissingObjects(img *image.Image, + logger log.DebugLogger) error { + imdb.objectFetchLock.Lock() + defer imdb.objectFetchLock.Unlock() + client, err := srpc.DialHTTP("tcp", imdb.replicationMaster, time.Minute) + if err != nil { + return err + } + defer client.Close() + objClient := objectclient.AttachObjectClient(client) + defer objClient.Close() + return img.GetMissingObjects(imdb.objectServer, objClient, logger) +} From b61806ec5ff79b27b3e052fefb093989ddcaf222 Mon Sep 17 00:00:00 2001 From: Richard Gooch Date: Fri, 28 Dec 2018 08:31:42 -0800 Subject: [PATCH 2/2] Fix imageserver to block replication to clients until updated from master. --- imageserver/rpcd/api.go | 8 ++++++-- imageserver/rpcd/getImageUpdates.go | 10 ++++++++++ imageserver/rpcd/replicator.go | 11 ++++++++--- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/imageserver/rpcd/api.go b/imageserver/rpcd/api.go index e13010f7..1ed82de9 100644 --- a/imageserver/rpcd/api.go +++ b/imageserver/rpcd/api.go @@ -21,6 +21,7 @@ var ( type srpcType struct { imageDataBase *scanner.ImageDataBase + finishedReplication <-chan struct{} // Closed when finished. replicationMaster string imageserverResource *srpc.ClientResource objSrv objectserver.FullObjectServer @@ -47,8 +48,10 @@ func Setup(imdb *scanner.ImageDataBase, replicationMaster string, if *archiveMode && replicationMaster == "" { return nil, errors.New("replication master required in archive mode") } + finishedReplication := make(chan struct{}) srpcObj := &srpcType{ imageDataBase: imdb, + finishedReplication: finishedReplication, replicationMaster: replicationMaster, imageserverResource: srpc.NewClientResource("tcp", replicationMaster), objSrv: objSrv, @@ -66,8 +69,9 @@ func Setup(imdb *scanner.ImageDataBase, replicationMaster string, "ListImages", }}) if replicationMaster != "" { - go srpcObj.replicator() + go srpcObj.replicator(finishedReplication) + } else { + close(finishedReplication) } - return (*htmlWriter)(srpcObj), nil } diff --git a/imageserver/rpcd/getImageUpdates.go b/imageserver/rpcd/getImageUpdates.go index 9e7970d7..3e172c34 100644 --- a/imageserver/rpcd/getImageUpdates.go +++ b/imageserver/rpcd/getImageUpdates.go @@ -11,6 +11,16 @@ func (t *srpcType) GetImageUpdates(conn *srpc.Conn, decoder srpc.Decoder, defer conn.Flush() t.logger.Printf("New image replication client connected from: %s\n", conn.RemoteAddr()) + select { + case <-t.finishedReplication: + default: + t.logger.Println( + "Blocking replication client until I've finished replicating") + <-t.finishedReplication + t.logger.Printf( + "Replication finished, unblocking replication client: %s\n", + conn.RemoteAddr()) + } t.incrementNumReplicationClients(true) defer t.incrementNumReplicationClients(false) addChannel := t.imageDataBase.RegisterAddNotifier() diff --git a/imageserver/rpcd/replicator.go b/imageserver/rpcd/replicator.go index 7294a87e..7e267e69 100644 --- a/imageserver/rpcd/replicator.go +++ b/imageserver/rpcd/replicator.go @@ -17,7 +17,7 @@ import ( "github.com/Symantec/Dominator/proto/imageserver" ) -func (t *srpcType) replicator() { +func (t *srpcType) replicator(finishedReplication chan<- struct{}) { initialTimeout := time.Second * 15 timeout := initialTimeout var nextSleepStopTime time.Time @@ -31,7 +31,7 @@ func (t *srpcType) replicator() { "ImageServer.GetImageUpdates"); err != nil { t.logger.Println(err) } else { - if err := t.getUpdates(conn); err != nil { + if err := t.getUpdates(conn, &finishedReplication); err != nil { if err == io.EOF { t.logger.Println( "Connection to image replicator closed") @@ -53,7 +53,8 @@ func (t *srpcType) replicator() { } } -func (t *srpcType) getUpdates(conn *srpc.Conn) error { +func (t *srpcType) getUpdates(conn *srpc.Conn, + finishedReplication *chan<- struct{}) error { t.logger.Printf("Image replicator: connected to: %s\n", t.replicationMaster) replicationStartTime := time.Now() decoder := gob.NewDecoder(conn) @@ -76,6 +77,10 @@ func (t *srpcType) getUpdates(conn *srpc.Conn) error { t.deleteMissingImages(initialImages) initialImages = nil } + if *finishedReplication != nil { + close(*finishedReplication) + *finishedReplication = nil + } t.logger.Printf("Replicated all current images in %s\n", format.Duration(time.Since(replicationStartTime))) continue