Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable concurrency #1

Merged
merged 1 commit into from
Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ Download binaries from [the latest release](https://github.com/WillAbides/gharch
## Command line usage

```
$ gharchive --help
Usage: gharchive <start> [<end>]

Arguments:
<start> start time formatted as YYYY-MM-DD, or as an RFC3339 date
[<end>] end time formatted as YYYY-MM-DD, or as an RFC3339 date. default is a day past start
[<end>] end time formatted as YYYY-MM-DD, or as an RFC3339 date. default is an hour past start

Flags:
-h, --help Show context-sensitive help.
Expand All @@ -27,9 +26,12 @@ Flags:
--strict-created-at only output events with a created_at between start and end
--no-empty-lines skip empty lines
--only-valid-json skip lines that aren not valid json objects
--preserve-order ensure that events are output in the same order they exist on data.gharchive.org
--concurrency=INT max number of concurrent downloads to run. Ignored if --preserve-order is set. Default is the number of cpus available.
--debug output debug logs
```

## Performance

I can iterate about 45k events per second from a MacBook Pro with a cable modem.
The bottleneck is decompressing files.
I can iterate about 200k events per second from an 8 core MacBook Pro with a
cable modem. On an 80 core server in a data center that increases to about 450k.
28 changes: 1 addition & 27 deletions bytereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,7 @@ package gharchive

// This file is mostly copied from https://github.com/pkg/json/blob/6ff99391461697279ad9e8620e35ff567d49287c/reader.go

import (
"bytes"
"io"
)

type lineScanner struct {
br byteReader
pos int
}

func (s *lineScanner) next() []byte {
s.br.release(s.pos)
for {
idx := bytes.IndexByte(s.br.window(), '\n')
if idx >= 0 {
s.pos = idx + 1
return s.br.window()[:s.pos]
}
if s.br.extend() == 0 {
return s.br.window()
}
}
}

func (s *lineScanner) error() error {
return s.br.err
}
import "io"

// A byteReader implements a sliding window over an io.Reader.
type byteReader struct {
Expand Down
57 changes: 47 additions & 10 deletions cmd/gharchive/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,31 @@ import (
"context"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"runtime"
"strings"
"time"

"github.com/alecthomas/kong"
jsoniter "github.com/json-iterator/go"
"github.com/willabides/gharchive-client"
"golang.org/x/text/language"
"golang.org/x/text/message"
)

var cli struct {
Start string `kong:"arg,help='start time formatted as YYYY-MM-DD, or as an RFC3339 date'"`
End string `kong:"arg,optional,help='end time formatted as YYYY-MM-DD, or as an RFC3339 date. default is a day past start'"`
End string `kong:"arg,optional,help='end time formatted as YYYY-MM-DD, or as an RFC3339 date. default is an hour past start'"`
IncludeType []string `kong:"name=type,help='include only these event types'"`
ExcludeType []string `kong:"name=not-type,help='exclude these event types'"`
StrictCreatedAt bool `kong:"help='only output events with a created_at between start and end'"`
NoEmptyLines bool `kong:"help='skip empty lines'"`
OnlyValidJSON bool `kong:"help='skip lines that aren not valid json objects'"`
PreserveOrder bool `kong:"help='ensure that events are output in the same order they exist on data.gharchive.org'"`
Concurrency int `kong:"help='max number of concurrent downloads to run. Ignored if --preserve-order is set. Default is the number of cpus available.'"`
Debug bool `kong:"help='output debug logs'"`
}

func parseTimeString(st string) (tm time.Time, err error) {
Expand All @@ -38,10 +47,14 @@ func main() {
k := kong.Parse(&cli)
start, err := parseTimeString(cli.Start)
k.FatalIfErrorf(err, "invalid start time")
debugLog := log.New(ioutil.Discard, "DEBUG ", log.LstdFlags)
if cli.Debug {
debugLog.SetOutput(os.Stderr)
}
var end time.Time
if cli.End != "" {
end, err = parseTimeString(cli.End)
k.FatalIfErrorf(err, "invalid end time")
k.FatalIfErrorf(err, "invalid end time. must be either 'YYYY-MM-DD' or 'YYYY-MM-DDThh:mm:ssZ' (RFC 3339")
}
if end.IsZero() {
end = start.AddDate(0, 0, 1)
Expand Down Expand Up @@ -112,17 +125,41 @@ func main() {
if len(fieldValidators) > 0 {
validators = append(validators, gharchive.ValidateJSONFields(fieldValidators))
}
sc, err := gharchive.New(ctx, start, end, &gharchive.Options{Validators: validators})
if cli.Concurrency == 0 {
cli.Concurrency = runtime.NumCPU()
}
if cli.PreserveOrder {
cli.Concurrency = 1
}
debugLog.Printf("concurrency=%d", cli.Concurrency)
debugLog.Printf("start=%s", start.Format(time.RFC3339))
debugLog.Printf("end=%s", end.Format(time.RFC3339))
sc, err := gharchive.New(ctx, start, &gharchive.Options{
Validators: validators,
Concurrency: cli.Concurrency,
PreserveOrder: cli.PreserveOrder,
EndTime: end,
})
k.FatalIfErrorf(err, "error creating scanner")
defer func() {
_ = sc.Close() //nolint:errcheck // nothing to do with this error
}()
for {
line, err := sc.Next(ctx)
if err == io.EOF || err == context.Canceled {
break
}
k.FatalIfErrorf(err, "error streaming from gharchive")
fmt.Print(string(line))
var lineCount int
scanStartTime := time.Now()
for sc.Scan(ctx) {
lineCount++
fmt.Print(string(sc.Bytes()))
}
scanDuration := time.Since(scanStartTime)
linesPerSecond := int64(float64(lineCount) / scanDuration.Seconds())
debugLog.Println("done")
debugLog.Printf("output %s lines", message.NewPrinter(language.English).Sprintf("%d", lineCount))
debugLog.Printf("took %0.2f seconds", scanDuration.Seconds())
debugLog.Printf("output %s lines per second", message.NewPrinter(language.English).Sprintf("%d", linesPerSecond))

err = sc.Err()
if err == io.EOF || err == context.Canceled {
err = nil
}
k.FatalIfErrorf(err, "error streaming from gharchive")
}
154 changes: 154 additions & 0 deletions concurrent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package gharchive

import (
"bytes"
"context"
"io"
"sync"
"time"

"github.com/killa-beez/gopkgs/pool"
)

type concurrentScanner struct {
scanners []*singleScanner
scannerErrs []error
lines chan []byte
cancel func()
bytes []byte

errLock sync.RWMutex
err error

doneLock sync.Mutex
doneChan chan struct{}
done bool
}

func newConcurrentScanner(ctx context.Context, startTime time.Time, opts *Options) (*concurrentScanner, error) {
if opts == nil {
opts = new(Options)
}
endTime := opts.EndTime
if endTime.IsZero() {
endTime = startTime.Add(time.Hour)
}
opts.SingleHour = true
startTime = startTime.UTC()
hour := time.Date(startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), 0, 0, 0, time.UTC)
var scanners []*singleScanner
for hour.Before(endTime) {
scanner, err := newSingleScanner(ctx, hour, opts)
if err != nil {
return nil, err
}
scanners = append(scanners, scanner)
hour = hour.Add(time.Hour)
}
m := &concurrentScanner{
scanners: scanners,
scannerErrs: make([]error, len(scanners)),
lines: make(chan []byte, opts.Concurrency*100_000),
doneChan: make(chan struct{}),
}
ctx, m.cancel = context.WithCancel(ctx)

p := pool.New(len(scanners), opts.Concurrency)
for i := range scanners {
i := i
scanner := scanners[i]
p.Add(pool.NewWorkUnit(func(ctx2 context.Context) {
scannerErr := runScanner(ctx2, scanner, m.lines)
if scannerErr == io.EOF {
scannerErr = nil
}
m.scannerErrs[i] = scannerErr
}))
}
p.Start(ctx)
go func() {
p.Wait()
m.beDone()
}()
return m, nil
}

func (m *concurrentScanner) beDone() {
m.doneLock.Lock()
defer m.doneLock.Unlock()
if m.done {
return
}
close(m.doneChan)
m.done = true
}

var bufPool sync.Pool

func runScanner(ctx context.Context, scanner *singleScanner, lines chan<- []byte) error {
buf, ok := bufPool.Get().(*bytes.Buffer)
if !ok {
buf = bytes.NewBuffer(make([]byte, 0, 8192))
}
defer bufPool.Put(buf)
for scanner.Scan(ctx) {
buf.Reset()
_, err := buf.ReadFrom(bytes.NewReader(scanner.Bytes()))
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case lines <- buf.Bytes():
}
}
return scanner.Err()
}

func (m *concurrentScanner) Close() error {
m.cancel()
var err error
for _, scanner := range m.scanners {
closeErr := scanner.Close()
if err == nil {
err = closeErr
}
}
m.beDone()
return err
}

func (m *concurrentScanner) Err() error {
m.errLock.RLock()
err := m.err
m.errLock.RUnlock()
return err
}

func (m *concurrentScanner) Scan(_ context.Context) bool {
select {
case m.bytes = <-m.lines:
return true
default:
}

select {
case m.bytes = <-m.lines:
return true
case <-m.doneChan:
m.errLock.Lock()
for _, err := range m.scannerErrs {
if err != nil {
m.err = err
break
}
}
m.errLock.Unlock()
return false
}
}

func (m *concurrentScanner) Bytes() []byte {
return m.bytes
}
58 changes: 58 additions & 0 deletions concurrent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package gharchive

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func Test_concurrentScanner(t *testing.T) {
t.Run("short", func(t *testing.T) {
ctx := context.Background()
client := setupShortTestServer(ctx, t)
start := time.Date(2020, 10, 10, 8, 6, 0, 0, time.UTC)
opts := &Options{
StorageClient: client,
Concurrency: 3,
EndTime: start.Add(150 * time.Minute),
}
scanner, err := newConcurrentScanner(ctx, start, opts)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, scanner.Close())
})
var count int
for scanner.Scan(ctx) {
count++
}
require.NoError(t, scanner.Err())
require.Equal(t, 33, count)
})

t.Run("regular", func(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
ctx := context.Background()
client := setupTestServer(ctx, t)
start := time.Date(2020, 10, 10, 8, 6, 0, 0, time.UTC)
opts := &Options{
StorageClient: client,
Concurrency: 3,
EndTime: start.Add(159 * time.Minute),
}
scanner, err := newConcurrentScanner(ctx, start, opts)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, scanner.Close())
})
var count int
for scanner.Scan(ctx) {
count++
}
require.NoError(t, scanner.Err())
require.Equal(t, 280628, count)
})
}