Skip to content

Commit

Permalink
[feat:lindb#21][storage:memdb]: add flusher
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingCrush committed Jul 14, 2019
1 parent c9a5921 commit ffdaa68
Show file tree
Hide file tree
Showing 19 changed files with 222 additions and 247 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ bin/
*.so
*.dylib

# mock files
*mock.go

web/node_modules/
web/build/*

Expand Down
9 changes: 6 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,19 @@ build-all: build-frontend build ## Build executable files with front-end files

GOLANGCI_LINT_VERSION ?= "latest"

generate: ## go generate
pre-test: ## go generate mock file.
if [ ! -e ${GOPATH}/bin/mockgen ]; then \
go get github.com/golang/mock/mockgen; \
fi
go list ./... | grep -v '/vendor/' | xargs go generate

test: ## Run test cases. (Args: GOLANGCI_LINT_VERSION=latest)
if [ ! -e ./bin/golangci-lint ]; then \
curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s $(GOLANGCI_LINT_VERSION); \
fi
./bin/golangci-lint run
GO111MODULE=on go test -v -race -coverprofile=coverage.out -covermode=atomic ./...

test: pre-test ## Run test cases. (Args: GOLANGCI_LINT_VERSION=latest)
GO111MODULE=on go test -v -race -coverprofile=coverage.out -covermode=atomic ./...

deps: ## Update vendor.
go mod verify
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ LinDB("Lin" stands for wisdom in Shanghainese) is a open-source Time Series Data
## State of this project

The current develop branch is unstable and is not recommended for production use. LinDB 0.1(what will be the first release version) is currently in the development stage.
Additional features will arrive during next month, we will translate the JAVA version of LinDB currently used under the production environment to Golang as soon as possible.
Additional features will arrive during July, we will translate the JAVA version of LinDB currently used under the production environment to Golang as soon as possible.

Once we implement the final feature and replace the LinDB under production environment with the Golang version, LinDB 0.1.0 will be released. At that point, we will move into the stable phase, our intention is to avoid breaking changes to the API and storage file format.

Expand Down
1 change: 1 addition & 0 deletions kv/table/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type storeBuilder struct {
writer bufioutil.BufioWriter
offset *encoding.DeltaBitPackingEncoder

// see paper of roaring bitmap: https://arxiv.org/pdf/1603.06549.pdf
keys *roaring.Bitmap
minKey uint32
maxKey uint32
Expand Down
128 changes: 0 additions & 128 deletions models/point_mock.go

This file was deleted.

4 changes: 2 additions & 2 deletions pkg/field/agg_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ func GetAggFunc(funcType AggType) AggFunc {

// AggFunc represents field's aggregator function for int64 or float64 value
type AggFunc interface {
// AggregateInt aggregates two int64 values, return aggregate result
// AggregateInt aggregates two int64 values into one
AggregateInt(a, b int64) int64
// AggregateInt aggregates two float64 values, return aggregate result
// AggregateInt aggregates two float64 values into one
AggregateFloat(a, b float64) float64
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/interval/interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type Calculator interface {
CalSegmentTime(timestamp int64) int64
// CalFamily calculates family base time based on given timestamp
CalFamily(timestamp int64, segmentTime int64) int
// CalFamilyStartTime calculates famliy start time based on segment time and family
// CalFamilyStartTime calculates family start time based on segment time and family
CalFamilyStartTime(segmentTime int64, family int) int64
// CalSlot calculates field store slot index based on given timestamp and base time
CalSlot(timestamp, baseTime, interval int64) int
Expand Down
2 changes: 2 additions & 0 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
const options = "OPTIONS"
const shardPath = "shard"

//go:generate mockgen -source ./engine.go -destination=./engine_mock.go -package tsdb

// Engine represents a time series storage engine
type Engine interface {
// Name returns tsdb engine's name, engine's name is database's name for user
Expand Down
2 changes: 2 additions & 0 deletions tsdb/index.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
package tsdb

//go:generate mockgen -source ./index.go -destination=./index_mock.go -package tsdb

type Index interface {
}
50 changes: 26 additions & 24 deletions tsdb/memdb/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (
"github.com/eleme/lindb/pkg/field"
)

// the longest length of basic-variable on x64 platform
const maxTimeWindow = 64

// blockStore represents a pool of block for reuse
// blockStore carries a pool of block for reuse
type blockStore struct {
timeWindow int
intBlockPool sync.Pool
Expand Down Expand Up @@ -88,20 +89,21 @@ type block interface {
bytes() []byte
}

// container marks if has value in block based on index
// container(bit array) is a mapping from 64 value to uint64 in big-endian,
// it is a temporary data-structure for compressing data.
type container struct {
container uint64
startTime int

compress []byte
}

// hasValue returns if has value with pos, if has value return true
// hasValue returns whether value is absent or present at pos, if present return true
func (c *container) hasValue(pos int) bool {
return c.container&(1<<uint64(maxTimeWindow-pos-1)) != 0
}

// setValue marks pos has value
// setValue marks pos is present
func (c *container) setValue(pos int) {
c.container |= 1 << uint64(maxTimeWindow-pos-1)
}
Expand All @@ -119,7 +121,7 @@ func (c *container) getStartTime() int {

// getEndTime returns end time slot
func (c *container) getEndTime() int {
// get trainilng zeros for container
// get trailing zeros for container
trailing := bits.TrailingZeros64(c.container)
return c.startTime + (maxTimeWindow - trailing) - 1
}
Expand Down Expand Up @@ -156,11 +158,11 @@ func (b *intBlock) updateValue(pos int, value int64) {
// compact compress block data
func (b *intBlock) compact(aggFunc field.AggFunc) error {
//TODO handle error
merge := newMerge(b, b.values, b.compress, aggFunc)
merger := newMerger(b, b.values, b.compress, aggFunc)
// do merge logic
merge.merge()
merger.merge()

buf, err := merge.tsd.Bytes()
buf, err := merger.tsd.Bytes()
if err != nil {
return err
}
Expand Down Expand Up @@ -203,10 +205,10 @@ func (b *floatBlock) compact(aggFunc field.AggFunc) error {
return nil
}

// merge is merge operation which provides compress block data.
// merger is merge operation which provides compress block data.
// 1) compress data not exist, just compress current block values
// 2) compress data exist, merge compress data and block values
type merge struct {
type merger struct {
block block
values []int64
compressData []byte
Expand All @@ -220,9 +222,9 @@ type merge struct {
aggFunc field.AggFunc
}

// newMerge creates merge operation with given agg func based on block data and exist compress data
func newMerge(block block, values []int64, compressData []byte, aggFunc field.AggFunc) *merge {
m := &merge{
// newMerger creates merge operation with given agg func based on block data and exist compress data
func newMerger(block block, values []int64, compressData []byte, aggFunc field.AggFunc) *merger {
m := &merger{
block: block,
values: values,
compressData: compressData,
Expand All @@ -233,7 +235,7 @@ func newMerge(block block, values []int64, compressData []byte, aggFunc field.Ag
}

// init initializes merge context, such time range, tsd decoder if has compress data
func (m *merge) init() {
func (m *merger) init() {
curStartTime := m.block.getStartTime()
curEndTime := m.block.getEndTime()
if len(m.compressData) == 0 {
Expand All @@ -260,9 +262,9 @@ func (m *merge) init() {
}

// merge does merge logic
func (m *merge) merge() {
func (m *merger) merge() {
if m.oldData == nil {
// compress data not eixst, just compress block data
// compress data not exist, just compress block data
m.compress()
} else {
// has old compress data, need merge block data
Expand Down Expand Up @@ -296,18 +298,18 @@ func (m *merge) merge() {
}

// isInRange return slot if in range, yes return true
func (m *merge) isInRange(slot, start, end int) bool {
func (m *merger) isInRange(slot, start, end int) bool {
return slot >= start && slot <= end
}

// mergeData merges current block values and compress data
func (m *merge) mergeData(newPos, oldPos int) {
func (m *merger) mergeData(newPos, oldPos int) {
b := m.block
hasValue := b.hasValue(newPos)
hasOldValue := m.oldData.HasValueWithSlot(oldPos)
switch {
case hasValue && hasOldValue:
// has value both in current and old, do rullup operation with agg func
// has value both in current and old, do rollup operation with agg func
val := m.aggFunc.AggregateInt(m.values[newPos], encoding.ZigZagDecode(m.oldData.Value()))
m.appendValue(encoding.ZigZagEncode(val))
case hasValue:
Expand All @@ -323,14 +325,14 @@ func (m *merge) mergeData(newPos, oldPos int) {
}

// compress compress current block values
func (m *merge) compress() {
func (m *merger) compress() {
for i := m.startTime; i <= m.endTime; i++ {
m.appendNewData(i - m.startTime)
}
}

// appendNewData appends current block value with pos
func (m *merge) appendNewData(pos int) {
func (m *merger) appendNewData(pos int) {
if m.block.hasValue(pos) {
m.appendValue(encoding.ZigZagEncode(m.values[pos]))
} else {
Expand All @@ -339,7 +341,7 @@ func (m *merge) appendNewData(pos int) {
}

// appendOldData reads compress data then appends it with new pos
func (m *merge) appendOldData(pos int) {
func (m *merger) appendOldData(pos int) {
if m.oldData.HasValueWithSlot(pos) {
m.appendValue(m.oldData.Value())
} else {
Expand All @@ -348,12 +350,12 @@ func (m *merge) appendOldData(pos int) {
}

// appendValue appends value with new pos
func (m *merge) appendValue(val uint64) {
func (m *merger) appendValue(val uint64) {
m.tsd.AppendTime(bit.One)
m.tsd.AppendValue(val)
}

// appendEmptyValue appends time slot only
func (m *merge) appendEmptyValue() {
func (m *merger) appendEmptyValue() {
m.tsd.AppendTime(bit.Zero)
}
2 changes: 1 addition & 1 deletion tsdb/memdb/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const (
// mask for calculating sharding-index by AND
shardingCountMask = shardingCountOfMStores - 1
// use this limit of metric-store when maxTagsLimit is not set
defaultMaxTagsLimit = 10000
defaultMaxTagsLimit = 100000
)

// use var for mocking
Expand Down
Loading

0 comments on commit ffdaa68

Please sign in to comment.