Skip to content

Commit

Permalink
Merge pull request #1 from WillAbides/concurrent
Browse files Browse the repository at this point in the history
enable concurrency
  • Loading branch information
WillAbides committed Oct 21, 2020
2 parents 6f3a468 + c793f61 commit c39728e
Show file tree
Hide file tree
Showing 15 changed files with 766 additions and 252 deletions.
10 changes: 6 additions & 4 deletions README.md
Expand Up @@ -13,12 +13,11 @@ Download binaries from [the latest release](https://github.com/WillAbides/gharch
## Command line usage

```
$ gharchive --help
Usage: gharchive <start> [<end>]
Arguments:
<start> start time formatted as YYYY-MM-DD, or as an RFC3339 date
[<end>] end time formatted as YYYY-MM-DD, or as an RFC3339 date. default is a day past start
[<end>] end time formatted as YYYY-MM-DD, or as an RFC3339 date. default is an hour past start
Flags:
-h, --help Show context-sensitive help.
Expand All @@ -27,9 +26,12 @@ Flags:
--strict-created-at only output events with a created_at between start and end
--no-empty-lines skip empty lines
--only-valid-json skip lines that aren not valid json objects
--preserve-order ensure that events are output in the same order they exist on data.gharchive.org
--concurrency=INT max number of concurrent downloads to run. Ignored if --preserve-order is set. Default is the number of cpus available.
--debug output debug logs
```

## Performance

I can iterate about 45k events per second from a MacBook Pro with a cable modem.
The bottleneck is decompressing files.
I can iterate about 200k events per second from an 8 core MacBook Pro with a
cable modem. On an 80 core server in a data center that increases to about 450k.
28 changes: 1 addition & 27 deletions bytereader.go
Expand Up @@ -2,33 +2,7 @@ package gharchive

// This file is mostly copied from https://github.com/pkg/json/blob/6ff99391461697279ad9e8620e35ff567d49287c/reader.go

import (
"bytes"
"io"
)

type lineScanner struct {
br byteReader
pos int
}

func (s *lineScanner) next() []byte {
s.br.release(s.pos)
for {
idx := bytes.IndexByte(s.br.window(), '\n')
if idx >= 0 {
s.pos = idx + 1
return s.br.window()[:s.pos]
}
if s.br.extend() == 0 {
return s.br.window()
}
}
}

func (s *lineScanner) error() error {
return s.br.err
}
import "io"

// A byteReader implements a sliding window over an io.Reader.
type byteReader struct {
Expand Down
57 changes: 47 additions & 10 deletions cmd/gharchive/main.go
Expand Up @@ -4,22 +4,31 @@ import (
"context"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"runtime"
"strings"
"time"

"github.com/alecthomas/kong"
jsoniter "github.com/json-iterator/go"
"github.com/willabides/gharchive-client"
"golang.org/x/text/language"
"golang.org/x/text/message"
)

var cli struct {
Start string `kong:"arg,help='start time formatted as YYYY-MM-DD, or as an RFC3339 date'"`
End string `kong:"arg,optional,help='end time formatted as YYYY-MM-DD, or as an RFC3339 date. default is a day past start'"`
End string `kong:"arg,optional,help='end time formatted as YYYY-MM-DD, or as an RFC3339 date. default is an hour past start'"`
IncludeType []string `kong:"name=type,help='include only these event types'"`
ExcludeType []string `kong:"name=not-type,help='exclude these event types'"`
StrictCreatedAt bool `kong:"help='only output events with a created_at between start and end'"`
NoEmptyLines bool `kong:"help='skip empty lines'"`
OnlyValidJSON bool `kong:"help='skip lines that aren not valid json objects'"`
PreserveOrder bool `kong:"help='ensure that events are output in the same order they exist on data.gharchive.org'"`
Concurrency int `kong:"help='max number of concurrent downloads to run. Ignored if --preserve-order is set. Default is the number of cpus available.'"`
Debug bool `kong:"help='output debug logs'"`
}

func parseTimeString(st string) (tm time.Time, err error) {
Expand All @@ -38,10 +47,14 @@ func main() {
k := kong.Parse(&cli)
start, err := parseTimeString(cli.Start)
k.FatalIfErrorf(err, "invalid start time")
debugLog := log.New(ioutil.Discard, "DEBUG ", log.LstdFlags)
if cli.Debug {
debugLog.SetOutput(os.Stderr)
}
var end time.Time
if cli.End != "" {
end, err = parseTimeString(cli.End)
k.FatalIfErrorf(err, "invalid end time")
k.FatalIfErrorf(err, "invalid end time. must be either 'YYYY-MM-DD' or 'YYYY-MM-DDThh:mm:ssZ' (RFC 3339")
}
if end.IsZero() {
end = start.AddDate(0, 0, 1)
Expand Down Expand Up @@ -112,17 +125,41 @@ func main() {
if len(fieldValidators) > 0 {
validators = append(validators, gharchive.ValidateJSONFields(fieldValidators))
}
sc, err := gharchive.New(ctx, start, end, &gharchive.Options{Validators: validators})
if cli.Concurrency == 0 {
cli.Concurrency = runtime.NumCPU()
}
if cli.PreserveOrder {
cli.Concurrency = 1
}
debugLog.Printf("concurrency=%d", cli.Concurrency)
debugLog.Printf("start=%s", start.Format(time.RFC3339))
debugLog.Printf("end=%s", end.Format(time.RFC3339))
sc, err := gharchive.New(ctx, start, &gharchive.Options{
Validators: validators,
Concurrency: cli.Concurrency,
PreserveOrder: cli.PreserveOrder,
EndTime: end,
})
k.FatalIfErrorf(err, "error creating scanner")
defer func() {
_ = sc.Close() //nolint:errcheck // nothing to do with this error
}()
for {
line, err := sc.Next(ctx)
if err == io.EOF || err == context.Canceled {
break
}
k.FatalIfErrorf(err, "error streaming from gharchive")
fmt.Print(string(line))
var lineCount int
scanStartTime := time.Now()
for sc.Scan(ctx) {
lineCount++
fmt.Print(string(sc.Bytes()))
}
scanDuration := time.Since(scanStartTime)
linesPerSecond := int64(float64(lineCount) / scanDuration.Seconds())
debugLog.Println("done")
debugLog.Printf("output %s lines", message.NewPrinter(language.English).Sprintf("%d", lineCount))
debugLog.Printf("took %0.2f seconds", scanDuration.Seconds())
debugLog.Printf("output %s lines per second", message.NewPrinter(language.English).Sprintf("%d", linesPerSecond))

err = sc.Err()
if err == io.EOF || err == context.Canceled {
err = nil
}
k.FatalIfErrorf(err, "error streaming from gharchive")
}
154 changes: 154 additions & 0 deletions concurrent.go
@@ -0,0 +1,154 @@
package gharchive

import (
"bytes"
"context"
"io"
"sync"
"time"

"github.com/killa-beez/gopkgs/pool"
)

type concurrentScanner struct {
scanners []*singleScanner
scannerErrs []error
lines chan []byte
cancel func()
bytes []byte

errLock sync.RWMutex
err error

doneLock sync.Mutex
doneChan chan struct{}
done bool
}

func newConcurrentScanner(ctx context.Context, startTime time.Time, opts *Options) (*concurrentScanner, error) {
if opts == nil {
opts = new(Options)
}
endTime := opts.EndTime
if endTime.IsZero() {
endTime = startTime.Add(time.Hour)
}
opts.SingleHour = true
startTime = startTime.UTC()
hour := time.Date(startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), 0, 0, 0, time.UTC)
var scanners []*singleScanner
for hour.Before(endTime) {
scanner, err := newSingleScanner(ctx, hour, opts)
if err != nil {
return nil, err
}
scanners = append(scanners, scanner)
hour = hour.Add(time.Hour)
}
m := &concurrentScanner{
scanners: scanners,
scannerErrs: make([]error, len(scanners)),
lines: make(chan []byte, opts.Concurrency*100_000),
doneChan: make(chan struct{}),
}
ctx, m.cancel = context.WithCancel(ctx)

p := pool.New(len(scanners), opts.Concurrency)
for i := range scanners {
i := i
scanner := scanners[i]
p.Add(pool.NewWorkUnit(func(ctx2 context.Context) {
scannerErr := runScanner(ctx2, scanner, m.lines)
if scannerErr == io.EOF {
scannerErr = nil
}
m.scannerErrs[i] = scannerErr
}))
}
p.Start(ctx)
go func() {
p.Wait()
m.beDone()
}()
return m, nil
}

func (m *concurrentScanner) beDone() {
m.doneLock.Lock()
defer m.doneLock.Unlock()
if m.done {
return
}
close(m.doneChan)
m.done = true
}

var bufPool sync.Pool

func runScanner(ctx context.Context, scanner *singleScanner, lines chan<- []byte) error {
buf, ok := bufPool.Get().(*bytes.Buffer)
if !ok {
buf = bytes.NewBuffer(make([]byte, 0, 8192))
}
defer bufPool.Put(buf)
for scanner.Scan(ctx) {
buf.Reset()
_, err := buf.ReadFrom(bytes.NewReader(scanner.Bytes()))
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case lines <- buf.Bytes():
}
}
return scanner.Err()
}

func (m *concurrentScanner) Close() error {
m.cancel()
var err error
for _, scanner := range m.scanners {
closeErr := scanner.Close()
if err == nil {
err = closeErr
}
}
m.beDone()
return err
}

func (m *concurrentScanner) Err() error {
m.errLock.RLock()
err := m.err
m.errLock.RUnlock()
return err
}

func (m *concurrentScanner) Scan(_ context.Context) bool {
select {
case m.bytes = <-m.lines:
return true
default:
}

select {
case m.bytes = <-m.lines:
return true
case <-m.doneChan:
m.errLock.Lock()
for _, err := range m.scannerErrs {
if err != nil {
m.err = err
break
}
}
m.errLock.Unlock()
return false
}
}

func (m *concurrentScanner) Bytes() []byte {
return m.bytes
}
58 changes: 58 additions & 0 deletions concurrent_test.go
@@ -0,0 +1,58 @@
package gharchive

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func Test_concurrentScanner(t *testing.T) {
t.Run("short", func(t *testing.T) {
ctx := context.Background()
client := setupShortTestServer(ctx, t)
start := time.Date(2020, 10, 10, 8, 6, 0, 0, time.UTC)
opts := &Options{
StorageClient: client,
Concurrency: 3,
EndTime: start.Add(150 * time.Minute),
}
scanner, err := newConcurrentScanner(ctx, start, opts)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, scanner.Close())
})
var count int
for scanner.Scan(ctx) {
count++
}
require.NoError(t, scanner.Err())
require.Equal(t, 33, count)
})

t.Run("regular", func(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
ctx := context.Background()
client := setupTestServer(ctx, t)
start := time.Date(2020, 10, 10, 8, 6, 0, 0, time.UTC)
opts := &Options{
StorageClient: client,
Concurrency: 3,
EndTime: start.Add(159 * time.Minute),
}
scanner, err := newConcurrentScanner(ctx, start, opts)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, scanner.Close())
})
var count int
for scanner.Scan(ctx) {
count++
}
require.NoError(t, scanner.Err())
require.Equal(t, 280628, count)
})
}

0 comments on commit c39728e

Please sign in to comment.