Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.limits.Limits;
import org.openmetadata.service.search.vector.OpenSearchVectorService;
import org.openmetadata.service.search.vector.VectorIndexService;
import org.openmetadata.service.search.vector.utils.DTOs.VectorSearchResponse;
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.security.auth.CatalogSecurityContext;
Expand Down Expand Up @@ -42,7 +42,12 @@ public Map<String, Object> execute(
"Semantic search is not enabled. Configure vector embeddings in the OpenMetadata server settings.");
}

OpenSearchVectorService vectorService = OpenSearchVectorService.getInstance();
// Resolve via the SearchRepository abstraction so this tool works on both backends.
// Direct OpenSearchVectorService.getInstance() returned null when the active vector
// service was ElasticSearchVectorService (ES backend), causing every MCP
// semantic_search call to fail with "Vector search service is not initialized" even
// though the REST endpoint /api/v1/search/vector/query worked.
VectorIndexService vectorService = Entity.getSearchRepository().getVectorIndexService();
if (vectorService == null) {
return errorResponse("Vector search service is not initialized");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.service.Entity;
import org.openmetadata.service.resources.Collection;
import org.openmetadata.service.search.vector.OpenSearchVectorService;
import org.openmetadata.service.search.vector.VectorIndexService;
import org.openmetadata.service.search.vector.utils.DTOs.FingerprintResponse;
import org.openmetadata.service.search.vector.utils.DTOs.VectorSearchRequest;
import org.openmetadata.service.search.vector.utils.DTOs.VectorSearchResponse;
Expand Down Expand Up @@ -75,7 +75,7 @@ public Response vectorSearchPost(
.build();
}

OpenSearchVectorService vectorService = OpenSearchVectorService.getInstance();
VectorIndexService vectorService = Entity.getSearchRepository().getVectorIndexService();
if (vectorService == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE)
.entity("{\"error\":\"Vector search service is not initialized\"}")
Expand Down Expand Up @@ -119,7 +119,7 @@ public Response getFingerprint(
.build();
}

OpenSearchVectorService vectorService = OpenSearchVectorService.getInstance();
VectorIndexService vectorService = Entity.getSearchRepository().getVectorIndexService();
if (vectorService == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE)
.entity("{\"error\":\"Vector search service is not initialized\"}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
package org.openmetadata.service.search;

import com.fasterxml.jackson.databind.ObjectMapper;
import es.co.elastic.clients.transport.rest5_client.low_level.Request;
import es.co.elastic.clients.transport.rest5_client.low_level.Response;
import es.co.elastic.clients.transport.rest5_client.low_level.ResponseException;
import es.co.elastic.clients.transport.rest5_client.low_level.Rest5Client;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.service.apps.bundles.searchIndex.stats.StageStatsTracker;
import org.openmetadata.service.apps.bundles.searchIndex.stats.StatsResult;

/**
* Elasticsearch counterpart to {@link VectorBulkProcessor}. Speaks the bulk NDJSON protocol
* directly via {@link Rest5Client} so we don't depend on the high-level ElasticsearchClient's
* typed bulk API.
*/
@Slf4j
public class ElasticSearchVectorBulkProcessor implements AutoCloseable {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final int DEFAULT_MAX_BULK_ACTIONS = 500;
private static final long DEFAULT_MAX_PAYLOAD_BYTES = 50L * 1024 * 1024;
private static final long FLUSH_INTERVAL_MS = 1000;

private final Rest5Client restClient;
private final String targetIndex;
private final int maxBulkActions;
private final long maxPayloadBytes;
private final ScheduledExecutorService scheduler;

private final List<VectorChunk> buffer = new ArrayList<>();
private final AtomicLong payloadSize = new AtomicLong(0);
private final AtomicLong totalSuccess = new AtomicLong(0);
private final AtomicLong totalFailed = new AtomicLong(0);
private volatile boolean closed = false;
private StageStatsTracker statsTracker;

public record VectorChunk(String chunkId, Map<String, Object> document, long estimatedSize) {}

public ElasticSearchVectorBulkProcessor(Rest5Client restClient, String targetIndex) {
this(restClient, targetIndex, DEFAULT_MAX_BULK_ACTIONS, DEFAULT_MAX_PAYLOAD_BYTES);
}

public ElasticSearchVectorBulkProcessor(
Rest5Client restClient, String targetIndex, int maxBulkActions, long maxPayloadBytes) {
this.restClient = restClient;
this.targetIndex = targetIndex;
this.maxBulkActions = maxBulkActions;
this.maxPayloadBytes = maxPayloadBytes;
this.scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(
this::flushIfNeeded, FLUSH_INTERVAL_MS, FLUSH_INTERVAL_MS, TimeUnit.MILLISECONDS);
}

public String getTargetIndex() {
return targetIndex;
}

public void setStatsTracker(StageStatsTracker tracker) {
this.statsTracker = tracker;
}

public synchronized void addChunk(String chunkId, Map<String, Object> chunkDoc) {
long estimated = estimateChunkSize(chunkDoc);
if (shouldFlush(estimated)) {
flush();
}
buffer.add(new VectorChunk(chunkId, chunkDoc, estimated));
payloadSize.addAndGet(estimated);
}

public synchronized void flush() {
if (buffer.isEmpty()) {
return;
}

List<VectorChunk> toFlush = new ArrayList<>(buffer);
buffer.clear();
payloadSize.set(0);

try {
// Build NDJSON: action header line + doc line, repeated.
StringBuilder body = new StringBuilder();
for (VectorChunk chunk : toFlush) {
body.append("{\"index\":{\"_index\":\"")
.append(targetIndex)
.append("\",\"_id\":\"")
.append(chunk.chunkId().replace("\"", "\\\""))
.append("\"}}\n");
body.append(MAPPER.writeValueAsString(chunk.document())).append('\n');
}

Request request = new Request("POST", "/_bulk");
request.setJsonEntity(body.toString());

int success = 0;
int failed = 0;
try {
Response response = restClient.performRequest(request);
var responseJson = parseBulkResponse(response);
success = responseJson[0];
failed = responseJson[1];
} catch (ResponseException e) {
// The whole bulk request was rejected (e.g. 4xx/5xx on the endpoint itself).
failed = toFlush.size();
LOG.error(
"ES vector bulk flush rejected for index {}: status {}",
targetIndex,
e.getResponse().getStatusCode());
}

totalSuccess.addAndGet(success);
totalFailed.addAndGet(failed);

if (statsTracker != null) {
for (int i = 0; i < success; i++) {
statsTracker.recordVector(StatsResult.SUCCESS);
}
for (int i = 0; i < failed; i++) {
statsTracker.recordVector(StatsResult.FAILED);
}
}

if (failed > 0) {
LOG.warn(
"ES vector bulk flush: {} success, {} failed out of {} in {}",
success,
failed,
toFlush.size(),
targetIndex);
} else {
LOG.debug("ES vector bulk flush: {} documents indexed in {}", success, targetIndex);
}
} catch (Exception e) {
totalFailed.addAndGet(toFlush.size());
if (statsTracker != null) {
for (int i = 0; i < toFlush.size(); i++) {
statsTracker.recordVector(StatsResult.FAILED);
}
}
LOG.error(
"ES vector bulk flush failed for {} documents in {}: {}",
toFlush.size(),
targetIndex,
e.getMessage(),
e);
}
}

/** Returns {success, failed}. Parses the {@code items[]} of an ES bulk response. */
private int[] parseBulkResponse(Response response) throws java.io.IOException {
int success = 0;
int failed = 0;
if (response.getEntity() != null) {
try (InputStream is = response.getEntity().getContent()) {
String body = new String(is.readAllBytes(), StandardCharsets.UTF_8);
var root = MAPPER.readTree(body);
var items = root.path("items");
if (items.isArray()) {
for (var item : items) {
// Each item is e.g. {"index":{"_id":..., "status":201}} — error key present on
// failure.
var indexNode = item.path("index");
if (!indexNode.path("error").isMissingNode()) {
failed++;
} else {
success++;
}
}
}
}
}
return new int[] {success, failed};
}

private synchronized void flushIfNeeded() {
if (!buffer.isEmpty() && !closed) {
flush();
}
}

private boolean shouldFlush(long additionalSize) {
return buffer.size() >= maxBulkActions || payloadSize.get() + additionalSize > maxPayloadBytes;
}

private long estimateChunkSize(Map<String, Object> doc) {
long size = 0;
Object embedding = doc.get("embedding");
if (embedding instanceof float[] arr) {
size += (long) arr.length * 4;
}
for (Map.Entry<String, Object> entry : doc.entrySet()) {
if ("embedding".equals(entry.getKey())) continue;
Object value = entry.getValue();
if (value instanceof String s) {
size += s.length() * 2L;
} else if (value instanceof List<?> list) {
size += list.size() * 50L;
}
}
return (long) (size * 1.2);
}

public long getTotalSuccess() {
return totalSuccess.get();
}

public long getTotalFailed() {
return totalFailed.get();
}

@Override
public void close() {
closed = true;
scheduler.shutdown();
flush();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
import org.openmetadata.service.search.nlq.NLQService;
import org.openmetadata.service.search.nlq.NLQServiceFactory;
import org.openmetadata.service.search.opensearch.OpenSearchClient;
import org.openmetadata.service.search.vector.ElasticSearchVectorService;
import org.openmetadata.service.search.vector.OpenSearchVectorService;
import org.openmetadata.service.search.vector.VectorEmbeddingHandler;
import org.openmetadata.service.search.vector.VectorIndexService;
Expand Down Expand Up @@ -322,7 +323,16 @@ public void updateIndexes() {
public void createMissingIndexes() {
LOG.info("Checking for missing search indexes...");
int created = 0;
boolean isEs = getSearchType() == ElasticSearchConfiguration.SearchType.ELASTICSEARCH;
for (Map.Entry<String, IndexMapping> entry : entityIndexMap.entrySet()) {
// On Elasticsearch, defer vectorEmbedding index creation to ElasticSearchVectorService:
// it knows the active embedding model dimension and creates dense_vector with the
// correct dims. Creating it here would bake the placeholder dimension into the index
// (embeddingClient is not initialized yet at this boot phase) and dense_vector.dims
// is immutable on an existing ES index.
if (isEs && VectorIndexService.VECTOR_INDEX_KEY.equals(entry.getKey())) {
continue;
}
try {
if (!indexExists(entry.getValue())) {
createIndex(entry.getValue());
Expand All @@ -347,7 +357,14 @@ public void createOrUpdateIndexTemplates() {
LOG.info("Creating/updating index templates for all entities...");
int success = 0;
int failed = 0;
boolean isEs = getSearchType() == ElasticSearchConfiguration.SearchType.ELASTICSEARCH;
for (Map.Entry<String, IndexMapping> entry : entityIndexMap.entrySet()) {
// See createMissingIndexes: the vectorEmbedding index template would bake the
// placeholder dimension on Elasticsearch. ElasticSearchVectorService creates the
// actual index later with the correct dense_vector dims.
if (isEs && VectorIndexService.VECTOR_INDEX_KEY.equals(entry.getKey())) {
continue;
}
try {
IndexMapping indexMapping = entry.getValue();
String indexName = indexMapping.getIndexName(clusterAlias);
Expand Down Expand Up @@ -413,9 +430,10 @@ public synchronized void initializeVectorSearchService() {
OpenSearchVectorService.init(osClient, embeddingClient, language);
this.vectorIndexService = OpenSearchVectorService.getInstance();
} else {
LOG.warn(
"Vector embedding is only supported with OpenSearch. Elasticsearch support is planned.");
return;
es.co.elastic.clients.elasticsearch.ElasticsearchClient esClient =
((ElasticSearchClient) getSearchClient()).getNewClient();
ElasticSearchVectorService.init(esClient, embeddingClient, language);
this.vectorIndexService = ElasticSearchVectorService.getInstance();
}

this.vectorEmbeddingHandler = new VectorEmbeddingHandler(vectorIndexService);
Expand Down Expand Up @@ -524,10 +542,14 @@ public void deleteIndex(IndexMapping indexMapping) {
}

private String getIndexMapping(IndexMapping indexMapping) {
String mappingFile = indexMapping.getIndexMappingFile();
if (getSearchType() == ElasticSearchConfiguration.SearchType.ELASTICSEARCH
&& mappingFile != null
&& mappingFile.endsWith("/vector_search_index.json")) {
mappingFile = mappingFile.replace("vector_search_index.json", "vector_search_index_es_native.json");
}
try (InputStream in =
getClass()
.getResourceAsStream(
String.format(indexMapping.getIndexMappingFile(), language.toLowerCase()))) {
getClass().getResourceAsStream(String.format(mappingFile, language.toLowerCase()))) {
assert in != null;
return new String(in.readAllBytes());
} catch (Exception e) {
Expand Down Expand Up @@ -2275,6 +2297,8 @@ public void ensureVectorIndexDimension() {
}

private String reformatVectorIndexWithDimension(String mapping, int dimension) {
boolean isEs = getSearchType() == ElasticSearchConfiguration.SearchType.ELASTICSEARCH;
String dimensionField = isEs ? "dims" : "dimension";
try {
com.fasterxml.jackson.databind.ObjectMapper mapper =
new com.fasterxml.jackson.databind.ObjectMapper();
Expand All @@ -2285,7 +2309,7 @@ private String reformatVectorIndexWithDimension(String mapping, int dimension) {
JsonNode properties = mappings.get("properties");
if (properties.has("embedding")) {
((com.fasterxml.jackson.databind.node.ObjectNode) properties.get("embedding"))
.put("dimension", dimension);
.put(dimensionField, dimension);
}
}
JsonNode meta =
Expand All @@ -2301,10 +2325,10 @@ private String reformatVectorIndexWithDimension(String mapping, int dimension) {
LOG.warn(
"Failed to parse mapping JSON for dimension patching, falling back to string replace");
return mapping
.replace("\"dimension\": 768", "\"dimension\": " + dimension)
.replace("\"dimension\":768", "\"dimension\":" + dimension)
.replace("\"dimension\": 512", "\"dimension\": " + dimension)
.replace("\"dimension\":512", "\"dimension\":" + dimension);
.replace("\"" + dimensionField + "\": 768", "\"" + dimensionField + "\": " + dimension)
.replace("\"" + dimensionField + "\":768", "\"" + dimensionField + "\":" + dimension)
.replace("\"" + dimensionField + "\": 512", "\"" + dimensionField + "\": " + dimension)
.replace("\"" + dimensionField + "\":512", "\"" + dimensionField + "\":" + dimension);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,11 @@ public void dettachIlmPolicyFromIndexes(String indexPattern) throws IOException

for (String indexName : getIndexResponse.indices().keySet()) {
try {
client
.indices()
.putSettings(
s -> s.index(indexName).settings(idx -> idx.lifecycle(l -> l.name(null))));
// Use the dedicated `POST /<index>/_ilm/remove` endpoint instead of
// `PUT _settings { index.lifecycle.name: null }`. The typed putSettings call
// drops null fields from the JSON body, which makes ES reject the request as
// "no settings to update" and leaves the ILM policy attached.
client.ilm().removePolicy(r -> r.index(indexName));
LOG.info("Detached ILM policy from index: {}", indexName);
} catch (ElasticsearchException e) {
if (e.status() == 404) {
Expand Down
Loading
Loading