Skip to content

agnosticeng/concrete

Repository files navigation

Concrete

A row file format for efficient blockchain data storage and querying.

Concrete is a row-oriented, block-compressed file format designed by Agnostic for storing and querying large volumes of blockchain data extracted from EVM RPC APIs. It enables fast, cost-effective reprocessing of historical blockchain data without repeatedly calling slow and expensive RPC endpoints.

Features

  • Row-Oriented Storage: Stores complete documents (blocks, transactions, receipts, etc.) optimized for full-document retrieval during indexing workflows
  • Block Compression: Data is organized into row groups with configurable block sizes, each compressed independently
  • Multiple Compression Codecs: Support for NONE, LZ4, GZIP, and ZSTD compression
  • Skip Indexes: Rich indexing support to avoid scanning unnecessary data:
    • Min-Max Indexes: Fast range queries on numeric fields (e.g., block numbers, timestamps)
    • XOR Filters: Probabilistic membership testing for exact-match queries (e.g., transaction hashes, addresses)
  • Rich Metadata: Protocol Buffers-based footer with customizable metadata for file provenance and schema information
  • Streaming Writes: Efficient sequential writes with configurable row group sizes
  • Fast Random Access: Footer-based index lookup enables efficient random access to specific data ranges

Installation

go get github.com/agnosticeng/concrete

Quick Start

Writing Data

package main

import (
    "os"
    "github.com/agnosticeng/concrete"
    "github.com/agnosticeng/concrete/proto"
)

func main() {
    f, err := os.Create("blocks.concrete")
    if err != nil {
        panic(err)
    }
    defer f.Close()

    w, err := concrete.NewWriter(f, concrete.WriterConfig{
        CompressionCodec: proto.CompressionCodec_LZ4,
        BlockSize:        10 * 1024 * 1024, // 10MB blocks
    })
    if err != nil {
        panic(err)
    }

    // Write blockchain data (JSON, protobuf, or any binary format)
    for i := 0; i < 1000; i++ {
        blockData := []byte(`{"number": 12345, "hash": "0x...", "transactions": [...]}`)
        if err := w.WriteRow(blockData); err != nil {
            panic(err)
        }
    }

    // Add metadata
    w.WriteMetadata(map[string]interface{}{
        "chain_id":    1,
        "start_block": 1000,
        "end_block":   2000,
        "created_at":  "2024-01-01T00:00:00Z",
    })

    if err := w.Close(); err != nil {
        panic(err)
    }
}

Writing with Skip Indexes

package main

import (
    "os"
    "github.com/agnosticeng/concrete"
    "github.com/agnosticeng/concrete/proto"
    "github.com/agnosticeng/concrete/skipindex/builder"
)

func main() {
    f, err := os.Create("blocks_indexed.concrete")
    if err != nil {
        panic(err)
    }
    defer f.Close()

    // Define skip indexes
    indices := []builder.Definition{
        builder.NewDefinition("block_number", builder.NewMinMaxUInt64Builder()),
        builder.NewDefinition("tx_hash", builder.MustNewXorFilterBuilder(proto.Hash64Function_XXHASH)),
    }

    w, err := concrete.NewWriterWithSkipIndices(f, indices, concrete.WriterConfig{
        CompressionCodec: proto.CompressionCodec_ZSTD,
        BlockSize:        5 * 1024 * 1024,
    })
    if err != nil {
        panic(err)
    }

    // Write data with skip index values
    for _, block := range blocks {
        if err := w.WriteRow(
            block.Data,
            builder.Values{
                MinMaxUInt64: builder.MinMaxUInt64Values(block.Number),
                XorFilter:    builder.XorFilterValues{block.TxHash},
            },
        ); err != nil {
            panic(err)
        }
    }

    if err := w.Close(); err != nil {
        panic(err)
    }
}

Reading Data

package main

import (
    "io"
    "os"
    "github.com/agnosticeng/concrete"
)

func main() {
    f, err := os.Open("blocks.concrete")
    if err != nil {
        panic(err)
    }
    defer f.Close()

    input, err := concrete.NewFileInput(f)
    if err != nil {
        panic(err)
    }

    r, err := concrete.NewReader(input, concrete.ReaderConfig{})
    if err != nil {
        panic(err)
    }
    defer r.Close()

    // Read all rows
    for {
        row, err := r.ReadRow()
        if err == io.EOF {
            break
        }
        if err != nil {
            panic(err)
        }
        // Process row (block data)
        _ = row
    }
}

Reading with Skip Index Queries

package main

import (
    "io"
    "os"
    "github.com/agnosticeng/concrete"
    "github.com/agnosticeng/concrete/skipindex/query"
    "github.com/samber/mo"
)

func main() {
    f, err := os.Open("blocks_indexed.concrete")
    if err != nil {
        panic(err)
    }
    defer f.Close()

    input, err := concrete.NewFileInput(f)
    if err != nil {
        panic(err)
    }

    // Query for blocks in range [1000, 2000]
    q := query.MinMaxUInt64("block_number", mo.Some(uint64(1000)), mo.Some(uint64(2000)))

    r, err := concrete.NewReaderWithQuery(input, q, concrete.ReaderConfig{})
    if err != nil {
        panic(err)
    }
    defer r.Close()

    // Only matching row groups will be scanned
    for {
        row, err := r.ReadRow()
        if err == io.EOF {
            break
        }
        if err != nil {
            panic(err)
        }
        _ = row
    }
}

