Skip to content

Commit

Permalink
Merge pull request #545 from rgooch/master
Browse files Browse the repository at this point in the history
Imageserver fixes: single-thread object fetching and block replication clients during startup.
  • Loading branch information
rgooch committed Dec 28, 2018
2 parents 276a623 + b61806e commit edadd94
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 22 deletions.
8 changes: 6 additions & 2 deletions imageserver/rpcd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var (

type srpcType struct {
imageDataBase *scanner.ImageDataBase
finishedReplication <-chan struct{} // Closed when finished.
replicationMaster string
imageserverResource *srpc.ClientResource
objSrv objectserver.FullObjectServer
Expand All @@ -47,8 +48,10 @@ func Setup(imdb *scanner.ImageDataBase, replicationMaster string,
if *archiveMode && replicationMaster == "" {
return nil, errors.New("replication master required in archive mode")
}
finishedReplication := make(chan struct{})
srpcObj := &srpcType{
imageDataBase: imdb,
finishedReplication: finishedReplication,
replicationMaster: replicationMaster,
imageserverResource: srpc.NewClientResource("tcp", replicationMaster),
objSrv: objSrv,
Expand All @@ -66,8 +69,9 @@ func Setup(imdb *scanner.ImageDataBase, replicationMaster string,
"ListImages",
}})
if replicationMaster != "" {
go srpcObj.replicator()
go srpcObj.replicator(finishedReplication)
} else {
close(finishedReplication)
}

return (*htmlWriter)(srpcObj), nil
}
10 changes: 10 additions & 0 deletions imageserver/rpcd/getImageUpdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ func (t *srpcType) GetImageUpdates(conn *srpc.Conn, decoder srpc.Decoder,
defer conn.Flush()
t.logger.Printf("New image replication client connected from: %s\n",
conn.RemoteAddr())
select {
case <-t.finishedReplication:
default:
t.logger.Println(
"Blocking replication client until I've finished replicating")
<-t.finishedReplication
t.logger.Printf(
"Replication finished, unblocking replication client: %s\n",
conn.RemoteAddr())
}
t.incrementNumReplicationClients(true)
defer t.incrementNumReplicationClients(false)
addChannel := t.imageDataBase.RegisterAddNotifier()
Expand Down
11 changes: 8 additions & 3 deletions imageserver/rpcd/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/Symantec/Dominator/proto/imageserver"
)

func (t *srpcType) replicator() {
func (t *srpcType) replicator(finishedReplication chan<- struct{}) {
initialTimeout := time.Second * 15
timeout := initialTimeout
var nextSleepStopTime time.Time
Expand All @@ -31,7 +31,7 @@ func (t *srpcType) replicator() {
"ImageServer.GetImageUpdates"); err != nil {
t.logger.Println(err)
} else {
if err := t.getUpdates(conn); err != nil {
if err := t.getUpdates(conn, &finishedReplication); err != nil {
if err == io.EOF {
t.logger.Println(
"Connection to image replicator closed")
Expand All @@ -53,7 +53,8 @@ func (t *srpcType) replicator() {
}
}

func (t *srpcType) getUpdates(conn *srpc.Conn) error {
func (t *srpcType) getUpdates(conn *srpc.Conn,
finishedReplication *chan<- struct{}) error {
t.logger.Printf("Image replicator: connected to: %s\n", t.replicationMaster)
replicationStartTime := time.Now()
decoder := gob.NewDecoder(conn)
Expand All @@ -76,6 +77,10 @@ func (t *srpcType) getUpdates(conn *srpc.Conn) error {
t.deleteMissingImages(initialImages)
initialImages = nil
}
if *finishedReplication != nil {
close(*finishedReplication)
*finishedReplication = nil
}
t.logger.Printf("Replicated all current images in %s\n",
format.Duration(time.Since(replicationStartTime)))
continue
Expand Down
1 change: 1 addition & 0 deletions imageserver/scanner/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ImageDataBase struct {
deduperLock sync.Mutex
deduper *stringutil.StringDeduplicator
pendingImageLock sync.Mutex
objectFetchLock sync.Mutex
// Unprotected by any lock.
objectServer objectserver.FullObjectServer
replicationMaster string
Expand Down
41 changes: 24 additions & 17 deletions imageserver/scanner/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ func (imdb *ImageDataBase) loadFile(filename string,
defer file.Close()
reader := fsutil.NewChecksumReader(file)
decoder := gob.NewDecoder(reader)
var image image.Image
if err := decoder.Decode(&image); err != nil {
var img image.Image
if err := decoder.Decode(&img); err != nil {
return err
}
if err := reader.VerifyChecksum(); err != nil {
Expand All @@ -175,38 +175,45 @@ func (imdb *ImageDataBase) loadFile(filename string,
return err
}
}
if imageIsExpired(&image) {
if imageIsExpired(&img) {
imdb.logger.Printf("Deleting already expired image: %s\n", filename)
return os.Remove(pathname)
}
if err := image.VerifyObjects(imdb.objectServer); err != nil {
if err := img.VerifyObjects(imdb.objectServer); err != nil {
if imdb.replicationMaster == "" ||
!strings.Contains(err.Error(), "not available") {
return fmt.Errorf("error verifying: %s: %s", filename, err)
}
client, err := srpc.DialHTTP("tcp", imdb.replicationMaster, time.Minute)
if err != nil {
return err
}
defer client.Close()
objClient := objectclient.AttachObjectClient(client)
defer objClient.Close()
err = image.GetMissingObjects(imdb.objectServer, objClient,
err = imdb.fetchMissingObjects(&img,
prefixlogger.New(filename+": ", logger))
if err != nil {
return err
}
}
image.FileSystem.RebuildInodePointers()
img.FileSystem.RebuildInodePointers()
imdb.deduperLock.Lock()
image.ReplaceStrings(imdb.deduper.DeDuplicate)
img.ReplaceStrings(imdb.deduper.DeDuplicate)
imdb.deduperLock.Unlock()
if err := image.Verify(); err != nil {
if err := img.Verify(); err != nil {
return err
}
imdb.scheduleExpiration(&image, filename)
imdb.scheduleExpiration(&img, filename)
imdb.Lock()
defer imdb.Unlock()
imdb.imageMap[filename] = &image
imdb.imageMap[filename] = &img
return nil
}

func (imdb *ImageDataBase) fetchMissingObjects(img *image.Image,
logger log.DebugLogger) error {
imdb.objectFetchLock.Lock()
defer imdb.objectFetchLock.Unlock()
client, err := srpc.DialHTTP("tcp", imdb.replicationMaster, time.Minute)
if err != nil {
return err
}
defer client.Close()
objClient := objectclient.AttachObjectClient(client)
defer objClient.Close()
return img.GetMissingObjects(imdb.objectServer, objClient, logger)
}

0 comments on commit edadd94

Please sign in to comment.