From 8a0ac1063747afb1850fa2cf2c31c8332ae9369e Mon Sep 17 00:00:00 2001 From: Joao Amaral <7281460+joaopamaral@users.noreply.github.com> Date: Mon, 11 May 2026 16:52:45 -0300 Subject: [PATCH 1/7] feat: backport native Elasticsearch vector search to 1.12.7-release Mirror OpenSearchVectorService for Elasticsearch 8.x/9.x: - ElasticSearchVectorService: 14-method implementation using Rest5Client low-level transport; dense_vector field type; top-level knn query format - ElasticSearchVectorBulkProcessor: Rest5Client-based bulk NDJSON analog to VectorBulkProcessor - vector_search_index_es_native.json (en/jp/ru/zh): ES-native mapping templates with dense_vector{dims, similarity:cosine} - VectorSearchQueryBuilder.buildNativeESQuery: ES top-level knn format with overflow-safe num_candidates clamping - SearchRepository: wire ELASTICSEARCH search type to ElasticSearchVectorService.init() (was a warn+return stub) Co-Authored-By: Claude Opus 4.7 (1M context) --- .../ElasticSearchVectorBulkProcessor.java | 226 +++++++ .../service/search/SearchRepository.java | 8 +- .../vector/ElasticSearchVectorService.java | 550 ++++++++++++++++++ .../vector/VectorSearchQueryBuilder.java | 60 +- .../en/vector_search_index_es_native.json | 294 ++++++++++ .../jp/vector_search_index_es_native.json | 294 ++++++++++ .../ru/vector_search_index_es_native.json | 410 +++++++++++++ .../zh/vector_search_index_es_native.json | 294 ++++++++++ 8 files changed, 2128 insertions(+), 8 deletions(-) create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/search/ElasticSearchVectorBulkProcessor.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/search/vector/ElasticSearchVectorService.java create mode 100644 openmetadata-spec/src/main/resources/elasticsearch/en/vector_search_index_es_native.json create mode 100644 openmetadata-spec/src/main/resources/elasticsearch/jp/vector_search_index_es_native.json create mode 100644 openmetadata-spec/src/main/resources/elasticsearch/ru/vector_search_index_es_native.json create mode 100644 openmetadata-spec/src/main/resources/elasticsearch/zh/vector_search_index_es_native.json diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/ElasticSearchVectorBulkProcessor.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/ElasticSearchVectorBulkProcessor.java new file mode 100644 index 000000000000..00b474576be7 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/ElasticSearchVectorBulkProcessor.java @@ -0,0 +1,226 @@ +package org.openmetadata.service.search; + +import com.fasterxml.jackson.databind.ObjectMapper; +import es.co.elastic.clients.transport.rest5_client.low_level.Request; +import es.co.elastic.clients.transport.rest5_client.low_level.Response; +import es.co.elastic.clients.transport.rest5_client.low_level.ResponseException; +import es.co.elastic.clients.transport.rest5_client.low_level.Rest5Client; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.service.apps.bundles.searchIndex.stats.StageStatsTracker; +import org.openmetadata.service.apps.bundles.searchIndex.stats.StatsResult; + +/** + * Elasticsearch counterpart to {@link VectorBulkProcessor}. Speaks the bulk NDJSON protocol + * directly via {@link Rest5Client} so we don't depend on the high-level ElasticsearchClient's + * typed bulk API. + */ +@Slf4j +public class ElasticSearchVectorBulkProcessor implements AutoCloseable { + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final int DEFAULT_MAX_BULK_ACTIONS = 500; + private static final long DEFAULT_MAX_PAYLOAD_BYTES = 50L * 1024 * 1024; + private static final long FLUSH_INTERVAL_MS = 1000; + + private final Rest5Client restClient; + private final String targetIndex; + private final int maxBulkActions; + private final long maxPayloadBytes; + private final ScheduledExecutorService scheduler; + + private final List buffer = new ArrayList<>(); + private final AtomicLong payloadSize = new AtomicLong(0); + private final AtomicLong totalSuccess = new AtomicLong(0); + private final AtomicLong totalFailed = new AtomicLong(0); + private volatile boolean closed = false; + private StageStatsTracker statsTracker; + + public record VectorChunk(String chunkId, Map document, long estimatedSize) {} + + public ElasticSearchVectorBulkProcessor(Rest5Client restClient, String targetIndex) { + this(restClient, targetIndex, DEFAULT_MAX_BULK_ACTIONS, DEFAULT_MAX_PAYLOAD_BYTES); + } + + public ElasticSearchVectorBulkProcessor( + Rest5Client restClient, String targetIndex, int maxBulkActions, long maxPayloadBytes) { + this.restClient = restClient; + this.targetIndex = targetIndex; + this.maxBulkActions = maxBulkActions; + this.maxPayloadBytes = maxPayloadBytes; + this.scheduler = Executors.newScheduledThreadPool(1); + scheduler.scheduleAtFixedRate( + this::flushIfNeeded, FLUSH_INTERVAL_MS, FLUSH_INTERVAL_MS, TimeUnit.MILLISECONDS); + } + + public String getTargetIndex() { + return targetIndex; + } + + public void setStatsTracker(StageStatsTracker tracker) { + this.statsTracker = tracker; + } + + public synchronized void addChunk(String chunkId, Map chunkDoc) { + long estimated = estimateChunkSize(chunkDoc); + if (shouldFlush(estimated)) { + flush(); + } + buffer.add(new VectorChunk(chunkId, chunkDoc, estimated)); + payloadSize.addAndGet(estimated); + } + + public synchronized void flush() { + if (buffer.isEmpty()) { + return; + } + + List toFlush = new ArrayList<>(buffer); + buffer.clear(); + payloadSize.set(0); + + try { + // Build NDJSON: action header line + doc line, repeated. + StringBuilder body = new StringBuilder(); + for (VectorChunk chunk : toFlush) { + body.append("{\"index\":{\"_index\":\"") + .append(targetIndex) + .append("\",\"_id\":\"") + .append(chunk.chunkId().replace("\"", "\\\"")) + .append("\"}}\n"); + body.append(MAPPER.writeValueAsString(chunk.document())).append('\n'); + } + + Request request = new Request("POST", "/_bulk"); + request.setJsonEntity(body.toString()); + + int success = 0; + int failed = 0; + try { + Response response = restClient.performRequest(request); + var responseJson = parseBulkResponse(response); + success = responseJson[0]; + failed = responseJson[1]; + } catch (ResponseException e) { + // The whole bulk request was rejected (e.g. 4xx/5xx on the endpoint itself). + failed = toFlush.size(); + LOG.error( + "ES vector bulk flush rejected for index {}: status {}", + targetIndex, + e.getResponse().getStatusCode()); + } + + totalSuccess.addAndGet(success); + totalFailed.addAndGet(failed); + + if (statsTracker != null) { + for (int i = 0; i < success; i++) { + statsTracker.recordVector(StatsResult.SUCCESS); + } + for (int i = 0; i < failed; i++) { + statsTracker.recordVector(StatsResult.FAILED); + } + } + + if (failed > 0) { + LOG.warn( + "ES vector bulk flush: {} success, {} failed out of {} in {}", + success, + failed, + toFlush.size(), + targetIndex); + } else { + LOG.debug("ES vector bulk flush: {} documents indexed in {}", success, targetIndex); + } + } catch (Exception e) { + totalFailed.addAndGet(toFlush.size()); + if (statsTracker != null) { + for (int i = 0; i < toFlush.size(); i++) { + statsTracker.recordVector(StatsResult.FAILED); + } + } + LOG.error( + "ES vector bulk flush failed for {} documents in {}: {}", + toFlush.size(), + targetIndex, + e.getMessage(), + e); + } + } + + /** Returns {success, failed}. Parses the {@code items[]} of an ES bulk response. */ + private int[] parseBulkResponse(Response response) throws java.io.IOException { + int success = 0; + int failed = 0; + if (response.getEntity() != null) { + try (InputStream is = response.getEntity().getContent()) { + String body = new String(is.readAllBytes(), StandardCharsets.UTF_8); + var root = MAPPER.readTree(body); + var items = root.path("items"); + if (items.isArray()) { + for (var item : items) { + // Each item is e.g. {"index":{"_id":..., "status":201}} — error key present on + // failure. + var indexNode = item.path("index"); + if (!indexNode.path("error").isMissingNode()) { + failed++; + } else { + success++; + } + } + } + } + } + return new int[] {success, failed}; + } + + private synchronized void flushIfNeeded() { + if (!buffer.isEmpty() && !closed) { + flush(); + } + } + + private boolean shouldFlush(long additionalSize) { + return buffer.size() >= maxBulkActions || payloadSize.get() + additionalSize > maxPayloadBytes; + } + + private long estimateChunkSize(Map doc) { + long size = 0; + Object embedding = doc.get("embedding"); + if (embedding instanceof float[] arr) { + size += (long) arr.length * 4; + } + for (Map.Entry entry : doc.entrySet()) { + if ("embedding".equals(entry.getKey())) continue; + Object value = entry.getValue(); + if (value instanceof String s) { + size += s.length() * 2L; + } else if (value instanceof List list) { + size += list.size() * 50L; + } + } + return (long) (size * 1.2); + } + + public long getTotalSuccess() { + return totalSuccess.get(); + } + + public long getTotalFailed() { + return totalFailed.get(); + } + + @Override + public void close() { + closed = true; + scheduler.shutdown(); + flush(); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index d77b800ee9cc..6b3f3c899cbb 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -137,6 +137,7 @@ import org.openmetadata.service.search.nlq.NLQService; import org.openmetadata.service.search.nlq.NLQServiceFactory; import org.openmetadata.service.search.opensearch.OpenSearchClient; +import org.openmetadata.service.search.vector.ElasticSearchVectorService; import org.openmetadata.service.search.vector.OpenSearchVectorService; import org.openmetadata.service.search.vector.VectorEmbeddingHandler; import org.openmetadata.service.search.vector.VectorIndexService; @@ -413,9 +414,10 @@ public synchronized void initializeVectorSearchService() { OpenSearchVectorService.init(osClient, embeddingClient, language); this.vectorIndexService = OpenSearchVectorService.getInstance(); } else { - LOG.warn( - "Vector embedding is only supported with OpenSearch. Elasticsearch support is planned."); - return; + es.co.elastic.clients.elasticsearch.ElasticsearchClient esClient = + ((ElasticSearchClient) getSearchClient()).getNewClient(); + ElasticSearchVectorService.init(esClient, embeddingClient, language); + this.vectorIndexService = ElasticSearchVectorService.getInstance(); } this.vectorEmbeddingHandler = new VectorEmbeddingHandler(vectorIndexService); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/vector/ElasticSearchVectorService.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/vector/ElasticSearchVectorService.java new file mode 100644 index 000000000000..48d138843c4c --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/vector/ElasticSearchVectorService.java @@ -0,0 +1,550 @@ +package org.openmetadata.service.search.vector; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import es.co.elastic.clients.elasticsearch.ElasticsearchClient; +import es.co.elastic.clients.transport.rest5_client.Rest5ClientTransport; +import es.co.elastic.clients.transport.rest5_client.low_level.Request; +import es.co.elastic.clients.transport.rest5_client.low_level.Response; +import es.co.elastic.clients.transport.rest5_client.low_level.ResponseException; +import es.co.elastic.clients.transport.rest5_client.low_level.Rest5Client; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.EntityInterface; +import org.openmetadata.service.events.lifecycle.EntityLifecycleEventDispatcher; +import org.openmetadata.service.search.ElasticSearchVectorBulkProcessor; +import org.openmetadata.service.search.vector.client.EmbeddingClient; +import org.openmetadata.service.search.vector.utils.DTOs.VectorSearchResponse; + +/** + * Elasticsearch 8.x/9.x vector search service. Mirrors {@link OpenSearchVectorService} but uses + * the Elasticsearch {@code dense_vector} field type and the top-level {@code knn} query format, + * and speaks to the cluster via the low-level {@link Rest5Client} so we don't depend on the + * typed ES Java API for ad-hoc requests. + */ +@Slf4j +public class ElasticSearchVectorService implements VectorIndexService { + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final int OVER_FETCH_MULTIPLIER = 2; + + private static volatile ElasticSearchVectorService instance; + + private final ElasticsearchClient client; + private final Rest5Client restClient; + @Getter private final EmbeddingClient embeddingClient; + private final String language; + private final int knnNumCandidatesMultiplier; + private ElasticSearchVectorBulkProcessor centralBulkProcessor; + private String centralBulkProcessorIndex; + + public ElasticSearchVectorService( + ElasticsearchClient client, + EmbeddingClient embeddingClient, + String language, + int knnNumCandidatesMultiplier) { + this.client = client; + this.restClient = extractRestClient(client); + this.embeddingClient = embeddingClient; + this.language = language != null ? language.toLowerCase(java.util.Locale.ROOT) : "en"; + this.knnNumCandidatesMultiplier = + knnNumCandidatesMultiplier > 0 + ? knnNumCandidatesMultiplier + : VectorSearchQueryBuilder.DEFAULT_KNN_NUM_CANDIDATES_MULTIPLIER; + } + + public ElasticSearchVectorService( + ElasticsearchClient client, EmbeddingClient embeddingClient, String language) { + this( + client, + embeddingClient, + language, + VectorSearchQueryBuilder.DEFAULT_KNN_NUM_CANDIDATES_MULTIPLIER); + } + + public ElasticSearchVectorService(ElasticsearchClient client, EmbeddingClient embeddingClient) { + this(client, embeddingClient, "en"); + } + + private static Rest5Client extractRestClient(ElasticsearchClient client) { + if (!(client._transport() instanceof Rest5ClientTransport rest5)) { + throw new IllegalArgumentException( + "ElasticSearchVectorService requires Rest5ClientTransport, got: " + + client._transport().getClass().getName()); + } + return rest5.restClient(); + } + + public static synchronized void init( + ElasticsearchClient client, EmbeddingClient embeddingClient, String language) { + init( + client, + embeddingClient, + language, + VectorSearchQueryBuilder.DEFAULT_KNN_NUM_CANDIDATES_MULTIPLIER); + } + + public static synchronized void init( + ElasticsearchClient client, + EmbeddingClient embeddingClient, + String language, + int knnNumCandidatesMultiplier) { + if (instance != null) { + LOG.warn("ElasticSearchVectorService already initialized, reinitializing"); + } + instance = + new ElasticSearchVectorService( + client, embeddingClient, language, knnNumCandidatesMultiplier); + instance.registerVectorEmbeddingHandler(); + LOG.info( + "ElasticSearchVectorService initialized with model={}, dimension={}", + embeddingClient.getModelId(), + embeddingClient.getDimension()); + } + + public static ElasticSearchVectorService getInstance() { + return instance; + } + + private void registerVectorEmbeddingHandler() { + try { + VectorEmbeddingHandler handler = new VectorEmbeddingHandler(this); + EntityLifecycleEventDispatcher.getInstance().registerHandler(handler); + LOG.info("Registered VectorEmbeddingHandler for entity lifecycle events"); + } catch (Exception e) { + LOG.error("Failed to register VectorEmbeddingHandler", e); + } + } + + public void close() { + // No-op by design — see OpenSearchVectorService.close() for context. The transport is + // shared with the rest of the application; closing it here would shut down the whole + // HC5 IOReactor. + } + + @Override + @SuppressWarnings("unchecked") + public VectorSearchResponse search( + String query, Map> filters, int size, int k, double threshold) { + long start = System.currentTimeMillis(); + try { + float[] queryVector = embeddingClient.embed(query); + int overFetchSize = size * OVER_FETCH_MULTIPLIER; + + String queryJson = + VectorSearchQueryBuilder.buildNativeESQuery( + queryVector, overFetchSize, k, filters, knnNumCandidatesMultiplier); + String indexName = getClusteredIndexName(); + String responseBody = executeGenericRequest("POST", "/" + indexName + "/_search", queryJson); + + JsonNode root = MAPPER.readTree(responseBody); + JsonNode hitsNode = root.path("hits").path("hits"); + + LinkedHashMap>> byParent = new LinkedHashMap<>(); + for (JsonNode hit : hitsNode) { + double score = hit.path("_score").asDouble(0.0); + if (score < threshold) { + continue; + } + + Map hitMap = MAPPER.convertValue(hit.path("_source"), Map.class); + hitMap.put("_score", score); + + String parentId = (String) hitMap.get("parent_id"); + if (parentId == null) { + parentId = hit.path("_id").asText(); + hitMap.put("parent_id", parentId); + } + byParent.computeIfAbsent(parentId, kVal -> new ArrayList<>()).add(hitMap); + } + + List> results = new ArrayList<>(); + int parentCount = 0; + for (List> chunks : byParent.values()) { + if (parentCount >= size) { + break; + } + results.addAll(chunks); + parentCount++; + } + + long tookMillis = System.currentTimeMillis() - start; + return new VectorSearchResponse(tookMillis, results); + } catch (Exception e) { + LOG.error("Vector search failed: {}", e.getMessage(), e); + throw new RuntimeException("Vector search failed", e); + } + } + + String executeGenericRequest(String method, String endpoint, String body) { + try { + Request request = new Request(method, endpoint); + if (body != null) { + request.setJsonEntity(body); + } + // Rest5Client.performRequest only throws ResponseException on 5xx (its internal + // isCorrectServerResponse is `code < 500`). 4xx responses are returned normally, + // so we still need a manual status-code check below for client errors. + Response response = restClient.performRequest(request); + int statusCode = response.getStatusCode(); + String responseBody = readEntityBody(response.getEntity()); + if (statusCode >= 400) { + throw new RuntimeException( + "Elasticsearch request failed with status " + statusCode + ": " + responseBody); + } + return responseBody; + } catch (ResponseException e) { + int statusCode = e.getResponse().getStatusCode(); + String errorBody = readEntityBody(e.getResponse().getEntity()); + LOG.error("Generic request failed: {} {}", method, endpoint, e); + throw new RuntimeException( + "Elasticsearch request failed with status " + statusCode + ": " + errorBody, e); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + LOG.error("Generic request failed: {} {}", method, endpoint, e); + throw new RuntimeException("Elasticsearch generic request failed", e); + } + } + + private static String readEntityBody(org.apache.hc.core5.http.HttpEntity entity) { + if (entity == null) { + return ""; + } + try (InputStream is = entity.getContent()) { + return new String(is.readAllBytes(), StandardCharsets.UTF_8); + } catch (Exception ignored) { + return ""; + } + } + + @Override + public void updateVectorEmbeddings(EntityInterface entity, String targetIndex) { + try { + String parentId = entity.getId().toString(); + String existingFingerprint = getExistingFingerprint(targetIndex, parentId); + String currentFingerprint = VectorDocBuilder.computeFingerprintForEntity(entity); + + if (currentFingerprint.equals(existingFingerprint)) { + LOG.debug("Skipping entity {} - fingerprint unchanged", parentId); + return; + } + + List> docs = VectorDocBuilder.fromEntity(entity, embeddingClient); + deleteByParentId(targetIndex, parentId); + bulkIndex(docs, targetIndex); + } catch (Exception e) { + LOG.error( + "Failed to update vector embeddings for entity {}: {}", + entity.getId(), + e.getMessage(), + e); + } + } + + @Override + public void updateVectorEmbeddingsWithMigration( + EntityInterface entity, String targetIndex, String sourceIndex) { + try { + String parentId = entity.getId().toString(); + String currentFingerprint = VectorDocBuilder.computeFingerprintForEntity(entity); + + if (sourceIndex != null) { + try { + String existingFingerprint = getExistingFingerprint(sourceIndex, parentId); + if (currentFingerprint.equals(existingFingerprint)) { + if (copyExistingVectorDocuments( + sourceIndex, targetIndex, parentId, currentFingerprint)) { + return; + } + } + } catch (Exception ex) { + LOG.warn( + "Migration copy failed for entity {}, falling back to recomputation: {}", + parentId, + ex.getMessage()); + } + } + + List> docs = VectorDocBuilder.fromEntity(entity, embeddingClient); + bulkIndex(docs, targetIndex); + } catch (Exception e) { + LOG.error( + "Failed to update vector embeddings with migration for entity {}: {}", + entity.getId(), + e.getMessage(), + e); + } + } + + @Override + public String getExistingFingerprint(String indexName, String parentId) { + try { + String query = + "{\"size\":1,\"_source\":[\"fingerprint\"]," + + "\"query\":{\"term\":{\"parent_id\":\"" + + VectorSearchQueryBuilder.escape(parentId) + + "\"}}}"; + String response = executeGenericRequest("POST", "/" + indexName + "/_search", query); + JsonNode root = MAPPER.readTree(response); + JsonNode hits = root.path("hits").path("hits"); + if (hits.isArray() && !hits.isEmpty()) { + return hits.get(0).path("_source").path("fingerprint").asText(null); + } + } catch (Exception e) { + LOG.debug( + "Failed to get fingerprint for parent_id={} in index={}: {}", + parentId, + indexName, + e.getMessage()); + } + return null; + } + + @Override + public Map getExistingFingerprintsBatch( + String indexName, List parentIds) { + if (parentIds == null || parentIds.isEmpty()) { + return Collections.emptyMap(); + } + try { + StringBuilder termsArray = new StringBuilder("["); + for (int i = 0; i < parentIds.size(); i++) { + if (i > 0) termsArray.append(','); + termsArray + .append("\"") + .append(VectorSearchQueryBuilder.escape(parentIds.get(i))) + .append("\""); + } + termsArray.append("]"); + + String query = + "{\"size\":" + + parentIds.size() + + ",\"_source\":[\"parent_id\",\"fingerprint\"]" + + ",\"query\":{\"terms\":{\"parent_id\":" + + termsArray + + "}}" + + ",\"collapse\":{\"field\":\"parent_id\"}}"; + + String response = executeGenericRequest("POST", "/" + indexName + "/_search", query); + JsonNode root = MAPPER.readTree(response); + JsonNode hits = root.path("hits").path("hits"); + + Map result = new HashMap<>(); + for (JsonNode hit : hits) { + String pid = hit.path("_source").path("parent_id").asText(); + String fp = hit.path("_source").path("fingerprint").asText(null); + if (pid != null && fp != null) { + result.put(pid, fp); + } + } + return result; + } catch (Exception e) { + LOG.error("Failed to batch get fingerprints in index={}: {}", indexName, e.getMessage(), e); + return Collections.emptyMap(); + } + } + + @Override + @SuppressWarnings("unchecked") + public boolean copyExistingVectorDocuments( + String sourceIndex, String targetIndex, String parentId, String fingerprint) { + try { + String searchQuery = + "{\"size\":1000,\"query\":{\"term\":{\"parent_id\":\"" + + VectorSearchQueryBuilder.escape(parentId) + + "\"}}}"; + String response = executeGenericRequest("POST", "/" + sourceIndex + "/_search", searchQuery); + JsonNode root = MAPPER.readTree(response); + JsonNode hits = root.path("hits").path("hits"); + + if (!hits.isArray() || hits.isEmpty()) { + return false; + } + + List> docs = new ArrayList<>(); + for (JsonNode hit : hits) { + Map source = MAPPER.convertValue(hit.path("_source"), Map.class); + source.put("fingerprint", fingerprint); + docs.add(source); + } + bulkIndex(docs, targetIndex); + return true; + } catch (Exception e) { + LOG.error( + "Failed to copy vector documents from {} to {} for parent_id={}: {}", + sourceIndex, + targetIndex, + parentId, + e.getMessage(), + e); + return false; + } + } + + @Override + public void softDeleteEmbeddings(EntityInterface entity) { + try { + String parentId = entity.getId().toString(); + String indexName = getClusteredIndexName(); + String script = + "{\"script\":{\"source\":\"ctx._source.deleted = true\"}," + + "\"query\":{\"term\":{\"parent_id\":\"" + + VectorSearchQueryBuilder.escape(parentId) + + "\"}}}"; + executeGenericRequest("POST", "/" + indexName + "/_update_by_query", script); + } catch (Exception e) { + LOG.error( + "Failed to soft delete embeddings for entity {}: {}", entity.getId(), e.getMessage(), e); + } + } + + @Override + public void hardDeleteEmbeddings(EntityInterface entity) { + try { + String parentId = entity.getId().toString(); + String indexName = getClusteredIndexName(); + deleteByParentId(indexName, parentId); + } catch (Exception e) { + LOG.error( + "Failed to hard delete embeddings for entity {}: {}", entity.getId(), e.getMessage(), e); + } + } + + @Override + public void restoreEmbeddings(EntityInterface entity) { + try { + String parentId = entity.getId().toString(); + String indexName = getClusteredIndexName(); + String script = + "{\"script\":{\"source\":\"ctx._source.deleted = false\"}," + + "\"query\":{\"term\":{\"parent_id\":\"" + + VectorSearchQueryBuilder.escape(parentId) + + "\"}}}"; + executeGenericRequest("POST", "/" + indexName + "/_update_by_query", script); + } catch (Exception e) { + LOG.error( + "Failed to restore embeddings for entity {}: {}", entity.getId(), e.getMessage(), e); + } + } + + private void deleteByParentId(String indexName, String parentId) { + try { + String query = + "{\"query\":{\"term\":{\"parent_id\":\"" + + VectorSearchQueryBuilder.escape(parentId) + + "\"}}}"; + executeGenericRequest("POST", "/" + indexName + "/_delete_by_query", query); + } catch (Exception e) { + LOG.error( + "Failed to delete by parent_id={} in index={}: {}", + parentId, + indexName, + e.getMessage(), + e); + } + } + + private static String getClusteredIndexName() { + return VectorIndexService.getClusteredIndexName(); + } + + @Override + public void createOrUpdateIndex(int dimension) { + try { + if (indexExists()) { + LOG.info("Vector index {} already exists", VECTOR_INDEX_NAME); + return; + } + + String mappingJson = loadIndexMapping(dimension); + executeGenericRequest("PUT", "/" + getClusteredIndexName(), mappingJson); + LOG.info("Created vector index {} with dimension {}", getClusteredIndexName(), dimension); + } catch (Exception e) { + LOG.error("Failed to create vector index: {}", e.getMessage(), e); + } + } + + @Override + public boolean indexExists() { + try { + Request request = new Request("HEAD", "/" + getClusteredIndexName()); + Response response = restClient.performRequest(request); + return response.getStatusCode() == 200; + } catch (ResponseException e) { + return false; + } catch (Exception e) { + LOG.error("Failed to check if vector index exists: {}", e.getMessage(), e); + return false; + } + } + + @Override + public String getIndexName() { + return getClusteredIndexName(); + } + + @Override + public void bulkIndex(List> documents, String targetIndex) { + if (documents == null || documents.isEmpty()) { + return; + } + + ElasticSearchVectorBulkProcessor processor = getOrCreateBulkProcessor(targetIndex); + for (int i = 0; i < documents.size(); i++) { + Map doc = documents.get(i); + String parentId = (String) doc.get("parent_id"); + int chunkIndex = doc.containsKey("chunk_index") ? (int) doc.get("chunk_index") : i; + String docId = parentId + "-" + chunkIndex; + processor.addChunk(docId, doc); + } + } + + private synchronized ElasticSearchVectorBulkProcessor getOrCreateBulkProcessor( + String targetIndex) { + if (centralBulkProcessor == null || !targetIndex.equals(centralBulkProcessorIndex)) { + if (centralBulkProcessor != null) { + centralBulkProcessor.close(); + } + centralBulkProcessor = new ElasticSearchVectorBulkProcessor(restClient, targetIndex); + centralBulkProcessorIndex = targetIndex; + } + return centralBulkProcessor; + } + + public synchronized void flushBulkProcessor() { + if (centralBulkProcessor != null) { + centralBulkProcessor.close(); + centralBulkProcessor = null; + centralBulkProcessorIndex = null; + } + } + + private String loadIndexMapping(int dimension) { + String resourcePath = "elasticsearch/" + language + "/vector_search_index_es_native.json"; + try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream(resourcePath)) { + if (inputStream == null) { + throw new IllegalStateException("Could not find " + resourcePath + " in classpath"); + } + String template = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8); + // Templates ship with dims:512 as the placeholder; rewrite to the active dimension. + String result = template.replace("\"dims\": 512", "\"dims\": " + dimension); + if (result.equals(template) && dimension != 512) { + throw new IllegalStateException( + "Failed to replace dims placeholder in ES vector index mapping template"); + } + return result; + } catch (IOException e) { + throw new RuntimeException("Failed to load ES vector search index mapping", e); + } + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/vector/VectorSearchQueryBuilder.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/vector/VectorSearchQueryBuilder.java index 7e785b5f941f..3e9b2934156e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/vector/VectorSearchQueryBuilder.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/vector/VectorSearchQueryBuilder.java @@ -15,6 +15,52 @@ public class VectorSearchQueryBuilder { private static final String ANY = "__ANY__"; private static final String NONE = "__NONE__"; + public static final int DEFAULT_KNN_NUM_CANDIDATES_MULTIPLIER = 2; + + /** + * Build an Elasticsearch 8.x/9.x native KNN query (top-level {@code knn} block with + * {@code field}, {@code query_vector}, {@code k}, {@code num_candidates}, and an optional + * filter). Use this against indexes that store the embedding field as {@code dense_vector}. + */ + public static String buildNativeESQuery( + float[] vector, int size, int k, Map> filters) { + return buildNativeESQuery(vector, size, k, filters, DEFAULT_KNN_NUM_CANDIDATES_MULTIPLIER); + } + + public static String buildNativeESQuery( + float[] vector, + int size, + int k, + Map> filters, + int numCandidatesMultiplier) { + // Compute in long to avoid int overflow when k * multiplier exceeds Integer.MAX_VALUE; + // clamp to Integer.MAX_VALUE so num_candidates is always positive. + long candidatesLong = (long) k * (long) numCandidatesMultiplier; + int numCandidates = + (int) Math.max(100, Math.min(candidatesLong, (long) Integer.MAX_VALUE)); + + StringBuilder sb = + new StringBuilder(512) + .append("{\"size\":") + .append(size) + .append(",\"_source\":{\"excludes\":[\"embedding\"]}") + .append(",\"knn\":{") + .append("\"field\":\"embedding\"") + .append(",\"query_vector\":") + .append(Arrays.toString(vector)) + .append(",\"k\":") + .append(k) + .append(",\"num_candidates\":") + .append(numCandidates); + + sb.append(",\"filter\":{\"bool\":{\"must\":["); + appendMustClauses(sb, filters); + sb.append("]}}"); // close must array and bool + + sb.append("}}"); // close knn object + return sb.toString(); + } + public static String build(float[] vector, int size, int k, Map> filters) { StringBuilder sb = @@ -31,6 +77,15 @@ public static String build(float[] vector, int size, int k, Map> filters) { // Only include documents where deleted=false sb.append("{\"term\":{\"deleted\":false}}"); @@ -78,11 +133,6 @@ public static String build(float[] vector, int size, int k, Map vals) { diff --git a/openmetadata-spec/src/main/resources/elasticsearch/en/vector_search_index_es_native.json b/openmetadata-spec/src/main/resources/elasticsearch/en/vector_search_index_es_native.json new file mode 100644 index 000000000000..e2cf3e99500b --- /dev/null +++ b/openmetadata-spec/src/main/resources/elasticsearch/en/vector_search_index_es_native.json @@ -0,0 +1,294 @@ +{ + "settings": { + "index": {}, + "analysis": { + "normalizer": { + "lowercase_normalizer": { + "type": "custom", + "filter": [ + "lowercase" + ] + } + }, + "filter": { + "om_stemmer": { + "type": "stemmer", + "name": "english" + }, + "word_delimiter_filter": { + "type": "word_delimiter", + "preserve_original": "true" + } + }, + "analyzer": { + "om_analyzer": { + "tokenizer": "standard", + "filter": [ + "lowercase", + "word_delimiter_filter", + "om_stemmer" + ] + } + } + } + }, + "mappings": { + "properties": { + "embedding": { + "type": "dense_vector", + "dims": 512, + "index": true, + "similarity": "cosine" + }, + "text_to_embed": { + "type": "text" + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + }, + "fullyQualifiedName": { + "type": "keyword" + }, + "entityType": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "normalizer": "lowercase_normalizer", + "ignore_above": 256 + } + } + }, + "serviceType": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + }, + "parent_id": { + "type": "keyword" + }, + "chunk_index": { + "type": "integer" + }, + "chunk_count": { + "type": "integer" + }, + "tags": { + "type": "nested", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "tier": { + "type": "object", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "certification": { + "type": "object", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "domains": { + "type": "object", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "displayName": { + "type": "text" + } + } + }, + "owners": { + "type": "nested", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "type": { + "type": "keyword" + }, + "displayName": { + "type": "text" + } + } + }, + "customProperties": { + "type": "object" + }, + "sourceId": { + "type": "keyword" + }, + "deleted": { + "type": "boolean" + }, + "fingerprint": { + "type": "keyword" + }, + "upVotes": { + "type": "integer" + }, + "downVotes": { + "type": "integer" + }, + "totalVotes": { + "type": "integer" + }, + "followersCount": { + "type": "integer" + }, + "usageSummary": { + "type": "object", + "properties": { + "dailyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + } + } + }, + "weeklyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + }, + "percentileRank": { + "type": "double" + } + } + }, + "monthlyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + }, + "percentileRank": { + "type": "double" + } + } + } + } + }, + "synonyms": { + "type": "keyword" + }, + "relatedTerms": { + "type": "nested", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "type": { + "type": "keyword" + }, + "displayName": { + "type": "text" + }, + "fullyQualifiedName": { + "type": "keyword" + } + } + }, + "metricExpression": { + "type": "object", + "properties": { + "language": { + "type": "keyword" + }, + "code": { + "type": "text", + "analyzer": "om_analyzer" + } + } + }, + "metricType": { + "type": "keyword" + }, + "unitOfMeasurement": { + "type": "keyword" + }, + "customUnitOfMeasurement": { + "type": "keyword" + }, + "granularity": { + "type": "keyword" + }, + "relatedMetrics": { + "type": "keyword" + } + } + } +} diff --git a/openmetadata-spec/src/main/resources/elasticsearch/jp/vector_search_index_es_native.json b/openmetadata-spec/src/main/resources/elasticsearch/jp/vector_search_index_es_native.json new file mode 100644 index 000000000000..1d42210083e8 --- /dev/null +++ b/openmetadata-spec/src/main/resources/elasticsearch/jp/vector_search_index_es_native.json @@ -0,0 +1,294 @@ +{ + "settings": { + "index": {}, + "analysis": { + "normalizer": { + "lowercase_normalizer": { + "type": "custom", + "filter": [ + "lowercase" + ] + } + }, + "filter": { + "om_stemmer": { + "type": "stemmer", + "name": "english" + }, + "word_delimiter_filter": { + "type": "word_delimiter", + "preserve_original": "true" + } + }, + "analyzer": { + "om_analyzer": { + "tokenizer": "standard", + "filter": [ + "lowercase", + "word_delimiter_filter", + "om_stemmer" + ] + } + } + } + }, + "mappings": { + "properties": { + "embedding": { + "type": "dense_vector", + "dims": 512, + "index": true, + "similarity": "cosine" + }, + "text_to_embed": { + "type": "text" + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + }, + "fullyQualifiedName": { + "type": "keyword" + }, + "entityType": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "normalizer": "lowercase_normalizer", + "ignore_above": 256 + } + } + }, + "serviceType": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + }, + "parent_id": { + "type": "keyword" + }, + "chunk_index": { + "type": "integer" + }, + "chunk_count": { + "type": "integer" + }, + "tags": { + "type": "nested", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "tier": { + "type": "object", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "certification": { + "type": "object", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "domains": { + "type": "object", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "displayName": { + "type": "text" + } + } + }, + "owners": { + "type": "nested", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "type": { + "type": "keyword" + }, + "displayName": { + "type": "text" + } + } + }, + "customProperties": { + "type": "object" + }, + "sourceId": { + "type": "keyword" + }, + "deleted": { + "type": "boolean" + }, + "fingerprint": { + "type": "keyword" + }, + "upVotes": { + "type": "integer" + }, + "downVotes": { + "type": "integer" + }, + "totalVotes": { + "type": "integer" + }, + "followersCount": { + "type": "integer" + }, + "synonyms": { + "type": "keyword" + }, + "relatedTerms": { + "type": "nested", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "type": { + "type": "keyword" + }, + "displayName": { + "type": "text" + }, + "fullyQualifiedName": { + "type": "keyword" + } + } + }, + "usageSummary": { + "type": "object", + "properties": { + "dailyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + } + } + }, + "weeklyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + }, + "percentileRank": { + "type": "double" + } + } + }, + "monthlyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + }, + "percentileRank": { + "type": "double" + } + } + } + } + }, + "metricExpression": { + "type": "object", + "properties": { + "language": { + "type": "keyword" + }, + "code": { + "type": "text", + "analyzer": "om_analyzer" + } + } + }, + "metricType": { + "type": "keyword" + }, + "unitOfMeasurement": { + "type": "keyword" + }, + "customUnitOfMeasurement": { + "type": "keyword" + }, + "granularity": { + "type": "keyword" + }, + "relatedMetrics": { + "type": "keyword" + } + } + } +} diff --git a/openmetadata-spec/src/main/resources/elasticsearch/ru/vector_search_index_es_native.json b/openmetadata-spec/src/main/resources/elasticsearch/ru/vector_search_index_es_native.json new file mode 100644 index 000000000000..6f621f1fdb80 --- /dev/null +++ b/openmetadata-spec/src/main/resources/elasticsearch/ru/vector_search_index_es_native.json @@ -0,0 +1,410 @@ +{ + "settings": { + "index": { + "max_ngram_diff": 17 + }, + "analysis": { + "tokenizer": { + "n_gram_tokenizer": { + "type": "ngram", + "min_gram": 3, + "max_gram": 20, + "token_chars": [ + "letter", + "digit" + ] + } + }, + "normalizer": { + "lowercase_normalizer": { + "type": "custom", + "filter": [ + "lowercase", + "asciifolding" + ] + } + }, + "filter": { + "word_delimiter_filter": { + "type": "word_delimiter", + "preserve_original": true + }, + "compound_word_delimiter_graph": { + "type": "word_delimiter_graph", + "generate_word_parts": true, + "generate_number_parts": true, + "split_on_case_change": true, + "split_on_numerics": true, + "catenate_words": false, + "catenate_numbers": false, + "catenate_all": false, + "preserve_original": true, + "stem_english_possessive": true + }, + "russian_stop": { + "type": "stop", + "stopwords": "_russian_" + }, + "english_stop": { + "type": "stop", + "stopwords": "_english_" + }, + "russian_snowball": { + "name": "russian", + "type": "stemmer" + }, + "om_kstem": { + "type": "kstem" + }, + "asciifolding": { + "type": "asciifolding" + } + }, + "analyzer": { + "om_analyzer": { + "tokenizer": "standard", + "filter": [ + "word_delimiter_filter", + "lowercase", + "asciifolding", + "russian_stop", + "russian_snowball", + "english_stop", + "om_kstem" + ] + }, + "om_ngram": { + "type": "custom", + "tokenizer": "n_gram_tokenizer", + "filter": [ + "lowercase" + ] + }, + "om_compound_analyzer": { + "tokenizer": "standard", + "filter": [ + "compound_word_delimiter_graph", + "lowercase", + "flatten_graph" + ] + } + } + } + }, + "mappings": { + "properties": { + "embedding": { + "type": "dense_vector", + "dims": 512, + "index": true, + "similarity": "cosine" + }, + "text_to_embed": { + "type": "text" + }, + "name": { + "type": "text", + "analyzer": "om_analyzer", + "fields": { + "keyword": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + }, + "ngram": { + "type": "text", + "analyzer": "om_ngram" + }, + "compound": { + "type": "text", + "analyzer": "om_compound_analyzer" + } + } + }, + "fullyQualifiedName": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + }, + "entityType": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "normalizer": "lowercase_normalizer", + "ignore_above": 256 + } + } + }, + "serviceType": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + }, + "parent_id": { + "type": "keyword" + }, + "chunk_index": { + "type": "integer" + }, + "chunk_count": { + "type": "integer" + }, + "tags": { + "type": "nested", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "tier": { + "type": "object", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "certification": { + "type": "object", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "domains": { + "type": "object", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "displayName": { + "type": "text", + "analyzer": "om_analyzer", + "fields": { + "keyword": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + }, + "ngram": { + "type": "text", + "analyzer": "om_ngram" + }, + "compound": { + "type": "text", + "analyzer": "om_compound_analyzer" + } + } + } + } + }, + "owners": { + "type": "nested", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "type": { + "type": "keyword" + }, + "displayName": { + "type": "text", + "analyzer": "om_analyzer", + "fields": { + "keyword": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + }, + "ngram": { + "type": "text", + "analyzer": "om_ngram" + }, + "compound": { + "type": "text", + "analyzer": "om_compound_analyzer" + } + } + } + } + }, + "customProperties": { + "type": "object" + }, + "sourceId": { + "type": "keyword" + }, + "deleted": { + "type": "boolean" + }, + "fingerprint": { + "type": "keyword" + }, + "upVotes": { + "type": "integer" + }, + "downVotes": { + "type": "integer" + }, + "totalVotes": { + "type": "integer" + }, + "followersCount": { + "type": "integer" + }, + "synonyms": { + "type": "keyword" + }, + "relatedTerms": { + "type": "nested", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "type": { + "type": "keyword" + }, + "displayName": { + "type": "text", + "analyzer": "om_analyzer", + "fields": { + "keyword": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + }, + "ngram": { + "type": "text", + "analyzer": "om_ngram" + }, + "compound": { + "type": "text", + "analyzer": "om_compound_analyzer" + } + } + }, + "fullyQualifiedName": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + } + } + }, + "usageSummary": { + "type": "object", + "properties": { + "dailyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + } + } + }, + "weeklyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + }, + "percentileRank": { + "type": "double" + } + } + }, + "monthlyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + }, + "percentileRank": { + "type": "double" + } + } + } + } + }, + "metricExpression": { + "type": "object", + "properties": { + "language": { + "type": "keyword" + }, + "code": { + "type": "text", + "analyzer": "om_analyzer" + } + } + }, + "metricType": { + "type": "keyword" + }, + "unitOfMeasurement": { + "type": "keyword" + }, + "customUnitOfMeasurement": { + "type": "keyword" + }, + "granularity": { + "type": "keyword" + }, + "relatedMetrics": { + "type": "keyword" + } + } + } +} diff --git a/openmetadata-spec/src/main/resources/elasticsearch/zh/vector_search_index_es_native.json b/openmetadata-spec/src/main/resources/elasticsearch/zh/vector_search_index_es_native.json new file mode 100644 index 000000000000..1d42210083e8 --- /dev/null +++ b/openmetadata-spec/src/main/resources/elasticsearch/zh/vector_search_index_es_native.json @@ -0,0 +1,294 @@ +{ + "settings": { + "index": {}, + "analysis": { + "normalizer": { + "lowercase_normalizer": { + "type": "custom", + "filter": [ + "lowercase" + ] + } + }, + "filter": { + "om_stemmer": { + "type": "stemmer", + "name": "english" + }, + "word_delimiter_filter": { + "type": "word_delimiter", + "preserve_original": "true" + } + }, + "analyzer": { + "om_analyzer": { + "tokenizer": "standard", + "filter": [ + "lowercase", + "word_delimiter_filter", + "om_stemmer" + ] + } + } + } + }, + "mappings": { + "properties": { + "embedding": { + "type": "dense_vector", + "dims": 512, + "index": true, + "similarity": "cosine" + }, + "text_to_embed": { + "type": "text" + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + }, + "fullyQualifiedName": { + "type": "keyword" + }, + "entityType": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "normalizer": "lowercase_normalizer", + "ignore_above": 256 + } + } + }, + "serviceType": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + }, + "parent_id": { + "type": "keyword" + }, + "chunk_index": { + "type": "integer" + }, + "chunk_count": { + "type": "integer" + }, + "tags": { + "type": "nested", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "tier": { + "type": "object", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "certification": { + "type": "object", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "domains": { + "type": "object", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "displayName": { + "type": "text" + } + } + }, + "owners": { + "type": "nested", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "type": { + "type": "keyword" + }, + "displayName": { + "type": "text" + } + } + }, + "customProperties": { + "type": "object" + }, + "sourceId": { + "type": "keyword" + }, + "deleted": { + "type": "boolean" + }, + "fingerprint": { + "type": "keyword" + }, + "upVotes": { + "type": "integer" + }, + "downVotes": { + "type": "integer" + }, + "totalVotes": { + "type": "integer" + }, + "followersCount": { + "type": "integer" + }, + "synonyms": { + "type": "keyword" + }, + "relatedTerms": { + "type": "nested", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "type": { + "type": "keyword" + }, + "displayName": { + "type": "text" + }, + "fullyQualifiedName": { + "type": "keyword" + } + } + }, + "usageSummary": { + "type": "object", + "properties": { + "dailyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + } + } + }, + "weeklyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + }, + "percentileRank": { + "type": "double" + } + } + }, + "monthlyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + }, + "percentileRank": { + "type": "double" + } + } + } + } + }, + "metricExpression": { + "type": "object", + "properties": { + "language": { + "type": "keyword" + }, + "code": { + "type": "text", + "analyzer": "om_analyzer" + } + } + }, + "metricType": { + "type": "keyword" + }, + "unitOfMeasurement": { + "type": "keyword" + }, + "customUnitOfMeasurement": { + "type": "keyword" + }, + "granularity": { + "type": "keyword" + }, + "relatedMetrics": { + "type": "keyword" + } + } + } +} From b43b328117609d23a075b09730401f57e8836a97 Mon Sep 17 00:00:00 2001 From: Joao Amaral <7281460+joaopamaral@users.noreply.github.com> Date: Mon, 11 May 2026 18:42:46 -0300 Subject: [PATCH 2/7] fix(es-vector): wire dedicated-arch boot path end-to-end for Elasticsearch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three fixes to make the ES vector backport actually serve queries in 1.12.7's dedicated-index architecture: - SearchRepository.getIndexMapping: when SearchType=ELASTICSEARCH and the IndexMapping's resource path is the OpenSearch-format vector_search_index.json, swap to vector_search_index_es_native.json. Without this, ES rejects the template (unknown setting [index.knn]). - SearchRepository.reformatVectorIndexWithDimension: on Elasticsearch, patch dense_vector.dims (not OpenSearch's knn_vector.dimension). The string-replace fallback also targets the ES field name. - SearchRepository.createMissingIndexes / createOrUpdateIndexTemplates: skip the "vectorEmbedding" entry on Elasticsearch. Boot Phase 1 runs before embeddingClient is initialized, so the JSON template's placeholder dimension would be baked in — and dense_vector.dims is immutable on an existing ES index. ElasticSearchVectorService creates the index later in Phase 3 with the active model's real dimension. - VectorSearchResource: read vectorIndexService via SearchRepository instead of OpenSearchVectorService.getInstance(), so the resource works for both backends. ES queries previously returned 503 "Vector search service is not initialized" because the resource only checked the OS singleton. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../search/VectorSearchResource.java | 6 +-- .../service/search/SearchRepository.java | 38 +++++++++++++++---- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/VectorSearchResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/VectorSearchResource.java index 695bc1670d5e..e867b24181a5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/VectorSearchResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/VectorSearchResource.java @@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.openmetadata.service.Entity; import org.openmetadata.service.resources.Collection; -import org.openmetadata.service.search.vector.OpenSearchVectorService; +import org.openmetadata.service.search.vector.VectorIndexService; import org.openmetadata.service.search.vector.utils.DTOs.FingerprintResponse; import org.openmetadata.service.search.vector.utils.DTOs.VectorSearchRequest; import org.openmetadata.service.search.vector.utils.DTOs.VectorSearchResponse; @@ -75,7 +75,7 @@ public Response vectorSearchPost( .build(); } - OpenSearchVectorService vectorService = OpenSearchVectorService.getInstance(); + VectorIndexService vectorService = Entity.getSearchRepository().getVectorIndexService(); if (vectorService == null) { return Response.status(Response.Status.SERVICE_UNAVAILABLE) .entity("{\"error\":\"Vector search service is not initialized\"}") @@ -119,7 +119,7 @@ public Response getFingerprint( .build(); } - OpenSearchVectorService vectorService = OpenSearchVectorService.getInstance(); + VectorIndexService vectorService = Entity.getSearchRepository().getVectorIndexService(); if (vectorService == null) { return Response.status(Response.Status.SERVICE_UNAVAILABLE) .entity("{\"error\":\"Vector search service is not initialized\"}") diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index 6b3f3c899cbb..0b279d31adca 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -323,7 +323,16 @@ public void updateIndexes() { public void createMissingIndexes() { LOG.info("Checking for missing search indexes..."); int created = 0; + boolean isEs = getSearchType() == ElasticSearchConfiguration.SearchType.ELASTICSEARCH; for (Map.Entry entry : entityIndexMap.entrySet()) { + // On Elasticsearch, defer vectorEmbedding index creation to ElasticSearchVectorService: + // it knows the active embedding model dimension and creates dense_vector with the + // correct dims. Creating it here would bake the placeholder dimension into the index + // (embeddingClient is not initialized yet at this boot phase) and dense_vector.dims + // is immutable on an existing ES index. + if (isEs && VectorIndexService.VECTOR_INDEX_KEY.equals(entry.getKey())) { + continue; + } try { if (!indexExists(entry.getValue())) { createIndex(entry.getValue()); @@ -348,7 +357,14 @@ public void createOrUpdateIndexTemplates() { LOG.info("Creating/updating index templates for all entities..."); int success = 0; int failed = 0; + boolean isEs = getSearchType() == ElasticSearchConfiguration.SearchType.ELASTICSEARCH; for (Map.Entry entry : entityIndexMap.entrySet()) { + // See createMissingIndexes: the vectorEmbedding index template would bake the + // placeholder dimension on Elasticsearch. ElasticSearchVectorService creates the + // actual index later with the correct dense_vector dims. + if (isEs && VectorIndexService.VECTOR_INDEX_KEY.equals(entry.getKey())) { + continue; + } try { IndexMapping indexMapping = entry.getValue(); String indexName = indexMapping.getIndexName(clusterAlias); @@ -526,10 +542,14 @@ public void deleteIndex(IndexMapping indexMapping) { } private String getIndexMapping(IndexMapping indexMapping) { + String mappingFile = indexMapping.getIndexMappingFile(); + if (getSearchType() == ElasticSearchConfiguration.SearchType.ELASTICSEARCH + && mappingFile != null + && mappingFile.endsWith("/vector_search_index.json")) { + mappingFile = mappingFile.replace("vector_search_index.json", "vector_search_index_es_native.json"); + } try (InputStream in = - getClass() - .getResourceAsStream( - String.format(indexMapping.getIndexMappingFile(), language.toLowerCase()))) { + getClass().getResourceAsStream(String.format(mappingFile, language.toLowerCase()))) { assert in != null; return new String(in.readAllBytes()); } catch (Exception e) { @@ -2277,6 +2297,8 @@ public void ensureVectorIndexDimension() { } private String reformatVectorIndexWithDimension(String mapping, int dimension) { + boolean isEs = getSearchType() == ElasticSearchConfiguration.SearchType.ELASTICSEARCH; + String dimensionField = isEs ? "dims" : "dimension"; try { com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); @@ -2287,7 +2309,7 @@ private String reformatVectorIndexWithDimension(String mapping, int dimension) { JsonNode properties = mappings.get("properties"); if (properties.has("embedding")) { ((com.fasterxml.jackson.databind.node.ObjectNode) properties.get("embedding")) - .put("dimension", dimension); + .put(dimensionField, dimension); } } JsonNode meta = @@ -2303,10 +2325,10 @@ private String reformatVectorIndexWithDimension(String mapping, int dimension) { LOG.warn( "Failed to parse mapping JSON for dimension patching, falling back to string replace"); return mapping - .replace("\"dimension\": 768", "\"dimension\": " + dimension) - .replace("\"dimension\":768", "\"dimension\":" + dimension) - .replace("\"dimension\": 512", "\"dimension\": " + dimension) - .replace("\"dimension\":512", "\"dimension\":" + dimension); + .replace("\"" + dimensionField + "\": 768", "\"" + dimensionField + "\": " + dimension) + .replace("\"" + dimensionField + "\":768", "\"" + dimensionField + "\":" + dimension) + .replace("\"" + dimensionField + "\": 512", "\"" + dimensionField + "\": " + dimension) + .replace("\"" + dimensionField + "\":512", "\"" + dimensionField + "\":" + dimension); } } From ab54710b5f9d1c739b80851f835481acf5779c6a Mon Sep 17 00:00:00 2001 From: Joao Amaral <7281460+joaopamaral@users.noreply.github.com> Date: Thu, 14 May 2026 09:42:14 -0300 Subject: [PATCH 3/7] fix(es-ilm): detach ILM via removePolicy endpoint instead of putSettings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `dettachIlmPolicyFromIndexes` previously called `client.indices().putSettings(s -> s.settings(idx -> idx.lifecycle(l -> l.name(null))))`. The ES Java client's Jackson serializer drops null fields, so the body serialized to `{settings: {index: {}}}` — empty — and Elasticsearch rejected the request with `action_request_validation_exception: no settings to update`. The migration `v172.removeOldDataInsightsObjects` swallowed the per-index error in its outer try/catch and continued, so the migration completed "successfully" but left old data-insights ILM policies attached to the data-stream backing indices and the orphan policies on the cluster. Switch to the dedicated `POST //_ilm/remove` endpoint via the typed `client.ilm().removePolicy(r -> r.index(indexName))` call. This is the purpose-built ES API for detaching ILM from an index and doesn't depend on null-field serialization behaviour. Verified by running `./bootstrap/openmetadata-ops.sh drop-create` from a dev image built on the fix branch: migration log now shows `Detached ILM policy from index: ` for every old data-insights backing index, the orphan policy is gone, and the reindex pipeline completes with 56/57 templates created (vectorEmbedding skipped on ES by design). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../elasticsearch/ElasticSearchGenericManager.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchGenericManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchGenericManager.java index 25c64d6f3654..91914e7a8bdb 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchGenericManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchGenericManager.java @@ -195,10 +195,11 @@ public void dettachIlmPolicyFromIndexes(String indexPattern) throws IOException for (String indexName : getIndexResponse.indices().keySet()) { try { - client - .indices() - .putSettings( - s -> s.index(indexName).settings(idx -> idx.lifecycle(l -> l.name(null)))); + // Use the dedicated `POST //_ilm/remove` endpoint instead of + // `PUT _settings { index.lifecycle.name: null }`. The typed putSettings call + // drops null fields from the JSON body, which makes ES reject the request as + // "no settings to update" and leaves the ILM policy attached. + client.ilm().removePolicy(r -> r.index(indexName)); LOG.info("Detached ILM policy from index: {}", indexName); } catch (ElasticsearchException e) { if (e.status() == 404) { From 12cc62664536cf75d58d19be802ce959e7c5b60c Mon Sep 17 00:00:00 2001 From: Joao Amaral <7281460+joaopamaral@users.noreply.github.com> Date: Thu, 14 May 2026 11:18:11 -0300 Subject: [PATCH 4/7] fix(ldap): load full user fields on login so teams are not wiped LdapAuthenticator.checkAndCreateUser fetched the existing OM user with only "id,name,email,roles". The subsequent PUT through UserUtil.addOrUpdateUser -> userRepository.createOrUpdate then ran the UserUpdater, whose entitySpecificUpdate unconditionally calls updateTeams / updatePersonas / etc. Because the in-memory user had teams = null, updateTeams executed deleteTo(user, HAS, TEAM) + assignTeams(null), which wiped every manually-assigned team on every LDAP login. The same path wiped personas, defaultPersona, profile, domains, personaPreferences, authenticationMechanism, and isEmailVerified. The deleteTo against a user with many teams also made login visibly slow. Switch the fetch to userRepository.getFieldsWithUserAuth("*"), matching the BasicAuthenticator path, so the PUT sees the user's full state and the updater preserves it. Adds LdapAuthCompleteFlowTest.testLoginPreservesManuallyAssignedTeams, a regression test that creates three pure-OM teams (no LDAP group/DN/role-mapping backing), manually assigns them to the LDAP user, logs in over LDAP, and asserts the three teams are still on the user afterwards. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../security/auth/LdapAuthenticator.java | 9 +- .../auth/LdapAuthCompleteFlowTest.java | 116 ++++++++++++++++++ 2 files changed, 124 insertions(+), 1 deletion(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/LdapAuthenticator.java b/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/LdapAuthenticator.java index 0e69ef790614..8799795c644c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/LdapAuthenticator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/LdapAuthenticator.java @@ -162,8 +162,15 @@ private User checkAndCreateUser(String userDn, String email, String userName) th // Check if the user exists in OM Database try { + // Load all updater-tracked fields (teams, personas, profile, defaultPersona, domains, + // personaPreferences, authenticationMechanism, isEmailVerified) so the subsequent PUT + // in UserUtil.addOrUpdateUser doesn't clobber them. UserUpdater.entitySpecificUpdate + // runs updateTeams / updatePersonas / etc. unconditionally; with a sparse fetch those + // fields arrive null and the updater wipes the corresponding relationships, which + // destroys the user's manually-assigned teams on every LDAP login and makes login slow + // (deleteTo does work proportional to the user's existing team count). User omUser = - userRepository.getByEmail(null, email, userRepository.getFields("id,name,email,roles")); + userRepository.getByEmail(null, email, userRepository.getFieldsWithUserAuth("*")); getRoleForLdap(userDn, omUser, Boolean.TRUE); finalUser = omUser; } catch (EntityNotFoundException ex) { diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/security/auth/LdapAuthCompleteFlowTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/security/auth/LdapAuthCompleteFlowTest.java index 2fe87da7fa31..688e782cf394 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/security/auth/LdapAuthCompleteFlowTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/security/auth/LdapAuthCompleteFlowTest.java @@ -440,6 +440,122 @@ void testAdminPrincipalsGrantsAdminPrivileges() throws Exception { user.getName()); } + @Test + @Order(10) + void testLoginPreservesManuallyAssignedTeams() throws Exception { + // Regression for the bug where LDAP login wiped the user's existing team membership. + // The teams created/assigned here are manual OpenMetadata teams — they have no + // counterpart in the LDAP directory and are not part of any role/group mapping. They + // model the real-world setup: an admin curates teams in OM by hand and assigns users + // to them; the LDAP integration must not touch those assignments on login. + LOG.info("Testing that LDAP login preserves manually-assigned (non-LDAP) OM teams"); + + // Make sure the user has been created in OM by an earlier login. + LoginRequest initialLogin = new LoginRequest(); + initialLogin.setEmail(TEST_USER_EMAIL); + initialLogin.setPassword(Base64.getEncoder().encodeToString(TEST_USER_PASSWORD.getBytes())); + Response initial = + client + .target(getServerUrl() + AUTH_LOGIN_ENDPOINT) + .request(MediaType.APPLICATION_JSON) + .post(Entity.json(initialLogin)); + assertEquals(200, initial.getStatus(), "Initial login should succeed"); + + // Create three pure-OM teams directly via TeamRepository. These exist only inside + // OpenMetadata and have no LDAP group/DN/role-mapping backing them. Using the + // repository (not the HTTP API) avoids the TestUtils.ADMIN_AUTH_HEADERS path, which + // does not authenticate once the test class has switched the auth config to LDAP. + org.openmetadata.service.jdbi3.TeamRepository teamRepository = + (org.openmetadata.service.jdbi3.TeamRepository) + org.openmetadata.service.Entity.getEntityRepository( + org.openmetadata.service.Entity.TEAM); + String suffix = String.valueOf(System.currentTimeMillis()); + org.openmetadata.schema.entity.teams.Team team1 = + teamRepository.create( + null, + new org.openmetadata.schema.entity.teams.Team() + .withId(java.util.UUID.randomUUID()) + .withName("om-manual-team-1-" + suffix) + .withTeamType(org.openmetadata.schema.api.teams.CreateTeam.TeamType.GROUP) + .withUpdatedBy("admin") + .withUpdatedAt(System.currentTimeMillis())); + org.openmetadata.schema.entity.teams.Team team2 = + teamRepository.create( + null, + new org.openmetadata.schema.entity.teams.Team() + .withId(java.util.UUID.randomUUID()) + .withName("om-manual-team-2-" + suffix) + .withTeamType(org.openmetadata.schema.api.teams.CreateTeam.TeamType.GROUP) + .withUpdatedBy("admin") + .withUpdatedAt(System.currentTimeMillis())); + org.openmetadata.schema.entity.teams.Team team3 = + teamRepository.create( + null, + new org.openmetadata.schema.entity.teams.Team() + .withId(java.util.UUID.randomUUID()) + .withName("om-manual-team-3-" + suffix) + .withTeamType(org.openmetadata.schema.api.teams.CreateTeam.TeamType.GROUP) + .withUpdatedBy("admin") + .withUpdatedAt(System.currentTimeMillis())); + + // Manually assign the three OM teams to the LDAP user (admin-curated membership). + UserRepository userRepository = + (UserRepository) + org.openmetadata.service.Entity.getEntityRepository( + org.openmetadata.service.Entity.USER); + User userBeforeAssign = + userRepository.getByEmail( + null, TEST_USER_EMAIL, userRepository.getFieldsWithUserAuth("*")); + userBeforeAssign.setTeams( + java.util.List.of( + team1.getEntityReference(), team2.getEntityReference(), team3.getEntityReference())); + userRepository.createOrUpdate(null, userBeforeAssign, "admin"); + + // Sanity-check the assignment landed. + User userAfterAssign = + userRepository.getByEmail(null, TEST_USER_EMAIL, userRepository.getFields("teams")); + List assignedTeamNames = + userAfterAssign.getTeams().stream() + .map(org.openmetadata.schema.type.EntityReference::getName) + .sorted() + .toList(); + assertTrue( + assignedTeamNames.containsAll( + java.util.List.of(team1.getName(), team2.getName(), team3.getName())), + "Setup failed: user should have all three manually-assigned teams before LDAP login." + + " Got: " + + assignedTeamNames); + + // Trigger an LDAP login. This drives LdapAuthenticator.checkAndCreateUser through the + // PUT path that previously fetched only "id,name,email,roles" and let + // UserUpdater.updateTeams clobber the manual team assignments on every login. + LoginRequest relogin = new LoginRequest(); + relogin.setEmail(TEST_USER_EMAIL); + relogin.setPassword(Base64.getEncoder().encodeToString(TEST_USER_PASSWORD.getBytes())); + Response reloginResponse = + client + .target(getServerUrl() + AUTH_LOGIN_ENDPOINT) + .request(MediaType.APPLICATION_JSON) + .post(Entity.json(relogin)); + assertEquals(200, reloginResponse.getStatus(), "LDAP login should succeed"); + + // After the LDAP login the user MUST still belong to all three manually-assigned teams. + User userAfterLogin = + userRepository.getByEmail(null, TEST_USER_EMAIL, userRepository.getFields("teams")); + List teamsAfterLogin = + userAfterLogin.getTeams().stream() + .map(org.openmetadata.schema.type.EntityReference::getName) + .sorted() + .toList(); + assertTrue( + teamsAfterLogin.containsAll( + java.util.List.of(team1.getName(), team2.getName(), team3.getName())), + "LDAP login wiped the user's manual team assignments. Expected to still have " + + java.util.List.of(team1.getName(), team2.getName(), team3.getName()) + + " but got " + + teamsAfterLogin); + } + @Test @Order(9) void testMultipleLoginAttempts() throws Exception { From af38d8e903149b54ed9b76fd7778b08e8fa052f2 Mon Sep 17 00:00:00 2001 From: Joao Amaral <7281460+joaopamaral@users.noreply.github.com> Date: Thu, 14 May 2026 15:08:33 -0300 Subject: [PATCH 5/7] refactor(ldap): use getPutFields() instead of getFieldsWithUserAuth("*") The previous fix passed `getFieldsWithUserAuth("*")` to `getByEmail`, which returns the full allowed-field set and so triggers fetching heavy User relationships like `owns` and `follows` on every LDAP login. `UserRepository.setFields` runs `getOwns(user)` / `getFollows(user)` whenever those fields are requested, each of which is a DB query proportional to the user's owned/followed entity count. Switch to `userRepository.getPutFields()`, the cached `Fields` object built from `USER_UPDATE_FIELDS`: profile, roles, teams, authenticationMechanism, isEmailVerified, personas, defaultPersona, domains, personaPreferences. That is exactly the set `UserUpdater.entitySpecificUpdate` mutates on the PUT path, so the updater no longer wipes relationships it can't see, and we no longer over-fetch unrelated User relationships on every LDAP login. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../security/auth/LdapAuthenticator.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/LdapAuthenticator.java b/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/LdapAuthenticator.java index 8799795c644c..80ed137c721a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/LdapAuthenticator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/LdapAuthenticator.java @@ -162,15 +162,16 @@ private User checkAndCreateUser(String userDn, String email, String userName) th // Check if the user exists in OM Database try { - // Load all updater-tracked fields (teams, personas, profile, defaultPersona, domains, - // personaPreferences, authenticationMechanism, isEmailVerified) so the subsequent PUT - // in UserUtil.addOrUpdateUser doesn't clobber them. UserUpdater.entitySpecificUpdate - // runs updateTeams / updatePersonas / etc. unconditionally; with a sparse fetch those - // fields arrive null and the updater wipes the corresponding relationships, which - // destroys the user's manually-assigned teams on every LDAP login and makes login slow - // (deleteTo does work proportional to the user's existing team count). - User omUser = - userRepository.getByEmail(null, email, userRepository.getFieldsWithUserAuth("*")); + // Load the same field set the PUT path uses (USER_UPDATE_FIELDS): roles, teams, + // profile, authenticationMechanism, isEmailVerified, personas, defaultPersona, + // domains, personaPreferences. UserUpdater.entitySpecificUpdate runs updateTeams / + // updatePersonas / etc. unconditionally; with a sparse fetch those fields arrive + // null and the updater wipes the corresponding relationships, which destroys the + // user's manually-assigned teams on every LDAP login and makes login slow (deleteTo + // does work proportional to the user's existing team count). Use getPutFields() + // (narrow) rather than getFieldsWithUserAuth("*") (wide) so we don't eagerly load + // the heavy owns/follows relationship sets on every login. + User omUser = userRepository.getByEmail(null, email, userRepository.getPutFields()); getRoleForLdap(userDn, omUser, Boolean.TRUE); finalUser = omUser; } catch (EntityNotFoundException ex) { From 7fd62f825087dedec3a9067f96e5377dfb3f9ed8 Mon Sep 17 00:00:00 2001 From: Joao Amaral <7281460+joaopamaral@users.noreply.github.com> Date: Thu, 14 May 2026 15:58:08 -0300 Subject: [PATCH 6/7] fix(bots): skip bot upsert when nothing changed to stop team-strip + reindex loop on boot MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BotResource.initialize() runs UserUtil.addOrUpdateBotUser(user) for every bot on every OM boot. The in-memory User built by UserUtil.user(...) does not have the teams field populated, so the PUT path through userRepository.createOrUpdate -> UserUpdater.entitySpecificUpdate runs updateTeams(original, updated) with original.teams = [Organization] (or the bot's real stored teams) and updated.teams = null. updateTeams then executes deleteTo(user, HAS, TEAM) + assignTeams(null), which strips every stored team membership the bot had, bumps the user version, and triggers an ES reindex of both the user and each affected team. With several bots this fires on every restart and produces the reindex storm plus "Circular dependency detected in team hierarchy for team: Organization" warnings we see in the boot logs. Short-circuit when the incoming bot has no real change vs. the persisted row: compare description, displayName, and roles. If they all match, return the original user and skip the PUT entirely — no UserUpdater, no team strip, no version bump, no reindex. Two adjustments to make the guard actually fire: - retrieveWithAuthMechanism now also loads "roles" (was loading only "authenticationMechanism"); description and displayName are scalar JSON-column fields and were already populated by the base read. - Compare roles via listOrEmpty(...) on both sides because the database-loaded original returns an empty list while the freshly built in-memory user returns null, and Objects.equals(null, []) is false. Adds UserUtilBotTest.addOrUpdateBotUserDoesNotStripTeams as a regression test. The test creates a real (non-Organization) team, assigns the bot to it, calls addOrUpdateBotUser with a fresh User object that does not carry the teams field (matching what BotResource.initialize passes on boot), then asserts the bot is still in the real team. Before this fix the assertion fails with "Got: [Organization]" — the real team has been wiped and only the virtual default remains. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openmetadata/service/util/UserUtil.java | 19 ++- .../service/util/UserUtilBotTest.java | 117 ++++++++++++++++++ 2 files changed, 135 insertions(+), 1 deletion(-) create mode 100644 openmetadata-service/src/test/java/org/openmetadata/service/util/UserUtilBotTest.java diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/UserUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/UserUtil.java index 2618adf9b45c..c8d2c4a96d84 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/UserUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/UserUtil.java @@ -336,6 +336,19 @@ public static boolean assignTeamsFromClaim(User user, List teamNames) { */ public static User addOrUpdateBotUser(User user) { User originalUser = retrieveWithAuthMechanism(user); + // Short-circuit when the incoming bot user has no real change vs. what's already in the + // database. Without this guard every OM boot calls into addOrUpdateUser -> + // userRepository.createOrUpdate, and UserUpdater.entitySpecificUpdate then runs + // updateTeams/updateRoles/etc. with the incoming `user.getTeams() == null`, which strips + // the bot's stored team relationships, bumps the version, and triggers an Elasticsearch + // reindex of the bot user (and any team membership change ripples into the team_search + // index too). With many bots this is a reindex storm on every restart. + if (originalUser != null + && Objects.equals(listOrEmpty(originalUser.getRoles()), listOrEmpty(user.getRoles())) + && Objects.equals(originalUser.getDescription(), user.getDescription()) + && Objects.equals(originalUser.getDisplayName(), user.getDisplayName())) { + return originalUser; + } AuthenticationMechanism authMechanism = originalUser != null ? originalUser.getAuthenticationMechanism() : null; // the user did not have an auth mechanism and auth config is present @@ -366,8 +379,12 @@ private static User retrieveWithAuthMechanism(User user) { EntityRepository userRepository = (UserRepository) Entity.getEntityRepository(Entity.USER); try { + // Include "roles" so the no-op short-circuit in addOrUpdateBotUser can compare it + // against the incoming user. description and displayName are scalar fields stored in + // the entity JSON column, so they're already populated by the base read without an + // extra field fetch. return userRepository.getByName( - null, user.getName(), new Fields(Set.of("authenticationMechanism"))); + null, user.getName(), new Fields(Set.of("authenticationMechanism", "roles"))); } catch (EntityNotFoundException e) { LOG.debug("Bot entity: {} does not exists.", user); return null; diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/util/UserUtilBotTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/util/UserUtilBotTest.java new file mode 100644 index 000000000000..cac9e5e8cb3b --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/util/UserUtilBotTest.java @@ -0,0 +1,117 @@ +/* + * Copyright 2026 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.service.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.openmetadata.schema.entity.teams.User; +import org.openmetadata.schema.type.EntityReference; +import org.openmetadata.service.Entity; +import org.openmetadata.service.OpenMetadataApplicationTest; +import org.openmetadata.service.jdbi3.UserRepository; + +/** + * Regression coverage for the boot-time team-strip loop on bot users. + * + *

Bots are persisted with {@code teams = [Organization]} (the default parent every user gets + * when no explicit team is given). On every OM boot, {@code UserUtil.addOrUpdateBotUser} was + * called with an in-memory {@link User} that did not carry the existing {@code teams} field. The + * PUT path on {@code userRepository.createOrUpdate} then ran {@code UserUpdater.updateTeams}, + * which executed {@code deleteTo + assignTeams(null)} and wiped the bot's team membership. The + * change description recorded + * {@code fieldsDeleted=[FieldChange[name=teams, oldValue=[Organization], newValue=null]]} on + * every boot, each one triggering an Elasticsearch reindex of the bot user. + */ +@Slf4j +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class UserUtilBotTest extends OpenMetadataApplicationTest { + + @Test + void addOrUpdateBotUserDoesNotStripTeams() throws Exception { + UserRepository userRepository = + (UserRepository) Entity.getEntityRepository(Entity.USER); + + String suffix = String.valueOf(System.currentTimeMillis()); + String botName = "bot-team-strip-regression-" + suffix; + String email = botName + "@open-metadata.org"; + + // Create a real (non-Organization) team and assign the bot to it. Organization is a + // virtual default that UserRepository.getTeams adds back whenever there are zero stored + // relationships, so a test that only checks for Organization can't tell whether the + // upsert wiped real team membership. With this real team, a wipe is observable. + org.openmetadata.service.jdbi3.TeamRepository teamRepository = + (org.openmetadata.service.jdbi3.TeamRepository) + Entity.getEntityRepository(Entity.TEAM); + org.openmetadata.schema.entity.teams.Team realTeam = + teamRepository.create( + null, + new org.openmetadata.schema.entity.teams.Team() + .withId(UUID.randomUUID()) + .withName("bot-strip-team-" + suffix) + .withTeamType(org.openmetadata.schema.api.teams.CreateTeam.TeamType.GROUP) + .withUpdatedBy("admin") + .withUpdatedAt(System.currentTimeMillis())); + + // Build the initial bot the same way the boot path does: via UserUtil.user(...). Both + // initial create and the simulated boot upsert below construct the in-memory User this + // way, so scalar fields (displayName, description) line up and the short-circuit guard + // can actually fire on the second pass. Override teams with the real team for the + // initial create only. + User initialBot = + UserUtil.user(botName, "open-metadata.org", botName) + .withIsBot(true) + .withTeams(new java.util.ArrayList<>(List.of(realTeam.getEntityReference()))); + User stored = userRepository.create(null, initialBot); + + // Sanity-check the bot really is in the real team. + User beforeBoot = + userRepository.getByName(null, botName, userRepository.getFields("teams")); + List teamsBefore = beforeBoot.getTeams(); + assertNotNull(teamsBefore, "Setup failed: bot should have at least one team"); + assertTrue( + teamsBefore.stream().anyMatch(t -> realTeam.getName().equals(t.getName())), + "Setup failed: bot should be in real team before upsert. Got: " + teamsBefore); + + // Simulate the bootstrap path: a fresh User object describing the same bot, without the + // teams field populated. This is exactly what UserUtil.addOrUpdateBotUser is called with + // on every OM boot — the in-memory User from configuration carries id/name/email and a + // few scalar fields, but never teams. + // Build the in-memory User the same way BotResource.initialize does on boot: + // UserUtil.user(name, domain, updatedBy).withIsBot(true). Note that the result does NOT + // have `teams` populated. + User boundary = UserUtil.user(botName, "open-metadata.org", botName).withIsBot(true); + + // The call under test. + UserUtil.addOrUpdateBotUser(boundary); + + // After the boot upsert the bot MUST still be a member of the Organization team. With the + // bug present, this fails with teams=null (or empty), exactly matching the production logs + // we observed (fieldsDeleted=[teams: oldValue=[Organization], newValue=null]). + User afterBoot = + userRepository.getByName(null, botName, userRepository.getFields("teams")); + List teamsAfter = afterBoot.getTeams(); + assertNotNull( + teamsAfter, + "Boot upsert stripped the bot's teams (teams field is null after addOrUpdateBotUser)"); + assertTrue( + teamsAfter.stream().anyMatch(t -> realTeam.getName().equals(t.getName())), + "Boot upsert removed the bot's real team membership. Got: " + teamsAfter); + } +} From 3aee2762620292225e69c89702aa3984a4cc0f4b Mon Sep 17 00:00:00 2001 From: Joao Amaral <7281460+joaopamaral@users.noreply.github.com> Date: Fri, 15 May 2026 09:05:59 -0300 Subject: [PATCH 7/7] fix(mcp): resolve semantic_search vector service via SearchRepository abstraction SemanticSearchTool hard-coded OpenSearchVectorService.getInstance(), which returns null when the active vector service is ElasticSearchVectorService (i.e. when the backend is Elasticsearch). Every MCP semantic_search call against an ES-backed deployment then failed with "Vector search service is not initialized" even though ElasticSearchVectorService was initialized at boot and the REST endpoint /api/v1/search/vector/query worked. Switch to Entity.getSearchRepository().getVectorIndexService(), the same abstraction VectorSearchResource already uses. The returned VectorIndexService impl is whichever was registered at boot (ElasticSearchVectorService for ES, OpenSearchVectorService for OS), and both implement the search(query, filters, size, k, threshold) signature this tool calls. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../org/openmetadata/mcp/tools/SemanticSearchTool.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/SemanticSearchTool.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/SemanticSearchTool.java index a243d483acad..6dd3b2c7ffef 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/SemanticSearchTool.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/SemanticSearchTool.java @@ -10,7 +10,7 @@ import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.service.Entity; import org.openmetadata.service.limits.Limits; -import org.openmetadata.service.search.vector.OpenSearchVectorService; +import org.openmetadata.service.search.vector.VectorIndexService; import org.openmetadata.service.search.vector.utils.DTOs.VectorSearchResponse; import org.openmetadata.service.security.Authorizer; import org.openmetadata.service.security.auth.CatalogSecurityContext; @@ -42,7 +42,12 @@ public Map execute( "Semantic search is not enabled. Configure vector embeddings in the OpenMetadata server settings."); } - OpenSearchVectorService vectorService = OpenSearchVectorService.getInstance(); + // Resolve via the SearchRepository abstraction so this tool works on both backends. + // Direct OpenSearchVectorService.getInstance() returned null when the active vector + // service was ElasticSearchVectorService (ES backend), causing every MCP + // semantic_search call to fail with "Vector search service is not initialized" even + // though the REST endpoint /api/v1/search/vector/query worked. + VectorIndexService vectorService = Entity.getSearchRepository().getVectorIndexService(); if (vectorService == null) { return errorResponse("Vector search service is not initialized"); }