Skip to content

dot-do/sqlake

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

sqlake

The edge-native data lakehouse for Cloudflare.

sqlake bridges the gap between real-time transactional data in Durable Objects and analytics-ready data in a Parquet-based data lake. Define your schema once with Drizzle, and sqlake handles the rest: SQLite migrations, CDC streaming, Parquet files, Iceberg catalogs, and unified querying.

import { SQLake, sql, text, integer } from 'sqlake'

// Define your schema once
const db = SQLake({
  users: {
    id: text('id').primaryKey(),
    email: text('email').notNull(),
    plan: text('plan'),
  },
  shard: { users: 'id' },
})

// Query single user (strong consistency - routes to Durable Object)
const user = await sql`SELECT * FROM users WHERE id = ${userId}`.first()

// Query ALL users (eventual consistency - routes to DuckDB on R2)
const stats = await sql.analytics`
  SELECT plan, COUNT(*) as count
  FROM users
  GROUP BY plan
`

// Time travel query (query historical data)
const yesterday = new Date(Date.now() - 24 * 60 * 60 * 1000)
const oldData = await sql.asOf(yesterday)`SELECT * FROM users`

Table of Contents


Overview

The Problem

Cloudflare Durable Objects give you globally distributed SQLite databases with zero configuration. But then you hit the wall:

  • Cross-shard queries: Each DO is isolated - you cannot query across all users
  • Analytics: Building aggregations requires custom data pipelines
  • Historical data: No built-in time travel or audit trails
  • Schema evolution: Coordinating migrations across thousands of DOs is complex

The Solution

sqlake automatically streams every write from your Durable Objects to a unified data lake on R2:

  • Unified schema: Define once with Drizzle, works everywhere
  • Automatic CDC: SQLite triggers capture every change
  • Parquet output: Columnar format optimized for analytics
  • Iceberg catalog: Time travel, schema evolution, snapshot isolation
  • Query routing: Automatic routing to DO (strong) or DuckDB (analytics)

Key Benefits

Feature Benefit
Edge-native Runs entirely on Cloudflare's edge infrastructure
SQLite + Iceberg Best of OLTP and OLAP in one system
Real-time CDC Changes stream to data lake in near real-time
Time travel Query data at any point in history
Zero infrastructure No external databases or data warehouses needed

Architecture

+------------------------------------------------------------------+
|                        Your Application                           |
+------------------------------------------------------------------+
|                                                                  |
|   sql`SELECT * FROM users WHERE id = ${id}`                     |
|                      |                                           |
|                      v                                           |
|              +---------------+                                   |
|              | Query Router  |                                   |
|              +-------+-------+                                   |
|        Strong |             | Eventual                          |
|               v             v                                    |
|     +-------------+   +----------------+                        |
|     | Durable     |   | DuckDB/Parquet |                        |
|     | Object      |   | on R2          |                        |
|     | (SQLite)    |   |                |                        |
|     +------+------+   +----------------+                        |
|            |                  ^                                  |
|            | CDC Triggers     |                                  |
|            v                  |                                  |
|     +---------------+         |                                  |
|     | CDC Buffer    +---------+                                  |
|     | (Parquet)     | Flush                                     |
|     +---------------+                                           |
|                                                                  |
+------------------------------------------------------------------+

Components

  1. Durable Objects (SQLakeDO)

    • Per-user/per-tenant SQLite databases
    • Strong consistency for transactional workloads
    • CDC triggers automatically installed
  2. CDC (Change Data Capture)

    • SQLite triggers capture INSERT/UPDATE/DELETE
    • Events buffered in _cdc_buffer table
    • Background flush to R2 as Parquet files
  3. R2 Storage

    • Parquet files with Hive-style partitioning (_shard=xxx/)
    • Iceberg metadata for table management
    • Zone maps for efficient file pruning
  4. Iceberg Tables

    • Apache Iceberg v2 compatible metadata
    • Snapshot-based time travel
    • Schema evolution support
    • Manifest files with column statistics
  5. DuckDB Analytics

    • WASM or native DuckDB for query execution
    • Reads Parquet files directly from R2
    • Predicate pushdown using zone maps

