High-performance distributed JSON document system with atomic writes, RFC-standard merging, and intelligent caching.
- π Atomic Writes - Two-phase commit with pending state, prevents data loss in concurrent scenarios
- π RFC Standards - Full RFC 7396 (Merge Patch) and RFC 6902 (JSON Patch) support
- β‘ High Performance - 999,999 writes/sec per catalog, parallel I/O, worker pool optimization
- πΎ Smart Caching - Redis-based cache with namespace isolation (~90% hit ratio)
- π― Intelligent Encoding - MD5-based sharding, case-insensitive safe paths
- π Built-in Tracing - Context-based performance monitoring (zero overhead when disabled)
- π AES Encryption - Optional AES-GCM encryption with minimal overhead (<0.05ms)
- π Snapshot System - Time-range based snapshots for efficient incremental reads
go get github.com/hkloudou/lake/v2@latestpackage main
import (
"context"
"fmt"
"github.com/hkloudou/lake/v2"
)
func main() {
// Create client (config loaded lazily)
client := lake.NewLake("redis://localhost:6379")
ctx := context.Background()
// Write data
err := client.Write(ctx, lake.WriteRequest{
Catalog: "users",
Path: "/profile", // Path format: starts with /
Body: []byte(`{"name":"Alice","age":30}`),
MergeType: lake.MergeTypeReplace,
})
// List catalog entries
list := client.List(ctx, "users")
// Read merged data (β ReadString is most common)
jsonStr, _ := lake.ReadString(ctx, list)
fmt.Printf("Data: %s\n", jsonStr)
}This section provides precise API signatures and file locations for AI assistants and automation tools.
import "github.com/hkloudou/lake/v2"| Function | File | Description |
|---|---|---|
NewLake(metaUrl string, opts ...func(*option)) *Client |
lake.go:48 | Create client with Redis URL |
WithStorage(storage storage.Storage) func(*option) |
lake.go:133 | Use custom storage |
WithSnapCacheMetaURL(metaUrl string, ttl time.Duration) func(*option) |
lake.go:108 | Use separate Redis for snapshot cache |
WithDeltaCacheMetaURL(metaUrl string, ttl time.Duration) func(*option) |
lake.go:124 | Use separate Redis for delta cache |
| Function | File | Description |
|---|---|---|
(*Client) Write(ctx, WriteRequest) error |
write.go:53 | Write JSON data with merge strategy |
(*Client) WriteFile(ctx, WriteFileRequest) error |
file.go:21 | Write binary file to catalog |
WriteRequest struct (write.go:14):
type WriteRequest struct {
Catalog string // Document namespace (e.g., "users", "orders")
Path string // JSON path starting with "/" (e.g., "/profile", "/settings/theme")
Body []byte // JSON data as raw bytes
MergeType MergeType // lake.MergeTypeReplace, lake.MergeTypeRFC7396, or lake.MergeTypeRFC6902
Meta []byte // Optional metadata
}MergeType constants (internal/index/encoding.go:13):
lake.MergeTypeReplace // = 1: Simple field replacement
lake.MergeTypeRFC7396 // = 2: RFC 7396 JSON Merge Patch (null removes field)
lake.MergeTypeRFC6902 // = 3: RFC 6902 JSON Patch (operations array)| Function | File | Description |
|---|---|---|
(*Client) List(ctx, catalog string) *ListResult |
list.go:101 | Get catalog metadata and delta list |
ReadBytes(ctx, *ListResult) ([]byte, error) |
helpers.go:11 | Read as raw bytes |
ReadString(ctx, *ListResult) (string, error) |
helpers.go:15 | Read as JSON string β Most common |
ReadMap(ctx, *ListResult) (map[string]any, error) |
helpers.go:24 | Read as map |
Read[T any](ctx, *ListResult) (*T, error) |
helpers.go:38 | Read with generic type |
Common Read Pattern (β Most important):
// Recommended: Read as string (most common)
list := client.List(ctx, "users")
if list.Err != nil {
return list.Err
}
jsonStr, err := lake.ReadString(ctx, list)
// Alternative: Read with type inference
list := client.List(ctx, "users")
user, err := lake.Read[User](ctx, list)
// Alternative: Read as map
list := client.List(ctx, "users")
data, err := lake.ReadMap(ctx, list)ListResult struct (list.go:13):
type ListResult struct {
Err error // Non-nil if pending writes detected or other error
HasPending bool // True if write in progress (< 120s)
}
// Key methods:
func (m ListResult) Exist() bool // Returns true if data exists
func (m ListResult) LastUpdated() float64 // Returns timestamp of last update| Function | File | Description |
|---|---|---|
(*Client) Meta(ctx, catalog string) (string, error) |
meta.go:7 | Get catalog metadata |
(*Client) BatchMeta(ctx, catalogs []string) (map[string]string, error) |
meta.go:14 | Get multiple catalog metadata |
| Function | File | Description |
|---|---|---|
(*Client) WriteFile(ctx, WriteFileRequest) error |
file.go:21 | Write binary file |
(*Client) FileExists(ctx, catalog, path string) (bool, error) |
file.go:63 | Check if file exists |
(*Client) FilesAndMeta(ctx, catalog string) (string, error) |
file.go:75 | Get all files and metadata |
| Function | File | Description |
|---|---|---|
(*Client) ClearHistory(ctx, catalog string) error |
clear.go:10 | Clear all history, keep latest snapshot |
(*Client) ClearHistoryWithRetention(ctx, catalog string, keepSnaps int) error |
clear.go:26 | Clear history, keep N snapshots |
| Function | File | Description |
|---|---|---|
(*Client) MotionSample(ctx, catalog, indicator string, motionCatalogs []string, shouldUpdated func, callback func) (float64, error) |
sample.go:20 | Incremental sampling with change detection |
Basic Write and Read:
client := lake.NewLake("redis://localhost:6379")
ctx := context.Background()
// Write
err := client.Write(ctx, lake.WriteRequest{
Catalog: "users",
Path: "/profile",
Body: []byte(`{"name":"Alice","age":30}`),
MergeType: lake.MergeTypeReplace,
})
// Read (β most common pattern)
list := client.List(ctx, "users")
if list.Err != nil {
log.Fatal(list.Err)
}
jsonStr, err := lake.ReadString(ctx, list)RFC 7396 Merge Patch (partial update, null removes field):
err := client.Write(ctx, lake.WriteRequest{
Catalog: "users",
Path: "/profile",
Body: []byte(`{"age":31,"city":"NYC","oldField":null}`),
MergeType: lake.MergeTypeRFC7396,
})RFC 6902 JSON Patch (operations):
err := client.Write(ctx, lake.WriteRequest{
Catalog: "users",
Path: "/",
Body: []byte(`[{"op":"add","path":"/tags","value":["vip"]}]`),
MergeType: lake.MergeTypeRFC6902,
})Read with Type:
type UserProfile struct {
Name string `json:"name"`
Age int `json:"age"`
}
list := client.List(ctx, "users")
profile, err := lake.Read[UserProfile](ctx, list)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Name: %s, Age: %d\n", profile.Name, profile.Age)Handle Pending Writes:
list := client.List(ctx, "users")
if list.HasPending {
// Write in progress, retry after delay
time.Sleep(100 * time.Millisecond)
list = client.List(ctx, "users")
}
if list.Err != nil {
return list.Err
}
data, err := lake.ReadString(ctx, list)lake/
βββ lake.go # Client creation, options
βββ write.go # Write(), WriteRequest
βββ read.go # Internal read implementation
βββ list.go # List(), ListResult
βββ helpers.go # ReadBytes, ReadString, ReadMap, Read[T]
βββ file.go # WriteFile, FileExists, FilesAndMeta
βββ clear.go # ClearHistory, ClearHistoryWithRetention
βββ meta.go # Meta, BatchMeta
βββ sample.go # MotionSample
βββ snapshot.go # Internal snapshot management
βββ internal/
βββ index/ # Redis index operations
βββ storage/ # OSS, File, Memory storage
βββ merge/ # RFC 7396, RFC 6902 merge
βββ cache/ # Redis, Memory cache
βββ config/ # Configuration management
import "time"
client := lake.NewLake(
"redis://localhost:6379",
lake.WithSnapCacheMetaURL("redis://localhost:6379", 5*time.Minute),
)Recommended: Use a dedicated Redis instance for snapshot caching
Reasons:
- Data Isolation - Snapshots are pure cache (can be rebuilt), separate from critical index data
- Memory Management - Enable LRU eviction on cache Redis without affecting index data
- Performance - No persistence overhead (AOF/RDB disabled) for faster access
- Cost Optimization - Use cheaper ephemeral storage for cache Redis
- Independent Scaling - Scale cache and index Redis independently based on workload
Recommended Setup: OCI Bitnami Redis
# Install using Helm
helm install lake-cache oci://registry-1.docker.io/bitnamicharts/redis -f cache-redis-values.yamlcache-redis-values.yaml:
architecture: standalone # Standalone mode (single node)
replica:
replicaCount: 1 # Single node, no replicas needed
master:
kind: Deployment
persistence:
enabled: false # Disable persistence (ephemeral storage)
resources:
limits:
memory: "4096Mi" # Maximum memory: 4GB
requests:
memory: "256Mi" # Initial memory request: 256MB
configuration: |
# Disable AOF persistence
appendonly no
# Disable RDB auto-save
save ""
# Enable LRU cache with 4GB memory limit
maxmemory 4096mb
# Set eviction policy to allkeys-lru (evict least recently used keys)
maxmemory-policy allkeys-lru
auth:
enabled: false # Disable authentication (internal use)
usePassword: false
cluster:
enabled: false # Disable cluster mode
persistence:
existingClaim: false # Don't use existing PVC
enabled: false # Disable persistence
sysctlImage:
enabled: true
repository: busybox
tag: v1.35.0
command:
- /bin/sh
- '-c'
- |
mount -o remount rw /proc/sys
sysctl -w net.core.somaxconn=65535
sysctl -w net.ipv4.ip_local_port_range="1024 65535"Why This Configuration?
- β No Persistence - Cache Redis stores snapshot data content (can be rebuilt from OSS), no need to persist
- β LRU Eviction - Automatically evicts cached snapshot data when memory is full (only affects cache, not index data)
- β High Performance - No disk I/O overhead from AOF/RDB
- β Memory Efficient - Uses 4GB max, starts with 256MB
- β Optimized Networking - Increased connection limits for high throughput
Important:
- Cache Redis (this section): Only caches snapshot data content, can use LRU eviction
- Main Redis (below): Stores index data (snap/delta/pending members), permanently saved unless manually deleted, MUST have persistence enabled
Critical: Main Redis stores index data (delta/pending/snapshot members) and MUST have persistence enabled.
Recommended Configuration for Minimum Data Loss:
master:
persistence:
enabled: true
path: /data
configuration: |
dir /data
# Enable AOF persistence (Append-Only File)
appendonly yes
# Configure RDB auto-save policies (multiple time points for redundancy)
save 900 1 # Save after 900 seconds (15 min) if at least 1 modification
save 300 10 # Save after 300 seconds (5 min) if at least 10 modifications
save 60 100 # Save after 60 seconds (1 min) if at least 100 modifications
# AOF sync policy (sync every second - balance between performance and durability)
appendfsync everysec
# Disable dangerous commands (prevent accidental data loss)
rename-command FLUSHDB ""
rename-command FLUSHALL ""Why This is the Lowest Data Loss Configuration:
-
Dual Persistence (AOF + RDB)
- AOF: Logs every write operation, can recover to the last second
- RDB: Creates point-in-time snapshots at multiple intervals
- If AOF corrupts, RDB provides backup recovery
-
Multi-Level RDB Snapshots
- High-frequency writes: RDB every 1 minute (60s/100 changes)
- Medium-frequency: RDB every 5 minutes (300s/10 changes)
- Low-frequency: RDB every 15 minutes (900s/1 change)
- Ensures data is saved regardless of write pattern
-
AOF everysec (Best Balance)
- Max 1 second of data loss in worst case
- Better performance than
appendfsync always - More durable than
appendfsync no
-
Command Protection
- FLUSHDB/FLUSHALL disabled to prevent accidental deletion
- Critical for production environments
Data Loss Scenarios:
| Scenario | Max Data Loss | Recovery Method |
|---|---|---|
| Graceful shutdown | 0 seconds | AOF + RDB intact |
| Power failure | 1 second | AOF recovery |
| AOF corruption | Up to RDB interval | RDB snapshot recovery |
| Both corrupted | Manual recovery | Rebuild from OSS deltas |
Cache vs Main Redis Comparison:
| Feature | Cache Redis | Main Redis |
|---|---|---|
| Persistence | β Disabled | β AOF + RDB |
| Eviction | β LRU enabled | β No eviction |
| Data Importance | Low (rebuiltable) | Critical (index) |
| Disk I/O | None | Moderate |
| Max Data Loss | All (OK) | 1 second |
| Recovery Time | Fast (rebuild) | Instant (AOF) |
import "github.com/hkloudou/lake/v2/trace"
ctx := trace.WithTrace(context.Background(), "Write")
client.Write(ctx, req)
tr := trace.FromContext(ctx)
fmt.Println(tr.Dump())
// Output:
// === Trace [Write]: Total 248ms ===
// [1] Init: 14.84ms
// [2] PreCommit: 2.14ms {tsSeq:..., seqID:1}
// [3] StoragePut: 203.48ms {key:..., size:5}
// [4] Commit: 2.57msPath follows a strict format for network-safe transmission:
- Must start with
/- Like URL paths - Must not end with
/- No trailing slashes - Segments follow JavaScript naming - Start with letter/
_/$, followed by letters/digits/_/$/. - Root document: Use
"/"for entire document operations
Valid Examples:
/ β Root document
/user β Single field
/user/profile β Nested field (user.profile in JSON)
/user.info β Field with dot in name (user\.info in gjson)
/$config β Dollar sign prefix allowed
Invalid Examples:
user β No leading /
/user/ β Trailing /
/123 β Starts with number
/user-name β Contains hyphen
Lake V2 supports three merge strategies:
client.Write(ctx, lake.WriteRequest{
Path: "/user/name", // Path format
Body: []byte(`"Alice"`),
MergeType: lake.MergeTypeReplace,
})RFC 7396 - Declarative merging with null deletion:
// Merge patch (adds city, removes age with null)
client.Write(ctx, lake.WriteRequest{
Path: "/user",
Body: []byte(`{"city":"NYC","age":null}`),
MergeType: lake.MergeTypeRFC7396,
})RFC 6902 - Imperative operations (add, remove, replace, move, copy):
client.Write(ctx, lake.WriteRequest{
Path: "/", // Root document
Body: []byte(`[
{"op":"add","path":"/a/b/c","value":42},
{"op":"move","from":"/a/b/c","path":"/x/y/z"}
]`),
MergeType: lake.MergeTypeRFC6902,
})Lake V2 uses a two-phase commit protocol to prevent data loss:
- Pre-Commit - Generate TimeSeqID and mark as pending in Redis (atomic via Lua)
- Storage Write - Write to OSS/S3 (may be slow)
- Commit - Remove pending, add committed (atomic via Lua)
This ensures no writes are lost even if concurrent reads create snapshots during slow OSS operations.
Catalogs are intelligently encoded for optimal performance:
- Pure lowercase (
users):(prefix β9bc6/(users - Pure uppercase (
USERS):)prefix β4020/)USERS - Mixed/unsafe (
Users,δΈζ): base32 βf9aa/kvzwk4tt - MD5 sharding: 65,536 directories for balanced distribution
Store configuration in Redis at key lake.setting:
{
"Name": "my-lake",
"Storage": "oss",
"Bucket": "my-bucket",
"Endpoint": "oss-cn-hangzhou",
"AccessKey": "your-access-key",
"SecretKey": "your-secret-key",
"AESPwd": "optional-encryption-key"
}import "github.com/hkloudou/lake/v2/internal/storage"
client := lake.NewLake(
"redis://localhost:6379",
lake.WithStorage(storage.NewMemoryStorage()),
)- Write Throughput: 999,999 operations/sec per catalog
- Read Performance: 2x faster with async snapshot save (v2.2.0)
- Delta Loading: 10x faster with worker pool (10 concurrent)
- Cache Hit Ratio: ~90% typical workload
- Atomic Overhead: <2% (4ms for Redis operations)
Write Operation:
Init: 14ms (first write only, config loading)
PreCommit: 2ms (Redis Lua: generate ID + mark pending)
StoragePut: 180ms (OSS write - main bottleneck)
Commit: 2ms (Redis Lua: finalize)
βββββββββββββββββββ
Total: 198ms (OSS-dominated, atomic overhead minimal)
Read Operation (v2.2.0 - Async Snapshot):
Before v2.2.0 (sync snapshot):
LoadData: 180ms
Merge: 10ms
SnapSave: 200ms β Blocking!
ββββββββββββββ
Total: 390ms
After v2.2.0 (async snapshot):
LoadData: 180ms
Merge: 10ms
SnapSave: async β Non-blocking!
ββββββββββββββ
Total: 190ms β 2x faster! π
- Async Snapshot Save - Snapshot generation no longer blocks Read response
- Redis-Based Lock Detection - Uses Redis TIME to detect pending write timeouts (120s, clock-skew resistant)
- SingleFlight - Prevents duplicate concurrent snapshot saves
- Parallel I/O - Snapshot and deltas load concurrently
- Worker Pool - 10 concurrent delta loads
- Smart Caching - Redis cache with ~90% hit ratio
- Optimized Storage - Simplified member format saves ~30% Redis space
Redis Index:
{prefix}:delta:base64(catalog) -> ZADD
score: timestamp.seqid (float: timestamp + seqid/1000000.0)
Must have exactly 6 decimal places, seqid > 0
Valid: 1700000000.000001 to 1700000000.999999
Delta member format:
delta|{mergeType}|{field}|{tsSeq}
Example: delta|1|/user/name|1700000000_1
Pending member format (uncommitted writes):
pending|delta|{mergeType}|{field}|{tsSeq}
Example: pending|delta|1|/user/name|1700000000_1
Snapshot member format:
snap|{startTsSeq}|{stopTsSeq}
Example: snap|1700000000_1|1700000100_500
OSS Storage:
{md5[0:4]}/{encoded}/delta/{ts}_{seqid}_{type}.json
{md5[0:4]}/{encoded}/snap/{start}~{stop}.snap
Write (Atomic Two-Phase Commit):
1. Lua: GetTimeSeqID + ZADD pending|... (atomic)
2. OSS: PUT data file
3. Lua: ZREM pending + ZADD delta|... (atomic)
Read (Parallel + Async):
1. List: Get snapshot info + delta index
- Check pending writes using Redis TIME (< 120s = error, > 120s = ignore) β¨ v2.2.0
2. Parallel Load:
- Thread 1: Cache/OSS load snapshot data
- Thread 2: Worker pool load delta bodies (10 concurrent)
3. Merge: CPU-bound merge operation
4. Async: Save new snapshot (background, non-blocking) β¨ v2.2.0
- Async Snapshot Save: Read operations no longer wait for snapshot saves (2x faster!)
- Improved Pending Detection: Uses Redis TIME for accurate lock expiry (120s timeout, prevents clock skew)
- Unified Merge Interface: Single
Mergerinterface for all merge strategies (Replace, RFC7396, RFC6902) - Path Validation: Strict path format with
/prefix, network-safe for HTTP transmission - Enhanced Score Parsing: Support multiple formats (underscore/decimal/float64) with 6-decimal precision validation
- Optimized Storage: Simplified delta member format, ~30% space saving in Redis
- File Structure: Code organized into write.go, read.go, snapshot.go, helpers.go
- SingleFlight Snapshots: Prevents duplicate concurrent snapshot generation
- Simplified Architecture: Removed snapMgr dependency, cleaner code
# Run all tests
go test ./...
# Run with trace
go test -v -run TestWriteWithTrace
# Specific package
go test -v ./internal/mergeProblem: Concurrent writes with slow OSS may cause data loss during snapshots.
Solution: Two-phase commit with pending state
- Phase 1: Mark as
pending|in Redis (atomic) - Phase 2: Write to OSS
- Phase 3: Commit to
delta|(atomic)
Read Behavior (v2.2.0 - Optimized):
- Uses Redis TIME for accurate age calculation (avoids server clock skew)
- Pending < 120s: Error returned (write in progress, client should retry)
- Pending > 120s: Ignored (abandoned write, auto-cleaned)
- Error stored in
ListResult.Err(non-fatal, can be checked before Read) - Background updater syncs Redis time every 5s (minimal overhead)
list := client.List(ctx, catalog)
if list.Err != nil {
// Pending writes detected (age < 120s), retry later
time.Sleep(100 * time.Millisecond)
list = client.List(ctx, catalog)
}
data, _ := lake.ReadMap(ctx, list)Philosophy: Snapshot is an optimization, not critical data.
Behavior:
- Snapshot save failure does not fail Read operation
- Error recorded in trace for debugging
- Next read will regenerate snapshot
- Data consistency maintained (snapshots can be rebuilt)
Lake V2's two-phase commit ensures zero data loss even in failure scenarios. Here's how each failure case is handled:
| Failure Point | State After Failure | Cleanup Method |
|---|---|---|
| PreCommit fails | Clean - no dirty data | None needed |
| StoragePut fails | pending in Redis, no file |
Rollback removes pending β |
| Commit fails | pending in Redis + file in storage |
ClearHistory handles it β |
| Process crash (after StoragePut, before Commit) | pending in Redis + file in storage |
ClearHistory handles it β |
When Commit fails, the system intentionally does not immediately delete the orphan file. This design is safer:
- Commit failure may be transient - Network glitch, Redis failover, etc.
- Pending acts as a protection - Read operations detect
pendingand return an error, preventing inconsistent reads - ClearHistory is the correct cleanup point - User explicitly calls cleanup when historical data is no longer needed, ensuring sufficient time has passed
The ClearHistory API handles all cleanup scenarios:
// ClearHistory removes old deltas and snapshots
// This also cleans up any orphan files from failed commits
client.ClearHistory(ctx, "users")Internal flow:
ReadSafeRemoveRangereturns all delta members (includingpendingmembers)- For each delta/pending member, derive the storage path using
MakeDeltaKey(catalog, tsSeq, mergeType) - Delete storage files in parallel (10 workers)
- Batch delete Redis members via
ZREM
Key insight: The pending member contains all information needed to reconstruct the storage file path:
pending|delta|{mergeType}|{path}|{timestamp}_{seqid}
β β
mergeType tsSeq
Combined with catalog (from the ZSet key), we can call:
storageDeltaKey := storage.MakeDeltaKey(catalog, tsSeq, mergeType)
storage.Delete(ctx, storageDeltaKey)Unlike traditional two-phase commit systems, Lake V2 does not require a separate fragment/orphan tracking table because:
- Pending member IS the fragment tracker - Contains all info to locate orphan files
- ClearHistory is comprehensive - Cleans both committed deltas and uncommitted pending entries
- Simpler architecture - No additional Redis keys or background cleanup tasks
When a pending member exists:
- Age < 120 seconds: Read returns error (write in progress, client should retry)
- Age > 120 seconds:
pendingis ignored (considered abandoned, will be cleaned byClearHistory)
list := client.List(ctx, "users")
if list.HasPending {
// Active write in progress, retry later
return fmt.Errorf("pending write detected: %w", list.Err)
}- Call
ClearHistoryperiodically - This is the unified cleanup mechanism - Use
ClearHistoryWithRetention- Keep recent snapshots for performance - Don't worry about orphan files - They will be cleaned when you call
ClearHistory - Trust the pending mechanism - It prevents inconsistent reads during failures
Panic Locations (defensive programming):
WithSnapCacheMetaURL()- Invalid Redis URL at initializationmakeCatalogKey()- Prefix not set (internal invariant violation)
Rationale: These represent programming errors, not runtime errors. Fail-fast to catch bugs early.
- Basic Examples - Write, Read, RFC patches
- Trace Examples - Performance monitoring
- Cache Examples - Redis caching setup
Contributions are welcome! Please ensure:
- All tests pass (
go test ./...) - Code is formatted (
go fmt ./...) - Commits are descriptive
MIT License - see LICENSE
- GitHub: https://github.com/hkloudou/lake
- Issues: https://github.com/hkloudou/lake/issues
- Releases: https://github.com/hkloudou/lake/releases
Previous Version: For v1 (legacy), see the v1 branch.