Skip to content

Commit

Permalink
lib/storage: correctly use maxBlockSize in various checks
Browse files Browse the repository at this point in the history
Previously `maxBlockSize` has been multiplied by 8 in certain checks. This is unnecessary.
  • Loading branch information
valyala committed Sep 24, 2020
1 parent b8bce34 commit 533bf76
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 24 deletions.
11 changes: 6 additions & 5 deletions lib/storage/block.go
@@ -1,6 +1,7 @@
package storage

import (
"fmt"
"sync"
"sync/atomic"

Expand All @@ -9,11 +10,11 @@ import (
)

const (
// The maximum size of values in the block.
maxBlockSize = 64 * 1024

// The maximum number of rows per block.
maxRowsPerBlock = 8 * 1024

// The maximum size of values in the block.
maxBlockSize = 8 * maxRowsPerBlock
)

// Block represents a block of time series values for a single TSID.
Expand Down Expand Up @@ -259,7 +260,7 @@ func (b *Block) UnmarshalData() error {
}

if b.bh.RowsCount <= 0 {
logger.Panicf("BUG: RowsCount must be greater than 0; got %d", b.bh.RowsCount)
return fmt.Errorf("RowsCount must be greater than 0; got %d", b.bh.RowsCount)
}

var err error
Expand All @@ -281,7 +282,7 @@ func (b *Block) UnmarshalData() error {
b.valuesData = b.valuesData[:0]

if len(b.timestamps) != len(b.values) {
logger.Panicf("BUG: timestamps and values count mismatch; got %d vs %d", len(b.timestamps), len(b.values))
return fmt.Errorf("timestamps and values count mismatch; got %d vs %d", len(b.timestamps), len(b.values))
}

b.nextIdx = 0
Expand Down
33 changes: 20 additions & 13 deletions lib/storage/block_header.go
Expand Up @@ -150,26 +150,33 @@ func (bh *blockHeader) Unmarshal(src []byte) ([]byte, error) {
bh.PrecisionBits = uint8(src[0])
src = src[1:]

err = bh.validate()
return src, err
}

func (bh *blockHeader) validate() error {
if bh.RowsCount == 0 {
return src, fmt.Errorf("RowsCount in block header cannot be zero")
return fmt.Errorf("RowsCount in block header cannot be zero")
}
if err = encoding.CheckMarshalType(bh.TimestampsMarshalType); err != nil {
return src, fmt.Errorf("unsupported TimestampsMarshalType: %w", err)
if bh.RowsCount > 2*maxRowsPerBlock {
return fmt.Errorf("too big RowsCount; got %d; cannot exceed %d", bh.RowsCount, 2*maxRowsPerBlock)
}
if err = encoding.CheckMarshalType(bh.ValuesMarshalType); err != nil {
return src, fmt.Errorf("unsupported ValuesMarshalType: %w", err)
if err := encoding.CheckMarshalType(bh.TimestampsMarshalType); err != nil {
return fmt.Errorf("unsupported TimestampsMarshalType: %w", err)
}
if err = encoding.CheckPrecisionBits(bh.PrecisionBits); err != nil {
return src, err
if err := encoding.CheckMarshalType(bh.ValuesMarshalType); err != nil {
return fmt.Errorf("unsupported ValuesMarshalType: %w", err)
}
if bh.TimestampsBlockSize > 2*8*maxBlockSize {
return src, fmt.Errorf("too big TimestampsBlockSize; got %d; cannot exceed %d", bh.TimestampsBlockSize, 2*8*maxBlockSize)
if err := encoding.CheckPrecisionBits(bh.PrecisionBits); err != nil {
return err
}
if bh.ValuesBlockSize > 2*8*maxBlockSize {
return src, fmt.Errorf("too big ValuesBlockSize; got %d; cannot exceed %d", bh.ValuesBlockSize, 2*8*maxBlockSize)
if bh.TimestampsBlockSize > 2*maxBlockSize {
return fmt.Errorf("too big TimestampsBlockSize; got %d; cannot exceed %d", bh.TimestampsBlockSize, 2*maxBlockSize)
}

return src, nil
if bh.ValuesBlockSize > 2*maxBlockSize {
return fmt.Errorf("too big ValuesBlockSize; got %d; cannot exceed %d", bh.ValuesBlockSize, 2*maxBlockSize)
}
return nil
}

// unmarshalBlockHeaders unmarshals all the block headers from src,
Expand Down
4 changes: 2 additions & 2 deletions lib/storage/block_header_test.go
Expand Up @@ -25,8 +25,8 @@ func TestBlockHeaderMarshalUnmarshal(t *testing.T) {
bh.MaxTimestamp = int64(i*2e3 + 3)
bh.TimestampsBlockOffset = uint64(i*12345 + 4)
bh.ValuesBlockOffset = uint64(i*3243 + 5)
bh.TimestampsBlockSize = uint32(i*892 + 6)
bh.ValuesBlockSize = uint32(i*894 + 7)
bh.TimestampsBlockSize = uint32((i*892 + 6) % maxBlockSize)
bh.ValuesBlockSize = uint32((i*894 + 7) % maxBlockSize)
bh.RowsCount = uint32(i*3 + 8)
bh.Scale = int16(i - 434 + 9)
bh.TimestampsMarshalType = encoding.MarshalType((i + 10) % 7)
Expand Down
4 changes: 2 additions & 2 deletions lib/storage/metaindex_row.go
Expand Up @@ -120,8 +120,8 @@ func (mr *metaindexRow) Unmarshal(src []byte) ([]byte, error) {
if mr.BlockHeadersCount <= 0 {
return src, fmt.Errorf("BlockHeadersCount must be greater than 0")
}
if mr.IndexBlockSize > 2*8*maxBlockSize {
return src, fmt.Errorf("too big IndexBlockSize; got %d; cannot exceed %d", mr.IndexBlockSize, 2*8*maxBlockSize)
if mr.IndexBlockSize > 2*maxBlockSize {
return src, fmt.Errorf("too big IndexBlockSize; got %d; cannot exceed %d", mr.IndexBlockSize, 2*maxBlockSize)
}

return src, nil
Expand Down
4 changes: 2 additions & 2 deletions lib/storage/metaindex_row_test.go
Expand Up @@ -92,8 +92,8 @@ func initTestMetaindexRow(mr *metaindexRow) {
if mr.BlockHeadersCount == 0 {
mr.BlockHeadersCount = 1
}
if mr.IndexBlockSize > 2*8*maxBlockSize {
mr.IndexBlockSize = 2 * 8 * maxBlockSize
if mr.IndexBlockSize > 2*maxBlockSize {
mr.IndexBlockSize = 2 * maxBlockSize
}
}

Expand Down

0 comments on commit 533bf76

Please sign in to comment.