Quick Start

Installation

npm install sqlake drizzle-orm

Define Your Schema

// schema.ts
import { SQLake, text, integer, real } from 'sqlake'

export const db = SQLake({
  users: {
    id: text('id').primaryKey(),
    email: text('email').notNull(),
    name: text('name'),
    plan: text('plan'),
    createdAt: integer('created_at', { mode: 'timestamp' }),
  },
  orders: {
    id: text('id').primaryKey(),
    userId: text('user_id').notNull(),
    amount: real('amount').notNull(),
    status: text('status'),
  },
  // Configure shard keys
  shard: {
    users: 'id',
    orders: 'userId',
  },
  // Storage mode for Parquet output
  storage: 'hybrid',  // 'columns' | 'variant' | 'hybrid'
  // R2 bucket binding name
  r2Bucket: 'SQLAKE_BUCKET',
})

// Type inference
export type User = typeof db.tables.users.$inferSelect
export type Order = typeof db.tables.orders.$inferSelect

Create Your Durable Object

// worker.ts
import { SQLakeDO } from 'sqlake/do'
import { db } from './schema'

export class UserDO extends SQLakeDO {
  schema = db

  async getUser(id: string) {
    return this.sql`SELECT * FROM users WHERE id = ${id}`.first()
  }

  async createUser(id: string, email: string) {
    await this.sql`INSERT INTO users (id, email) VALUES (${id}, ${email})`.run()
    return { id, email }
  }

  async updatePlan(id: string, plan: string) {
    await this.sql`UPDATE users SET plan = ${plan} WHERE id = ${id}`.run()
  }
}

export default {
  async fetch(request: Request, env: Env) {
    const url = new URL(request.url)
    const userId = url.searchParams.get('userId')

    if (!userId) {
      return new Response('Missing userId', { status: 400 })
    }

    const doId = env.USER_DO.idFromName(userId)
    const stub = env.USER_DO.get(doId)

    // Route to the user's Durable Object
    return stub.fetch(request)
  }
}

Configure Wrangler

# wrangler.toml
name = "my-app"
main = "src/worker.ts"

[[durable_objects.bindings]]
name = "USER_DO"
class_name = "UserDO"

[[migrations]]
tag = "v1"
new_classes = ["UserDO"]

[[r2_buckets]]
binding = "SQLAKE_BUCKET"
bucket_name = "my-app-data"

Generate and Deploy

# Generate SQLite migrations and Parquet schema
npx sqlake generate

# Push migrations to R2 for DO consumption
npx sqlake push

# Deploy to Cloudflare
npx wrangler deploy

Core Concepts

Schema Definition

sqlake uses Drizzle ORM's column types to define your schema:

import { SQLake, text, integer, real, blob } from 'sqlake'

const db = SQLake({
  // Table definitions (Drizzle column types)
  products: {
    id: text('id').primaryKey(),
    name: text('name').notNull(),
    price: real('price').notNull(),
    stock: integer('stock').default(0),
    metadata: blob('metadata', { mode: 'json' }),
  },

  // Shard key configuration
  shard: {
    products: 'id',  // Routes queries by product ID
  },

  // Storage mode for Parquet
  storage: 'hybrid',  // columns + _row variant

  // Enable CDC (default: true)
  cdc: true,

  // R2 bucket binding name
  r2Bucket: 'SQLAKE_BUCKET',

  // Variant column name (default: '_row')
  variantColumn: '_row',
})

Storage Modes

Mode Description Use Case
columns First-class columns only Pure analytics workloads
variant _row VARIANT column only Document retrieval
hybrid Both columns and _row Mixed workloads (recommended)

SQLakeDO (Durable Object Base Class)

SQLakeDO is the base class for your Durable Objects that provides:

  • Automatic migration on cold start
  • CDC trigger installation
  • Background CDC flush to R2
  • DO-local SQL query interface
