Skip to content
This repository has been archived by the owner on Jan 20, 2020. It is now read-only.

Commit

Permalink
github: fix sync, improve channel usage
Browse files Browse the repository at this point in the history
  • Loading branch information
bobheadxi committed Mar 5, 2019
1 parent 14a51d3 commit 8d62a53
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 52 deletions.
6 changes: 5 additions & 1 deletion github/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ func (c *Client) GetIssues(
wait *sync.WaitGroup,
) error {
wait.Add(1)
defer wait.Done()
defer func() {
close(issuesC)
close(fetchDetailsC)
wait.Done()
}()

var (
l = c.l.With("user", user, "repo", repo)
Expand Down
22 changes: 15 additions & 7 deletions github/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"sync"
"testing"
"time"

"github.com/google/go-github/github"
"github.com/joho/godotenv"
Expand Down Expand Up @@ -96,7 +97,6 @@ func TestSyncer(t *testing.T) {
}

var wg = &sync.WaitGroup{}
var itemsC = make(chan *Item, 500)
var s = NewSyncer(l.Named("syncer"), ic, SyncOptions{
Repo: Repo{
Owner: "bobheadxi",
Expand All @@ -106,13 +106,21 @@ func TestSyncer(t *testing.T) {
State: IssueStateAll,
},
DetailsFetchWorkers: 3,
IndexC: itemsC,
OutputBufferSize: 500,
})

go assert.NoError(t, s.Sync(ctx, wg))
for i := range itemsC {
t.Logf("%+v", i)
}

outC, errC := s.Sync(ctx, wg)
wg.Wait()
select {
case err := <-errC:
t.Errorf("sync errored: %s", err.Error())
default:
break
}
go func() {
for i := range outC {
t.Logf("received #%d", i.Number)
}
}()
time.Sleep(10 * time.Second)
}
126 changes: 82 additions & 44 deletions github/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,38 @@ package github

import (
"context"
"strconv"
"errors"
"sync"

"github.com/google/go-github/github"
"go.uber.org/zap"
)

// Item is an item due for indexing
// ItemType denotes supported GitHub item types
type ItemType string

const (
// ItemTypeIssue is a GitHub issue
ItemTypeIssue ItemType = "issue"
// ItemTypePR is a GitHub pull request
ItemTypePR ItemType = "pull-request"
)

// Item is a GitHub item due for indexing
// TODO: this needs to be better
type Item struct {
ID string
Type string
Data interface{}
ID int
Number int
Type ItemType
Data interface{}
}

// SyncOptions denotes options for a syncer
type SyncOptions struct {
Repo Repo
Filter ItemFilter
DetailsFetchWorkers int
IndexC chan *Item
OutputBufferSize int
}

// Repo denotes a repository to sync
Expand All @@ -39,9 +51,15 @@ type Syncer struct {

issuesC chan *github.Issue
fetchDetailsC chan *github.Issue

outC chan *Item

used bool
}

// NewSyncer instantiates a new GitHub Syncer
// NewSyncer instantiates a new GitHub Syncer. It Syncer::Sync() can only be
// used once.
// TODO: should it be reusable?
func NewSyncer(
l *zap.SugaredLogger,
client *Client,
Expand All @@ -54,11 +72,40 @@ func NewSyncer(

issuesC: make(chan *github.Issue, opts.DetailsFetchWorkers),
fetchDetailsC: make(chan *github.Issue, opts.DetailsFetchWorkers),

outC: make(chan *Item, opts.OutputBufferSize),
}
}

// Sync pulls all issues from its configured repository and blocks until done
func (s *Syncer) Sync(ctx context.Context, wg *sync.WaitGroup) error {
// Sync pulls all issues from its configured repository and blocks until done.
// It can only be called once.
func (s *Syncer) Sync(ctx context.Context, wg *sync.WaitGroup) (<-chan *Item, <-chan error) {
// guard against reuse
if s.used {
var errC = make(chan error, 1)
errC <- errors.New("syncer cannot be reused")
close(errC)
return nil, errC
}
s.used = true

// execute sync
var errC = s.sync(ctx, wg)
go func() {
wg.Wait()
close(s.outC)
}()
return s.outC, errC
}

func (s *Syncer) sync(ctx context.Context, wg *sync.WaitGroup) <-chan error {
// spin up workers
for i := 0; i < s.opts.DetailsFetchWorkers; i++ {
go s.fetchDetails(ctx, wg)
}
go s.handleIssues(ctx, wg)

// start sync
var errC = make(chan error)
go func() {
if err := s.c.GetIssues(
Expand All @@ -71,49 +118,40 @@ func (s *Syncer) Sync(ctx context.Context, wg *sync.WaitGroup) error {
wg); err != nil {
errC <- err
}
wg.Wait()
close(errC)
}()

cancellableCtx, cancel := context.WithCancel(ctx)
for i := 0; i < s.opts.DetailsFetchWorkers; i++ {
go s.fetchDetails(cancellableCtx, wg)
}
return errC
}

wg.Wait()
cancel()
select {
case err := <-errC:
return err
default:
return nil
func (s *Syncer) handleIssues(ctx context.Context, wg *sync.WaitGroup) {
wg.Add(1)
for i := range s.issuesC {
s.outC <- &Item{
ID: int(i.GetID()),
Number: int(i.GetNumber()),
Type: ItemTypeIssue,
Data: i,
}
}
wg.Done()
}

func (s *Syncer) fetchDetails(ctx context.Context, wg *sync.WaitGroup) {
for {
select {
case <-ctx.Done():
return
case i := <-s.issuesC:
wg.Add(1)
s.opts.IndexC <- &Item{
ID: strconv.Itoa(int(i.GetID())),
Type: "issue",
Data: i,
}
wg.Done()
case i := <-s.fetchDetailsC:
wg.Add(1)
if pr, err := s.c.GetPullRequest(ctx, i); err != nil {
s.l.Errorw("failed to get pull request",
"issue", i.GetNumber())
} else {
s.opts.IndexC <- &Item{
ID: strconv.Itoa(int(pr.GetID())),
Type: "pull-request",
Data: pr,
}
wg.Add(1)
for i := range s.fetchDetailsC {
if pr, err := s.c.GetPullRequest(ctx, i); err != nil {
s.l.Errorw("failed to get pull request",
"issue", i.GetNumber())
} else {
s.outC <- &Item{
ID: int(pr.GetID()),
Number: int(pr.GetNumber()),
Type: ItemTypePR,
Data: pr,
}
wg.Done()
}
}
wg.Done()
}

0 comments on commit 8d62a53

Please sign in to comment.