ElasticKit is a lightweight, opinionated Python library built on top of the official Elasticsearch client (elasticsearch + elastic-transport) and the QueryBuilder. It provides a clean, async-first API for managing Elasticsearch indices, mappings, searches (including PIT), and complex queries.
Perfect for production-grade async applications that need fast iteration and readable code.
- Async-first: Full
async/awaitsupport with context manager - Point-in-Time (PIT): Renew or create PIT with one method
- Query Builder: Type-safe, fluent
QueryBuilderwith modern Elasticsearch features - Mappings & Fields: Easy extraction of
keyword/textfields and nested structures - CRUD + Bulk: Simple and powerful document operations
- Index Management: Create, refresh, check existence, update mappings
- Aggregations: Ready-made helpers for terms, cardinality, nested, histogram, etc.
- Composable queries: Combine queries with
and/orin one line
from elastickit import ElasticsearchClient, QueryBuilder, combine_queries
async def main():
async with ElasticsearchClient(
host="localhost",
port=9200,
index="my-products",
verify_certs=False,
) as client:
# Create index with mappings
await client.create_index(
mappings={
"properties": {
"name": {"type": "text"},
"price": {"type": "float"},
"tags": {"type": "keyword"},
"nested": {
"type": "nested",
"properties": {"sub_price": {"type": "float"}}
}
}
}
)
# Simple search
results = await client.search(
QueryBuilder.match("name", "laptop")
)
# Complex query
query = combine_queries(
[
QueryBuilder.match("name", "laptop"),
QueryBuilder.range("price", gte=500),
],
operator="and"
)
results = await client.search(query)
# Bulk operations
await client.bulk([
{"index": {"_id": "1"}}, {"name": "MacBook", "price": 1200},
{"index": {"_id": "2"}}, {"name": "iPad", "price": 800},
])
# Extract all searchable fields
fields = client.get_mappings()
metadata = ExtractMappings(fields).extract()
print(metadata)ElasticsearchClient(host="localhost", port=9200, index="", verify_certs=True)async with ElasticsearchClient(...) as client:| Method | Purpose | Notes |
|---|---|---|
get_mappings(refresh=False) |
Get current mappings | Caches locally |
put_mappings(mappings) |
Update mappings (type changes not allowed) | Refreshes cache |
create_index(mappings=None, settings=None) |
Create index | Auto-loads mappings |
index(_id, doc, **kwargs) |
Index / update document | — |
bulk(operations) |
Bulk API | — |
search(query, **kwargs) |
Execute search | Supports PIT |
get(_id) |
Get single document | — |
mget(ids) |
Multi-get | — |
refresh() |
Refresh index | — |
index_exists() |
Check if index exists | — |
open_point_in_time(...) |
Create / renew PIT | Returns ID or error dict |
QueryBuilder.match("name", "laptop")
QueryBuilder.match_phrase("description", "fast delivery")
QueryBuilder.term("is_active", True)
QueryBuilder.terms("tags", ["electronics", "gadgets"])
QueryBuilder.range("price", gte=100, lte=1000)
QueryBuilder.nested("author", QueryBuilder.match("name", "John"))
QueryBuilder.exists("price")
QueryBuilder.multi_match("best laptop", ["name", "description"], type_="best_fields")
QueryBuilder.aggs("category", size=10)
QueryBuilder.histogram("created_at", "price")combine_queries(
[QueryBuilder.match("a", 1), QueryBuilder.match("b", 2)],
operator="or"
)fields = ExtractMappings(mappings_dict).extract()
for path, meta in fields.items():
print(f"{path}: {meta.field_type} ({meta.field_path})")Automatically extracts:
keywordandrawfields- Nested structure paths
- Full field paths for aggregation / filtering
async with client:
pit_id = await client.open_point_in_time()
results = await client.search(
query,
pit={"id": pit_id, "keep_alive": "5m"}
)await client.create_index(
settings={"number_of_shards": 3, "number_of_replicas": 1},
mappings={...}
)All methods return either:
- The real response object
- Or
{"error": "...", "status": 404/500}
pip install elastickitMIT