import { SQLakeDO, type SQLakeWorkerEnv } from 'sqlake/do'
import { db } from './schema'

interface MyEnv extends SQLakeWorkerEnv {
  API_KEY: string
}

export class MyDO extends SQLakeDO {
  // Required: link your schema
  schema = db

  // Optional: customize CDC settings
  protected cdcFlushThreshold = 1000      // Flush every N records
  protected cdcFlushInterval = 60_000     // Flush every N ms

  // Use this.sql for DO-local queries
  async getData() {
    return this.sql`SELECT * FROM products`.all()
  }

  async insertData(id: string, name: string) {
    await this.sql`INSERT INTO products (id, name) VALUES (${id}, ${name})`.run()
  }

  // Migration status
  getStatus() {
    return {
      migrations: this.getMigrationStatus(),
      cdc: this.getCDCFlushMetrics(),
    }
  }
}

Migration Methods

// Get current migration status
const status = do.getMigrationStatus()
// { manifestVersion, appliedCount, pendingCount, applied: [...], pending: [...] }

// Get last migration result
const result = do.getLastMigrationResult()
// { success, applied, migrations: [...], durationMs, error? }

// Dry-run migrations
const preview = await do.dryRunMigrations()

// Rollback to a specific migration
await do.rollbackMigrations('002_add_column')

CDC Error Handling

// Get flush metrics
const metrics = do.getCDCFlushMetrics()
// { consecutiveFailures, totalFailures, lastError, lastSuccess }

// Get events that failed to flush
const deadLetters = do.getCDCDeadLetterEvents(100)

// Retry dead-letter events
await do.retryCDCDeadLetterEvents({ maxRetries: 3, batchSize: 100 })

// Purge old dead-letter events (default: 7 days)
const purged = do.purgeCDCDeadLetterEvents(7 * 24 * 60 * 60 * 1000)

CDC (Change Data Capture)

sqlake automatically captures all changes via SQLite triggers:

-- Automatically created trigger (example)
CREATE TRIGGER _cdc_users_insert
AFTER INSERT ON users
BEGIN
  INSERT INTO _cdc_buffer (op, table_name, row_id, after_json)
  SELECT 'c', 'users', NEW.id, json_object('id', NEW.id, 'email', NEW.email);
END

CDC Event Structure

interface CDCEvent<T = unknown> {
  _op: 'c' | 'u' | 'd'    // create, update, delete
  _table: string           // Table name
  _shard: string           // Shard ID
  _seq: bigint             // Sequence number
  _ts: number              // Timestamp (ms since epoch)
  _before: T | null        // State before (null for inserts)
  _after: T | null         // State after (null for deletes)
}

Parquet Output

CDC events are written to R2 as Parquet files with Hive-style partitioning:

data/
  users/
    _shard=user-123/
      cdc_1706788800000_1.parquet
      cdc_1706788900000_2.parquet
    _shard=user-456/
      cdc_1706788850000_1.parquet

Time Travel Queries

Query historical data using Iceberg snapshots:

// Query as of a specific timestamp
const yesterday = new Date(Date.now() - 24 * 60 * 60 * 1000)
const oldUsers = await sql.asOf(yesterday)`SELECT * FROM users`

// Query as of an ISO timestamp
const snapshot = await sql.asOf('2024-01-15T00:00:00Z')`SELECT * FROM users`

// Query a specific Iceberg snapshot by ID
const exact = await sql.snapshot(1234567890123456)`SELECT * FROM users`

// Query changes between two timestamps (incremental processing)
const changes = await sql.between(startTime, endTime)`SELECT * FROM events`

Analytics with DuckDB

Cross-shard analytics queries are routed to DuckDB:

// Force analytics mode (eventual consistency)
const stats = await sql.analytics`
  SELECT
    plan,
    COUNT(*) as user_count,
    AVG(orders.amount) as avg_order
  FROM users
  LEFT JOIN orders ON users.id = orders.userId
  GROUP BY plan
`

