From c793f61438cf03822dcf0a4146958e5b4749878f Mon Sep 17 00:00:00 2001 From: Will Roden Date: Wed, 21 Oct 2020 18:21:48 -0500 Subject: [PATCH] enable concurrency --- README.md | 10 +- bytereader.go | 28 +---- cmd/gharchive/main.go | 57 ++++++++-- concurrent.go | 154 ++++++++++++++++++++++++++ concurrent_test.go | 58 ++++++++++ gharchive.go | 246 +++++++++++------------------------------- gharchive_test.go | 92 +++++++++++----- go.mod | 2 + go.sum | 3 + linescanner.go | 34 ++++++ linescanner_test.go | 39 +++++++ script/gharchive | 2 +- script/test | 1 + single.go | 202 ++++++++++++++++++++++++++++++++++ single_test.go | 90 ++++++++++++++++ 15 files changed, 766 insertions(+), 252 deletions(-) create mode 100644 concurrent.go create mode 100644 concurrent_test.go create mode 100644 linescanner.go create mode 100644 linescanner_test.go create mode 100644 single.go create mode 100644 single_test.go diff --git a/README.md b/README.md index a08f309..6d425fc 100644 --- a/README.md +++ b/README.md @@ -13,12 +13,11 @@ Download binaries from [the latest release](https://github.com/WillAbides/gharch ## Command line usage ``` -$ gharchive --help Usage: gharchive [] Arguments: start time formatted as YYYY-MM-DD, or as an RFC3339 date - [] end time formatted as YYYY-MM-DD, or as an RFC3339 date. default is a day past start + [] end time formatted as YYYY-MM-DD, or as an RFC3339 date. default is an hour past start Flags: -h, --help Show context-sensitive help. @@ -27,9 +26,12 @@ Flags: --strict-created-at only output events with a created_at between start and end --no-empty-lines skip empty lines --only-valid-json skip lines that aren not valid json objects + --preserve-order ensure that events are output in the same order they exist on data.gharchive.org + --concurrency=INT max number of concurrent downloads to run. Ignored if --preserve-order is set. Default is the number of cpus available. + --debug output debug logs ``` ## Performance -I can iterate about 45k events per second from a MacBook Pro with a cable modem. -The bottleneck is decompressing files. +I can iterate about 200k events per second from an 8 core MacBook Pro with a +cable modem. On an 80 core server in a data center that increases to about 450k. diff --git a/bytereader.go b/bytereader.go index 8a13c08..d81d8af 100644 --- a/bytereader.go +++ b/bytereader.go @@ -2,33 +2,7 @@ package gharchive // This file is mostly copied from https://github.com/pkg/json/blob/6ff99391461697279ad9e8620e35ff567d49287c/reader.go -import ( - "bytes" - "io" -) - -type lineScanner struct { - br byteReader - pos int -} - -func (s *lineScanner) next() []byte { - s.br.release(s.pos) - for { - idx := bytes.IndexByte(s.br.window(), '\n') - if idx >= 0 { - s.pos = idx + 1 - return s.br.window()[:s.pos] - } - if s.br.extend() == 0 { - return s.br.window() - } - } -} - -func (s *lineScanner) error() error { - return s.br.err -} +import "io" // A byteReader implements a sliding window over an io.Reader. type byteReader struct { diff --git a/cmd/gharchive/main.go b/cmd/gharchive/main.go index 32dec9b..35436d6 100644 --- a/cmd/gharchive/main.go +++ b/cmd/gharchive/main.go @@ -4,22 +4,31 @@ import ( "context" "fmt" "io" + "io/ioutil" + "log" + "os" + "runtime" "strings" "time" "github.com/alecthomas/kong" jsoniter "github.com/json-iterator/go" "github.com/willabides/gharchive-client" + "golang.org/x/text/language" + "golang.org/x/text/message" ) var cli struct { Start string `kong:"arg,help='start time formatted as YYYY-MM-DD, or as an RFC3339 date'"` - End string `kong:"arg,optional,help='end time formatted as YYYY-MM-DD, or as an RFC3339 date. default is a day past start'"` + End string `kong:"arg,optional,help='end time formatted as YYYY-MM-DD, or as an RFC3339 date. default is an hour past start'"` IncludeType []string `kong:"name=type,help='include only these event types'"` ExcludeType []string `kong:"name=not-type,help='exclude these event types'"` StrictCreatedAt bool `kong:"help='only output events with a created_at between start and end'"` NoEmptyLines bool `kong:"help='skip empty lines'"` OnlyValidJSON bool `kong:"help='skip lines that aren not valid json objects'"` + PreserveOrder bool `kong:"help='ensure that events are output in the same order they exist on data.gharchive.org'"` + Concurrency int `kong:"help='max number of concurrent downloads to run. Ignored if --preserve-order is set. Default is the number of cpus available.'"` + Debug bool `kong:"help='output debug logs'"` } func parseTimeString(st string) (tm time.Time, err error) { @@ -38,10 +47,14 @@ func main() { k := kong.Parse(&cli) start, err := parseTimeString(cli.Start) k.FatalIfErrorf(err, "invalid start time") + debugLog := log.New(ioutil.Discard, "DEBUG ", log.LstdFlags) + if cli.Debug { + debugLog.SetOutput(os.Stderr) + } var end time.Time if cli.End != "" { end, err = parseTimeString(cli.End) - k.FatalIfErrorf(err, "invalid end time") + k.FatalIfErrorf(err, "invalid end time. must be either 'YYYY-MM-DD' or 'YYYY-MM-DDThh:mm:ssZ' (RFC 3339") } if end.IsZero() { end = start.AddDate(0, 0, 1) @@ -112,17 +125,41 @@ func main() { if len(fieldValidators) > 0 { validators = append(validators, gharchive.ValidateJSONFields(fieldValidators)) } - sc, err := gharchive.New(ctx, start, end, &gharchive.Options{Validators: validators}) + if cli.Concurrency == 0 { + cli.Concurrency = runtime.NumCPU() + } + if cli.PreserveOrder { + cli.Concurrency = 1 + } + debugLog.Printf("concurrency=%d", cli.Concurrency) + debugLog.Printf("start=%s", start.Format(time.RFC3339)) + debugLog.Printf("end=%s", end.Format(time.RFC3339)) + sc, err := gharchive.New(ctx, start, &gharchive.Options{ + Validators: validators, + Concurrency: cli.Concurrency, + PreserveOrder: cli.PreserveOrder, + EndTime: end, + }) k.FatalIfErrorf(err, "error creating scanner") defer func() { _ = sc.Close() //nolint:errcheck // nothing to do with this error }() - for { - line, err := sc.Next(ctx) - if err == io.EOF || err == context.Canceled { - break - } - k.FatalIfErrorf(err, "error streaming from gharchive") - fmt.Print(string(line)) + var lineCount int + scanStartTime := time.Now() + for sc.Scan(ctx) { + lineCount++ + fmt.Print(string(sc.Bytes())) + } + scanDuration := time.Since(scanStartTime) + linesPerSecond := int64(float64(lineCount) / scanDuration.Seconds()) + debugLog.Println("done") + debugLog.Printf("output %s lines", message.NewPrinter(language.English).Sprintf("%d", lineCount)) + debugLog.Printf("took %0.2f seconds", scanDuration.Seconds()) + debugLog.Printf("output %s lines per second", message.NewPrinter(language.English).Sprintf("%d", linesPerSecond)) + + err = sc.Err() + if err == io.EOF || err == context.Canceled { + err = nil } + k.FatalIfErrorf(err, "error streaming from gharchive") } diff --git a/concurrent.go b/concurrent.go new file mode 100644 index 0000000..c1a40e5 --- /dev/null +++ b/concurrent.go @@ -0,0 +1,154 @@ +package gharchive + +import ( + "bytes" + "context" + "io" + "sync" + "time" + + "github.com/killa-beez/gopkgs/pool" +) + +type concurrentScanner struct { + scanners []*singleScanner + scannerErrs []error + lines chan []byte + cancel func() + bytes []byte + + errLock sync.RWMutex + err error + + doneLock sync.Mutex + doneChan chan struct{} + done bool +} + +func newConcurrentScanner(ctx context.Context, startTime time.Time, opts *Options) (*concurrentScanner, error) { + if opts == nil { + opts = new(Options) + } + endTime := opts.EndTime + if endTime.IsZero() { + endTime = startTime.Add(time.Hour) + } + opts.SingleHour = true + startTime = startTime.UTC() + hour := time.Date(startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), 0, 0, 0, time.UTC) + var scanners []*singleScanner + for hour.Before(endTime) { + scanner, err := newSingleScanner(ctx, hour, opts) + if err != nil { + return nil, err + } + scanners = append(scanners, scanner) + hour = hour.Add(time.Hour) + } + m := &concurrentScanner{ + scanners: scanners, + scannerErrs: make([]error, len(scanners)), + lines: make(chan []byte, opts.Concurrency*100_000), + doneChan: make(chan struct{}), + } + ctx, m.cancel = context.WithCancel(ctx) + + p := pool.New(len(scanners), opts.Concurrency) + for i := range scanners { + i := i + scanner := scanners[i] + p.Add(pool.NewWorkUnit(func(ctx2 context.Context) { + scannerErr := runScanner(ctx2, scanner, m.lines) + if scannerErr == io.EOF { + scannerErr = nil + } + m.scannerErrs[i] = scannerErr + })) + } + p.Start(ctx) + go func() { + p.Wait() + m.beDone() + }() + return m, nil +} + +func (m *concurrentScanner) beDone() { + m.doneLock.Lock() + defer m.doneLock.Unlock() + if m.done { + return + } + close(m.doneChan) + m.done = true +} + +var bufPool sync.Pool + +func runScanner(ctx context.Context, scanner *singleScanner, lines chan<- []byte) error { + buf, ok := bufPool.Get().(*bytes.Buffer) + if !ok { + buf = bytes.NewBuffer(make([]byte, 0, 8192)) + } + defer bufPool.Put(buf) + for scanner.Scan(ctx) { + buf.Reset() + _, err := buf.ReadFrom(bytes.NewReader(scanner.Bytes())) + if err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case lines <- buf.Bytes(): + } + } + return scanner.Err() +} + +func (m *concurrentScanner) Close() error { + m.cancel() + var err error + for _, scanner := range m.scanners { + closeErr := scanner.Close() + if err == nil { + err = closeErr + } + } + m.beDone() + return err +} + +func (m *concurrentScanner) Err() error { + m.errLock.RLock() + err := m.err + m.errLock.RUnlock() + return err +} + +func (m *concurrentScanner) Scan(_ context.Context) bool { + select { + case m.bytes = <-m.lines: + return true + default: + } + + select { + case m.bytes = <-m.lines: + return true + case <-m.doneChan: + m.errLock.Lock() + for _, err := range m.scannerErrs { + if err != nil { + m.err = err + break + } + } + m.errLock.Unlock() + return false + } +} + +func (m *concurrentScanner) Bytes() []byte { + return m.bytes +} diff --git a/concurrent_test.go b/concurrent_test.go new file mode 100644 index 0000000..c48ba72 --- /dev/null +++ b/concurrent_test.go @@ -0,0 +1,58 @@ +package gharchive + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func Test_concurrentScanner(t *testing.T) { + t.Run("short", func(t *testing.T) { + ctx := context.Background() + client := setupShortTestServer(ctx, t) + start := time.Date(2020, 10, 10, 8, 6, 0, 0, time.UTC) + opts := &Options{ + StorageClient: client, + Concurrency: 3, + EndTime: start.Add(150 * time.Minute), + } + scanner, err := newConcurrentScanner(ctx, start, opts) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, scanner.Close()) + }) + var count int + for scanner.Scan(ctx) { + count++ + } + require.NoError(t, scanner.Err()) + require.Equal(t, 33, count) + }) + + t.Run("regular", func(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + ctx := context.Background() + client := setupTestServer(ctx, t) + start := time.Date(2020, 10, 10, 8, 6, 0, 0, time.UTC) + opts := &Options{ + StorageClient: client, + Concurrency: 3, + EndTime: start.Add(159 * time.Minute), + } + scanner, err := newConcurrentScanner(ctx, start, opts) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, scanner.Close()) + }) + var count int + for scanner.Scan(ctx) { + count++ + } + require.NoError(t, scanner.Err()) + require.Equal(t, 280628, count) + }) +} diff --git a/gharchive.go b/gharchive.go index 7c1f469..4a4ede5 100644 --- a/gharchive.go +++ b/gharchive.go @@ -3,220 +3,102 @@ package gharchive import ( "context" "io" - "strings" "time" "cloud.google.com/go/storage" - "github.com/klauspost/compress/gzip" "google.golang.org/api/option" ) -// Validator is a function that returns true when a line passes validation -type Validator func(line []byte) bool - -// Options are options for a Scanner -type Options struct { - StorageClient *storage.Client - Bucket string - Validators []Validator -} - -func (o *Options) withDefaults(ctx context.Context) (*Options, error) { - if o == nil { - o = new(Options) - } - if o.StorageClient != nil && o.Bucket != "" { - return o, nil - } - out := &Options{ - StorageClient: o.StorageClient, - Bucket: o.Bucket, - Validators: o.Validators, - } - var err error - if out.StorageClient == nil { - out.StorageClient, err = storage.NewClient(ctx, option.WithoutAuthentication()) - if err != nil { - return nil, err - } - } - if out.Bucket == "" { - out.Bucket = "data.gharchive.org" - } - return out, nil -} - -// New returns a new Scanner -func New(ctx context.Context, startTime, endTime time.Time, opts *Options) (*Scanner, error) { - var err error - opts, err = opts.withDefaults(ctx) - if err != nil { - return nil, err - } - - return &Scanner{ - opts: opts, - bucket: opts.Bucket, - client: opts.StorageClient, - startTime: startTime.UTC(), - endTime: endTime.UTC(), - }, nil +type iface interface { + io.Closer + Scan(ctx context.Context) bool + Bytes() []byte + Err() error } // Scanner scans lines from gharchive type Scanner struct { - opts *Options - client *storage.Client - bucket string - startTime time.Time - endTime time.Time - curHour time.Time - lineScanner *lineScanner - hourReader *objReader - brBuffer []byte + scanner iface } -func (s *Scanner) iterateCurHour() { - if s.curHour.IsZero() { - s.curHour = s.startTime.Truncate(time.Hour) - return - } - s.curHour = s.curHour.Add(time.Hour) +// Scan advances the scanner to the next token, which will then be available through +// the Bytes method. It returns false when the scan stops by reaching the end of the output. +// After Scan returns false, the Err method will return any error that occurred during scanning. +func (s *Scanner) Scan(ctx context.Context) bool { + return s.scanner.Scan(ctx) } -// Close closes the scanner -func (s *Scanner) Close() error { - var err error - if s.client != nil { - err = s.client.Close() - } - if s.hourReader != nil { - hrErr := s.hourReader.Close() - if err == nil { - err = hrErr - } - } - return err +// Bytes returns the most recent token generated by a call to Scan. +// The underlying array may point to data that will be overwritten +// by a subsequent call to Scan. +func (s *Scanner) Bytes() []byte { + return s.scanner.Bytes() } -func (s *Scanner) validateLine(line []byte) bool { - for _, validator := range s.opts.Validators { - ok := validator(line) - if !ok { - return false - } - } - return true +// Err returns the first non-EOF error that was encountered by the Scanner. +func (s *Scanner) Err() error { + return s.scanner.Err() } -func (s *Scanner) prepLineScanner(ctx context.Context) error { - if ctx.Err() != nil { - return ctx.Err() - } - if len(s.brBuffer) == 0 { - s.brBuffer = make([]byte, 8192) - } - if s.lineScanner != nil { - err := s.lineScanner.error() - if err == nil || err != io.EOF { - return err - } - } - - // starting here we know either s.lineScanner == nil or s.lineScanner.error() == io.EOF - // either way we need to do the same thing. +// Close closes the scanner. +func (s *Scanner) Close() error { + return s.scanner.Close() +} - if s.hourReader == nil { - s.hourReader = new(objReader) - } - s.iterateCurHour() - if s.curHour.After(s.endTime) { - return io.EOF - } - err := s.hourReader.newObj(ctx, s.curHour, s.opts) +// New returns a new Scanner +func New(ctx context.Context, startTime time.Time, opts *Options) (*Scanner, error) { + var err error + opts, err = opts.withDefaults(ctx) if err != nil { - return err + return nil, err } - s.lineScanner = &lineScanner{ - br: byteReader{ - data: s.brBuffer, - r: s.hourReader, - }, + scanner := new(Scanner) + if opts.SingleHour || opts.Concurrency == 1 || opts.PreserveOrder { + scanner.scanner, err = newSingleScanner(ctx, startTime, opts) + } else { + scanner.scanner, err = newConcurrentScanner(ctx, startTime, opts) } - return nil -} - -// Next returns the next line of output. error is io.EOF at the end. -func (s *Scanner) Next(ctx context.Context) ([]byte, error) { - for { - if ctx.Err() != nil { - return nil, ctx.Err() - } - err := s.prepLineScanner(ctx) - if err != nil { - return nil, err - } - line := s.lineScanner.next() - if s.validateLine(line) { - return line, nil - } + if err != nil { + return nil, err } + return scanner, nil } -type objReader struct { - rdr io.Reader - gzRdr *gzip.Reader -} +// Validator is a function that returns true when a line passes validation +type Validator func(line []byte) bool -func (z *objReader) Read(p []byte) (n int, err error) { - return z.gzRdr.Read(p) +// Options are options for a Scanner +type Options struct { + Validators []Validator // list of validators to check each line + SingleHour bool // ignore end time and just scan the file containing the hour in which start occurs. + EndTime time.Time // end of the timespan to scan. events up to the second before EndTime will be scanned. ignored when SingleHour is set. default: start time + 1 hour + PreserveOrder bool // run a single process so that the output order is preserved + Concurrency int // number of concurrent downloads to run. ignored when PreserveOrder is set. default: 1 + Bucket string // the GCP bucket for gharchive. default: data.gharchive.org + StorageClient *storage.Client // a client to use instead of the default. } -func (z *objReader) Close() error { - var err error - if z.gzRdr != nil { - err = z.gzRdr.Close() +func (o *Options) withDefaults(ctx context.Context) (*Options, error) { + if o == nil { + o = new(Options) } - if z.rdr == nil { - return err + if o.StorageClient != nil && o.Bucket != "" { + return o, nil } - if rdr, ok := z.rdr.(io.Closer); ok { - rdrErr := rdr.Close() - if rdrErr != nil { - return rdrErr + out := new(Options) + *out = *o + var err error + if out.StorageClient == nil { + out.StorageClient, err = storage.NewClient(ctx, option.WithoutAuthentication()) + if err != nil { + return nil, err } } - return err -} - -func (z *objReader) Reset(r io.Reader) error { - err := z.Close() - if err != nil { - return err - } - z.rdr = r - if z.gzRdr == nil { - z.gzRdr, err = gzip.NewReader(r) - return err - } - return z.gzRdr.Reset(r) -} - -func (z *objReader) newObj(ctx context.Context, hour time.Time, opts *Options) error { - var err error - opts, err = opts.withDefaults(ctx) - if err != nil { - return err + if out.Bucket == "" { + out.Bucket = "data.gharchive.org" } - tm := hour.UTC() - - // this hack is required to get a single-digit hour in the object name - obj := tm.UTC().Format("2006-01-02-") - obj += strings.TrimPrefix(tm.UTC().Format("15.json.gz"), "0") - - rdr, err := opts.StorageClient.Bucket(opts.Bucket).Object(obj).NewReader(ctx) - if err != nil { - return err + if out.Concurrency == 0 { + out.Concurrency = 1 } - return z.Reset(rdr) + return out, nil } diff --git a/gharchive_test.go b/gharchive_test.go index bac9711..c048171 100644 --- a/gharchive_test.go +++ b/gharchive_test.go @@ -1,8 +1,8 @@ package gharchive import ( + "bytes" "context" - "fmt" "io" "net/http" "net/http/httptest" @@ -10,9 +10,9 @@ import ( "path" "path/filepath" "testing" - "time" "cloud.google.com/go/storage" + "github.com/klauspost/compress/gzip" "github.com/stretchr/testify/require" "google.golang.org/api/option" ) @@ -27,7 +27,8 @@ func downloadTestFiles(t testing.TB) { t.Helper() dir := filepath.FromSlash("./tmp/testfiles") - err := os.MkdirAll(dir, 0o700) + shortDir := filepath.Join(dir, "short") + err := os.MkdirAll(shortDir, 0o700) require.NoError(t, err) for _, filename := range testfiles { _, err = os.Stat(filepath.Join(dir, filename)) @@ -38,18 +39,54 @@ func downloadTestFiles(t testing.TB) { err = nil } require.NoError(t, err) - resp, err := http.Get("https://data.gharchive.org/" + filename) + var resp *http.Response + resp, err = http.Get("https://data.gharchive.org/" + filename) require.NoError(t, err) require.Equal(t, 200, resp.StatusCode) - file, err := os.Create(filepath.Join(dir, filename)) + var file *os.File + file, err = os.Create(filepath.Join(dir, filename)) require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, file.Close()) - require.NoError(t, resp.Body.Close()) - }) _, err = io.Copy(file, resp.Body) + require.NoError(t, file.Close()) + require.NoError(t, resp.Body.Close()) require.NoError(t, err) } + for _, filename := range testfiles { + _, err = os.Stat(filepath.Join(shortDir, filename)) + if err == nil { + continue + } + if os.IsNotExist(err) { + err = nil + } + require.NoError(t, err) + infile, err := os.Open(filepath.Join(dir, filename)) + require.NoError(t, err) + gzr, err := gzip.NewReader(infile) + require.NoError(t, err) + outfile, err := os.Create(filepath.Join(shortDir, filename)) + require.NoError(t, err) + gzw := gzip.NewWriter(outfile) + ls := lineScanner{ + br: byteReader{ + r: gzr, + }, + } + for i := 0; i < 10; i++ { + ls.scan() + b := ls.bytes() + if i == 9 { + b = bytes.TrimSpace(b) + } + _, err = gzw.Write(b) + require.NoError(t, err) + require.NoError(t, ls.error()) + } + require.NoError(t, gzw.Close()) + require.NoError(t, gzr.Close()) + require.NoError(t, infile.Close()) + require.NoError(t, outfile.Close()) + } } func setupTestServer(ctx context.Context, t *testing.T) *storage.Client { @@ -73,24 +110,23 @@ func setupTestServer(ctx context.Context, t *testing.T) *storage.Client { return client } -func TestScanner(t *testing.T) { - ctx := context.Background() - client := setupTestServer(ctx, t) - start := time.Date(2020, 10, 10, 8, 6, 0, 0, time.UTC) - end := start.Add(159 * time.Minute) - opts := &Options{ - StorageClient: client, - } - scanner, err := New(ctx, start, end, opts) +func setupShortTestServer(ctx context.Context, t *testing.T) *storage.Client { + t.Helper() + downloadTestFiles(t) + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + http.ServeFile(w, req, filepath.Join("tmp", "testfiles", "short", path.Base(req.URL.Path))) + })) + t.Cleanup(func() { + server.Close() + }) + client, err := storage.NewClient(ctx, + option.WithoutAuthentication(), + option.WithEndpoint(server.URL), + option.WithHTTPClient(server.Client()), + ) require.NoError(t, err) - var count int - for { - _, err = scanner.Next(ctx) - if err != nil { - break - } - count++ - } - require.EqualError(t, err, io.EOF.Error()) - fmt.Println(count) + t.Cleanup(func() { + require.NoError(t, client.Close()) + }) + return client } diff --git a/go.mod b/go.mod index a827e8f..8891072 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,9 @@ require ( cloud.google.com/go/storage v1.12.0 github.com/alecthomas/kong v0.2.11 github.com/json-iterator/go v1.1.10 + github.com/killa-beez/gopkgs/pool v0.0.0-20191206232703-3018f97f77a9 github.com/klauspost/compress v1.11.1 github.com/stretchr/testify v1.5.1 + golang.org/x/text v0.3.3 google.golang.org/api v0.33.0 ) diff --git a/go.sum b/go.sum index 8576ecb..a0b598d 100644 --- a/go.sum +++ b/go.sum @@ -118,6 +118,9 @@ github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1 h1:6QPYqodiu3GuPL+7mfx+NwDdp2eTkp9IfEUpgAwUN0o= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= +github.com/killa-beez/gopkgs v0.0.0-20191206232703-3018f97f77a9 h1:PUY4WPJbFGVQ0LWEkzEYfQHP0GbpQYTcgF17U15p/Qo= +github.com/killa-beez/gopkgs/pool v0.0.0-20191206232703-3018f97f77a9 h1:Y2cyTfdPSKF70zdikKlU2+Q5naUebDY9IOYZlO2QuBs= +github.com/killa-beez/gopkgs/pool v0.0.0-20191206232703-3018f97f77a9/go.mod h1:sdnu79EGO/CBZMDU1J69GCUNTUHlo610bhWoz1+xAuo= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.11.1 h1:bPb7nMRdOZYDrpPMTA3EInUQrdgoBinqUuSwlGdKDdE= github.com/klauspost/compress v1.11.1/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= diff --git a/linescanner.go b/linescanner.go new file mode 100644 index 0000000..73ecbe6 --- /dev/null +++ b/linescanner.go @@ -0,0 +1,34 @@ +package gharchive + +import "bytes" + +type lineScanner struct { + br byteReader + pos int +} + +func (s *lineScanner) scan() bool { + s.br.release(s.pos) + for { + idx := bytes.IndexByte(s.br.window(), '\n') + if idx >= 0 { + s.pos = idx + 1 + return true + } + if s.br.extend() == 0 { + s.pos = len(s.br.window()) + return s.pos > 0 + } + } +} + +func (s *lineScanner) bytes() []byte { + return s.br.window()[:s.pos] +} + +func (s *lineScanner) error() error { + if len(s.br.window()) > 0 { + return nil + } + return s.br.err +} diff --git a/linescanner_test.go b/linescanner_test.go new file mode 100644 index 0000000..3f77ad5 --- /dev/null +++ b/linescanner_test.go @@ -0,0 +1,39 @@ +package gharchive + +import ( + "encoding/json" + "os" + "path/filepath" + "testing" + + "github.com/klauspost/compress/gzip" + "github.com/stretchr/testify/require" +) + +func Test_lineScanner(t *testing.T) { + downloadTestFiles(t) + file, err := os.Open(filepath.FromSlash("tmp/testfiles/short/2020-10-10-10.json.gz")) + require.NoError(t, err) + gzr, err := gzip.NewReader(file) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, gzr.Close()) + require.NoError(t, file.Close()) + }) + ls := &lineScanner{ + br: byteReader{ + r: gzr, + data: make([]byte, 0, newBufferSize), + }, + } + var count int + for ls.scan() { + if ls.error() != nil { + break + } + require.True(t, json.Valid(ls.bytes())) + count++ + } + require.EqualError(t, ls.error(), "EOF") + require.Equal(t, 10, count) +} diff --git a/script/gharchive b/script/gharchive index d2e5496..e348928 100755 --- a/script/gharchive +++ b/script/gharchive @@ -4,5 +4,5 @@ set -e CDPATH="" cd -- "$(dirname -- "$(dirname -- "$0")")" -make -s bin/gharchive +go build -o bin/gharchive ./cmd/gharchive exec bin/gharchive "$@" diff --git a/script/test b/script/test index fbea6b0..d5db174 100755 --- a/script/test +++ b/script/test @@ -5,3 +5,4 @@ set -e CDPATH="" cd -- "$(dirname -- "$(dirname -- "$0")")" go test -covermode=atomic ./... +go test -race -short ./... diff --git a/single.go b/single.go new file mode 100644 index 0000000..a681604 --- /dev/null +++ b/single.go @@ -0,0 +1,202 @@ +package gharchive + +import ( + "context" + "io" + "strings" + "time" + + "cloud.google.com/go/storage" + "github.com/klauspost/compress/gzip" +) + +// singleScanner scans lines from gharchive +type singleScanner struct { + opts *Options + client *storage.Client + bucket string + startTime time.Time + endTime time.Time + curHour time.Time + lineScanner *lineScanner + hourReader *objReader + brBuffer []byte + err error +} + +func newSingleScanner(ctx context.Context, startTime time.Time, opts *Options) (*singleScanner, error) { + var err error + opts, err = opts.withDefaults(ctx) + if err != nil { + return nil, err + } + + endTime := opts.EndTime + if endTime.IsZero() { + endTime = startTime.Add(time.Hour) + } + return &singleScanner{ + opts: opts, + bucket: opts.Bucket, + client: opts.StorageClient, + startTime: startTime.UTC(), + endTime: endTime.UTC(), + }, nil +} + +func (s *singleScanner) iterateCurHour() { + if s.curHour.IsZero() { + s.curHour = s.startTime.Truncate(time.Hour) + return + } + s.curHour = s.curHour.Add(time.Hour) +} + +// Close closes the scanner +func (s *singleScanner) Close() error { + var err error + if s.client != nil { + err = s.client.Close() + } + if s.hourReader != nil { + hrErr := s.hourReader.Close() + if err == nil { + err = hrErr + } + } + return err +} + +func (s *singleScanner) validateLine(line []byte) bool { + for _, validator := range s.opts.Validators { + ok := validator(line) + if !ok { + return false + } + } + return true +} + +func (s *singleScanner) prepLineScanner(ctx context.Context) error { + if ctx.Err() != nil { + return ctx.Err() + } + if len(s.brBuffer) == 0 { + s.brBuffer = make([]byte, 8192) + } + if s.lineScanner != nil { + err := s.lineScanner.error() + if s.opts.SingleHour || err != io.EOF { + return err + } + } + + // starting here we know either s.lineScanner == nil or s.lineScanner.error() == io.EOF + // either way we need to do the same thing. + + if s.hourReader == nil { + s.hourReader = new(objReader) + } + s.iterateCurHour() + if s.curHour.After(s.endTime) { + return io.EOF + } + err := s.hourReader.newObj(ctx, s.curHour, s.opts) + if err != nil { + return err + } + s.lineScanner = &lineScanner{ + br: byteReader{ + data: s.brBuffer, + r: s.hourReader, + }, + } + return nil +} + +// Bytes returns the current line +func (s *singleScanner) Bytes() []byte { + return s.lineScanner.bytes() +} + +// Err returns the scanner's error +func (s *singleScanner) Err() error { + err := s.err + if err == io.EOF { + err = nil + } + return err +} + +// Scan advances to the next line +func (s *singleScanner) Scan(ctx context.Context) bool { + for { + if ctx.Err() != nil { + s.err = ctx.Err() + return false + } + err := s.prepLineScanner(ctx) + if err != nil { + s.err = err + return false + } + s.lineScanner.scan() + if s.validateLine(s.lineScanner.bytes()) { + return true + } + } +} + +type objReader struct { + rdr io.Reader + gzRdr *gzip.Reader +} + +func (z *objReader) Read(p []byte) (n int, err error) { + return z.gzRdr.Read(p) +} + +func (z *objReader) Close() error { + var err error + if z.gzRdr != nil { + err = z.gzRdr.Close() + } + if z.rdr == nil { + return err + } + if rdr, ok := z.rdr.(io.Closer); ok { + rdrErr := rdr.Close() + if rdrErr != nil { + return rdrErr + } + } + return err +} + +func (z *objReader) Reset(r io.Reader) error { + err := z.Close() + if err != nil { + return err + } + z.rdr = r + if z.gzRdr == nil { + z.gzRdr, err = gzip.NewReader(r) + return err + } + return z.gzRdr.Reset(r) +} + +func (z *objReader) newObj(ctx context.Context, hour time.Time, opts *Options) error { + var err error + tm := hour.UTC() + + // this hack is required to get a single-digit hour in the object name + obj := tm.UTC().Format("2006-01-02-") + obj += strings.TrimPrefix(tm.UTC().Format("15.json.gz"), "0") + + rdr, err := opts.StorageClient.Bucket(opts.Bucket).Object(obj).NewReader(ctx) + if err != nil { + return err + } + return z.Reset(rdr) +} diff --git a/single_test.go b/single_test.go new file mode 100644 index 0000000..dd7e698 --- /dev/null +++ b/single_test.go @@ -0,0 +1,90 @@ +package gharchive + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func Test_singleScanner(t *testing.T) { + t.Run("short", func(t *testing.T) { + t.Run("multi-hour", func(t *testing.T) { + ctx := context.Background() + client := setupShortTestServer(ctx, t) + start := time.Date(2020, 10, 10, 8, 6, 0, 0, time.UTC) + opts := &Options{ + StorageClient: client, + EndTime: start.Add(159 * time.Minute), + } + scanner, err := New(ctx, start, opts) + require.NoError(t, err) + var count int + for scanner.Scan(ctx) { + count++ + } + require.NoError(t, scanner.Err()) + require.Equal(t, 33, count) + }) + + t.Run("single hour", func(t *testing.T) { + ctx := context.Background() + client := setupShortTestServer(ctx, t) + start := time.Date(2020, 10, 10, 10, 6, 0, 0, time.UTC) + opts := &Options{ + StorageClient: client, + SingleHour: true, + } + scanner, err := New(ctx, start, opts) + require.NoError(t, err) + var got [][]byte + for scanner.Scan(ctx) { + got = append(got, scanner.Bytes()) + } + require.NoError(t, scanner.Err()) + require.Len(t, got, 11) + }) + }) + + t.Run("regular", func(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + t.Run("multi-hour", func(t *testing.T) { + ctx := context.Background() + client := setupTestServer(ctx, t) + start := time.Date(2020, 10, 10, 8, 6, 0, 0, time.UTC) + opts := &Options{ + StorageClient: client, + EndTime: start.Add(159 * time.Minute), + } + scanner, err := New(ctx, start, opts) + require.NoError(t, err) + var count int + for scanner.Scan(ctx) { + count++ + } + require.NoError(t, scanner.Err()) + require.Equal(t, 280628, count) + }) + + t.Run("single hour", func(t *testing.T) { + ctx := context.Background() + client := setupTestServer(ctx, t) + start := time.Date(2020, 10, 10, 8, 6, 0, 0, time.UTC) + opts := &Options{ + StorageClient: client, + SingleHour: true, + } + scanner, err := New(ctx, start, opts) + require.NoError(t, err) + var count int + for scanner.Scan(ctx) { + count++ + } + require.NoError(t, scanner.Err()) + require.Equal(t, 92993, count) + }) + }) +}