From a138e421cd40651f5577c7fb445722d8a36ce7f4 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 23 Jun 2021 12:16:31 -0400 Subject: [PATCH 01/16] speccing out unordered head block --- go.mod | 1 + go.sum | 6 + pkg/chunkenc/unordered.go | 168 +++++++++++ .../Workiva/go-datastructures/LICENSE | 202 +++++++++++++ .../go-datastructures/rangetree/entries.go | 45 +++ .../go-datastructures/rangetree/error.go | 40 +++ .../go-datastructures/rangetree/immutable.go | 275 ++++++++++++++++++ .../go-datastructures/rangetree/interface.go | 82 ++++++ .../go-datastructures/rangetree/node.go | 37 +++ .../go-datastructures/rangetree/ordered.go | 241 +++++++++++++++ .../rangetree/orderedtree.go | 263 +++++++++++++++++ .../Workiva/go-datastructures/slice/int64.go | 91 ++++++ vendor/modules.txt | 4 + 13 files changed, 1455 insertions(+) create mode 100644 pkg/chunkenc/unordered.go create mode 100644 vendor/github.com/Workiva/go-datastructures/LICENSE create mode 100644 vendor/github.com/Workiva/go-datastructures/rangetree/entries.go create mode 100644 vendor/github.com/Workiva/go-datastructures/rangetree/error.go create mode 100644 vendor/github.com/Workiva/go-datastructures/rangetree/immutable.go create mode 100644 vendor/github.com/Workiva/go-datastructures/rangetree/interface.go create mode 100644 vendor/github.com/Workiva/go-datastructures/rangetree/node.go create mode 100644 vendor/github.com/Workiva/go-datastructures/rangetree/ordered.go create mode 100644 vendor/github.com/Workiva/go-datastructures/rangetree/orderedtree.go create mode 100644 vendor/github.com/Workiva/go-datastructures/slice/int64.go diff --git a/go.mod b/go.mod index 86c6a0dd2385..ffa5693228f1 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( cloud.google.com/go/pubsub v1.3.1 github.com/Masterminds/sprig/v3 v3.2.2 github.com/NYTimes/gziphandler v1.1.1 + github.com/Workiva/go-datastructures v1.0.53 github.com/aws/aws-lambda-go v1.17.0 github.com/bmatcuk/doublestar v1.2.2 github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee diff --git a/go.sum b/go.sum index 4d2d0f95490b..d3a2e85c16ca 100644 --- a/go.sum +++ b/go.sum @@ -172,6 +172,8 @@ github.com/Shopify/sarama v1.27.1/go.mod h1:g5s5osgELxgM+Md9Qni9rzo7Rbt+vvFQI4bt github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= +github.com/Workiva/go-datastructures v1.0.53 h1:J6Y/52yX10Xc5JjXmGtWoSSxs3mZnGSaq37xZZh7Yig= +github.com/Workiva/go-datastructures v1.0.53/go.mod h1:1yZL+zfsztete+ePzZz/Zb1/t5BnDuE2Ya2MMGhzP6A= github.com/abdullin/seq v0.0.0-20160510034733-d5467c17e7af/go.mod h1:5Jv4cbFiHJMsVxt52+i0Ha45fjshj6wxYr1r19tB9bw= github.com/aerospike/aerospike-client-go v1.27.0/go.mod h1:zj8LBEnWBDOVEIJt8LvaRvDG5ARAoa5dBeHaB472NRc= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= @@ -1344,6 +1346,7 @@ github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9 github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= +github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI= @@ -1608,12 +1611,14 @@ github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1C github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= +github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966 h1:j6JEOq5QWFker+d7mFQYOhjTZonQ7YkLTHm56dbn+yM= github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tonistiigi/fifo v0.0.0-20190226154929-a9fb20d87448 h1:hbyjqt5UnyKeOT3rFVxLxi7iTI6XqR2p4TkwEAQdUiw= github.com/tonistiigi/fifo v0.0.0-20190226154929-a9fb20d87448/go.mod h1:Q5IRRDY+cjIaiOjTAnXN5LKQV5MPqVx5ofQn85Jy5Yw= +github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/uber/jaeger-client-go v2.15.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-client-go v2.20.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= @@ -2147,6 +2152,7 @@ golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE= golang.org/x/tools v0.0.0-20201014170642-d1624618ad65/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU= golang.org/x/tools v0.0.0-20201020161133-226fd2f889ca/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU= +golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201119054027-25dc3e1ccc3c/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go new file mode 100644 index 000000000000..d8ada16ab91a --- /dev/null +++ b/pkg/chunkenc/unordered.go @@ -0,0 +1,168 @@ +package chunkenc + +import ( + "context" + "time" + + "github.com/Workiva/go-datastructures/rangetree" + "github.com/grafana/loki/pkg/iter" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/logqlmodel/stats" +) + +type unorderedHeadBlock struct { + // Opted for range tree over skiplist for space reduction. + // Inserts: O(log(n)) + // Scans: (O(k+log(n))) where k=num_scanned_entries & n=total_entries + rt rangetree.RangeTree + + size int // size of uncompressed bytes. + mint, maxt int64 // upper and lower bounds +} + +func newUnorderedHeadBlock() *unorderedHeadBlock { + return &unorderedHeadBlock{ + rt: rangetree.New(1), + } +} + +func (hb *unorderedHeadBlock) isEmpty() bool { + return hb.size == 0 +} + +// collection of entries belonging to the same nanosecond +type nsEntries struct { + ts int64 + entries []string +} + +func (e *nsEntries) ValueAtDimension(_ uint64) int64 { + return e.ts +} + +func (hb *unorderedHeadBlock) append(ts int64, line string) { + // This is an allocation hack. The rangetree lib does not + // support the ability to pass a "mutate" function during an insert + // and instead will displace any existing entry at the specified timestamp. + // Since Loki supports multiple lines per timestamp, + // we insert an entry without any log lines, + // which is ordered by timestamp alone. + // Then, we detect if we've displaced any existing entries, and + // append the new one to the existing, preallocated slice. + // If not, we create a slice with one entry. + e := &nsEntries{ + ts: ts, + } + displaced := hb.rt.Add(e) + if len(displaced) > 0 { + e.entries = append(displaced[0].(*nsEntries).entries, line) + } else { + e.entries = []string{line} + } + + // Update hb metdata + if hb.size == 0 || hb.mint > ts { + hb.mint = ts + } + + if hb.maxt < ts { + hb.maxt = ts + } + + hb.size += len(line) + +} + +type interval struct { + mint, maxt int64 +} + +func (i interval) LowAtDimension(_ uint64) int64 { return i.mint } + +// rangetree library treats this as inclusive, but we want exclusivity +func (i interval) HighAtDimension(_ uint64) int64 { return i.maxt - 1 } + +func (hb *unorderedHeadBlock) iterator( + ctx context.Context, + direction logproto.Direction, + mint, + maxt int64, + pipeline log.StreamPipeline, +) iter.EntryIterator { + if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) { + return iter.NoopIterator + } + + entries := hb.rt.Query(interval{ + mint: mint, + maxt: maxt, + }) + + chunkStats := stats.GetChunkData(ctx) + + // We are doing a copy everytime, this is because b.entries could change completely, + // the alternate would be that we allocate a new b.entries everytime we cut a block, + // but the tradeoff is that queries to near-realtime data would be much lower than + // cutting of blocks. + streams := map[uint64]*logproto.Stream{} + + process := func(es *nsEntries) { + chunkStats.HeadChunkLines += int64(len(es.entries)) + + // preserve write ordering of entries with the same ts + var i int + if direction == logproto.BACKWARD { + i = len(es.entries) - 1 + } + next := func() { + if direction == logproto.FORWARD { + i++ + } else { + i-- + } + } + + for ; i < len(es.entries) && i >= 0; next() { + line := es.entries[i] + chunkStats.HeadChunkBytes += int64(len(line)) + newLine, parsedLbs, ok := pipeline.ProcessString(line) + if !ok { + return + } + var stream *logproto.Stream + lhash := parsedLbs.Hash() + if stream, ok = streams[lhash]; !ok { + stream = &logproto.Stream{ + Labels: parsedLbs.String(), + } + streams[lhash] = stream + } + + stream.Entries = append(stream.Entries, logproto.Entry{ + Timestamp: time.Unix(0, es.ts), + Line: newLine, + }) + } + + } + + if direction == logproto.FORWARD { + for _, e := range entries { + process(e.(*nsEntries)) + } + } else { + for i := len(entries) - 1; i >= 0; i-- { + process(entries[i].(*nsEntries)) + } + } + + if len(streams) == 0 { + return iter.NoopIterator + } + streamsResult := make([]logproto.Stream, 0, len(streams)) + for _, stream := range streams { + streamsResult = append(streamsResult, *stream) + } + return iter.NewStreamsIterator(ctx, streamsResult, direction) +} diff --git a/vendor/github.com/Workiva/go-datastructures/LICENSE b/vendor/github.com/Workiva/go-datastructures/LICENSE new file mode 100644 index 000000000000..7a4a3ea2424c --- /dev/null +++ b/vendor/github.com/Workiva/go-datastructures/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/vendor/github.com/Workiva/go-datastructures/rangetree/entries.go b/vendor/github.com/Workiva/go-datastructures/rangetree/entries.go new file mode 100644 index 000000000000..0206fd8754e1 --- /dev/null +++ b/vendor/github.com/Workiva/go-datastructures/rangetree/entries.go @@ -0,0 +1,45 @@ +/* +Copyright 2014 Workiva, LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rangetree + +import "sync" + +var entriesPool = sync.Pool{ + New: func() interface{} { + return make(Entries, 0, 10) + }, +} + +// Entries is a typed list of Entry that can be reused if Dispose +// is called. +type Entries []Entry + +// Dispose will free the resources consumed by this list and +// allow the list to be reused. +func (entries *Entries) Dispose() { + for i := 0; i < len(*entries); i++ { + (*entries)[i] = nil + } + + *entries = (*entries)[:0] + entriesPool.Put(*entries) +} + +// NewEntries will return a reused list of entries. +func NewEntries() Entries { + return entriesPool.Get().(Entries) +} diff --git a/vendor/github.com/Workiva/go-datastructures/rangetree/error.go b/vendor/github.com/Workiva/go-datastructures/rangetree/error.go new file mode 100644 index 000000000000..3166c2cbc8d3 --- /dev/null +++ b/vendor/github.com/Workiva/go-datastructures/rangetree/error.go @@ -0,0 +1,40 @@ +/* +Copyright 2014 Workiva, LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rangetree + +import "fmt" + +// NoEntriesError is returned from an operation that requires +// existing entries when none are found. +type NoEntriesError struct{} + +func (nee NoEntriesError) Error() string { + return `No entries in this tree.` +} + +// OutOfDimensionError is returned when a requested operation +// doesn't meet dimensional requirements. +type OutOfDimensionError struct { + provided, max uint64 +} + +func (oode OutOfDimensionError) Error() string { + return fmt.Sprintf(`Provided dimension: %d is + greater than max dimension: %d`, + oode.provided, oode.max, + ) +} diff --git a/vendor/github.com/Workiva/go-datastructures/rangetree/immutable.go b/vendor/github.com/Workiva/go-datastructures/rangetree/immutable.go new file mode 100644 index 000000000000..89b42fa11d64 --- /dev/null +++ b/vendor/github.com/Workiva/go-datastructures/rangetree/immutable.go @@ -0,0 +1,275 @@ +/* +Copyright 2014 Workiva, LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rangetree + +import "github.com/Workiva/go-datastructures/slice" + +type immutableRangeTree struct { + number uint64 + top orderedNodes + dimensions uint64 +} + +func newCache(dimensions uint64) []slice.Int64Slice { + cache := make([]slice.Int64Slice, 0, dimensions-1) + for i := uint64(0); i < dimensions; i++ { + cache = append(cache, slice.Int64Slice{}) + } + return cache +} + +func (irt *immutableRangeTree) needNextDimension() bool { + return irt.dimensions > 1 +} + +func (irt *immutableRangeTree) add(nodes *orderedNodes, cache []slice.Int64Slice, entry Entry, added *uint64) { + var node *node + list := nodes + + for i := uint64(1); i <= irt.dimensions; i++ { + if isLastDimension(irt.dimensions, i) { + if i != 1 && !cache[i-1].Exists(node.value) { + nodes := make(orderedNodes, len(*list)) + copy(nodes, *list) + list = &nodes + cache[i-1].Insert(node.value) + } + + newNode := newNode(entry.ValueAtDimension(i), entry, false) + overwritten := list.add(newNode) + if overwritten == nil { + *added++ + } + if node != nil { + node.orderedNodes = *list + } + break + } + + if i != 1 && !cache[i-1].Exists(node.value) { + nodes := make(orderedNodes, len(*list)) + copy(nodes, *list) + list = &nodes + cache[i-1].Insert(node.value) + node.orderedNodes = *list + } + + node, _ = list.getOrAdd(entry, i, irt.dimensions) + list = &node.orderedNodes + } +} + +// Add will add the provided entries into the tree and return +// a new tree with those entries added. +func (irt *immutableRangeTree) Add(entries ...Entry) *immutableRangeTree { + if len(entries) == 0 { + return irt + } + + cache := newCache(irt.dimensions) + top := make(orderedNodes, len(irt.top)) + copy(top, irt.top) + added := uint64(0) + for _, entry := range entries { + irt.add(&top, cache, entry, &added) + } + + tree := newImmutableRangeTree(irt.dimensions) + tree.top = top + tree.number = irt.number + added + return tree +} + +// InsertAtDimension will increment items at and above the given index +// by the number provided. Provide a negative number to to decrement. +// Returned are two lists and the modified tree. The first list is a +// list of entries that were moved. The second is a list entries that +// were deleted. These lists are exclusive. +func (irt *immutableRangeTree) InsertAtDimension(dimension uint64, + index, number int64) (*immutableRangeTree, Entries, Entries) { + + if dimension > irt.dimensions || number == 0 { + return irt, nil, nil + } + + modified, deleted := make(Entries, 0, 100), make(Entries, 0, 100) + + tree := newImmutableRangeTree(irt.dimensions) + tree.top = irt.top.immutableInsert( + dimension, 1, irt.dimensions, + index, number, + &modified, &deleted, + ) + tree.number = irt.number - uint64(len(deleted)) + + return tree, modified, deleted +} + +type immutableNodeBundle struct { + list *orderedNodes + index int + previousNode *node + newNode *node +} + +func (irt *immutableRangeTree) Delete(entries ...Entry) *immutableRangeTree { + cache := newCache(irt.dimensions) + top := make(orderedNodes, len(irt.top)) + copy(top, irt.top) + deleted := uint64(0) + for _, entry := range entries { + irt.delete(&top, cache, entry, &deleted) + } + + tree := newImmutableRangeTree(irt.dimensions) + tree.top = top + tree.number = irt.number - deleted + return tree +} + +func (irt *immutableRangeTree) delete(top *orderedNodes, + cache []slice.Int64Slice, entry Entry, deleted *uint64) { + + path := make([]*immutableNodeBundle, 0, 5) + var index int + var n *node + var local *node + list := top + + for i := uint64(1); i <= irt.dimensions; i++ { + value := entry.ValueAtDimension(i) + local, index = list.get(value) + if local == nil { // there's nothing to delete + return + } + + nb := &immutableNodeBundle{ + list: list, + index: index, + previousNode: n, + } + path = append(path, nb) + n = local + list = &n.orderedNodes + } + + *deleted++ + + for i := len(path) - 1; i >= 0; i-- { + nb := path[i] + if nb.previousNode != nil { + nodes := make(orderedNodes, len(*nb.list)) + copy(nodes, *nb.list) + nb.list = &nodes + if len(*nb.list) == 1 { + continue + } + nn := newNode( + nb.previousNode.value, + nb.previousNode.entry, + !isLastDimension(irt.dimensions, uint64(i)+1), + ) + nn.orderedNodes = nodes + path[i-1].newNode = nn + } + } + + for _, nb := range path { + if nb.newNode == nil { + nb.list.deleteAt(nb.index) + } else { + (*nb.list)[nb.index] = nb.newNode + } + } +} + +func (irt *immutableRangeTree) apply(list orderedNodes, interval Interval, + dimension uint64, fn func(*node) bool) bool { + + low, high := interval.LowAtDimension(dimension), interval.HighAtDimension(dimension) + + if isLastDimension(irt.dimensions, dimension) { + if !list.apply(low, high, fn) { + return false + } + } else { + if !list.apply(low, high, func(n *node) bool { + if !irt.apply(n.orderedNodes, interval, dimension+1, fn) { + return false + } + return true + }) { + return false + } + return true + } + + return true +} + +// Query will return an ordered list of results in the given +// interval. +func (irt *immutableRangeTree) Query(interval Interval) Entries { + entries := NewEntries() + + irt.apply(irt.top, interval, 1, func(n *node) bool { + entries = append(entries, n.entry) + return true + }) + + return entries +} + +func (irt *immutableRangeTree) get(entry Entry) Entry { + on := irt.top + for i := uint64(1); i <= irt.dimensions; i++ { + n, _ := on.get(entry.ValueAtDimension(i)) + if n == nil { + return nil + } + if i == irt.dimensions { + return n.entry + } + on = n.orderedNodes + } + + return nil +} + +// Get returns any entries that exist at the addresses provided by the +// given entries. Entries are returned in the order in which they are +// received. If an entry cannot be found, a nil is returned in its +// place. +func (irt *immutableRangeTree) Get(entries ...Entry) Entries { + result := make(Entries, 0, len(entries)) + for _, entry := range entries { + result = append(result, irt.get(entry)) + } + + return result +} + +// Len returns the number of items in this tree. +func (irt *immutableRangeTree) Len() uint64 { + return irt.number +} + +func newImmutableRangeTree(dimensions uint64) *immutableRangeTree { + return &immutableRangeTree{ + dimensions: dimensions, + } +} diff --git a/vendor/github.com/Workiva/go-datastructures/rangetree/interface.go b/vendor/github.com/Workiva/go-datastructures/rangetree/interface.go new file mode 100644 index 000000000000..17f1e46a358c --- /dev/null +++ b/vendor/github.com/Workiva/go-datastructures/rangetree/interface.go @@ -0,0 +1,82 @@ +/* +Copyright 2014 Workiva, LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Package rangetree is designed to store n-dimensional data in an easy-to-query +way. Given this package's primary use as representing cartesian data, this +information is represented by int64s at n-dimensions. This implementation +is not actually a tree but a sparse n-dimensional list. This package also +includes two implementations of this sparse list, one mutable (and not threadsafe) +and another that is immutable copy-on-write which is threadsafe. The mutable +version is obviously faster but will likely have write contention for any +consumer that needs a threadsafe rangetree. + +TODO: unify both implementations with the same interface. +*/ +package rangetree + +// Entry defines items that can be added to the rangetree. +type Entry interface { + // ValueAtDimension returns the value of this entry + // at the specified dimension. + ValueAtDimension(dimension uint64) int64 +} + +// Interval describes the methods required to query the rangetree. Note that +// all ranges are inclusive. +type Interval interface { + // LowAtDimension returns an integer representing the lower bound + // at the requested dimension. + LowAtDimension(dimension uint64) int64 + // HighAtDimension returns an integer representing the higher bound + // at the request dimension. + HighAtDimension(dimension uint64) int64 +} + +// RangeTree describes the methods available to the rangetree. +type RangeTree interface { + // Add will add the provided entries to the tree. Any entries that + // were overwritten will be returned in the order in which they + // were overwritten. If an entry's addition does not overwrite, a nil + // is returned for that entry's index in the provided cells. + Add(entries ...Entry) Entries + // Len returns the number of entries in the tree. + Len() uint64 + // Delete will remove the provided entries from the tree. + // Any entries that were deleted will be returned in the order in + // which they were deleted. If an entry does not exist to be deleted, + // a nil is returned for that entry's index in the provided cells. + Delete(entries ...Entry) Entries + // Query will return a list of entries that fall within + // the provided interval. The values at dimensions are inclusive. + Query(interval Interval) Entries + // Apply will call the provided function with each entry that exists + // within the provided range, in order. Return false at any time to + // cancel iteration. Altering the entry in such a way that its location + // changes will result in undefined behavior. + Apply(interval Interval, fn func(Entry) bool) + // Get returns any entries that exist at the addresses provided by the + // given entries. Entries are returned in the order in which they are + // received. If an entry cannot be found, a nil is returned in its + // place. + Get(entries ...Entry) Entries + // InsertAtDimension will increment items at and above the given index + // by the number provided. Provide a negative number to to decrement. + // Returned are two lists. The first list is a list of entries that + // were moved. The second is a list entries that were deleted. These + // lists are exclusive. + InsertAtDimension(dimension uint64, index, number int64) (Entries, Entries) +} diff --git a/vendor/github.com/Workiva/go-datastructures/rangetree/node.go b/vendor/github.com/Workiva/go-datastructures/rangetree/node.go new file mode 100644 index 000000000000..83c933d70e4b --- /dev/null +++ b/vendor/github.com/Workiva/go-datastructures/rangetree/node.go @@ -0,0 +1,37 @@ +/* +Copyright 2014 Workiva, LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rangetree + +type nodes []*node + +type node struct { + value int64 + entry Entry + orderedNodes orderedNodes +} + +func newNode(value int64, entry Entry, needNextDimension bool) *node { + n := &node{} + n.value = value + if needNextDimension { + n.orderedNodes = make(orderedNodes, 0, 10) + } else { + n.entry = entry + } + + return n +} diff --git a/vendor/github.com/Workiva/go-datastructures/rangetree/ordered.go b/vendor/github.com/Workiva/go-datastructures/rangetree/ordered.go new file mode 100644 index 000000000000..6ec1b82f20f7 --- /dev/null +++ b/vendor/github.com/Workiva/go-datastructures/rangetree/ordered.go @@ -0,0 +1,241 @@ +/* +Copyright 2014 Workiva, LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rangetree + +import "sort" + +// orderedNodes represents an ordered list of points living +// at the last dimension. No duplicates can be inserted here. +type orderedNodes nodes + +func (nodes orderedNodes) search(value int64) int { + return sort.Search( + len(nodes), + func(i int) bool { return nodes[i].value >= value }, + ) +} + +// addAt will add the provided node at the provided index. Returns +// a node if one was overwritten. +func (nodes *orderedNodes) addAt(i int, node *node) *node { + if i == len(*nodes) { + *nodes = append(*nodes, node) + return nil + } + + if (*nodes)[i].value == node.value { + overwritten := (*nodes)[i] + // this is a duplicate, there can't be a duplicate + // point in the last dimension + (*nodes)[i] = node + return overwritten + } + + *nodes = append(*nodes, nil) + copy((*nodes)[i+1:], (*nodes)[i:]) + (*nodes)[i] = node + return nil +} + +func (nodes *orderedNodes) add(node *node) *node { + i := nodes.search(node.value) + return nodes.addAt(i, node) +} + +func (nodes *orderedNodes) deleteAt(i int) *node { + if i >= len(*nodes) { // no matching found + return nil + } + + deleted := (*nodes)[i] + copy((*nodes)[i:], (*nodes)[i+1:]) + (*nodes)[len(*nodes)-1] = nil + *nodes = (*nodes)[:len(*nodes)-1] + return deleted +} + +func (nodes *orderedNodes) delete(value int64) *node { + i := nodes.search(value) + + if (*nodes)[i].value != value || i == len(*nodes) { + return nil + } + + return nodes.deleteAt(i) +} + +func (nodes orderedNodes) apply(low, high int64, fn func(*node) bool) bool { + index := nodes.search(low) + if index == len(nodes) { + return true + } + + for ; index < len(nodes); index++ { + if nodes[index].value > high { + break + } + + if !fn(nodes[index]) { + return false + } + } + + return true +} + +func (nodes orderedNodes) get(value int64) (*node, int) { + i := nodes.search(value) + if i == len(nodes) { + return nil, i + } + + if nodes[i].value == value { + return nodes[i], i + } + + return nil, i +} + +func (nodes *orderedNodes) getOrAdd(entry Entry, + dimension, lastDimension uint64) (*node, bool) { + + isLastDimension := isLastDimension(lastDimension, dimension) + value := entry.ValueAtDimension(dimension) + + i := nodes.search(value) + if i == len(*nodes) { + node := newNode(value, entry, !isLastDimension) + *nodes = append(*nodes, node) + return node, true + } + + if (*nodes)[i].value == value { + return (*nodes)[i], false + } + + node := newNode(value, entry, !isLastDimension) + *nodes = append(*nodes, nil) + copy((*nodes)[i+1:], (*nodes)[i:]) + (*nodes)[i] = node + return node, true +} + +func (nodes orderedNodes) flatten(entries *Entries) { + for _, node := range nodes { + if node.orderedNodes != nil { + node.orderedNodes.flatten(entries) + } else { + *entries = append(*entries, node.entry) + } + } +} + +func (nodes *orderedNodes) insert(insertDimension, dimension, maxDimension uint64, + index, number int64, modified, deleted *Entries) { + + lastDimension := isLastDimension(maxDimension, dimension) + + if insertDimension == dimension { + i := nodes.search(index) + var toDelete []int + + for j := i; j < len(*nodes); j++ { + (*nodes)[j].value += number + if (*nodes)[j].value < index { + toDelete = append(toDelete, j) + if lastDimension { + *deleted = append(*deleted, (*nodes)[j].entry) + } else { + (*nodes)[j].orderedNodes.flatten(deleted) + } + continue + } + if lastDimension { + *modified = append(*modified, (*nodes)[j].entry) + } else { + (*nodes)[j].orderedNodes.flatten(modified) + } + } + + for i, index := range toDelete { + nodes.deleteAt(index - i) + } + + return + } + + for _, node := range *nodes { + node.orderedNodes.insert( + insertDimension, dimension+1, maxDimension, + index, number, modified, deleted, + ) + } +} + +func (nodes orderedNodes) immutableInsert(insertDimension, dimension, maxDimension uint64, + index, number int64, modified, deleted *Entries) orderedNodes { + + lastDimension := isLastDimension(maxDimension, dimension) + + cp := make(orderedNodes, len(nodes)) + copy(cp, nodes) + + if insertDimension == dimension { + i := cp.search(index) + var toDelete []int + + for j := i; j < len(cp); j++ { + nn := newNode(cp[j].value+number, cp[j].entry, !lastDimension) + nn.orderedNodes = cp[j].orderedNodes + cp[j] = nn + if cp[j].value < index { + toDelete = append(toDelete, j) + if lastDimension { + *deleted = append(*deleted, cp[j].entry) + } else { + cp[j].orderedNodes.flatten(deleted) + } + continue + } + if lastDimension { + *modified = append(*modified, cp[j].entry) + } else { + cp[j].orderedNodes.flatten(modified) + } + } + + for _, index := range toDelete { + cp.deleteAt(index) + } + + return cp + } + + for i := 0; i < len(cp); i++ { + oldNode := nodes[i] + nn := newNode(oldNode.value, oldNode.entry, !lastDimension) + nn.orderedNodes = oldNode.orderedNodes.immutableInsert( + insertDimension, dimension+1, + maxDimension, + index, number, + modified, deleted, + ) + cp[i] = nn + } + + return cp +} diff --git a/vendor/github.com/Workiva/go-datastructures/rangetree/orderedtree.go b/vendor/github.com/Workiva/go-datastructures/rangetree/orderedtree.go new file mode 100644 index 000000000000..09a47b4484a9 --- /dev/null +++ b/vendor/github.com/Workiva/go-datastructures/rangetree/orderedtree.go @@ -0,0 +1,263 @@ +/* +Copyright 2014 Workiva, LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package rangetree + +func isLastDimension(value, test uint64) bool { + return test >= value +} + +type nodeBundle struct { + list *orderedNodes + index int +} + +type orderedTree struct { + top orderedNodes + number uint64 + dimensions uint64 + path []*nodeBundle +} + +func (ot *orderedTree) resetPath() { + ot.path = ot.path[:0] +} + +func (ot *orderedTree) needNextDimension() bool { + return ot.dimensions > 1 +} + +// add will add the provided entry to the rangetree and return an +// entry if one was overwritten. +func (ot *orderedTree) add(entry Entry) *node { + var node *node + list := &ot.top + + for i := uint64(1); i <= ot.dimensions; i++ { + if isLastDimension(ot.dimensions, i) { + overwritten := list.add( + newNode(entry.ValueAtDimension(i), entry, false), + ) + if overwritten == nil { + ot.number++ + } + return overwritten + } + node, _ = list.getOrAdd(entry, i, ot.dimensions) + list = &node.orderedNodes + } + + return nil +} + +// Add will add the provided entries to the tree. This method +// returns a list of entries that were overwritten in the order +// in which entries were received. If an entry doesn't overwrite +// anything, a nil will be returned for that entry in the returned +// slice. +func (ot *orderedTree) Add(entries ...Entry) Entries { + if len(entries) == 0 { + return nil + } + + overwrittens := make(Entries, len(entries)) + for i, entry := range entries { + if entry == nil { + continue + } + + overwritten := ot.add(entry) + if overwritten != nil { + overwrittens[i] = overwritten.entry + } + } + + return overwrittens +} + +func (ot *orderedTree) delete(entry Entry) *node { + ot.resetPath() + var index int + var node *node + list := &ot.top + + for i := uint64(1); i <= ot.dimensions; i++ { + value := entry.ValueAtDimension(i) + node, index = list.get(value) + if node == nil { // there's nothing to delete + return nil + } + + nb := &nodeBundle{list: list, index: index} + ot.path = append(ot.path, nb) + + list = &node.orderedNodes + } + + ot.number-- + + for i := len(ot.path) - 1; i >= 0; i-- { + nb := ot.path[i] + nb.list.deleteAt(nb.index) + if len(*nb.list) > 0 { + break + } + } + + return node +} + +func (ot *orderedTree) get(entry Entry) Entry { + on := ot.top + for i := uint64(1); i <= ot.dimensions; i++ { + n, _ := on.get(entry.ValueAtDimension(i)) + if n == nil { + return nil + } + if i == ot.dimensions { + return n.entry + } + on = n.orderedNodes + } + + return nil +} + +// Get returns any entries that exist at the addresses provided by the +// given entries. Entries are returned in the order in which they are +// received. If an entry cannot be found, a nil is returned in its +// place. +func (ot *orderedTree) Get(entries ...Entry) Entries { + result := make(Entries, 0, len(entries)) + for _, entry := range entries { + result = append(result, ot.get(entry)) + } + + return result +} + +// Delete will remove the provided entries from the tree. +// Any entries that were deleted will be returned in the order in +// which they were deleted. If an entry does not exist to be deleted, +// a nil is returned for that entry's index in the provided cells. +func (ot *orderedTree) Delete(entries ...Entry) Entries { + if len(entries) == 0 { + return nil + } + + deletedEntries := make(Entries, len(entries)) + for i, entry := range entries { + if entry == nil { + continue + } + + deleted := ot.delete(entry) + if deleted != nil { + deletedEntries[i] = deleted.entry + } + } + + return deletedEntries +} + +// Len returns the number of items in the tree. +func (ot *orderedTree) Len() uint64 { + return ot.number +} + +func (ot *orderedTree) apply(list orderedNodes, interval Interval, + dimension uint64, fn func(*node) bool) bool { + + low, high := interval.LowAtDimension(dimension), interval.HighAtDimension(dimension) + + if isLastDimension(ot.dimensions, dimension) { + if !list.apply(low, high, fn) { + return false + } + } else { + if !list.apply(low, high, func(n *node) bool { + if !ot.apply(n.orderedNodes, interval, dimension+1, fn) { + return false + } + return true + }) { + return false + } + return true + } + + return true +} + +// Apply will call (in order) the provided function to every +// entry that falls within the provided interval. Any alteration +// the the entry that would result in different answers to the +// interface methods results in undefined behavior. +func (ot *orderedTree) Apply(interval Interval, fn func(Entry) bool) { + ot.apply(ot.top, interval, 1, func(n *node) bool { + return fn(n.entry) + }) +} + +// Query will return an ordered list of results in the given +// interval. +func (ot *orderedTree) Query(interval Interval) Entries { + entries := NewEntries() + + ot.apply(ot.top, interval, 1, func(n *node) bool { + entries = append(entries, n.entry) + return true + }) + + return entries +} + +// InsertAtDimension will increment items at and above the given index +// by the number provided. Provide a negative number to to decrement. +// Returned are two lists. The first list is a list of entries that +// were moved. The second is a list entries that were deleted. These +// lists are exclusive. +func (ot *orderedTree) InsertAtDimension(dimension uint64, + index, number int64) (Entries, Entries) { + + // TODO: perhaps return an error here? + if dimension > ot.dimensions || number == 0 { + return nil, nil + } + + modified := make(Entries, 0, 100) + deleted := make(Entries, 0, 100) + + ot.top.insert(dimension, 1, ot.dimensions, + index, number, &modified, &deleted, + ) + + ot.number -= uint64(len(deleted)) + + return modified, deleted +} + +func newOrderedTree(dimensions uint64) *orderedTree { + return &orderedTree{ + dimensions: dimensions, + path: make([]*nodeBundle, 0, dimensions), + } +} + +// New is the constructor to create a new rangetree with +// the provided number of dimensions. +func New(dimensions uint64) RangeTree { + return newOrderedTree(dimensions) +} diff --git a/vendor/github.com/Workiva/go-datastructures/slice/int64.go b/vendor/github.com/Workiva/go-datastructures/slice/int64.go new file mode 100644 index 000000000000..b6526db82c48 --- /dev/null +++ b/vendor/github.com/Workiva/go-datastructures/slice/int64.go @@ -0,0 +1,91 @@ +/* +Copyright 2014 Workiva, LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Package Int64 simply adds an Int64-typed version of the standard library's +sort/IntSlice implementation. + +Also added is an Insert method. +*/ +package slice + +import "sort" + +// Int64Slice is a slice that fulfills the sort.Interface interface. +type Int64Slice []int64 + +// Len returns the len of this slice. Required by sort.Interface. +func (s Int64Slice) Len() int { + return len(s) +} + +// Less returns a bool indicating if the value at position i +// is less than at position j. Required by sort.Interface. +func (s Int64Slice) Less(i, j int) bool { + return s[i] < s[j] +} + +// Search will search this slice and return an index that corresponds +// to the lowest position of that value. You'll need to check +// separately if the value at that position is equal to x. The +// behavior of this method is undefinited if the slice is not sorted. +func (s Int64Slice) Search(x int64) int { + return sort.Search(len(s), func(i int) bool { + return s[i] >= x + }) +} + +// Sort will in-place sort this list of int64s. +func (s Int64Slice) Sort() { + sort.Sort(s) +} + +// Swap will swap the elements at positions i and j. This is required +// by sort.Interface. +func (s Int64Slice) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +// Exists returns a bool indicating if the provided value exists +// in this list. This has undefined behavior if the list is not +// sorted. +func (s Int64Slice) Exists(x int64) bool { + i := s.Search(x) + if i == len(s) { + return false + } + + return s[i] == x +} + +// Insert will insert x into the sorted position in this list +// and return a list with the value added. If this slice has not +// been sorted Insert's behavior is undefined. +func (s Int64Slice) Insert(x int64) Int64Slice { + i := s.Search(x) + if i == len(s) { + return append(s, x) + } + + if s[i] == x { + return s + } + + s = append(s, 0) + copy(s[i+1:], s[i:]) + s[i] = x + return s +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 748861ff7cad..9122692cde4a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -68,6 +68,10 @@ github.com/NYTimes/gziphandler github.com/PuerkitoBio/purell # github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 github.com/PuerkitoBio/urlesc +# github.com/Workiva/go-datastructures v1.0.53 +## explicit +github.com/Workiva/go-datastructures/rangetree +github.com/Workiva/go-datastructures/slice # github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 github.com/alecthomas/template github.com/alecthomas/template/parse From c79a599bde5cf4cde3e7a9ff500e2b99509260a0 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 23 Jun 2021 13:27:41 -0400 Subject: [PATCH 02/16] testware & unordered serialise --- pkg/chunkenc/memchunk_test.go | 7 +- pkg/chunkenc/unordered.go | 56 ++++++++++- pkg/chunkenc/unordered_test.go | 169 +++++++++++++++++++++++++++++++++ 3 files changed, 227 insertions(+), 5 deletions(-) create mode 100644 pkg/chunkenc/unordered_test.go diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index b5a254d68fba..63edb14c6f5d 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -40,10 +40,9 @@ var testEncoding = []Encoding{ } var ( - testBlockSize = 256 * 1024 - testTargetSize = 1500 * 1024 - noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{}) - countExtractor = func() log.StreamSampleExtractor { + testBlockSize = 256 * 1024 + testTargetSize = 1500 * 1024 + countExtractor = func() log.StreamSampleExtractor { ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false) if err != nil { panic(err) diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index d8ada16ab91a..6dec0f5b9bf1 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -1,7 +1,10 @@ package chunkenc import ( + "bytes" "context" + "encoding/binary" + "math" "time" "github.com/Workiva/go-datastructures/rangetree" @@ -9,6 +12,12 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/logqlmodel/stats" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" +) + +var ( + noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{}) ) type unorderedHeadBlock struct { @@ -17,6 +26,7 @@ type unorderedHeadBlock struct { // Scans: (O(k+log(n))) where k=num_scanned_entries & n=total_entries rt rangetree.RangeTree + lines int // number of entries size int // size of uncompressed bytes. mint, maxt int64 // upper and lower bounds } @@ -55,7 +65,7 @@ func (hb *unorderedHeadBlock) append(ts int64, line string) { ts: ts, } displaced := hb.rt.Add(e) - if len(displaced) > 0 { + if displaced[0] != nil { e.entries = append(displaced[0].(*nsEntries).entries, line) } else { e.entries = []string{line} @@ -71,6 +81,7 @@ func (hb *unorderedHeadBlock) append(ts int64, line string) { } hb.size += len(line) + hb.lines++ } @@ -166,3 +177,46 @@ func (hb *unorderedHeadBlock) iterator( } return iter.NewStreamsIterator(ctx, streamsResult, direction) } + +func (hb *unorderedHeadBlock) serialise(pool WriterPool) ([]byte, error) { + inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer) + defer func() { + inBuf.Reset() + serializeBytesBufferPool.Put(inBuf) + }() + outBuf := &bytes.Buffer{} + + encBuf := make([]byte, binary.MaxVarintLen64) + compressedWriter := pool.GetWriter(outBuf) + defer pool.PutWriter(compressedWriter) + + itr := hb.iterator( + context.Background(), + logproto.FORWARD, + 0, + math.MaxInt64, + noopStreamPipeline, + ) + + // TODO(owen-d): we don't have to reuse the iterator implementation here + // and could avoid allocations due to re-casting the underlying types. + for itr.Next() { + e := itr.Entry() + n := binary.PutVarint(encBuf, e.Timestamp.UnixNano()) + inBuf.Write(encBuf[:n]) + + n = binary.PutUvarint(encBuf, uint64(len(e.Line))) + inBuf.Write(encBuf[:n]) + + inBuf.WriteString(e.Line) + } + + if _, err := compressedWriter.Write(inBuf.Bytes()); err != nil { + return nil, errors.Wrap(err, "appending entry") + } + if err := compressedWriter.Close(); err != nil { + return nil, errors.Wrap(err, "flushing pending compress buffer") + } + + return outBuf.Bytes(), nil +} diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go new file mode 100644 index 000000000000..e3f26d919683 --- /dev/null +++ b/pkg/chunkenc/unordered_test.go @@ -0,0 +1,169 @@ +package chunkenc + +import ( + "context" + "math" + "testing" + "time" + + "github.com/grafana/loki/pkg/iter" + "github.com/grafana/loki/pkg/logproto" + "github.com/stretchr/testify/require" +) + +func iterEq(t *testing.T, exp []entry, got iter.EntryIterator, dir logproto.Direction) { + var i int + for got.Next() { + require.Equal(t, logproto.Entry{ + Timestamp: time.Unix(0, exp[i].t), + Line: exp[i].s, + }, got.Entry()) + i++ + } + require.Equal(t, i, len(exp)) +} + +func Test_Unordered_InsertRetrieval(t *testing.T) { + for _, tc := range []struct { + desc string + input, exp []entry + dir logproto.Direction + }{ + { + desc: "simple forward", + input: []entry{ + {0, "a"}, {1, "b"}, {2, "c"}, + }, + exp: []entry{ + {0, "a"}, {1, "b"}, {2, "c"}, + }, + }, + { + desc: "simple backward", + input: []entry{ + {0, "a"}, {1, "b"}, {2, "c"}, + }, + exp: []entry{ + {2, "c"}, {1, "b"}, {0, "a"}, + }, + dir: logproto.BACKWARD, + }, + { + desc: "unordered forward", + input: []entry{ + {1, "b"}, {0, "a"}, {2, "c"}, + }, + exp: []entry{ + {0, "a"}, {1, "b"}, {2, "c"}, + }, + }, + { + desc: "unordered backward", + input: []entry{ + {1, "b"}, {0, "a"}, {2, "c"}, + }, + exp: []entry{ + {2, "c"}, {1, "b"}, {0, "a"}, + }, + dir: logproto.BACKWARD, + }, + { + desc: "ts collision forward", + input: []entry{ + {0, "a"}, {0, "b"}, {1, "c"}, + }, + exp: []entry{ + {0, "a"}, {0, "b"}, {1, "c"}, + }, + }, + { + desc: "ts collision backward", + input: []entry{ + {0, "a"}, {0, "b"}, {1, "c"}, + }, + exp: []entry{ + {1, "c"}, {0, "b"}, {0, "a"}, + }, + dir: logproto.BACKWARD, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + hb := newUnorderedHeadBlock() + for _, e := range tc.input { + hb.append(e.t, e.s) + } + + itr := hb.iterator( + context.Background(), + tc.dir, + 0, + math.MaxInt64, + noopStreamPipeline, + ) + + iterEq(t, tc.exp, itr, tc.dir) + }) + } +} + +func Test_UnorderedBoundedIter(t *testing.T) { + for _, tc := range []struct { + desc string + mint, maxt int64 + dir logproto.Direction + input []entry + exp []entry + }{ + { + desc: "simple", + mint: 1, + maxt: 4, + input: []entry{ + {0, "a"}, {1, "b"}, {2, "c"}, {3, "d"}, {4, "e"}, + }, + exp: []entry{ + {1, "b"}, {2, "c"}, {3, "d"}, + }, + }, + { + desc: "simple backward", + mint: 1, + maxt: 4, + input: []entry{ + {0, "a"}, {1, "b"}, {2, "c"}, {3, "d"}, {4, "e"}, + }, + exp: []entry{ + {3, "d"}, {2, "c"}, {1, "b"}, + }, + dir: logproto.BACKWARD, + }, + { + desc: "unordered", + mint: 1, + maxt: 4, + input: []entry{ + {0, "a"}, {2, "c"}, {1, "b"}, {4, "e"}, {3, "d"}, + }, + exp: []entry{ + {1, "b"}, {2, "c"}, {3, "d"}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + hb := newUnorderedHeadBlock() + for _, e := range tc.input { + hb.append(e.t, e.s) + } + + itr := hb.iterator( + context.Background(), + tc.dir, + tc.mint, + tc.maxt, + noopStreamPipeline, + ) + + iterEq(t, tc.exp, itr, tc.dir) + }) + } +} From 46820141884c3ab08e5a8f3796e980f8452c1ceb Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 23 Jun 2021 17:21:55 -0400 Subject: [PATCH 03/16] common utils for iter & sampleIter --- pkg/chunkenc/unordered.go | 140 ++++++++++++++++++++++++++++++-------- 1 file changed, 110 insertions(+), 30 deletions(-) diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index 6dec0f5b9bf1..e986875ced9b 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -5,9 +5,11 @@ import ( "context" "encoding/binary" "math" + "sort" "time" "github.com/Workiva/go-datastructures/rangetree" + "github.com/cespare/xxhash/v2" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" @@ -94,13 +96,15 @@ func (i interval) LowAtDimension(_ uint64) int64 { return i.mint } // rangetree library treats this as inclusive, but we want exclusivity func (i interval) HighAtDimension(_ uint64) int64 { return i.maxt - 1 } -func (hb *unorderedHeadBlock) iterator( +// helper for base logic across {Entry,Sample}Iterator +func (hb *unorderedHeadBlock) buildIter( ctx context.Context, direction logproto.Direction, mint, maxt int64, - pipeline log.StreamPipeline, -) iter.EntryIterator { + entryFn func(int64, string), + finalizer func() interface{}, +) interface{} { if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) { return iter.NoopIterator } @@ -111,13 +115,6 @@ func (hb *unorderedHeadBlock) iterator( }) chunkStats := stats.GetChunkData(ctx) - - // We are doing a copy everytime, this is because b.entries could change completely, - // the alternate would be that we allocate a new b.entries everytime we cut a block, - // but the tradeoff is that queries to near-realtime data would be much lower than - // cutting of blocks. - streams := map[uint64]*logproto.Stream{} - process := func(es *nsEntries) { chunkStats.HeadChunkLines += int64(len(es.entries)) @@ -137,10 +134,49 @@ func (hb *unorderedHeadBlock) iterator( for ; i < len(es.entries) && i >= 0; next() { line := es.entries[i] chunkStats.HeadChunkBytes += int64(len(line)) + entryFn(es.ts, line) + + } + } + + if direction == logproto.FORWARD { + for _, e := range entries { + process(e.(*nsEntries)) + } + } else { + for i := len(entries) - 1; i >= 0; i-- { + process(entries[i].(*nsEntries)) + } + } + + return finalizer() +} + +func (hb *unorderedHeadBlock) iterator( + ctx context.Context, + direction logproto.Direction, + mint, + maxt int64, + pipeline log.StreamPipeline, +) iter.EntryIterator { + + // We are doing a copy everytime, this is because b.entries could change completely, + // the alternate would be that we allocate a new b.entries everytime we cut a block, + // but the tradeoff is that queries to near-realtime data would be much lower than + // cutting of blocks. + streams := map[uint64]*logproto.Stream{} + + return hb.buildIter( + ctx, + direction, + mint, + maxt, + func(ts int64, line string) { newLine, parsedLbs, ok := pipeline.ProcessString(line) if !ok { return } + var stream *logproto.Stream lhash := parsedLbs.Hash() if stream, ok = streams[lhash]; !ok { @@ -151,31 +187,75 @@ func (hb *unorderedHeadBlock) iterator( } stream.Entries = append(stream.Entries, logproto.Entry{ - Timestamp: time.Unix(0, es.ts), + Timestamp: time.Unix(0, ts), Line: newLine, }) - } + }, + func() interface{} { + if len(streams) == 0 { + return iter.NoopIterator + } + streamsResult := make([]logproto.Stream, 0, len(streams)) + for _, stream := range streams { + streamsResult = append(streamsResult, *stream) + } + return iter.NewStreamsIterator(ctx, streamsResult, direction) + }, + ).(iter.EntryIterator) +} - } +func (hb *unorderedHeadBlock) sampleIterator( + ctx context.Context, + mint, + maxt int64, + extractor log.StreamSampleExtractor, +) iter.SampleIterator { - if direction == logproto.FORWARD { - for _, e := range entries { - process(e.(*nsEntries)) - } - } else { - for i := len(entries) - 1; i >= 0; i-- { - process(entries[i].(*nsEntries)) - } - } + series := map[uint64]*logproto.Series{} - if len(streams) == 0 { - return iter.NoopIterator - } - streamsResult := make([]logproto.Stream, 0, len(streams)) - for _, stream := range streams { - streamsResult = append(streamsResult, *stream) - } - return iter.NewStreamsIterator(ctx, streamsResult, direction) + return hb.buildIter( + ctx, + logproto.FORWARD, + mint, + maxt, + func(ts int64, line string) { + value, parsedLabels, ok := extractor.ProcessString(line) + if !ok { + return + } + var found bool + var s *logproto.Series + lhash := parsedLabels.Hash() + if s, found = series[lhash]; !found { + s = &logproto.Series{ + Labels: parsedLabels.String(), + } + series[lhash] = s + } + + // []byte here doesn't create allocation because Sum64 has go:noescape directive + // It specifies that the function does not allow any of the pointers passed as arguments + // to escape into the heap or into the values returned from the function. + h := xxhash.Sum64([]byte(line)) + s.Samples = append(s.Samples, logproto.Sample{ + Timestamp: ts, + Value: value, + Hash: h, + }) + }, + func() interface{} { + if len(series) == 0 { + return iter.NoopIterator + } + seriesRes := make([]logproto.Series, 0, len(series)) + for _, s := range series { + // todo(ctovena) not sure we need this sort. + sort.Sort(s) + seriesRes = append(seriesRes, *s) + } + return iter.NewMultiSeriesIterator(ctx, seriesRes) + }, + ).(iter.SampleIterator) } func (hb *unorderedHeadBlock) serialise(pool WriterPool) ([]byte, error) { From 2941c448df5ac34c85367f092f3bc3383f632dbb Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 24 Jun 2021 10:42:34 -0400 Subject: [PATCH 04/16] more generic forEntries --- pkg/chunkenc/unordered.go | 66 +++++++++++++++++++++------------------ 1 file changed, 35 insertions(+), 31 deletions(-) diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index e986875ced9b..519f702ee837 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -97,16 +97,21 @@ func (i interval) LowAtDimension(_ uint64) int64 { return i.mint } func (i interval) HighAtDimension(_ uint64) int64 { return i.maxt - 1 } // helper for base logic across {Entry,Sample}Iterator -func (hb *unorderedHeadBlock) buildIter( +func (hb *unorderedHeadBlock) forEntries( ctx context.Context, direction logproto.Direction, mint, maxt int64, + initializer func(numEntries int), entryFn func(int64, string), - finalizer func() interface{}, -) interface{} { +) { if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) { - return iter.NoopIterator + + if initializer != nil { + initializer(0) + } + + return } entries := hb.rt.Query(interval{ @@ -149,7 +154,6 @@ func (hb *unorderedHeadBlock) buildIter( } } - return finalizer() } func (hb *unorderedHeadBlock) iterator( @@ -166,11 +170,12 @@ func (hb *unorderedHeadBlock) iterator( // cutting of blocks. streams := map[uint64]*logproto.Stream{} - return hb.buildIter( + hb.forEntries( ctx, direction, mint, maxt, + nil, func(ts int64, line string) { newLine, parsedLbs, ok := pipeline.ProcessString(line) if !ok { @@ -191,17 +196,16 @@ func (hb *unorderedHeadBlock) iterator( Line: newLine, }) }, - func() interface{} { - if len(streams) == 0 { - return iter.NoopIterator - } - streamsResult := make([]logproto.Stream, 0, len(streams)) - for _, stream := range streams { - streamsResult = append(streamsResult, *stream) - } - return iter.NewStreamsIterator(ctx, streamsResult, direction) - }, - ).(iter.EntryIterator) + ) + + if len(streams) == 0 { + return iter.NoopIterator + } + streamsResult := make([]logproto.Stream, 0, len(streams)) + for _, stream := range streams { + streamsResult = append(streamsResult, *stream) + } + return iter.NewStreamsIterator(ctx, streamsResult, direction) } func (hb *unorderedHeadBlock) sampleIterator( @@ -213,11 +217,12 @@ func (hb *unorderedHeadBlock) sampleIterator( series := map[uint64]*logproto.Series{} - return hb.buildIter( + hb.forEntries( ctx, logproto.FORWARD, mint, maxt, + nil, func(ts int64, line string) { value, parsedLabels, ok := extractor.ProcessString(line) if !ok { @@ -243,19 +248,18 @@ func (hb *unorderedHeadBlock) sampleIterator( Hash: h, }) }, - func() interface{} { - if len(series) == 0 { - return iter.NoopIterator - } - seriesRes := make([]logproto.Series, 0, len(series)) - for _, s := range series { - // todo(ctovena) not sure we need this sort. - sort.Sort(s) - seriesRes = append(seriesRes, *s) - } - return iter.NewMultiSeriesIterator(ctx, seriesRes) - }, - ).(iter.SampleIterator) + ) + + if len(series) == 0 { + return iter.NoopIterator + } + seriesRes := make([]logproto.Series, 0, len(series)) + for _, s := range series { + // todo(ctovena) not sure we need this sort. + sort.Sort(s) + seriesRes = append(seriesRes, *s) + } + return iter.NewMultiSeriesIterator(ctx, seriesRes) } func (hb *unorderedHeadBlock) serialise(pool WriterPool) ([]byte, error) { From cf1c9099910f169f6487f93c610b890233b37c50 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 24 Jun 2021 10:45:51 -0400 Subject: [PATCH 05/16] more efficient unordedHeadChunk serialise (no internal re-casting) --- pkg/chunkenc/unordered.go | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index 519f702ee837..b892e0736ecc 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -274,26 +274,22 @@ func (hb *unorderedHeadBlock) serialise(pool WriterPool) ([]byte, error) { compressedWriter := pool.GetWriter(outBuf) defer pool.PutWriter(compressedWriter) - itr := hb.iterator( + hb.forEntries( context.Background(), logproto.FORWARD, 0, math.MaxInt64, - noopStreamPipeline, - ) - - // TODO(owen-d): we don't have to reuse the iterator implementation here - // and could avoid allocations due to re-casting the underlying types. - for itr.Next() { - e := itr.Entry() - n := binary.PutVarint(encBuf, e.Timestamp.UnixNano()) - inBuf.Write(encBuf[:n]) + nil, + func(ts int64, line string) { + n := binary.PutVarint(encBuf, ts) + inBuf.Write(encBuf[:n]) - n = binary.PutUvarint(encBuf, uint64(len(e.Line))) - inBuf.Write(encBuf[:n]) + n = binary.PutUvarint(encBuf, uint64(len(line))) + inBuf.Write(encBuf[:n]) - inBuf.WriteString(e.Line) - } + inBuf.WriteString(line) + }, + ) if _, err := compressedWriter.Write(inBuf.Bytes()); err != nil { return nil, errors.Wrap(err, "appending entry") From 118f638d82f7b52319b030fa9856c526f70aba91 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 24 Jun 2021 11:45:28 -0400 Subject: [PATCH 06/16] roundtripping unordered head block, exit headchunk iteration early, add constant for current default chunk version --- pkg/chunkenc/memchunk.go | 4 +- pkg/chunkenc/unordered.go | 145 ++++++++++++++++++++++++++++++--- pkg/chunkenc/unordered_test.go | 75 +++++++++++++++++ 3 files changed, 211 insertions(+), 13 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 1c6ac2ee0ba0..7bfe65658140 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -32,6 +32,8 @@ const ( chunkFormatV2 chunkFormatV3 + DefaultChunkFormat = chunkFormatV3 // the currently used chunk format + blocksPerChunk = 10 maxLineLength = 1024 * 1024 * 1024 @@ -277,7 +279,7 @@ func NewMemChunk(enc Encoding, blockSize, targetSize int) *MemChunk { blocks: []block{}, head: &headBlock{}, - format: chunkFormatV3, + format: DefaultChunkFormat, encoding: enc, } diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index b892e0736ecc..a9fe8be292ed 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/binary" + "io" "math" "sort" "time" @@ -87,13 +88,15 @@ func (hb *unorderedHeadBlock) append(ts int64, line string) { } +// Implements rangetree.Interval type interval struct { mint, maxt int64 } func (i interval) LowAtDimension(_ uint64) int64 { return i.mint } -// rangetree library treats this as inclusive, but we want exclusivity +// rangetree library treats this as inclusive, but we want exclusivity, +// or [from, through) in nanoseconds func (i interval) HighAtDimension(_ uint64) int64 { return i.maxt - 1 } // helper for base logic across {Entry,Sample}Iterator @@ -103,8 +106,8 @@ func (hb *unorderedHeadBlock) forEntries( mint, maxt int64, initializer func(numEntries int), - entryFn func(int64, string), -) { + entryFn func(int64, string) error, // returning an error exits early +) (err error) { if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) { if initializer != nil { @@ -139,7 +142,7 @@ func (hb *unorderedHeadBlock) forEntries( for ; i < len(es.entries) && i >= 0; next() { line := es.entries[i] chunkStats.HeadChunkBytes += int64(len(line)) - entryFn(es.ts, line) + err = entryFn(es.ts, line) } } @@ -147,13 +150,20 @@ func (hb *unorderedHeadBlock) forEntries( if direction == logproto.FORWARD { for _, e := range entries { process(e.(*nsEntries)) + if err != nil { + return err + } } } else { for i := len(entries) - 1; i >= 0; i-- { process(entries[i].(*nsEntries)) + if err != nil { + return err + } } } + return nil } func (hb *unorderedHeadBlock) iterator( @@ -170,16 +180,16 @@ func (hb *unorderedHeadBlock) iterator( // cutting of blocks. streams := map[uint64]*logproto.Stream{} - hb.forEntries( + _ = hb.forEntries( ctx, direction, mint, maxt, nil, - func(ts int64, line string) { + func(ts int64, line string) error { newLine, parsedLbs, ok := pipeline.ProcessString(line) if !ok { - return + return nil } var stream *logproto.Stream @@ -195,6 +205,7 @@ func (hb *unorderedHeadBlock) iterator( Timestamp: time.Unix(0, ts), Line: newLine, }) + return nil }, ) @@ -217,16 +228,16 @@ func (hb *unorderedHeadBlock) sampleIterator( series := map[uint64]*logproto.Series{} - hb.forEntries( + _ = hb.forEntries( ctx, logproto.FORWARD, mint, maxt, nil, - func(ts int64, line string) { + func(ts int64, line string) error { value, parsedLabels, ok := extractor.ProcessString(line) if !ok { - return + return nil } var found bool var s *logproto.Series @@ -247,6 +258,7 @@ func (hb *unorderedHeadBlock) sampleIterator( Value: value, Hash: h, }) + return nil }, ) @@ -262,6 +274,7 @@ func (hb *unorderedHeadBlock) sampleIterator( return iter.NewMultiSeriesIterator(ctx, seriesRes) } +// serialise is used in creating an ordered, compressed block from an unorderedHeadBlock func (hb *unorderedHeadBlock) serialise(pool WriterPool) ([]byte, error) { inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer) defer func() { @@ -274,13 +287,13 @@ func (hb *unorderedHeadBlock) serialise(pool WriterPool) ([]byte, error) { compressedWriter := pool.GetWriter(outBuf) defer pool.PutWriter(compressedWriter) - hb.forEntries( + _ = hb.forEntries( context.Background(), logproto.FORWARD, 0, math.MaxInt64, nil, - func(ts int64, line string) { + func(ts int64, line string) error { n := binary.PutVarint(encBuf, ts) inBuf.Write(encBuf[:n]) @@ -288,6 +301,7 @@ func (hb *unorderedHeadBlock) serialise(pool WriterPool) ([]byte, error) { inBuf.Write(encBuf[:n]) inBuf.WriteString(line) + return nil }, ) @@ -300,3 +314,110 @@ func (hb *unorderedHeadBlock) serialise(pool WriterPool) ([]byte, error) { return outBuf.Bytes(), nil } + +// CheckpointSize returns the estimated size of the headblock checkpoint. +func (hb *unorderedHeadBlock) CheckpointSize(version byte) int { + size := 1 // version + size += binary.MaxVarintLen32 * 2 // total entries + total size + size += binary.MaxVarintLen64 * 2 // mint,maxt + size += (binary.MaxVarintLen64 + binary.MaxVarintLen32) * hb.lines // ts + len of log line. + size += hb.size // uncompressed bytes of lines + return size +} + +// CheckpointBytes serializes a headblock to []byte. This is used by the WAL checkpointing, +// which does not want to mutate a chunk by cutting it (otherwise risking content address changes), but +// needs to serialize/deserialize the data to disk to ensure data durability. +func (hb *unorderedHeadBlock) CheckpointBytes(version byte, b []byte) ([]byte, error) { + buf := bytes.NewBuffer(b[:0]) + err := hb.CheckpointTo(version, buf) + return buf.Bytes(), err +} + +// CheckpointTo serializes a headblock to a `io.Writer`. see `CheckpointBytes`. +func (hb *unorderedHeadBlock) CheckpointTo(version byte, w io.Writer) error { + eb := EncodeBufferPool.Get().(*encbuf) + defer EncodeBufferPool.Put(eb) + + eb.reset() + + eb.putByte(version) + _, err := w.Write(eb.get()) + if err != nil { + return errors.Wrap(err, "write headBlock version") + } + eb.reset() + + eb.putUvarint(hb.lines) + + _, err = w.Write(eb.get()) + if err != nil { + return errors.Wrap(err, "write headBlock metas") + } + eb.reset() + + err = hb.forEntries( + context.Background(), + logproto.FORWARD, + 0, + math.MaxInt64, + nil, + func(ts int64, line string) error { + eb.putVarint64(ts) + eb.putUvarint(len(line)) + _, err = w.Write(eb.get()) + if err != nil { + return errors.Wrap(err, "write headBlock entry ts") + } + eb.reset() + + _, err := io.WriteString(w, line) + if err != nil { + return errors.Wrap(err, "write headblock entry line") + } + return nil + }, + ) + + return nil +} + +func (hb *unorderedHeadBlock) FromCheckpoint(b []byte) error { + // ensure it's empty + *hb = *newUnorderedHeadBlock() + + if len(b) < 1 { + return nil + } + + db := decbuf{b: b} + + version := db.byte() + if db.err() != nil { + return errors.Wrap(db.err(), "verifying headblock header") + } + switch version { + case chunkFormatV1, chunkFormatV2, chunkFormatV3: + default: + return errors.Errorf("incompatible headBlock version (%v), only V1,V2,V3 is currently supported", version) + } + + n := db.uvarint() + + if err := db.err(); err != nil { + return errors.Wrap(err, "verifying headblock metadata") + } + + for i := 0; i < n && db.err() == nil; i++ { + ts := db.varint64() + lineLn := db.uvarint() + line := string(db.bytes(lineLn)) + hb.append(ts, line) + } + + if err := db.err(); err != nil { + return errors.Wrap(err, "decoding entries") + } + + return nil +} diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index e3f26d919683..ba6dfd6d268b 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -2,6 +2,8 @@ package chunkenc import ( "context" + "errors" + "fmt" "math" "testing" "time" @@ -23,6 +25,57 @@ func iterEq(t *testing.T, exp []entry, got iter.EntryIterator, dir logproto.Dire require.Equal(t, i, len(exp)) } +func Test_forEntriesEarlyReturn(t *testing.T) { + hb := newUnorderedHeadBlock() + for i := 0; i < 10; i++ { + hb.append(int64(i), fmt.Sprint(i)) + } + + // forward + var forwardCt int + var forwardStop int64 + err := hb.forEntries( + context.Background(), + logproto.FORWARD, + 0, + math.MaxInt64, + nil, + func(ts int64, line string) error { + forwardCt++ + forwardStop = ts + if ts == 5 { + return errors.New("err") + } + return nil + }, + ) + require.Error(t, err) + require.Equal(t, int64(5), forwardStop) + require.Equal(t, 6, forwardCt) + + // backward + var backwardCt int + var backwardStop int64 + err = hb.forEntries( + context.Background(), + logproto.BACKWARD, + 0, + math.MaxInt64, + nil, + func(ts int64, line string) error { + backwardCt++ + backwardStop = ts + if ts == 5 { + return errors.New("err") + } + return nil + }, + ) + require.Error(t, err) + require.Equal(t, int64(5), backwardStop) + require.Equal(t, 5, backwardCt) +} + func Test_Unordered_InsertRetrieval(t *testing.T) { for _, tc := range []struct { desc string @@ -167,3 +220,25 @@ func Test_UnorderedBoundedIter(t *testing.T) { }) } } + +func Test_UnorderedHeadBlockCheckpointRoundtrip(t *testing.T) { + hb := newUnorderedHeadBlock() + + for i := 0; i < 100; i++ { + hb.append(int64(i), fmt.Sprint(i)) + } + + // turn to bytes + b, err := hb.CheckpointBytes(DefaultChunkFormat, nil) + require.Nil(t, err) + + // restore a copy from bytes + cpy := newUnorderedHeadBlock() + require.Nil(t, cpy.FromCheckpoint(b)) + + // ensure copy's bytes match original + cpyBytes, err := cpy.CheckpointBytes(DefaultChunkFormat, nil) + require.Nil(t, err) + require.Equal(t, b, cpyBytes) + +} From f248b47677e4baf5b9c70783a85fd7305f9b71fd Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 24 Jun 2021 13:09:31 -0400 Subject: [PATCH 07/16] adds head block write benchmarks for ordered & unordered writes --- pkg/chunkenc/unordered_test.go | 71 ++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index ba6dfd6d268b..318465b5e236 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math" + "math/rand" "testing" "time" @@ -242,3 +243,73 @@ func Test_UnorderedHeadBlockCheckpointRoundtrip(t *testing.T) { require.Equal(t, b, cpyBytes) } + +func BenchmarkHeadBlockWrites(b *testing.B) { + // ordered, ordered + // unordered, ordered + // unordered, unordered + + // current default block size of 256kb with 75b avg log lines =~ 5.2k lines/block + var nWrites = (256 << 10) / 50 + + headBlockFn := func() func(int64, string) { + hb := &headBlock{} + return func(ts int64, line string) { + _ = hb.append(ts, line) + } + } + + unorderedHeadBlockFn := func() func(int64, string) { + hb := newUnorderedHeadBlock() + return func(ts int64, line string) { + hb.append(ts, line) + } + } + + for _, tc := range []struct { + desc string + fn func(int64, string) + unorderedWrites bool + }{ + { + desc: "ordered headblock ordered writes", + fn: headBlockFn(), + }, + { + desc: "unordered headblock ordered writes", + fn: unorderedHeadBlockFn(), + }, + { + desc: "unordered headblock unordered writes", + fn: unorderedHeadBlockFn(), + unorderedWrites: true, + }, + } { + // build writes before we start benchmarking so random number generation, etc, + // isn't included in our timing info + writes := make([]entry, 0, nWrites) + rnd := rand.NewSource(0) + for i := 0; i < nWrites; i++ { + if tc.unorderedWrites { + ts := rnd.Int63() + writes = append(writes, entry{ + t: ts, + s: fmt.Sprint("line:", ts), + }) + } else { + writes = append(writes, entry{ + t: int64(i), + s: fmt.Sprint("line:", i), + }) + } + } + + b.Run(tc.desc, func(b *testing.B) { + for n := 0; n < b.N; n++ { + for _, w := range writes { + tc.fn(w.t, w.s) + } + } + }) + } +} From 7f1c1d5436b6feda65498639e772798ea12657e2 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 24 Jun 2021 17:08:05 -0400 Subject: [PATCH 08/16] fixes bench --- pkg/chunkenc/unordered_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index 318465b5e236..d646dd838e53 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -268,20 +268,20 @@ func BenchmarkHeadBlockWrites(b *testing.B) { for _, tc := range []struct { desc string - fn func(int64, string) + fn func() func(int64, string) unorderedWrites bool }{ { desc: "ordered headblock ordered writes", - fn: headBlockFn(), + fn: headBlockFn, }, { desc: "unordered headblock ordered writes", - fn: unorderedHeadBlockFn(), + fn: unorderedHeadBlockFn, }, { desc: "unordered headblock unordered writes", - fn: unorderedHeadBlockFn(), + fn: unorderedHeadBlockFn, unorderedWrites: true, }, } { @@ -306,8 +306,9 @@ func BenchmarkHeadBlockWrites(b *testing.B) { b.Run(tc.desc, func(b *testing.B) { for n := 0; n < b.N; n++ { + writeFn := tc.fn() for _, w := range writes { - tc.fn(w.t, w.s) + writeFn(w.t, w.s) } } }) From 224425b16734b1d64c64640ffaa7087212ad54b0 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 7 Jul 2021 08:53:14 -0400 Subject: [PATCH 09/16] removes unused initializer --- pkg/chunkenc/unordered.go | 10 ---------- pkg/chunkenc/unordered_test.go | 2 -- 2 files changed, 12 deletions(-) diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index a9fe8be292ed..24e867b4cf45 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -105,15 +105,9 @@ func (hb *unorderedHeadBlock) forEntries( direction logproto.Direction, mint, maxt int64, - initializer func(numEntries int), entryFn func(int64, string) error, // returning an error exits early ) (err error) { if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) { - - if initializer != nil { - initializer(0) - } - return } @@ -185,7 +179,6 @@ func (hb *unorderedHeadBlock) iterator( direction, mint, maxt, - nil, func(ts int64, line string) error { newLine, parsedLbs, ok := pipeline.ProcessString(line) if !ok { @@ -233,7 +226,6 @@ func (hb *unorderedHeadBlock) sampleIterator( logproto.FORWARD, mint, maxt, - nil, func(ts int64, line string) error { value, parsedLabels, ok := extractor.ProcessString(line) if !ok { @@ -292,7 +284,6 @@ func (hb *unorderedHeadBlock) serialise(pool WriterPool) ([]byte, error) { logproto.FORWARD, 0, math.MaxInt64, - nil, func(ts int64, line string) error { n := binary.PutVarint(encBuf, ts) inBuf.Write(encBuf[:n]) @@ -361,7 +352,6 @@ func (hb *unorderedHeadBlock) CheckpointTo(version byte, w io.Writer) error { logproto.FORWARD, 0, math.MaxInt64, - nil, func(ts int64, line string) error { eb.putVarint64(ts) eb.putUvarint(len(line)) diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index d646dd838e53..f96fbd88a656 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -40,7 +40,6 @@ func Test_forEntriesEarlyReturn(t *testing.T) { logproto.FORWARD, 0, math.MaxInt64, - nil, func(ts int64, line string) error { forwardCt++ forwardStop = ts @@ -62,7 +61,6 @@ func Test_forEntriesEarlyReturn(t *testing.T) { logproto.BACKWARD, 0, math.MaxInt64, - nil, func(ts int64, line string) error { backwardCt++ backwardStop = ts From 79d2ab7013a2e5e96f4ad17ef1f9364c688934c8 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 7 Jul 2021 09:56:06 -0400 Subject: [PATCH 10/16] gofmt --- pkg/chunkenc/unordered.go | 5 +++-- pkg/chunkenc/unordered_test.go | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index 24e867b4cf45..4db500f1fac9 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -11,12 +11,13 @@ import ( "github.com/Workiva/go-datastructures/rangetree" "github.com/cespare/xxhash/v2" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/logqlmodel/stats" - "github.com/pkg/errors" - "github.com/prometheus/prometheus/pkg/labels" ) var ( diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index f96fbd88a656..8f0a8affacb0 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -9,9 +9,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" - "github.com/stretchr/testify/require" ) func iterEq(t *testing.T, exp []entry, got iter.EntryIterator, dir logproto.Direction) { From c7d3d78633dfa87a44bca6565f4399ae004dbd5b Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 7 Jul 2021 10:12:21 -0400 Subject: [PATCH 11/16] linting --- pkg/chunkenc/memchunk.go | 2 +- pkg/chunkenc/unordered.go | 2 ++ pkg/chunkenc/unordered_test.go | 6 +++--- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 7bfe65658140..7eaf01d5656c 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -352,7 +352,7 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) { // Verify checksums. expCRC := binary.BigEndian.Uint32(b[blk.offset+l:]) if expCRC != crc32.Checksum(blk.b, castagnoliTable) { - level.Error(util_log.Logger).Log("msg", "Checksum does not match for a block in chunk, this block will be skipped", "err", ErrInvalidChecksum) + _ = level.Error(util_log.Logger).Log("msg", "Checksum does not match for a block in chunk, this block will be skipped", "err", ErrInvalidChecksum) continue } diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index 4db500f1fac9..0732a07273fa 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -213,6 +213,7 @@ func (hb *unorderedHeadBlock) iterator( return iter.NewStreamsIterator(ctx, streamsResult, direction) } +// nolint:unused func (hb *unorderedHeadBlock) sampleIterator( ctx context.Context, mint, @@ -267,6 +268,7 @@ func (hb *unorderedHeadBlock) sampleIterator( return iter.NewMultiSeriesIterator(ctx, seriesRes) } +// nolint:unused // serialise is used in creating an ordered, compressed block from an unorderedHeadBlock func (hb *unorderedHeadBlock) serialise(pool WriterPool) ([]byte, error) { inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer) diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index 8f0a8affacb0..210c9320851d 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -15,7 +15,7 @@ import ( "github.com/grafana/loki/pkg/logproto" ) -func iterEq(t *testing.T, exp []entry, got iter.EntryIterator, dir logproto.Direction) { +func iterEq(t *testing.T, exp []entry, got iter.EntryIterator) { var i int for got.Next() { require.Equal(t, logproto.Entry{ @@ -154,7 +154,7 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { noopStreamPipeline, ) - iterEq(t, tc.exp, itr, tc.dir) + iterEq(t, tc.exp, itr) }) } } @@ -216,7 +216,7 @@ func Test_UnorderedBoundedIter(t *testing.T) { noopStreamPipeline, ) - iterEq(t, tc.exp, itr, tc.dir) + iterEq(t, tc.exp, itr) }) } } From 60b6fe523ddab31aad16952b5e83683e47b3f6bf Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 7 Jul 2021 16:39:11 -0400 Subject: [PATCH 12/16] introduces an entry counter into streams & wal records --- pkg/ingester/checkpoint.go | 1 + pkg/ingester/checkpoint.pb.go | 109 +++++++++++++++++-------- pkg/ingester/checkpoint.proto | 4 + pkg/ingester/encoding.go | 37 ++++++--- pkg/ingester/encoding_test.go | 148 +++++++++++++++++++++++++++------- pkg/ingester/recovery_test.go | 2 +- pkg/ingester/stream.go | 10 ++- pkg/ingester/wal.go | 2 +- 8 files changed, 240 insertions(+), 73 deletions(-) diff --git a/pkg/ingester/checkpoint.go b/pkg/ingester/checkpoint.go index 05a3b8e34ca3..a0cbbda85097 100644 --- a/pkg/ingester/checkpoint.go +++ b/pkg/ingester/checkpoint.go @@ -272,6 +272,7 @@ func (s *streamIterator) Next() bool { s.current.To = stream.lastLine.ts s.current.LastLine = stream.lastLine.content + s.current.EntryCt = stream.entryCt return true } diff --git a/pkg/ingester/checkpoint.pb.go b/pkg/ingester/checkpoint.pb.go index 4d8c8f7ac5b6..bcb0871cee87 100644 --- a/pkg/ingester/checkpoint.pb.go +++ b/pkg/ingester/checkpoint.pb.go @@ -144,6 +144,9 @@ type Series struct { // Last timestamp of the last chunk. To time.Time `protobuf:"bytes,5,opt,name=to,proto3,stdtime" json:"to"` LastLine string `protobuf:"bytes,6,opt,name=lastLine,proto3" json:"lastLine,omitempty"` + // highest counter value for pushes to this stream. + // Used to skip already applied entries during WAL replay. + EntryCt int64 `protobuf:"varint,7,opt,name=entryCt,proto3" json:"entryCt,omitempty"` } func (m *Series) Reset() { *m = Series{} } @@ -213,6 +216,13 @@ func (m *Series) GetLastLine() string { return "" } +func (m *Series) GetEntryCt() int64 { + if m != nil { + return m.EntryCt + } + return 0 +} + func init() { proto.RegisterType((*Chunk)(nil), "loki_ingester.Chunk") proto.RegisterType((*Series)(nil), "loki_ingester.Series") @@ -221,38 +231,39 @@ func init() { func init() { proto.RegisterFile("pkg/ingester/checkpoint.proto", fileDescriptor_00f4b7152db9bdb5) } var fileDescriptor_00f4b7152db9bdb5 = []byte{ - // 492 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0xbd, 0x8e, 0xd3, 0x4c, - 0x14, 0xf5, 0x24, 0x8e, 0x3f, 0x67, 0xf2, 0xd1, 0x0c, 0x08, 0x8d, 0x22, 0x31, 0xb1, 0xb6, 0x4a, - 0x83, 0x2d, 0x05, 0x0a, 0x68, 0x90, 0x62, 0x10, 0x12, 0xd2, 0x16, 0xc8, 0x40, 0x43, 0x83, 0xfc, - 0x33, 0xb1, 0x4d, 0x1c, 0x8f, 0x35, 0x33, 0x96, 0xa0, 0xe3, 0x11, 0xf6, 0x31, 0x78, 0x04, 0x1e, - 0x61, 0xcb, 0x94, 0x2b, 0x90, 0x16, 0xe2, 0x34, 0x94, 0xfb, 0x08, 0x68, 0xc6, 0x36, 0x1b, 0x4a, - 0x77, 0xf7, 0x9c, 0x7b, 0x8f, 0xcf, 0xf5, 0x9d, 0x03, 0x1f, 0x54, 0xdb, 0xd4, 0xcb, 0xcb, 0x94, - 0x0a, 0x49, 0xb9, 0x17, 0x67, 0x34, 0xde, 0x56, 0x2c, 0x2f, 0xa5, 0x5b, 0x71, 0x26, 0x19, 0xba, - 0x53, 0xb0, 0x6d, 0xfe, 0xa1, 0xef, 0xcf, 0x17, 0x29, 0x63, 0x69, 0x41, 0x3d, 0xdd, 0x8c, 0xea, - 0x8d, 0x27, 0xf3, 0x1d, 0x15, 0x32, 0xdc, 0x55, 0xed, 0xfc, 0xfc, 0x61, 0x9a, 0xcb, 0xac, 0x8e, - 0xdc, 0x98, 0xed, 0xbc, 0x94, 0xa5, 0xec, 0x76, 0x52, 0x21, 0x0d, 0x74, 0xd5, 0x8d, 0x3f, 0x3d, - 0x19, 0x8f, 0x19, 0x97, 0xf4, 0x53, 0xc5, 0xd9, 0x47, 0x1a, 0xcb, 0x0e, 0x79, 0x6a, 0xbb, 0xae, - 0x11, 0x75, 0x45, 0x2b, 0x3d, 0xfb, 0x31, 0x82, 0x93, 0xe7, 0x59, 0x5d, 0x6e, 0xd1, 0x13, 0x68, - 0x6e, 0x38, 0xdb, 0x61, 0xe0, 0x80, 0xe5, 0x6c, 0x35, 0x77, 0xdb, 0x1d, 0xdd, 0xde, 0xd9, 0x7d, - 0xdb, 0xef, 0xe8, 0xdb, 0x97, 0xd7, 0x0b, 0xe3, 0xe2, 0xe7, 0x02, 0x04, 0x5a, 0x81, 0x1e, 0xc3, - 0x91, 0x64, 0x78, 0x34, 0x40, 0x37, 0x92, 0x0c, 0xf9, 0x70, 0xba, 0x29, 0x6a, 0x91, 0xd1, 0x64, - 0x2d, 0xf1, 0x78, 0x80, 0xf8, 0x56, 0x86, 0x5e, 0xc2, 0x59, 0x11, 0x0a, 0xf9, 0xae, 0x4a, 0x42, - 0x49, 0x13, 0x6c, 0x0e, 0xf8, 0xca, 0xa9, 0x10, 0xdd, 0x87, 0x56, 0x5c, 0x30, 0x41, 0x13, 0x3c, - 0x71, 0xc0, 0xd2, 0x0e, 0x3a, 0xa4, 0x78, 0xf1, 0xb9, 0x8c, 0x69, 0x82, 0xad, 0x96, 0x6f, 0x11, - 0x42, 0xd0, 0x4c, 0x42, 0x19, 0xe2, 0xff, 0x1c, 0xb0, 0xfc, 0x3f, 0xd0, 0xb5, 0xe2, 0x32, 0x1a, - 0x26, 0xd8, 0x6e, 0x39, 0x55, 0x9f, 0x7d, 0x1b, 0x41, 0xeb, 0x0d, 0xe5, 0x39, 0x15, 0xea, 0x53, - 0xb5, 0xa0, 0xfc, 0xd5, 0x0b, 0x7d, 0xe0, 0x69, 0xd0, 0x21, 0xe4, 0xc0, 0xd9, 0x46, 0x05, 0x83, - 0x57, 0x3c, 0x2f, 0xa5, 0xbe, 0xa2, 0x19, 0x9c, 0x52, 0xa8, 0x84, 0x56, 0x11, 0x46, 0xb4, 0x10, - 0x78, 0xec, 0x8c, 0x97, 0xb3, 0xd5, 0x5d, 0xb7, 0x7f, 0x4a, 0xf7, 0x5c, 0xf1, 0xaf, 0xc3, 0x9c, - 0xfb, 0x6b, 0xf5, 0x63, 0xdf, 0xaf, 0x17, 0x83, 0xa2, 0xd0, 0xea, 0xd7, 0x49, 0x58, 0x49, 0xca, - 0x83, 0xce, 0x05, 0xad, 0xa0, 0x15, 0xab, 0x44, 0x08, 0x6c, 0x6a, 0xbf, 0x7b, 0xee, 0x3f, 0xe9, - 0x75, 0x75, 0x5c, 0x7c, 0x53, 0x19, 0x06, 0xdd, 0x64, 0x17, 0x81, 0xc9, 0xc0, 0x08, 0xcc, 0xa1, - 0xad, 0x5e, 0xe1, 0x3c, 0x2f, 0xa9, 0x3e, 0xf0, 0x34, 0xf8, 0x8b, 0xfd, 0x67, 0xfb, 0x03, 0x31, - 0xae, 0x0e, 0xc4, 0xb8, 0x39, 0x10, 0xf0, 0xa5, 0x21, 0xe0, 0x6b, 0x43, 0xc0, 0x65, 0x43, 0xc0, - 0xbe, 0x21, 0xe0, 0x57, 0x43, 0xc0, 0xef, 0x86, 0x18, 0x37, 0x0d, 0x01, 0x17, 0x47, 0x62, 0xec, - 0x8f, 0xc4, 0xb8, 0x3a, 0x12, 0xe3, 0xbd, 0xdd, 0x6f, 0x19, 0x59, 0xda, 0xfd, 0xd1, 0x9f, 0x00, - 0x00, 0x00, 0xff, 0xff, 0xae, 0x13, 0x93, 0xc4, 0x9a, 0x03, 0x00, 0x00, + // 503 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0x3d, 0x8f, 0xd3, 0x40, + 0x10, 0xf5, 0x26, 0x8e, 0x2f, 0xd9, 0x40, 0xb3, 0x20, 0xb4, 0x8a, 0xc4, 0xc6, 0xba, 0x2a, 0x0d, + 0xb6, 0x14, 0x28, 0xa0, 0x41, 0x8a, 0x0f, 0x21, 0x21, 0x5d, 0x81, 0x0c, 0x34, 0x34, 0xc8, 0x1f, + 0x1b, 0xdb, 0xc4, 0xf1, 0x5a, 0xbb, 0x6b, 0x89, 0xeb, 0xf8, 0x09, 0xf7, 0x33, 0xf8, 0x29, 0x57, + 0x46, 0x54, 0x27, 0x90, 0x0e, 0xe2, 0x34, 0x94, 0xf7, 0x13, 0xd0, 0xae, 0x6d, 0x2e, 0x94, 0xee, + 0xe6, 0xbd, 0x99, 0xe7, 0x79, 0x9e, 0x7d, 0xf0, 0x71, 0xb9, 0x49, 0xdc, 0xac, 0x48, 0xa8, 0x90, + 0x94, 0xbb, 0x51, 0x4a, 0xa3, 0x4d, 0xc9, 0xb2, 0x42, 0x3a, 0x25, 0x67, 0x92, 0xa1, 0xfb, 0x39, + 0xdb, 0x64, 0x9f, 0xba, 0xfe, 0x6c, 0x9e, 0x30, 0x96, 0xe4, 0xd4, 0xd5, 0xcd, 0xb0, 0x5a, 0xbb, + 0x32, 0xdb, 0x52, 0x21, 0x83, 0x6d, 0xd9, 0xcc, 0xcf, 0x9e, 0x24, 0x99, 0x4c, 0xab, 0xd0, 0x89, + 0xd8, 0xd6, 0x4d, 0x58, 0xc2, 0xee, 0x26, 0x15, 0xd2, 0x40, 0x57, 0xed, 0xf8, 0x8b, 0xa3, 0xf1, + 0x88, 0x71, 0x49, 0xbf, 0x94, 0x9c, 0x7d, 0xa6, 0x91, 0x6c, 0x91, 0xab, 0xdc, 0xb5, 0x8d, 0xb0, + 0x2d, 0x1a, 0xe9, 0xe9, 0xcf, 0x01, 0x1c, 0x9d, 0xa5, 0x55, 0xb1, 0x41, 0xcf, 0xa1, 0xb9, 0xe6, + 0x6c, 0x8b, 0x81, 0x0d, 0x16, 0xd3, 0xe5, 0xcc, 0x69, 0x3c, 0x3a, 0xdd, 0x66, 0xe7, 0x7d, 0xe7, + 0xd1, 0x1b, 0x5f, 0xdd, 0xcc, 0x8d, 0xcb, 0x5f, 0x73, 0xe0, 0x6b, 0x05, 0x7a, 0x06, 0x07, 0x92, + 0xe1, 0x41, 0x0f, 0xdd, 0x40, 0x32, 0xe4, 0xc1, 0xc9, 0x3a, 0xaf, 0x44, 0x4a, 0xe3, 0x95, 0xc4, + 0xc3, 0x1e, 0xe2, 0x3b, 0x19, 0x7a, 0x0d, 0xa7, 0x79, 0x20, 0xe4, 0x87, 0x32, 0x0e, 0x24, 0x8d, + 0xb1, 0xd9, 0xe3, 0x2b, 0xc7, 0x42, 0xf4, 0x08, 0x5a, 0x51, 0xce, 0x04, 0x8d, 0xf1, 0xc8, 0x06, + 0x8b, 0xb1, 0xdf, 0x22, 0xc5, 0x8b, 0x8b, 0x22, 0xa2, 0x31, 0xb6, 0x1a, 0xbe, 0x41, 0x08, 0x41, + 0x33, 0x0e, 0x64, 0x80, 0x4f, 0x6c, 0xb0, 0xb8, 0xe7, 0xeb, 0x5a, 0x71, 0x29, 0x0d, 0x62, 0x3c, + 0x6e, 0x38, 0x55, 0x9f, 0x7e, 0x1f, 0x40, 0xeb, 0x1d, 0xe5, 0x19, 0x15, 0xea, 0x53, 0x95, 0xa0, + 0xfc, 0xcd, 0x2b, 0x7d, 0xe0, 0x89, 0xdf, 0x22, 0x64, 0xc3, 0xe9, 0x5a, 0x05, 0x83, 0x97, 0x3c, + 0x2b, 0xa4, 0xbe, 0xa2, 0xe9, 0x1f, 0x53, 0xa8, 0x80, 0x56, 0x1e, 0x84, 0x34, 0x17, 0x78, 0x68, + 0x0f, 0x17, 0xd3, 0xe5, 0x03, 0xa7, 0x7b, 0x4a, 0xe7, 0x5c, 0xf1, 0x6f, 0x83, 0x8c, 0x7b, 0x2b, + 0xf5, 0x63, 0x3f, 0x6e, 0xe6, 0xbd, 0xa2, 0xd0, 0xe8, 0x57, 0x71, 0x50, 0x4a, 0xca, 0xfd, 0x76, + 0x0b, 0x5a, 0x42, 0x2b, 0x52, 0x89, 0x10, 0xd8, 0xd4, 0xfb, 0x1e, 0x3a, 0xff, 0xa5, 0xd7, 0xd1, + 0x71, 0xf1, 0x4c, 0xb5, 0xd0, 0x6f, 0x27, 0xdb, 0x08, 0x8c, 0x7a, 0x46, 0x60, 0x06, 0xc7, 0xea, + 0x15, 0xce, 0xb3, 0x82, 0xea, 0x03, 0x4f, 0xfc, 0x7f, 0x18, 0x61, 0x78, 0x42, 0x0b, 0xc9, 0x2f, + 0xce, 0xa4, 0xbe, 0xf2, 0xd0, 0xef, 0xa0, 0xf7, 0x72, 0xb7, 0x27, 0xc6, 0xf5, 0x9e, 0x18, 0xb7, + 0x7b, 0x02, 0xbe, 0xd6, 0x04, 0x7c, 0xab, 0x09, 0xb8, 0xaa, 0x09, 0xd8, 0xd5, 0x04, 0xfc, 0xae, + 0x09, 0xf8, 0x53, 0x13, 0xe3, 0xb6, 0x26, 0xe0, 0xf2, 0x40, 0x8c, 0xdd, 0x81, 0x18, 0xd7, 0x07, + 0x62, 0x7c, 0x1c, 0x77, 0xfe, 0x43, 0x4b, 0xfb, 0x7a, 0xfa, 0x37, 0x00, 0x00, 0xff, 0xff, 0x70, + 0x87, 0xe1, 0x9b, 0xb4, 0x03, 0x00, 0x00, } func (this *Chunk) Equal(that interface{}) bool { @@ -347,6 +358,9 @@ func (this *Series) Equal(that interface{}) bool { if this.LastLine != that1.LastLine { return false } + if this.EntryCt != that1.EntryCt { + return false + } return true } func (this *Chunk) GoString() string { @@ -370,7 +384,7 @@ func (this *Series) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 10) + s := make([]string, 0, 11) s = append(s, "&ingester.Series{") s = append(s, "UserID: "+fmt.Sprintf("%#v", this.UserID)+",\n") s = append(s, "Fingerprint: "+fmt.Sprintf("%#v", this.Fingerprint)+",\n") @@ -384,6 +398,7 @@ func (this *Series) GoString() string { } s = append(s, "To: "+fmt.Sprintf("%#v", this.To)+",\n") s = append(s, "LastLine: "+fmt.Sprintf("%#v", this.LastLine)+",\n") + s = append(s, "EntryCt: "+fmt.Sprintf("%#v", this.EntryCt)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -541,6 +556,11 @@ func (m *Series) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintCheckpoint(dAtA, i, uint64(len(m.LastLine))) i += copy(dAtA[i:], m.LastLine) } + if m.EntryCt != 0 { + dAtA[i] = 0x38 + i++ + i = encodeVarintCheckpoint(dAtA, i, uint64(m.EntryCt)) + } return i, nil } @@ -615,6 +635,9 @@ func (m *Series) Size() (n int) { if l > 0 { n += 1 + l + sovCheckpoint(uint64(l)) } + if m.EntryCt != 0 { + n += 1 + sovCheckpoint(uint64(m.EntryCt)) + } return n } @@ -659,6 +682,7 @@ func (this *Series) String() string { `Chunks:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Chunks), "Chunk", "Chunk", 1), `&`, ``, 1) + `,`, `To:` + strings.Replace(strings.Replace(this.To.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, `LastLine:` + fmt.Sprintf("%v", this.LastLine) + `,`, + `EntryCt:` + fmt.Sprintf("%v", this.EntryCt) + `,`, `}`, }, "") return s @@ -1177,6 +1201,25 @@ func (m *Series) Unmarshal(dAtA []byte) error { } m.LastLine = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EntryCt", wireType) + } + m.EntryCt = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCheckpoint + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.EntryCt |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipCheckpoint(dAtA[iNdEx:]) diff --git a/pkg/ingester/checkpoint.proto b/pkg/ingester/checkpoint.proto index 60a9403428fd..5e3e9104ea3f 100644 --- a/pkg/ingester/checkpoint.proto +++ b/pkg/ingester/checkpoint.proto @@ -33,4 +33,8 @@ message Series { // Last timestamp of the last chunk. google.protobuf.Timestamp to = 5 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; string lastLine = 6; + // highest counter value for pushes to this stream. + // Used to skip already applied entries during WAL replay. + int64 entryCt = 7; + } diff --git a/pkg/ingester/encoding.go b/pkg/ingester/encoding.go index 9c3e4b991b68..cbbbeb1dbd8a 100644 --- a/pkg/ingester/encoding.go +++ b/pkg/ingester/encoding.go @@ -17,12 +17,19 @@ const ( _ = iota // ignore first value so the zero value doesn't look like a record type. // WALRecordSeries is the type for the WAL record for series. WALRecordSeries RecordType = iota - // WALRecordSamples is the type for the WAL record for samples. - WALRecordEntries + // WALRecordEntriesV1 is the type for the WAL record for samples. + WALRecordEntriesV1 // CheckpointRecord is the type for the Checkpoint record based on protos. CheckpointRecord + // WALRecordEntriesV2 is the type for the WAL record for samples with an + // additional counter value for use in replaying without the ordering constraint. + WALRecordEntriesV2 ) +// The current type of Entries that this distribution writes. +// Loki can read in a backwards compatible manner, but will write the newest variant. +const CurrentEntriesRec RecordType = WALRecordEntriesV2 + // WALRecord is a struct combining the series and samples record. type WALRecord struct { UserID string @@ -52,20 +59,23 @@ func (r *WALRecord) Reset() { r.entryIndexMap = make(map[uint64]int) } -func (r *WALRecord) AddEntries(fp uint64, entries ...logproto.Entry) { +func (r *WALRecord) AddEntries(fp uint64, counter int64, entries ...logproto.Entry) { if idx, ok := r.entryIndexMap[fp]; ok { r.RefEntries[idx].Entries = append(r.RefEntries[idx].Entries, entries...) + r.RefEntries[idx].Counter = counter return } r.entryIndexMap[fp] = len(r.RefEntries) r.RefEntries = append(r.RefEntries, RefEntries{ + Counter: counter, Ref: fp, Entries: entries, }) } type RefEntries struct { + Counter int64 Ref uint64 Entries []logproto.Entry } @@ -84,9 +94,9 @@ func (r *WALRecord) encodeSeries(b []byte) []byte { return encoded } -func (r *WALRecord) encodeEntries(b []byte) []byte { +func (r *WALRecord) encodeEntries(version RecordType, b []byte) []byte { buf := EncWith(b) - buf.PutByte(byte(WALRecordEntries)) + buf.PutByte(byte(version)) buf.PutUvarintStr(r.UserID) // Placeholder for the first timestamp of any sample encountered. @@ -108,7 +118,12 @@ outer: if len(ref.Entries) < 1 { continue } - buf.PutBE64(ref.Ref) // write fingerprint + buf.PutBE64(ref.Ref) // write fingerprint + + if version >= WALRecordEntriesV2 { + buf.PutBE64int64(ref.Counter) // write highest counter value + } + buf.PutUvarint(len(ref.Entries)) // write number of entries for _, s := range ref.Entries { @@ -120,7 +135,7 @@ outer: return buf.Get() } -func decodeEntries(b []byte, rec *WALRecord) error { +func decodeEntries(b []byte, version RecordType, rec *WALRecord) error { if len(b) == 0 { return nil } @@ -133,6 +148,10 @@ func decodeEntries(b []byte, rec *WALRecord) error { Ref: dec.Be64(), } + if version >= WALRecordEntriesV2 { + refEntries.Counter = dec.Be64int64() + } + nEntries := dec.Uvarint() refEntries.Entries = make([]logproto.Entry, 0, nEntries) rem := nEntries @@ -178,9 +197,9 @@ func decodeWALRecord(b []byte, walRec *WALRecord) (err error) { case WALRecordSeries: userID = decbuf.UvarintStr() rSeries, err = dec.Series(decbuf.B, walRec.Series) - case WALRecordEntries: + case WALRecordEntriesV1, WALRecordEntriesV2: userID = decbuf.UvarintStr() - err = decodeEntries(decbuf.B, walRec) + err = decodeEntries(decbuf.B, t, walRec) default: return errors.New("unknown record type") } diff --git a/pkg/ingester/encoding_test.go b/pkg/ingester/encoding_test.go index ad17a63aeb37..84beb5fe8ebf 100644 --- a/pkg/ingester/encoding_test.go +++ b/pkg/ingester/encoding_test.go @@ -50,47 +50,139 @@ func Test_Encoding_Series(t *testing.T) { require.Equal(t, record, decoded) } +// func Test_Encoding_Entries(t *testing.T) { +// record := &WALRecord{ +// entryIndexMap: make(map[uint64]int), +// UserID: "123", +// RefEntries: []RefEntries{ +// { +// Ref: 456, +// Counter: 1, +// Entries: []logproto.Entry{ +// { +// Timestamp: time.Unix(1000, 0), +// Line: "first", +// }, +// { +// Timestamp: time.Unix(2000, 0), +// Line: "second", +// }, +// }, +// }, +// { +// Ref: 789, +// Counter: 2, +// Entries: []logproto.Entry{ +// { +// Timestamp: time.Unix(3000, 0), +// Line: "third", +// }, +// { +// Timestamp: time.Unix(4000, 0), +// Line: "fourth", +// }, +// }, +// }, +// }, +// } + +// buf := record.encodeEntries(nil) + +// decoded := recordPool.GetRecord() + +// err := decodeWALRecord(buf, decoded) +// require.Nil(t, err) +// require.Equal(t, record, decoded) +// } + func Test_Encoding_Entries(t *testing.T) { - record := &WALRecord{ - entryIndexMap: make(map[uint64]int), - UserID: "123", - RefEntries: []RefEntries{ - { - Ref: 456, - Entries: []logproto.Entry{ + for _, tc := range []struct { + desc string + rec *WALRecord + version RecordType + }{ + { + desc: "v1", + rec: &WALRecord{ + entryIndexMap: make(map[uint64]int), + UserID: "123", + RefEntries: []RefEntries{ { - Timestamp: time.Unix(1000, 0), - Line: "first", + Ref: 456, + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(1000, 0), + Line: "first", + }, + { + Timestamp: time.Unix(2000, 0), + Line: "second", + }, + }, }, { - Timestamp: time.Unix(2000, 0), - Line: "second", + Ref: 789, + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(3000, 0), + Line: "third", + }, + { + Timestamp: time.Unix(4000, 0), + Line: "fourth", + }, + }, }, }, }, - { - Ref: 789, - Entries: []logproto.Entry{ + version: WALRecordEntriesV1, + }, + { + desc: "v2", + rec: &WALRecord{ + entryIndexMap: make(map[uint64]int), + UserID: "123", + RefEntries: []RefEntries{ { - Timestamp: time.Unix(3000, 0), - Line: "third", + Ref: 456, + Counter: 1, // v2 uses counter for WAL replay + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(1000, 0), + Line: "first", + }, + { + Timestamp: time.Unix(2000, 0), + Line: "second", + }, + }, }, { - Timestamp: time.Unix(4000, 0), - Line: "fourth", + Ref: 789, + Counter: 2, // v2 uses counter for WAL replay + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(3000, 0), + Line: "third", + }, + { + Timestamp: time.Unix(4000, 0), + Line: "fourth", + }, + }, }, }, }, + version: WALRecordEntriesV2, }, - } - - buf := record.encodeEntries(nil) - - decoded := recordPool.GetRecord() + } { + decoded := recordPool.GetRecord() + buf := tc.rec.encodeEntries(tc.version, nil) + err := decodeWALRecord(buf, decoded) + require.Nil(t, err) + require.Equal(t, tc.rec, decoded) - err := decodeWALRecord(buf, decoded) - require.Nil(t, err) - require.Equal(t, record, decoded) + } } func Benchmark_EncodeEntries(b *testing.B) { @@ -121,7 +213,7 @@ func Benchmark_EncodeEntries(b *testing.B) { defer recordPool.PutBytes(buf) for n := 0; n < b.N; n++ { - record.encodeEntries(buf) + record.encodeEntries(CurrentEntriesRec, buf) } } @@ -148,7 +240,7 @@ func Benchmark_DecodeWAL(b *testing.B) { }, } - buf := record.encodeEntries(nil) + buf := record.encodeEntries(CurrentEntriesRec, nil) rec := recordPool.GetRecord() b.ReportAllocs() b.ResetTimer() diff --git a/pkg/ingester/recovery_test.go b/pkg/ingester/recovery_test.go index 8448294c5684..b93050851aa5 100644 --- a/pkg/ingester/recovery_test.go +++ b/pkg/ingester/recovery_test.go @@ -90,7 +90,7 @@ func buildMemoryReader(users, totalStreams, entriesPerStream int) (*MemoryWALRea } if len(rec.RefEntries) > 0 { - reader.xs = append(reader.xs, rec.encodeEntries(nil)) + reader.xs = append(reader.xs, rec.encodeEntries(CurrentEntriesRec, nil)) } } diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 12d7e01cc71b..65cf9aa75409 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -72,6 +72,13 @@ type stream struct { tailers map[uint32]*tailer tailerMtx sync.RWMutex + + // entryCt is a counter which is incremented on each accepted entry. + // This allows us to discard WAL entries during replays which were + // already recovered via checkpoints. Historically out of order + // errors were used to detect this, but this counter has been + // introduced to facilitate removing the ordering constraint. + entryCt int64 } type chunkDesc struct { @@ -201,6 +208,7 @@ func (s *stream) Push( lastChunkTimestamp = entries[i].Timestamp s.lastLine.ts = lastChunkTimestamp s.lastLine.content = entries[i].Line + s.entryCt++ // length of string plus bytesAdded += len(entries[i].Line) @@ -211,7 +219,7 @@ func (s *stream) Push( if len(storedEntries) != 0 { // record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them). if record != nil { - record.AddEntries(uint64(s.fp), storedEntries...) + record.AddEntries(uint64(s.fp), s.entryCt, storedEntries...) } else { // If record is nil, this is a WAL recovery. s.metrics.recoveredEntriesTotal.Add(float64(len(storedEntries))) diff --git a/pkg/ingester/wal.go b/pkg/ingester/wal.go index b1d0ada37678..86a7fa92abdc 100644 --- a/pkg/ingester/wal.go +++ b/pkg/ingester/wal.go @@ -126,7 +126,7 @@ func (w *walWrapper) Log(record *WALRecord) error { buf = buf[:0] } if len(record.RefEntries) > 0 { - buf = record.encodeEntries(buf) + buf = record.encodeEntries(CurrentEntriesRec, buf) if err := w.wal.Log(buf); err != nil { return err } From 376ec1eff39e3dab0ea94baba64905d2908e9a43 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 7 Jul 2021 17:16:32 -0400 Subject: [PATCH 13/16] sets & resets entrycount on checkpoint recovery and after wal replay finishes --- pkg/ingester/recovery.go | 7 +++++++ pkg/ingester/stream.go | 4 ++++ 2 files changed, 11 insertions(+) diff --git a/pkg/ingester/recovery.go b/pkg/ingester/recovery.go index e81dbafae8bd..9987a900e910 100644 --- a/pkg/ingester/recovery.go +++ b/pkg/ingester/recovery.go @@ -125,6 +125,7 @@ func (r *ingesterRecoverer) Series(series *Series) error { bytesAdded, entriesAdded, err := stream.setChunks(series.Chunks) stream.lastLine.ts = series.To stream.lastLine.content = series.LastLine + stream.entryCt = series.EntryCt if err != nil { return err @@ -197,6 +198,12 @@ func (r *ingesterRecoverer) Push(userID string, entries RefEntries) error { } func (r *ingesterRecoverer) Close() { + // reset all the incrementing stream counters after a successful WAL replay. + for _, inst := range r.ing.getInstances() { + inst.forAllStreams(context.Background(), func(s *stream) error { + s.resetCounter() + }) + } close(r.done) } diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 65cf9aa75409..8db660e6eb99 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -376,3 +376,7 @@ func (s *stream) addTailer(t *tailer) { s.tailers[t.getID()] = t } + +func (s *stream) resetCounter() { + s.entryCt = 0 +} From 5e724831c7aa68b6b9febaf3504171c8a50874a7 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 7 Jul 2021 18:12:13 -0400 Subject: [PATCH 14/16] pushing to streams checks counter --- pkg/ingester/instance.go | 2 +- pkg/ingester/metrics.go | 5 +++++ pkg/ingester/recovery.go | 6 +++++- pkg/ingester/stream.go | 16 ++++++++++++++++ pkg/ingester/stream_test.go | 8 ++++---- 5 files changed, 31 insertions(+), 6 deletions(-) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 1d3f3851f31f..cb45a851c2f0 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -164,7 +164,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { continue } - if _, err := stream.Push(ctx, s.Entries, record); err != nil { + if _, err := stream.Push(ctx, s.Entries, record, 0); err != nil { appendErr = err continue } diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index d685d02dacc9..95164c1b7ada 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -22,6 +22,7 @@ type ingesterMetrics struct { recoveredStreamsTotal prometheus.Counter recoveredChunksTotal prometheus.Counter recoveredEntriesTotal prometheus.Counter + duplicateEntriesTotal prometheus.Counter recoveredBytesTotal prometheus.Counter recoveryBytesInUse prometheus.Gauge recoveryIsFlushing prometheus.Gauge @@ -100,6 +101,10 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics { Name: "loki_ingester_wal_recovered_entries_total", Help: "Total number of entries recovered from the WAL.", }), + duplicateEntriesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_ingester_wal_duplicate_entries_total", + Help: "Entries discarded during WAL replay due to existing in checkpoints.", + }), recoveredBytesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ Name: "loki_ingester_wal_recovered_bytes_total", Help: "Total number of bytes recovered from the WAL.", diff --git a/pkg/ingester/recovery.go b/pkg/ingester/recovery.go index 9987a900e910..908bffb312f0 100644 --- a/pkg/ingester/recovery.go +++ b/pkg/ingester/recovery.go @@ -191,8 +191,11 @@ func (r *ingesterRecoverer) Push(userID string, entries RefEntries) error { } // ignore out of order errors here (it's possible for a checkpoint to already have data from the wal segments) - bytesAdded, _ := s.(*stream).Push(context.Background(), entries.Entries, nil) + bytesAdded, err := s.(*stream).Push(context.Background(), entries.Entries, nil, entries.Counter) r.ing.replayController.Add(int64(bytesAdded)) + if err != nil && err == ErrEntriesExist { + r.ing.metrics.duplicateEntriesTotal.Add(float64(len(entries.Entries))) + } return nil }) } @@ -202,6 +205,7 @@ func (r *ingesterRecoverer) Close() { for _, inst := range r.ing.getInstances() { inst.forAllStreams(context.Background(), func(s *stream) error { s.resetCounter() + return nil }) } close(r.done) diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 8db660e6eb99..c00dac70babf 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -10,6 +10,7 @@ import ( util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" @@ -46,6 +47,10 @@ var ( }) ) +var ( + ErrEntriesExist = errors.New("duplicate push - entries already exist") +) + func init() { prometheus.MustRegister(chunksCreatedTotal) prometheus.MustRegister(samplesPerChunk) @@ -146,10 +151,21 @@ func (s *stream) NewChunk() *chunkenc.MemChunk { func (s *stream) Push( ctx context.Context, entries []logproto.Entry, + // WAL record to add push contents to. + // May be nil to disable this functionality. record *WALRecord, + // Counter used in WAL replay to avoid duplicates. + // If this is non-zero, the stream will reject entries + // with a counter value less than or equal to it's own. + counter int64, ) (int, error) { s.chunkMtx.Lock() defer s.chunkMtx.Unlock() + + if counter > 0 && counter <= s.entryCt { + return 0, ErrEntriesExist + } + var bytesAdded int prevNumChunks := len(s.chunks) var lastChunkTimestamp time.Time diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 113855ce55f9..991a29a8a2c9 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -46,7 +46,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { _, err := s.Push(context.Background(), []logproto.Entry{ {Timestamp: time.Unix(int64(numLogs), 0), Line: "log"}, - }, recordPool.GetRecord()) + }, recordPool.GetRecord(), 0) require.NoError(t, err) newLines := make([]logproto.Entry, numLogs) @@ -65,7 +65,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { fmt.Fprintf(&expected, "total ignored: %d out of %d", numLogs, numLogs) expectErr := httpgrpc.Errorf(http.StatusBadRequest, expected.String()) - _, err = s.Push(context.Background(), newLines, recordPool.GetRecord()) + _, err = s.Push(context.Background(), newLines, recordPool.GetRecord(), 0) require.Error(t, err) require.Equal(t, expectErr.Error(), err.Error()) }) @@ -86,7 +86,7 @@ func TestPushDeduplication(t *testing.T) { {Timestamp: time.Unix(1, 0), Line: "test"}, {Timestamp: time.Unix(1, 0), Line: "test"}, {Timestamp: time.Unix(1, 0), Line: "newer, better test"}, - }, recordPool.GetRecord()) + }, recordPool.GetRecord(), 0) require.NoError(t, err) require.Len(t, s.chunks, 1) require.Equal(t, s.chunks[0].chunk.Size(), 2, @@ -164,7 +164,7 @@ func Benchmark_PushStream(b *testing.B) { for n := 0; n < b.N; n++ { rec := recordPool.GetRecord() - _, err := s.Push(ctx, e, rec) + _, err := s.Push(ctx, e, rec, 0) require.NoError(b, err) recordPool.PutRecord(rec) } From b96fb6255119c462b10f7d13bed947ccffec6690 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 8 Jul 2021 14:31:21 -0400 Subject: [PATCH 15/16] reject pre-applied entries test --- pkg/ingester/stream.go | 1 + pkg/ingester/stream_test.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index c00dac70babf..657708416d2b 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -157,6 +157,7 @@ func (s *stream) Push( // Counter used in WAL replay to avoid duplicates. // If this is non-zero, the stream will reject entries // with a counter value less than or equal to it's own. + // It is set to zero and thus bypassed outside of WAL replays. counter int64, ) (int, error) { s.chunkMtx.Lock() diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 991a29a8a2c9..f845f6b7f9d5 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -94,6 +94,41 @@ func TestPushDeduplication(t *testing.T) { require.Equal(t, len("test"+"newer, better test"), written) } +func TestPushRejectOldCounter(t *testing.T) { + s := newStream( + defaultConfig(), + model.Fingerprint(0), + labels.Labels{ + {Name: "foo", Value: "bar"}, + }, + NilMetrics, + ) + + // counter should be 2 now since the first line will be deduped + _, err := s.Push(context.Background(), []logproto.Entry{ + {Timestamp: time.Unix(1, 0), Line: "test"}, + {Timestamp: time.Unix(1, 0), Line: "test"}, + {Timestamp: time.Unix(1, 0), Line: "newer, better test"}, + }, recordPool.GetRecord(), 0) + require.NoError(t, err) + require.Len(t, s.chunks, 1) + require.Equal(t, s.chunks[0].chunk.Size(), 2, + "expected exact duplicate to be dropped and newer content with same timestamp to be appended") + + // fail to push with a counter <= the streams internal counter + _, err = s.Push(context.Background(), []logproto.Entry{ + {Timestamp: time.Unix(1, 0), Line: "test"}, + }, recordPool.GetRecord(), 2) + require.Equal(t, ErrEntriesExist, err) + + // succeed with a greater counter + _, err = s.Push(context.Background(), []logproto.Entry{ + {Timestamp: time.Unix(1, 0), Line: "test"}, + }, recordPool.GetRecord(), 3) + require.Nil(t, err) + +} + func TestStreamIterator(t *testing.T) { const chunks = 3 const entries = 100 From 5847ad3887c5d4b72040b51a8d8f92a2e90bc108 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 8 Jul 2021 15:42:40 -0400 Subject: [PATCH 16/16] removes commented test --- pkg/ingester/encoding_test.go | 45 ----------------------------------- 1 file changed, 45 deletions(-) diff --git a/pkg/ingester/encoding_test.go b/pkg/ingester/encoding_test.go index 84beb5fe8ebf..accddddd63b9 100644 --- a/pkg/ingester/encoding_test.go +++ b/pkg/ingester/encoding_test.go @@ -50,51 +50,6 @@ func Test_Encoding_Series(t *testing.T) { require.Equal(t, record, decoded) } -// func Test_Encoding_Entries(t *testing.T) { -// record := &WALRecord{ -// entryIndexMap: make(map[uint64]int), -// UserID: "123", -// RefEntries: []RefEntries{ -// { -// Ref: 456, -// Counter: 1, -// Entries: []logproto.Entry{ -// { -// Timestamp: time.Unix(1000, 0), -// Line: "first", -// }, -// { -// Timestamp: time.Unix(2000, 0), -// Line: "second", -// }, -// }, -// }, -// { -// Ref: 789, -// Counter: 2, -// Entries: []logproto.Entry{ -// { -// Timestamp: time.Unix(3000, 0), -// Line: "third", -// }, -// { -// Timestamp: time.Unix(4000, 0), -// Line: "fourth", -// }, -// }, -// }, -// }, -// } - -// buf := record.encodeEntries(nil) - -// decoded := recordPool.GetRecord() - -// err := decodeWALRecord(buf, decoded) -// require.Nil(t, err) -// require.Equal(t, record, decoded) -// } - func Test_Encoding_Entries(t *testing.T) { for _, tc := range []struct { desc string