// Predicate pushdown for efficient queries
import { P, filterDataFiles } from 'sqlake/analytics'

const predicate = P.and(
  P.eq('plan', 'pro'),
  P.gte('created_at', startDate)
)

// Filter Parquet files using zone maps
const relevantFiles = filterDataFiles(allFiles, predicate)

Consistency Models

Query Type Consistency Routing Use Case
Single shard Strong Durable Object User data, transactions
Analytics Eventual DuckDB on R2 Reports, aggregations
Causal Causal R2 with bookmark wait Read-your-writes
// Strong consistency (automatic when shard key present)
const user = await sql`SELECT * FROM users WHERE id = ${id}`.first()

// Eventual consistency (explicit analytics mode)
const count = await sql.analytics`SELECT COUNT(*) FROM users`.first()

// Causal consistency (wait for your writes to replicate)
const bookmark: Bookmark = { shard: 'user-123', seq: 42n }
const orders = await sql.causal(bookmark)`SELECT * FROM orders`

CLI Commands

sqlake generate

Generate SQLite migrations and Parquet schema from your Drizzle schema.

sqlake generate

This command:

  1. Runs drizzle-kit generate for SQLite migrations
  2. Parses your schema files to extract table definitions
  3. Generates Parquet schema in .sqlake/parquet-schema.json
  4. Generates TypeScript CDC types in .sqlake/types.ts
  5. Updates the manifest in .sqlake/manifest.json

sqlake push

Push migrations and schema to R2 for Durable Object consumption.

sqlake push

This command:

  1. Uploads migration manifest to R2
  2. Updates Iceberg table metadata
  3. Handles schema evolution

sqlake dev

Start a local development server with SQLite and CDC.

sqlake dev [options]

Options:
  --port, -p <n>    HTTP port (default: 4983)
  --db, -d <path>   SQLite path (default: .sqlake/local.db)
  --verbose, -v     Enable debug logging

The server provides:

  • Local SQLite database
  • CDC triggers for change tracking
  • Parquet file output in .sqlake/data/
  • HTTP query endpoint at POST /query

sqlake studio

Start Drizzle Studio with sqlake's analytics proxy.

sqlake studio [options]

Options:
  --port, -p <n>    Proxy port (default: 4984)
  --db, -d <path>   SQLite path (default: .sqlake/local.db)

sqlake query

Execute ad-hoc SQL queries.

sqlake query <sql> [options]

Options:
  --db, -d <path>       SQLite path (default: .sqlake/local.db)
  --analytics, -a       Use DuckDB instead of SQLite
  --format, -f <fmt>    Output format: json, table (default: json)
  --params, -p <json>   Query parameters as JSON array

Examples:
  sqlake query "SELECT * FROM users"
  sqlake query "SELECT * FROM users WHERE id = ?" --params '["user-1"]'
  sqlake query "SELECT COUNT(*) FROM users" --analytics --format table

sqlake migrate

Apply or rollback migrations locally.

sqlake migrate [options]

Options:
  --status, -s          Show migration status
  --dry-run, -n         Preview without applying
  --rollback, -r        Rollback instead of forward migration
  --target, -t <name>   Migrate to specific version
  --skip-checksum       Skip checksum verification

Examples:
  sqlake migrate                         Apply all pending
  sqlake migrate --status                Show status
  sqlake migrate --dry-run               Preview changes
  sqlake migrate --rollback              Rollback last migration
  sqlake migrate --target 002_add_email  Migrate to specific version

API Reference

Main Module (sqlake)

// Schema and configuration
export { SQLake } from 'sqlake'
export type { SQLakeConfig, SQLakeDatabase, ShardConfig, StorageMode } from 'sqlake'

// SQL query interface
export { sql, setQueryRouter, getQueryRouter } from 'sqlake'

// Column types (re-exported from Drizzle)
export { text, integer, real, blob } from 'sqlake'