Advanced Query Operations

// AND query: combine multiple skip indexes
q := query.And(
    query.MinMaxUInt64("block_number", mo.Some(uint64(1000)), mo.Some(uint64(2000))),
    query.XorFilter("address", [][]byte{[]byte("0x1234...")}),
)

// OR query
q := query.Or(
    query.MinMaxUInt64("timestamp", mo.Some(uint64(1700000000)), mo.None[uint64]()),
    query.MinMaxUInt64("timestamp", mo.None[uint64](), mo.Some(uint64(1710000000))),
)

// NOT query
q := query.Not(
    query.MinMaxUInt64("block_number", mo.Some(uint64(0)), mo.Some(uint64(100))),
)

// Match all (no filtering)
q := query.MatchAll()

Direct Row Group Access

r, _ := concrete.NewReader(input, concrete.ReaderConfig{})
footer := r.Footer()

// Access specific row group directly
rg, err := r.RowGroup(0)
if err != nil {
    panic(err)
}
defer rg.Close()

for {
    row, err := rg.ReadRow()
    if err == io.EOF {
        break
    }
    _ = row
}

File Format

┌─────────────────────────────────────────────────────────────┐
│                      Magic Number ("CC")                     │
├─────────────────────────────────────────────────────────────┤
│                    Row Group 0 (compressed)                  │
│  ┌─────────────────────────────────────────────────────┐    │
│  │  Length-Delimited Row 0                              │    │
│  │  Length-Delimited Row 1                              │    │
│  │  ...                                                 │    │
│  └─────────────────────────────────────────────────────┘    │
├─────────────────────────────────────────────────────────────┤
│                    Row Group 1 (compressed)                  │
├─────────────────────────────────────────────────────────────┤
│                          ...                                 │
├─────────────────────────────────────────────────────────────┤
│              Skip Index Data Section 0                       │
│  ┌─────────────────────────────────────────────────────┐    │
│  │  Marker ("CCSI")                                     │    │
│  │  Compressed Index Data                               │    │
│  │  Marker ("CCSI")                                     │    │
│  └─────────────────────────────────────────────────────┘    │
├─────────────────────────────────────────────────────────────┤
│                          Footer                              │
│  ┌─────────────────────────────────────────────────────┐    │
│  │  Compression Codec                                   │    │
│  │  Row Group Offsets & Lengths                         │    │
│  │  Skip Index Definitions                              │    │
│  │  Metadata (protobuf.Struct)                          │    │
│  └─────────────────────────────────────────────────────┘    │
├─────────────────────────────────────────────────────────────┤
│              Footer Size (8 bytes, little-endian)            │
├─────────────────────────────────────────────────────────────┤
│                      Magic Number ("CC")                     │
└─────────────────────────────────────────────────────────────┘

Architecture

Core Components

  • Writer: Streams data to disk with configurable block sizes and compression
  • Reader: Provides sequential and random access to stored data
  • Row Group: Independent compression unit containing multiple rows
  • Skip Index Builder: Constructs indexes during write operations
  • Skip Index Query: Evaluates queries against indexes to filter row groups

Compression

Concrete supports multiple compression codecs via the CompressionCodec enum:

Codec Description Use Case
NONE No compression Debugging, already-compressed data
LZ4 Fast compression/decompression Low-latency queries
GZIP Balanced compression ratio/speed General purpose
ZSTD High compression ratio, fast decode Large datasets, archival

Skip Indexes

Min-Max Index

Stores minimum and maximum values for each row group. Ideal for:

  • Block number ranges
  • Timestamp ranges
  • Numeric field filtering
// Query blocks 1000-2000
query.MinMaxUInt64("block_number", mo.Some(1000), mo.Some(2000))

XOR Filter

Probabilistic data structure for membership testing. Ideal for:

  • Transaction hash lookups
  • Address filtering
  • Exact match queries

Note: XOR filters may produce false positives but never false negatives.

// Query specific transaction hashes
query.XorFilter("tx_hash", [][]byte{txHash1, txHash2})

Use Cases

Blockchain Indexing

// Store extracted blockchain data for later reprocessing
// Avoid repeated RPC calls for historical data

Historical Data Analysis

// Query specific block ranges without scanning entire dataset
// Efficient time-series analysis on block timestamps

Transaction Lookup

// Fast transaction hash lookups using XOR filters
// Avoid full table scans for point queries

Testing

Run tests:

go test -v ./...

License

[Specify license here]

Contributing

[Specify contribution guidelines here]

Related Projects

  • Blockfile - Larger ecosystem this format is part of
  • Parquet - Inspiration for columnar storage concepts
  • ORC - Similar row-group based file format

Concrete is developed and maintained by Agnostic since 2024.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors