Shared Python substrate for ForkTex services — nine independent modules, one install.
pip install forktex-core # db · cache · flow · log
pip install forktex-core[vault] # + Fernet encryption
pip install forktex-core[storage] # + S3/MinIO
pip install forktex-core[queue] # + arq background jobs
pip install forktex-core[vector] # + Qdrant vector search
pip install forktex-core[all] # everything| Module | Purpose | Reference |
|---|---|---|
db |
Async Postgres — engine, session, ORM base classes, CRUD, advisory locks, migration runner | docs/db.md |
cache |
Async Redis — @cached, stale-while-revalidate, namespaced keys |
docs/cache.md |
flow |
Durable execution — pipelines, graphs, state machines, AI agent loops | docs/flow.md |
vault |
Encryption at rest — Vault, EncryptedJSON column type, KEK rotation |
docs/vault.md |
storage |
Object storage — S3/MinIO connector, multi-bucket, presigned URLs | docs/storage.md |
queue |
Background jobs — @task, enqueue, worker, inspect/cancel |
docs/queue.md |
vector |
Vector search — Qdrant, dense/hybrid/multimodal, cross-collection | docs/vector.md |
data |
Virtual schemas — tenant-defined entities, JSONB rows, DataQuery |
docs/data.md |
log |
Structured logging — JSON/Loki, trace_id contextvar, FastAPI middleware | docs/log.md |
from forktex_core.log import setup_logging, get_logger, TraceIDMiddleware
setup_logging(service="my-service") # JSON to stdout, INFO
# setup_logging(service="my-service", debug=True) # human-readable, DEBUG
log = get_logger(__name__)
log.info("starting up")
# FastAPI: add middleware so every request gets a trace_id automatically
app.add_middleware(TraceIDMiddleware)from forktex_core.db import init_engine, get_session, BaseDBModel, OrgScopedMixin, AuditMixin
import sqlalchemy as sa
from sqlalchemy.orm import Mapped, mapped_column
import uuid
init_engine("postgresql+asyncpg://user:pass@host/db", pool_size=10)
class Invoice(BaseDBModel, OrgScopedMixin, AuditMixin):
__tablename__ = "invoice"
id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
amount: Mapped[int] = mapped_column(sa.Integer)
async with get_session() as session: # auto-commit / rollback
session.add(Invoice(org_id=org_id, amount=100))from forktex_core.cache import init, cached, async_log_context
await init("redis://localhost:6379/0")
@cached(ttl=300)
async def get_org(org_id: str) -> dict: ...
# Structured log context (also works without cache)
from forktex_core.log import async_log_context
async with async_log_context(org_id=str(org_id)):
log.info("processing") # → {..."org_id": "org-xyz"}from forktex_core.flow import Flow, step
flow = Flow(database_url="postgresql+asyncpg://...")
await flow.init()
@step
async def send_welcome(ctx, state): ...
@flow.pipeline("onboarding.user", version=1)
class UserOnboarding:
steps = [send_welcome, create_workspace]
instance = await flow.run("onboarding.user", state={"email": "x@y.com"})
await instance.wait(timeout=60)from forktex_core.vault import Vault, EncryptedJSON
import os
vault = Vault(kek=os.environ["FTX_KEK"])
class Provider(BaseDBModel):
__tablename__ = "provider"
credentials: Mapped[bytes] = mapped_column(EncryptedJSON(vault))
provider.credentials = {"api_key": "sk-..."} # transparent encrypt/decryptfrom forktex_core.storage import register, get_client
# Register once at startup (supports multiple buckets)
register("docs", url="http://minio:9000", bucket="documents",
access_key=KEY, secret_key=SECRET)
client = get_client("docs")
await client.upload("invoices/abc.pdf", pdf_bytes, content_type="application/pdf")
url = await client.presign("invoices/abc.pdf", expires_in=3600)
# Actor uploads directly to MinIO — no auth header needed, signature is in the URL
put_url = await client.presign("uploads/photo.jpg", method="put_object",
content_type="image/jpeg", expires_in=900)from forktex_core.queue import task, init, enqueue, make_worker, JobCtx
await init("redis://localhost:6379/1")
@task(retries=2, timeout=120)
async def process_document(ctx: JobCtx, doc_id: str) -> None:
...
job_id = await enqueue(process_document, str(doc.id))
# Worker entrypoint (run separately)
WorkerSettings = make_worker("redis://localhost:6379/1")from forktex_core.vector import Vector, VectorPoint, SearchQuery
vector = Vector(qdrant_url="http://qdrant:6333")
coll = vector.collection(f"org-{org_id}--knowledge") # use -- not : as separator
await coll.create(dim=1536)
await coll.upsert([VectorPoint(id=1, vector=embed(text), payload={"text": text})])
hits = await coll.search(SearchQuery(vector=embed(query)).limit(10).using("hybrid"))
for h in hits:
print(h.score, h.payload["text"])from forktex_core.data import VirtualEntity, VirtualField, EntityMode, FieldDataType
async with get_session() as session:
entity = VirtualEntity(namespace=str(org_id), slug="contacts",
label="Contacts", mode=EntityMode.VIRTUAL)
session.add(entity)
await session.flush()
session.add(VirtualField(entity_id=entity.id, namespace=str(org_id),
key="email", label="Email",
data_type=FieldDataType.TEXT, is_required=True))from contextlib import asynccontextmanager
from fastapi import FastAPI
from forktex_core.db import init_engine, close_engine
from forktex_core.cache import init as cache_init, close as cache_close
from forktex_core.log import setup_logging, TraceIDMiddleware
setup_logging(service="my-api") # call before app creation
@asynccontextmanager
async def lifespan(app: FastAPI):
init_engine(settings.db_url, pool_size=20)
await cache_init(settings.redis_url)
yield
await close_engine()
await cache_close()
app = FastAPI(lifespan=lifespan)
app.add_middleware(TraceIDMiddleware)Library schemas (forktex_flow, forktex_data) are isolated from your alembic by default. If your Postgres host doesn't allow CREATE SCHEMA, route them to public:
init_engine(url, schema_translate_map={
"forktex_flow": None, # None = public schema
"forktex_data": None,
})Read the per-module reference first. Each docs/MODULE.md has: complete API signatures, canonical usage patterns, anti-patterns, edge case table, error catalogue, and integration map.
Critical things to get right:
| Rule | Why |
|---|---|
Qdrant collection names: use -- not : |
Qdrant rejects : with 422 |
Qdrant point IDs: int or str(uuid.uuid4()) only |
Arbitrary strings → 400 |
schema_translate_map: use None key for default-schema tables |
"public" key doesn't remap schema=None tables |
AuditMixin requires BaseDBModel |
Raises TypeError on class definition |
cache.init() raises on failure |
Doesn't silently degrade — handle at startup |
| `data.DataQuery.fetch() in 0.6 | |
queue.make_worker() returns arq.Worker |
Not arq.worker.WorkerSettings |
Module reference: db · cache · flow · vault · storage · queue · vector · data · log