// Query classes
export { SQLQuery, PipelinedQuery, MappedQuery } from 'sqlake'
export { LocalRouter, WorkerRouter, createInMemoryRouter } from 'sqlake'
export { parseSQL, determineRoute, computeShardId } from 'sqlake'

// Types
export type { CDCEvent, Bookmark, TableRef, InferSelect, InferInsert } from 'sqlake'
export type { QueryResult, RunResult, QueryOptions, QueryRouter } from 'sqlake'

// Observability
export { createObservability, ConsoleLogger, InMemoryMetrics } from 'sqlake'
export type { Logger, Metrics, Span } from 'sqlake'

DO Module (sqlake/do)

// Base class
export { SQLakeDO } from 'sqlake/do'
export type { SQLakeDOOptions, SQLakeWorkerEnv, CDCBufferRecord } from 'sqlake/do'

// Migrations
export { MigrationRunner, MigrationError, ChecksumMismatchError } from 'sqlake/do'
export { fetchMigrationManifest, createMigrationManifest, computeChecksum } from 'sqlake/do'
export type { Migration, MigrationManifest, MigrationResult } from 'sqlake/do'

// CDC flush
export { flushCDCToR2, flushCDCToR2WithIceberg, CDCFlushErrorTracker } from 'sqlake/do'
export type { CDCFlushOptions, CDCFlushResult, CDCFlushMetrics } from 'sqlake/do'

Analytics Module (sqlake/analytics)

// DuckDB integration
export { DuckDBLoader, createDuckDBProvider, detectEnvironment } from 'sqlake/analytics'
export type { DuckDBLoaderConfig, DuckDBProvider } from 'sqlake/analytics'

// Predicate pushdown
export { P, PredicateBuilder, filterDataFiles, filterManifestEntries } from 'sqlake/analytics'
export type { Predicate, ColumnPredicate, PruningResult, ZoneMapStats } from 'sqlake/analytics'

// Time travel
export { TimeTravelQuery, resolveTimeTravel, resolveTimeRange } from 'sqlake/analytics'
export { SnapshotNotFoundError } from 'sqlake/analytics'
export type { TimeTravelOptions, TimeTravelResolution } from 'sqlake/analytics'

// Service binding (Workers RPC)
export { createDuckDBServiceClient, DuckDBServiceError } from 'sqlake/analytics'
export type { DuckDBService, DuckDBQueryRequest, DuckDBQueryResponse } from 'sqlake/analytics'

Iceberg Module (sqlake/iceberg)

// Table metadata
export { createTableMetadata, addSnapshot, getCurrentSnapshot } from 'sqlake/iceberg'
export { getSnapshotById, getSnapshotAsOf, getSnapshotsInRange } from 'sqlake/iceberg'
export type { IcebergTableMetadata, IcebergSchema, IcebergSnapshot } from 'sqlake/iceberg'

// Manifest management
export { createManifest, createManifestEntry, createDataFile } from 'sqlake/iceberg'
export type { IcebergManifest, IcebergManifestEntry, IcebergDataFile } from 'sqlake/iceberg'

// Registry (high-level API)
export { IcebergRegistry, createIcebergRegistry } from 'sqlake/iceberg'
export type { RegisterDataFileOptions, RegisterDataFileResult } from 'sqlake/iceberg'

// Commit (atomic operations)
export { IcebergCommitter, createIcebergCommitter, CommitConflictError } from 'sqlake/iceberg'
export type { SnapshotCommitOptions, SnapshotCommitResult } from 'sqlake/iceberg'

// Schema evolution
export { SchemaEvolutionBuilder, evolveSchema, isTypePromotionAllowed } from 'sqlake/iceberg'
export type { SchemaEvolutionOperation, AddColumnOperation } from 'sqlake/iceberg'

// Statistics
export { CDCStatsToIcebergConverter, buildDataFileStats } from 'sqlake/iceberg'

Columns Module (sqlake/columns)

