Skip to content

Commit

Permalink
maintner: refactor gcslog and API into separate packages
Browse files Browse the repository at this point in the history
Fixes golang/go#24786.

Change-Id: I008810a0394c75e7c790165308ff9ef872c77fdc
Reviewed-on: https://go-review.googlesource.com/105935
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
  • Loading branch information
broady committed Jun 5, 2018
1 parent 01f5214 commit 3b34821
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Implementation of maintner.MutationSource and Logger for Google Cloud Storage.

package main
// Package gcslog is an implementation of maintner.MutationSource and Logger for Google Cloud Storage.
package gcslog

import (
"bytes"
Expand Down Expand Up @@ -39,7 +38,12 @@ const targetObjectSize = 16 << 20

const flushInterval = 10 * time.Minute

type gcsLog struct {
// GCSLog implements MutationLogger and MutationSource.
var _ maintner.MutationLogger = &GCSLog{}
var _ maintner.MutationSource = &GCSLog{}

// GCSLog logs mutations to GCS.
type GCSLog struct {
sc *storage.Client
bucketName string
bucket *storage.BucketHandle
Expand Down Expand Up @@ -70,15 +74,16 @@ func (s gcsLogSegment) String() string {

// newGCSLogBase returns a new gcsLog instance without any association
// with Google Cloud Storage.
func newGCSLogBase() *gcsLog {
gl := &gcsLog{
func newGCSLogBase() *GCSLog {
gl := &GCSLog{
seg: map[int]gcsLogSegment{},
}
gl.cond = sync.NewCond(&gl.mu)
return gl
}

func newGCSLog(ctx context.Context, bucketName string) (*gcsLog, error) {
// NewGCSLog creates a GCSLog that logs mutations to a given GCS bucket.
func NewGCSLog(ctx context.Context, bucketName string) (*GCSLog, error) {
sc, err := storage.NewClient(ctx)
if err != nil {
return nil, fmt.Errorf("storage.NewClient: %v", err)
Expand All @@ -95,7 +100,7 @@ func newGCSLog(ctx context.Context, bucketName string) (*gcsLog, error) {

var objnameRx = regexp.MustCompile(`^(\d{4})\.([0-9a-f]{56})\.mutlog$`)

func (gl *gcsLog) initLoad(ctx context.Context) error {
func (gl *GCSLog) initLoad(ctx context.Context) error {
it := gl.bucket.Objects(ctx, nil)
maxNum := 0
for {
Expand Down Expand Up @@ -162,7 +167,7 @@ func (gl *gcsLog) initLoad(ctx context.Context) error {
return nil
}

func (gl *gcsLog) serveLogFile(w http.ResponseWriter, r *http.Request) {
func (gl *GCSLog) serveLogFile(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" && r.Method != "HEAD" {
http.Error(w, "bad method", http.StatusBadRequest)
return
Expand Down Expand Up @@ -193,7 +198,7 @@ func (gl *gcsLog) serveLogFile(w http.ResponseWriter, r *http.Request) {
http.ServeContent(w, r, "", time.Time{}, strings.NewReader(content))
}

func (gl *gcsLog) serveJSONLogsIndex(w http.ResponseWriter, r *http.Request) {
func (gl *GCSLog) serveJSONLogsIndex(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" && r.Method != "HEAD" {
http.Error(w, "bad method", http.StatusBadRequest)
return
Expand Down Expand Up @@ -244,9 +249,9 @@ func sumSegmentSizes(segs []maintner.LogSegmentJSON) (sum int64) {
return sum
}

// waitSizeNot blocks until the sum of gcsLog is not v, or the context expires.
// waitSizeNot blocks until the sum of GCSLog is not v, or the context expires.
// It reports whether the size changed.
func (gl *gcsLog) waitSizeNot(ctx context.Context, v int64) (changed bool) {
func (gl *GCSLog) waitSizeNot(ctx context.Context, v int64) (changed bool) {
returned := make(chan struct{})
defer close(returned)
go gl.waitSizeNotAwaitContextOrChange(ctx, returned)
Expand All @@ -270,7 +275,7 @@ func (gl *gcsLog) waitSizeNot(ctx context.Context, v int64) (changed bool) {
// It's a goroutine that selects on two channels and calls
// sync.Cond.Broadcast to wake up the waitSizeNot waiter if the
// context expires.
func (gl *gcsLog) waitSizeNotAwaitContextOrChange(ctx context.Context, returned <-chan struct{}) {
func (gl *GCSLog) waitSizeNotAwaitContextOrChange(ctx context.Context, returned <-chan struct{}) {
select {
case <-ctx.Done():
gl.cond.Broadcast()
Expand All @@ -279,7 +284,7 @@ func (gl *gcsLog) waitSizeNotAwaitContextOrChange(ctx context.Context, returned
}
}

func (gl *gcsLog) sumSizeLocked() int64 {
func (gl *GCSLog) sumSizeLocked() int64 {
var sum int64
for n, seg := range gl.seg {
if n != gl.curNum {
Expand All @@ -290,7 +295,7 @@ func (gl *gcsLog) sumSizeLocked() int64 {
return sum
}

func (gl *gcsLog) getJSONLogs(startSeg int) (segs []maintner.LogSegmentJSON) {
func (gl *GCSLog) getJSONLogs(startSeg int) (segs []maintner.LogSegmentJSON) {
gl.mu.Lock()
defer gl.mu.Unlock()
if startSeg > gl.curNum || startSeg < 0 {
Expand All @@ -317,9 +322,9 @@ func (gl *gcsLog) getJSONLogs(startSeg int) (segs []maintner.LogSegmentJSON) {
return
}

// gcsLogWriter is the io.Writer used to write to gcsLog.logBuf. It
// gcsLogWriter is the io.Writer used to write to GCSLog.logBuf. It
// keeps the sha224 in sync. Caller must hold gl.mu.
type gcsLogWriter struct{ gl *gcsLog }
type gcsLogWriter struct{ gl *GCSLog }

func (w gcsLogWriter) Write(p []byte) (n int, err error) {
gl := w.gl
Expand All @@ -337,7 +342,8 @@ func (w gcsLogWriter) Write(p []byte) (n int, err error) {
return len(p), nil
}

func (gl *gcsLog) Log(m *maintpb.Mutation) error {
// Log writes m to GCS after the buffer is full or after a periodic flush.
func (gl *GCSLog) Log(m *maintpb.Mutation) error {
data, err := proto.Marshal(m)
if err != nil {
return err
Expand Down Expand Up @@ -372,14 +378,14 @@ func (gl *gcsLog) Log(m *maintpb.Mutation) error {
return nil
}

func (gl *gcsLog) onFlushTimer() {
func (gl *GCSLog) onFlushTimer() {
log.Printf("flush timer fired.")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
gl.flush(ctx)
}

func (gl *gcsLog) flush(ctx context.Context) error {
func (gl *GCSLog) flush(ctx context.Context) error {
gl.mu.Lock()

defer gl.mu.Unlock()
Expand All @@ -397,7 +403,7 @@ func (gl *gcsLog) flush(ctx context.Context) error {
return nil
}

func (gl *gcsLog) flushLocked(ctx context.Context) error {
func (gl *GCSLog) flushLocked(ctx context.Context) error {
buf := gl.logBuf.Bytes()
if len(buf) == 0 {
return nil
Expand Down Expand Up @@ -443,7 +449,7 @@ func (gl *gcsLog) flushLocked(ctx context.Context) error {
return nil
}

func (gl *gcsLog) deleteOldSegment(ctx context.Context, objName string) {
func (gl *GCSLog) deleteOldSegment(ctx context.Context, objName string) {
err := gl.bucket.Object(objName).Delete(ctx)
if err != nil {
// Can ignore, though. Probably emphemeral, and not critical.
Expand All @@ -454,7 +460,7 @@ func (gl *gcsLog) deleteOldSegment(ctx context.Context, objName string) {
}
}

func (gl *gcsLog) objectNames() (names []string) {
func (gl *GCSLog) objectNames() (names []string) {
gl.mu.Lock()
defer gl.mu.Unlock()
for _, seg := range gl.seg {
Expand All @@ -464,7 +470,7 @@ func (gl *gcsLog) objectNames() (names []string) {
return
}

func (gl *gcsLog) foreachSegmentReader(ctx context.Context, fn func(r io.Reader) error) error {
func (gl *GCSLog) foreachSegmentReader(ctx context.Context, fn func(r io.Reader) error) error {
objs := gl.objectNames()
for i, obj := range objs {
log.Printf("Reading %d/%d: %s ...", i+1, len(objs), obj)
Expand All @@ -481,7 +487,10 @@ func (gl *gcsLog) foreachSegmentReader(ctx context.Context, fn func(r io.Reader)
return nil
}

func (gl *gcsLog) GetMutations(ctx context.Context) <-chan maintner.MutationStreamEvent {
// GetMutations returns a channel of mutations or related events.
// The channel will never be closed.
// All sends on the returned channel should select on the provided context.
func (gl *GCSLog) GetMutations(ctx context.Context) <-chan maintner.MutationStreamEvent {
ch := make(chan maintner.MutationStreamEvent, 50) // buffered: overlap gunzip/unmarshal with loading
go func() {
err := gl.foreachSegmentReader(ctx, func(r io.Reader) error {
Expand Down Expand Up @@ -524,9 +533,8 @@ func try(tries int, firstDelay time.Duration, fn func() error) error {
return err
}

// copyFrom is only used for the one-time migrate from disk-to-GCS
// code path.
func (gl *gcsLog) copyFrom(src maintner.MutationSource) error {
// CopyFrom is only used for the one-time migrate from disk-to-GCS code path.
func (gl *GCSLog) CopyFrom(src maintner.MutationSource) error {
gl.curNum = 0
ctx := context.Background()
for e := range src.GetMutations(ctx) {
Expand All @@ -546,3 +554,9 @@ func (gl *gcsLog) copyFrom(src maintner.MutationSource) error {
}
panic("unexpected channel close")
}

// RegisterHandlers adds handlers for the default paths (/logs and /logs/).
func (gl *GCSLog) RegisterHandlers(mux *http.ServeMux) {
mux.HandleFunc("/logs", gl.serveJSONLogsIndex)
mux.HandleFunc("/logs/", gl.serveLogFile)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package main
package gcslog

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package main
// Package maintapi exposes a gRPC maintner service for a given corpus.
package maintapi

import (
"context"
Expand All @@ -18,6 +19,11 @@ import (
"golang.org/x/build/maintner/maintnerd/apipb"
)

// NewAPIService creates a gRPC Server that serves the Maintner API for the given corpus.
func NewAPIService(corpus *maintner.Corpus) apipb.MaintnerServiceServer {
return apiService{corpus}
}

// apiService implements apipb.MaintnerServiceServer using the Corpus c.
type apiService struct {
c *maintner.Corpus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package main
package maintapi

import (
"context"
Expand Down
11 changes: 6 additions & 5 deletions maintner/maintnerd/maintnerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"golang.org/x/build/maintner"
"golang.org/x/build/maintner/godata"
"golang.org/x/build/maintner/maintnerd/apipb"
"golang.org/x/build/maintner/maintnerd/gcslog"
"golang.org/x/build/maintner/maintnerd/maintapi"
"golang.org/x/crypto/acme/autocert"
"golang.org/x/net/http2"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -155,15 +157,14 @@ func main() {
if *genMut {
if *bucket != "" {
ctx := context.Background()
gl, err := newGCSLog(ctx, *bucket)
gl, err := gcslog.NewGCSLog(ctx, *bucket)
if err != nil {
log.Fatalf("newGCSLog: %v", err)
}
http.HandleFunc("/logs", gl.serveJSONLogsIndex)
http.HandleFunc("/logs/", gl.serveLogFile)
gl.RegisterHandlers(http.DefaultServeMux)
if *migrateGCSFlag {
diskLog := maintner.NewDiskMutationLogger(*dataDir)
if err := gl.copyFrom(diskLog); err != nil {
if err := gl.CopyFrom(diskLog); err != nil {
log.Fatalf("migrate: %v", err)
}
log.Printf("Success.")
Expand Down Expand Up @@ -252,7 +253,7 @@ func main() {
}

grpcServer := grpc.NewServer()
apipb.RegisterMaintnerServiceServer(grpcServer, apiService{corpus})
apipb.RegisterMaintnerServiceServer(grpcServer, maintapi.NewAPIService(corpus))
http.Handle("/apipb.MaintnerService/", grpcServer)

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
Expand Down

0 comments on commit 3b34821

Please sign in to comment.