// Drizzle column types
export { text, integer, real, blob, numeric } from 'sqlake/columns'
export { sqliteTable, primaryKey, foreignKey, unique, index, check } from 'sqlake/columns'

Examples

User Service with Orders

// schema.ts
import { SQLake, text, integer, real } from 'sqlake'

export const db = SQLake({
  users: {
    id: text('id').primaryKey(),
    email: text('email').notNull(),
    name: text('name'),
    plan: text('plan').default('free'),
  },
  orders: {
    id: text('id').primaryKey(),
    userId: text('user_id').notNull(),
    amount: real('amount').notNull(),
    status: text('status').default('pending'),
    createdAt: integer('created_at'),
  },
  shard: {
    users: 'id',
    orders: 'userId',
  },
})
// user-do.ts
import { SQLakeDO } from 'sqlake/do'
import { db } from './schema'

export class UserDO extends SQLakeDO {
  schema = db

  async createOrder(orderId: string, amount: number) {
    const userId = this.ctx.id.name
    await this.sql`
      INSERT INTO orders (id, user_id, amount, created_at)
      VALUES (${orderId}, ${userId}, ${amount}, ${Date.now()})
    `.run()
  }

  async getOrders() {
    const userId = this.ctx.id.name
    return this.sql`
      SELECT * FROM orders
      WHERE user_id = ${userId}
      ORDER BY created_at DESC
    `.all()
  }
}

Analytics Dashboard

// analytics.ts
import { sql } from 'sqlake'

export async function getDashboardStats() {
  // All these queries use eventual consistency (DuckDB)

  const usersByPlan = await sql.analytics`
    SELECT plan, COUNT(*) as count
    FROM users
    GROUP BY plan
  `

  const revenueByDay = await sql.analytics`
    SELECT
      DATE(created_at / 1000, 'unixepoch') as date,
      SUM(amount) as revenue,
      COUNT(*) as orders
    FROM orders
    WHERE status = 'completed'
    GROUP BY date
    ORDER BY date DESC
    LIMIT 30
  `

  const topCustomers = await sql.analytics`
    SELECT
      users.id,
      users.email,
      COUNT(orders.id) as order_count,
      SUM(orders.amount) as total_spent
    FROM users
    JOIN orders ON users.id = orders.user_id
    WHERE orders.status = 'completed'
    GROUP BY users.id
    ORDER BY total_spent DESC
    LIMIT 10
  `

  return { usersByPlan, revenueByDay, topCustomers }
}

Query with N+1 Prevention

import { sql } from 'sqlake'

// Using .map() for efficient dependent queries
const usersWithOrders = await sql.analytics`
  SELECT * FROM users WHERE plan = 'pro'
`.map(async user => ({
  ...user,
  recentOrders: await sql.analytics`
    SELECT * FROM orders
    WHERE user_id = ${user.id}
    ORDER BY created_at DESC
    LIMIT 5
  `,
  orderStats: await sql.analytics`
    SELECT COUNT(*) as count, SUM(amount) as total
    FROM orders
    WHERE user_id = ${user.id}
  `.first(),
}))

Incremental Processing with Time Travel

import { sql } from 'sqlake'
import { resolveTimeRange } from 'sqlake/analytics'

async function processNewEvents(lastProcessedTime: number) {
  const now = Date.now()

  // Get all changes since last processing
  const changes = await sql.between(lastProcessedTime, now)`
    SELECT * FROM events
    WHERE _op IN ('c', 'u')
  `

  for (const event of changes) {
    await processEvent(event)
  }

  return now // New watermark
}

Requirements

  • Runtime: Cloudflare Workers with Durable Objects
  • Storage: Cloudflare R2 bucket
  • CLI: Node.js 18+
  • Dependencies:
    • drizzle-orm (peer dependency)
    • hyparquet / hyparquet-writer (included)

License

MIT


Links


Stop building data pipelines. Start building products.

npm install sqlake

About

SQL on lake storage with Cloudflare Workers

Resources

Contributing

Security policy

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages