diff --git a/build.gradle b/build.gradle index 7d673dc145717..7ca3acc2721a4 100644 --- a/build.gradle +++ b/build.gradle @@ -35,6 +35,7 @@ project.ext.externalDependency = [ 'ebean': 'io.ebean:ebean:11.33.3', 'ebeanAgent': 'io.ebean:ebean-agent:11.27.1', 'elasticSearchRest': 'org.elasticsearch.client:elasticsearch-rest-high-level-client:5.6.8', + 'elasticSearchTransport': 'org.elasticsearch.client:transport:5.6.8', 'findbugsAnnotations': 'com.google.code.findbugs:annotations:3.0.1', 'guice': 'com.google.inject:guice:4.2.2', 'guava': 'com.google.guava:guava:27.0.1-jre', @@ -62,6 +63,7 @@ project.ext.externalDependency = [ 'parseqTest': 'com.linkedin.parseq:parseq:3.0.7:test', 'playDocs': 'com.typesafe.play:play-docs_2.11:2.6.18', 'playGuice': 'com.typesafe.play:play-guice_2.11:2.6.18', + 'playJavaJdbc': 'com.typesafe.play:play-java-jdbc_2.11:2.6.18', 'playTest': 'com.typesafe.play:play-test_2.11:2.6.18', 'postgresql': 'org.postgresql:postgresql:42.2.14', 'reflections': 'org.reflections:reflections:0.9.11', @@ -70,6 +72,7 @@ project.ext.externalDependency = [ 'springBeans': 'org.springframework:spring-beans:5.2.3.RELEASE', 'springContext': 'org.springframework:spring-context:5.2.3.RELEASE', 'springCore': 'org.springframework:spring-core:5.2.3.RELEASE', + 'springJdbc': 'org.springframework:spring-jdbc:5.2.3.RELEASE', 'springWeb': 'org.springframework:spring-web:5.2.3.RELEASE', 'springBootAutoconfigure': 'org.springframework.boot:spring-boot-autoconfigure:2.1.2.RELEASE', 'springBootStarterWeb': 'org.springframework.boot:spring-boot-starter-web:2.1.2.RELEASE', diff --git a/gms/impl/src/main/java/com/linkedin/metadata/resources/dataset/DownstreamLineageResource.java b/gms/impl/src/main/java/com/linkedin/metadata/resources/dataset/DownstreamLineageResource.java index a1bb2e95db66c..6c6e7cb5b38fe 100644 --- a/gms/impl/src/main/java/com/linkedin/metadata/resources/dataset/DownstreamLineageResource.java +++ b/gms/impl/src/main/java/com/linkedin/metadata/resources/dataset/DownstreamLineageResource.java @@ -9,7 +9,6 @@ import com.linkedin.dataset.UpstreamLineage; import com.linkedin.metadata.dao.BaseLocalDAO; import com.linkedin.metadata.dao.BaseQueryDAO; -import com.linkedin.metadata.dao.utils.SearchUtils; import com.linkedin.metadata.entity.DatasetEntity; import com.linkedin.metadata.query.CriterionArray; import com.linkedin.metadata.query.Filter; @@ -62,7 +61,7 @@ public DownstreamLineageResource() { @RestMethod.Get public Task get(@PathKeysParam @Nonnull PathKeys keys) { final DatasetUrn datasetUrn = getUrn(keys); - final Filter filter = SearchUtils.getFilter(Collections.singletonMap("upstreams", datasetUrn.toString())); + final Filter filter = newFilter(Collections.singletonMap("upstreams", datasetUrn.toString())); return RestliUtils.toTask(() -> { final List downstreamDatasets = _queryDAO @@ -97,4 +96,4 @@ private DatasetUrn getUrn(@PathKeysParam @Nonnull PathKeys keys) { DatasetKey key = keys.>get(DATASET_KEY).getKey(); return new DatasetUrn(key.getPlatform(), key.getName(), key.getOrigin()); } -} \ No newline at end of file +} diff --git a/metadata-builders/src/test/java/com/linkedin/metadata/builders/search/CorpGroupIndexBuilderTest.java b/metadata-builders/src/test/java/com/linkedin/metadata/builders/search/CorpGroupIndexBuilderTest.java index 475581d2cb977..cc4a1d8a06463 100644 --- a/metadata-builders/src/test/java/com/linkedin/metadata/builders/search/CorpGroupIndexBuilderTest.java +++ b/metadata-builders/src/test/java/com/linkedin/metadata/builders/search/CorpGroupIndexBuilderTest.java @@ -48,4 +48,4 @@ public void testGetDocumentsToUpdateFromCorpGroupSnapshot() { assertEquals(actualDocs.get(0).getGroups(), Collections.singletonList(groupName)); assertEquals(actualDocs.get(0).getEmail(), email); } -} \ No newline at end of file +} diff --git a/metadata-builders/src/test/java/com/linkedin/metadata/builders/search/CorpUserInfoIndexBuilderTest.java b/metadata-builders/src/test/java/com/linkedin/metadata/builders/search/CorpUserInfoIndexBuilderTest.java index 94c63f2842173..413aed2444fd8 100644 --- a/metadata-builders/src/test/java/com/linkedin/metadata/builders/search/CorpUserInfoIndexBuilderTest.java +++ b/metadata-builders/src/test/java/com/linkedin/metadata/builders/search/CorpUserInfoIndexBuilderTest.java @@ -49,4 +49,4 @@ public void testGetDocumentsToUpdateFromCorpUserSnapshot() { assertEquals(actualDocs.get(2).getSkills(), Collections.emptyList()); assertEquals(actualDocs.get(2).getTeams(), Arrays.asList("team1", "team2")); } -} \ No newline at end of file +} diff --git a/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java b/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java index 77cb52eaca1f1..9abcc0d5d506a 100644 --- a/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java +++ b/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java @@ -9,10 +9,15 @@ import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer; import com.linkedin.metadata.dao.retention.TimeBasedRetention; import com.linkedin.metadata.dao.retention.VersionBasedRetention; +import com.linkedin.metadata.dao.storage.LocalDAOStorageConfig; import com.linkedin.metadata.dao.utils.ModelUtils; import com.linkedin.metadata.dao.utils.RecordUtils; import com.linkedin.metadata.query.ExtraInfo; import com.linkedin.metadata.query.ExtraInfoArray; +import com.linkedin.metadata.query.IndexCriterion; +import com.linkedin.metadata.query.IndexCriterionArray; +import com.linkedin.metadata.query.IndexFilter; +import com.linkedin.metadata.query.IndexValue; import com.linkedin.metadata.query.ListResultMetadata; import io.ebean.DuplicateKeyException; import io.ebean.EbeanServer; @@ -25,8 +30,10 @@ import java.net.URISyntaxException; import java.sql.Timestamp; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Function; @@ -35,8 +42,10 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.persistence.RollbackException; +import lombok.Value; import static com.linkedin.metadata.dao.EbeanMetadataAspect.*; +import static com.linkedin.metadata.dao.utils.RegisteredUrnPathExtractors.*; /** @@ -46,18 +55,52 @@ public class EbeanLocalDAO extends BaseLocalDAO { private static final String EBEAN_MODEL_PACKAGE = EbeanMetadataAspect.class.getPackage().getName(); + private static final String EBEAN_INDEX_PACKAGE = EbeanMetadataIndex.class.getPackage().getName(); protected final EbeanServer _server; + @Value + static class GMAIndexPair { + public String valueType; + public Object value; + } + + /** + * Constructor for EbeanLocalDAO + * + * @param aspectUnionClass containing union of all supported aspects. Must be a valid aspect union defined in com.linkedin.metadata.aspect + * @param producer {@link BaseMetadataEventProducer} for the metadata event producer + * @param serverConfig {@link ServerConfig} that defines the configuration of EbeanServer instances + */ public EbeanLocalDAO(@Nonnull Class aspectUnionClass, @Nonnull BaseMetadataEventProducer producer, @Nonnull ServerConfig serverConfig) { super(aspectUnionClass, producer); + _server = createServer(serverConfig); + } + + /** + * Constructor for EbeanLocalDAO + * + * @param producer {@link BaseMetadataEventProducer} for the metadata event producer + * @param serverConfig {@link ServerConfig} that defines the configuration of EbeanServer instances + * @param storageConfig {@link LocalDAOStorageConfig} containing storage config of full list of supported aspects + */ + public EbeanLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull ServerConfig serverConfig, + @Nonnull LocalDAOStorageConfig storageConfig) { + super(producer, storageConfig); + _server = createServer(serverConfig); + } + @Nonnull + private EbeanServer createServer(@Nonnull ServerConfig serverConfig) { // Make sure that the serverConfig includes the package that contains DAO's Ebean model. if (!serverConfig.getPackages().contains(EBEAN_MODEL_PACKAGE)) { serverConfig.getPackages().add(EBEAN_MODEL_PACKAGE); } - _server = EbeanServerFactory.create(serverConfig); + if (!serverConfig.getPackages().contains(EBEAN_INDEX_PACKAGE)) { + serverConfig.getPackages().add(EBEAN_INDEX_PACKAGE); + } + return EbeanServerFactory.create(serverConfig); } // For testing purpose @@ -67,6 +110,12 @@ public EbeanLocalDAO(@Nonnull Class aspectUnionClass, @Nonnull Bas _server = server; } + // For testing purpose + EbeanLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull EbeanServer server, @Nonnull LocalDAOStorageConfig storageConfig) { + super(producer, storageConfig); + _server = server; + } + /** * Return the {@link EbeanServer} server instance used for customized queries. */ @@ -155,6 +204,18 @@ protected long saveLatest(@Nonnull URN urn, @Non return largestVersion; } + @Override + protected void saveToLocalSecondaryIndex(@Nonnull URN urn, + @Nonnull ASPECT newValue, long version) { + + // Process and save URN + // Only do this with the first version of each aspect + if (version == FIRST_VERSION) { + processAndSaveUrnToLocalSecondaryIndex(urn); + } + processAndSaveAspectToLocalSecondaryIndex(urn, newValue); + } + @Override @Nullable protected AspectEntry getLatest(@Nonnull URN urn, @@ -192,6 +253,47 @@ protected void save(@Nonnull URN urn, @Nonnull RecordTemplate value, @Nonnull Au } } + protected long saveSingleRecordToLocalSecondaryIndex(@Nonnull URN urn, @Nonnull String aspect, + @Nonnull String path, @Nonnull Object value) { + + final EbeanMetadataIndex record = new EbeanMetadataIndex() + .setUrn(urn.toString()) + .setAspect(aspect) + .setPath(path); + if (value instanceof Integer || value instanceof Long) { + record.setLongVal(Long.valueOf(value.toString())); + } else if (value instanceof Float || value instanceof Double) { + record.setDoubleVal(Double.valueOf(value.toString())); + } else { + record.setStringVal(value.toString()); + } + + _server.insert(record); + return record.getId(); + } + + @Nonnull + Map, LocalDAOStorageConfig.AspectStorageConfig> getStrongConsistentIndexPaths() { + return Collections.unmodifiableMap(new HashMap<>(_storageConfig.getAspectStorageConfigMap())); + } + + protected void processAndSaveUrnToLocalSecondaryIndex(@Nonnull URN urn) { + if (existsInLocalSecondaryIndex(urn)) { + return; + } + + final Map pathValueMap = getUrnPathExtractor(urn.getClass()).extractPaths(urn); + pathValueMap.forEach( + (path, value) -> saveSingleRecordToLocalSecondaryIndex(urn, urn.getClass().getCanonicalName(), path, value) + ); + } + + // TODO: Will be implemented later + protected void processAndSaveAspectToLocalSecondaryIndex(@Nonnull URN urn, + @Nullable ASPECT newValue) { + + } + @Override protected long getNextVersion(@Nonnull URN urn, @Nonnull Class aspectClass) { @@ -248,6 +350,12 @@ protected void applyTimeBasedRetention(@Nonnull .map(record -> toRecordTemplate(key.getAspectClass(), record)))); } + public boolean existsInLocalSecondaryIndex(@Nonnull URN urn) { + return _server.find(EbeanMetadataIndex.class) + .where().eq(URN_COLUMN, urn.toString()) + .exists(); + } + @Nonnull private List batchGet(@Nonnull Set> keys) { @@ -379,8 +487,7 @@ public ListResult list(@Nonnull Class ASPECT toRecordTemplate(@Nonnull Class aspectClass, @Nonnull EbeanMetadataAspect aspect) { @@ -396,8 +513,8 @@ private static ASPECT toRecordTemplate(@Nonnull @Nonnull private ListResult toListResult(@Nonnull List values, @Nullable ListResultMetadata listResultMetadata, - @Nonnull PagedList pagedList, int start) { - final int nextStart = pagedList.hasNext() ? start + pagedList.getList().size() : ListResult.INVALID_NEXT_START; + @Nonnull PagedList pagedList, @Nullable Integer start) { + final int nextStart = (start != null && pagedList.hasNext()) ? start.intValue() + pagedList.getList().size() : ListResult.INVALID_NEXT_START; return ListResult.builder() // Format .values(values) @@ -460,4 +577,82 @@ public long newNumericId(@Nonnull String namespace, int maxTransactionRetry) { return id; }, maxTransactionRetry).getId(); } + + @Nonnull + static GMAIndexPair getGMAIndexPair(@Nonnull IndexValue indexValue) { + final Object object; + if (indexValue.isBoolean()) { + object = indexValue.getBoolean().toString(); + return new GMAIndexPair(EbeanMetadataIndex.STRING_COLUMN, object); + } else if (indexValue.isDouble()) { + object = indexValue.getDouble(); + return new GMAIndexPair(EbeanMetadataIndex.DOUBLE_COLUMN, object); + } else if (indexValue.isFloat()) { + object = indexValue.getFloat(); + return new GMAIndexPair(EbeanMetadataIndex.DOUBLE_COLUMN, object); + } else if (indexValue.isInt()) { + object = indexValue.getInt(); + return new GMAIndexPair(EbeanMetadataIndex.LONG_COLUMN, object); + } else if (indexValue.isLong()) { + object = indexValue.getLong(); + return new GMAIndexPair(EbeanMetadataIndex.LONG_COLUMN, object); + } else if (indexValue.isString()) { + object = indexValue.getString(); + return new GMAIndexPair(EbeanMetadataIndex.STRING_COLUMN, object); + } else { + throw new IllegalArgumentException("Invalid index value " + indexValue); + } + } + + /** + * Returns list of urns from strongly consistent secondary index that satisfy the given filter conditions. + * Results are sorted in increasing alphabetical order of urn. + * NOTE: Currently this works for only one filter condition + * TODO: Extend the support for multiple filter conditions + * + * @param indexFilter {@link IndexFilter} containing filter conditions to be applied + * @param lastUrn last urn of the previous fetched page. This eliminates the need to use offset which + * is known to slow down performance of MySQL queries. For the first page, this should be set as NULL + * @param pageSize maximum number of distinct urns to return + * @return {@link ListResult} of urns from strongly consistent secondary index that satisfy the given filter conditions + */ + @Override + @Nonnull + public ListResult listUrns(@Nonnull IndexFilter indexFilter, @Nullable URN lastUrn, int pageSize) { + if (!isLocalSecondaryIndexEnabled()) { + throw new UnsupportedOperationException("Local secondary index isn't supported by EbeanLocalDAO"); + } + final IndexCriterionArray indexCriterionArray = indexFilter.getCriteria(); + if (indexCriterionArray.size() == 0) { + throw new UnsupportedOperationException("Empty Index Filter is not supported by EbeanLocalDAO"); + } + if (indexCriterionArray.size() > 1) { + throw new UnsupportedOperationException("Currently only one filter condition is supported by EbeanLocalDAO"); + } + + final IndexCriterion criterion = indexCriterionArray.get(0); + ExpressionList expressionList = _server.find(EbeanMetadataIndex.class) + .setDistinct(true) + .select(EbeanMetadataIndex.URN_COLUMN) + .where() + .gt(EbeanMetadataIndex.URN_COLUMN, lastUrn == null ? "" : lastUrn.toString()) + .eq(EbeanMetadataIndex.ASPECT_COLUMN, criterion.getAspect()); + if (criterion.hasPathParams()) { + final GMAIndexPair gmaIndexPair = getGMAIndexPair(criterion.getPathParams().getValue()); + expressionList = expressionList + .eq(EbeanMetadataIndex.PATH_COLUMN, criterion.getPathParams().getPath()) + .eq(gmaIndexPair.valueType, gmaIndexPair.value); + } + final PagedList pagedList = expressionList + .orderBy() + .asc(EbeanMetadataIndex.URN_COLUMN) + .setMaxRows(pageSize) + .findPagedList(); + final List urns = pagedList.getList() + .stream() + .map(EbeanLocalDAO::extractUrn) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + return toListResult(urns, null, pagedList, null); + } } diff --git a/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanMetadataIndex.java b/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanMetadataIndex.java index 0d81cbb509ea9..bb7100760408e 100644 --- a/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanMetadataIndex.java +++ b/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanMetadataIndex.java @@ -11,6 +11,7 @@ import lombok.Getter; import lombok.NonNull; import lombok.Setter; +import lombok.experimental.Accessors; @Getter @@ -35,6 +36,7 @@ EbeanMetadataIndex.URN_COLUMN }) @Entity +@Accessors(chain = true) @Table(name = "metadata_index") public class EbeanMetadataIndex extends Model { @@ -74,5 +76,4 @@ public class EbeanMetadataIndex extends Model { @Column(name = DOUBLE_COLUMN) protected Double doubleVal; - } \ No newline at end of file diff --git a/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/DatasetUrnPathExtractor.java b/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/DatasetUrnPathExtractor.java new file mode 100644 index 0000000000000..452c4e0bcb02e --- /dev/null +++ b/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/DatasetUrnPathExtractor.java @@ -0,0 +1,22 @@ +package com.linkedin.metadata.dao.utils; + +import com.linkedin.common.urn.DatasetUrn; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; + + +public class DatasetUrnPathExtractor implements UrnPathExtractor { + @Override + public Map extractPaths(@Nonnull DatasetUrn urn) { + return Collections.unmodifiableMap(new HashMap() { + { + put("/platform", urn.getPlatformEntity().toString()); + put("/datasetName", urn.getDatasetNameEntity()); + put("/origin", urn.getOriginEntity().toString()); + put("/platform/platformName", urn.getPlatformEntity().getPlatformNameEntity()); + } + }); + } +} diff --git a/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/RegisteredUrnPathExtractors.java b/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/RegisteredUrnPathExtractors.java new file mode 100644 index 0000000000000..47293506e0f22 --- /dev/null +++ b/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/RegisteredUrnPathExtractors.java @@ -0,0 +1,39 @@ +package com.linkedin.metadata.dao.utils; + +import com.linkedin.common.urn.DatasetUrn; +import com.linkedin.common.urn.Urn; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; + + +/** + * A class that holds all the registered {@link UrnPathExtractor}s. + * + * Register new type of urn path extractors by adding them to {@link #REGISTERED_URN_PATH_EXTRACTORS}. + */ +public class RegisteredUrnPathExtractors { + + private static final Map, UrnPathExtractor> REGISTERED_URN_PATH_EXTRACTORS = new HashMap() { + { + put(DatasetUrn.class, new DatasetUrnPathExtractor()); + } + }; + + private RegisteredUrnPathExtractors() { + // Util class + } + + @Nonnull + public static UrnPathExtractor getUrnPathExtractor(@Nonnull Class clazz) { + return REGISTERED_URN_PATH_EXTRACTORS.get(clazz); + } + + // For testing purpose + public static void registerUrnPathExtractor(@Nonnull Class clazz, @Nonnull UrnPathExtractor extractor) { + if (REGISTERED_URN_PATH_EXTRACTORS.containsKey(clazz)) { + throw new RuntimeException("Path extractor for " + clazz.getCanonicalName() + " already registered!"); + } + REGISTERED_URN_PATH_EXTRACTORS.put(clazz, extractor); + } +} diff --git a/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/UrnPathExtractor.java b/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/UrnPathExtractor.java new file mode 100644 index 0000000000000..d743cdbed78ce --- /dev/null +++ b/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/UrnPathExtractor.java @@ -0,0 +1,11 @@ +package com.linkedin.metadata.dao.utils; + +import com.linkedin.common.urn.Urn; +import java.util.Map; +import javax.annotation.Nonnull; + + +public interface UrnPathExtractor { + @Nonnull + Map extractPaths(@Nonnull URN urn); +} diff --git a/metadata-dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java b/metadata-dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java index 0db54156d96aa..1f26fb6ebf774 100644 --- a/metadata-dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java +++ b/metadata-dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java @@ -1,5 +1,7 @@ package com.linkedin.metadata.dao; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.CorpuserUrn; @@ -12,14 +14,26 @@ import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer; import com.linkedin.metadata.dao.retention.TimeBasedRetention; import com.linkedin.metadata.dao.retention.VersionBasedRetention; +import com.linkedin.metadata.dao.storage.LocalDAOStorageConfig; +import com.linkedin.metadata.dao.utils.BarUrnPathExtractor; +import com.linkedin.metadata.dao.utils.BazUrnPathExtractor; +import com.linkedin.metadata.dao.utils.FooUrnPathExtractor; import com.linkedin.metadata.dao.utils.ModelUtils; import com.linkedin.metadata.dao.utils.RecordUtils; import com.linkedin.metadata.query.ExtraInfo; +import com.linkedin.metadata.query.IndexCriterion; +import com.linkedin.metadata.query.IndexCriterionArray; +import com.linkedin.metadata.query.IndexFilter; +import com.linkedin.metadata.query.IndexPathParams; +import com.linkedin.metadata.query.IndexValue; import com.linkedin.metadata.query.ListResultMetadata; import com.linkedin.testing.AspectBar; import com.linkedin.testing.AspectFoo; import com.linkedin.testing.AspectInvalid; import com.linkedin.testing.EntityAspectUnion; +import com.linkedin.testing.urn.BarUrn; +import com.linkedin.testing.urn.BazUrn; +import com.linkedin.testing.urn.FooUrn; import io.ebean.EbeanServer; import io.ebean.EbeanServerFactory; import io.ebean.Transaction; @@ -27,18 +41,21 @@ import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import javax.persistence.RollbackException; import org.mockito.InOrder; +import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import static com.linkedin.metadata.dao.BaseReadDAO.*; +import static com.linkedin.metadata.dao.utils.RegisteredUrnPathExtractors.*; import static com.linkedin.metadata.utils.TestUtils.*; import static com.linkedin.testing.TestUtils.*; import static org.mockito.Mockito.*; @@ -51,6 +68,13 @@ public class EbeanLocalDAOTest { private BaseMetadataEventProducer _mockProducer; private AuditStamp _dummyAuditStamp; + @BeforeClass + public void setupClass() { + registerUrnPathExtractor(BarUrn.class, new BarUrnPathExtractor()); + registerUrnPathExtractor(BazUrn.class, new BazUrnPathExtractor()); + registerUrnPathExtractor(FooUrn.class, new FooUrnPathExtractor()); + } + @BeforeMethod public void setupTest() { _server = EbeanServerFactory.create(EbeanLocalDAO.createTestingH2ServerConfig()); @@ -364,6 +388,7 @@ public void testGetMultipleAspectsForMultipleUrns() { public void testBackfill() { EbeanLocalDAO dao = new EbeanLocalDAO(EntityAspectUnion.class, _mockProducer, _server); Urn urn = makeUrn(1); + AspectFoo expected = new AspectFoo().setValue("foo"); addMetadata(urn, AspectFoo.class.getCanonicalName(), 0, expected); @@ -375,6 +400,82 @@ public void testBackfill() { verifyNoMoreInteractions(_mockProducer); } + @Test + public void testLocalSecondaryIndexBackfill() { + EbeanLocalDAO dao = new EbeanLocalDAO(EntityAspectUnion.class, _mockProducer, _server); + + FooUrn urn = makeFooUrn(1); + AspectFoo expected = new AspectFoo().setValue("foo"); + addMetadata(urn, AspectFoo.class.getCanonicalName(), 0, expected); + + // Check if backfilled: _writeToLocalSecondary = false and _backfillLocalSecondaryIndex = false + dao.backfill(AspectFoo.class, urn); + assertEquals(getAllRecordsFromLocalSecondaryIndex(urn).size(), 0); + + // Check if backfilled: _writeToLocalSecondary = true and _backfillLocalSecondaryIndex = false + dao.enableLocalSecondaryIndex(true); + dao.backfill(AspectFoo.class, urn); + assertEquals(getAllRecordsFromLocalSecondaryIndex(urn).size(), 0); + + // Check if backfilled: _writeToLocalSecondary = true and _backfillLocalSecondaryIndex = true + dao.setBackfillLocalSecondaryIndex(true); + dao.backfill(AspectFoo.class, urn); + List fooRecords = getAllRecordsFromLocalSecondaryIndex(urn); + assertEquals(fooRecords.size(), 1); + EbeanMetadataIndex fooRecord = fooRecords.get(0); + assertEquals(fooRecord.getUrn(), urn.toString()); + assertEquals(fooRecord.getAspect(), FooUrn.class.getCanonicalName()); + assertEquals(fooRecord.getPath(), "/fooId"); + assertEquals(fooRecord.getLongVal().longValue(), 1L); + } + + @Test + public void testBatchBackfill() { + EbeanLocalDAO dao = new EbeanLocalDAO(EntityAspectUnion.class, _mockProducer, _server); + List urns = ImmutableList.of(makeUrn(1), makeUrn(2), makeUrn(3)); + + Map, RecordTemplate>> aspects = new HashMap<>(); + + urns.forEach(urn -> { + AspectFoo aspectFoo = new AspectFoo().setValue("foo"); + AspectBar aspectBar = new AspectBar().setValue("bar"); + aspects.put(urn, ImmutableMap.of(AspectFoo.class, aspectFoo, AspectBar.class, aspectBar)); + addMetadata(urn, AspectFoo.class.getCanonicalName(), 0, aspectFoo); + addMetadata(urn, AspectBar.class.getCanonicalName(), 0, aspectBar); + }); + + // Backfill single aspect for set of urns + Map> backfilledAspects1 = dao.backfill(AspectFoo.class, new HashSet<>(urns)); + for (Urn urn: urns) { + RecordTemplate aspect = aspects.get(urn).get(AspectFoo.class); + assertEquals(backfilledAspects1.get(urn).get(), aspect); + verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, aspect, aspect); + } + clearInvocations(_mockProducer); + + // Backfill set of aspects for a single urn + Map, Optional> backfilledAspects2 = + dao.backfill(ImmutableSet.of(AspectFoo.class, AspectBar.class), urns.get(0)); + for (Class clazz: aspects.get(urns.get(0)).keySet()) { + RecordTemplate aspect = aspects.get(urns.get(0)).get(clazz); + assertEquals(backfilledAspects2.get(clazz).get(), aspect); + verify(_mockProducer, times(1)).produceMetadataAuditEvent(urns.get(0), aspect, aspect); + } + clearInvocations(_mockProducer); + + // Backfill set of aspects for set of urns + Map, Optional>> backfilledAspects3 = + dao.backfill(ImmutableSet.of(AspectFoo.class, AspectBar.class), new HashSet<>(urns)); + for (Urn urn: urns) { + for (Class clazz: aspects.get(urn).keySet()) { + RecordTemplate aspect = aspects.get(urn).get(clazz); + assertEquals(backfilledAspects3.get(urn).get(clazz).get(), aspect); + verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, aspect, aspect); + } + } + verifyNoMoreInteractions(_mockProducer); + } + @Test public void testListVersions() { EbeanLocalDAO dao = new EbeanLocalDAO(EntityAspectUnion.class, _mockProducer, _server); @@ -502,6 +603,28 @@ public void testList() { assertNotNull(results.getMetadata()); } + @Test + void testStrongConsistentIndexPaths() { + // construct LocalDAOStorageConfig object + LocalDAOStorageConfig.PathStorageConfig pathStorageConfig = LocalDAOStorageConfig.PathStorageConfig.builder().strongConsistentSecondaryIndex(true).build(); + Map pathStorageConfigMap = new HashMap<>(); + pathStorageConfigMap.put("/value", pathStorageConfig); + LocalDAOStorageConfig.AspectStorageConfig aspectStorageConfig = + LocalDAOStorageConfig.AspectStorageConfig.builder().pathStorageConfigMap(pathStorageConfigMap).build(); + Map, LocalDAOStorageConfig.AspectStorageConfig> aspectStorageConfigMap = new HashMap<>(); + aspectStorageConfigMap.put(AspectFoo.class, aspectStorageConfig); + LocalDAOStorageConfig storageConfig = LocalDAOStorageConfig.builder().aspectStorageConfigMap(aspectStorageConfigMap).build(); + + EbeanLocalDAO dao = new EbeanLocalDAO(_mockProducer, _server, storageConfig); + Map, LocalDAOStorageConfig.AspectStorageConfig> aspectToPaths = dao.getStrongConsistentIndexPaths(); + + assertNotNull(aspectToPaths); + Set> setAspects = aspectToPaths.keySet(); + assertEquals(setAspects, new HashSet<>(Arrays.asList(AspectFoo.class))); + LocalDAOStorageConfig.AspectStorageConfig config = aspectToPaths.get(AspectFoo.class); + assertTrue(config.getPathStorageConfigMap().get("/value").isStrongConsistentSecondaryIndex()); + } + @Test public void testListAspectsForAllUrns() { EbeanLocalDAO dao = new EbeanLocalDAO(EntityAspectUnion.class, _mockProducer, _server); @@ -579,6 +702,232 @@ void testNewNumericId() { assertEquals(id3, 1); } + @Test + void testSaveSingleEntryToLocalSecondaryIndex() { + EbeanLocalDAO dao = new EbeanLocalDAO(EntityAspectUnion.class, _mockProducer, _server); + BarUrn urn = makeBarUrn(0); + + // Test indexing integer typed value + long recordId = dao.saveSingleRecordToLocalSecondaryIndex(urn, BarUrn.class.getCanonicalName(), "/intFoo", 0); + EbeanMetadataIndex record = getRecordFromLocalSecondaryIndex(recordId); + assertNotNull(record); + assertEquals(record.getUrn(), urn.toString()); + assertEquals(record.getAspect(), BarUrn.class.getCanonicalName()); + assertEquals(record.getPath(), "/intFoo"); + assertEquals(record.getLongVal().longValue(), 0L); + + // Test indexing long typed value + recordId = dao.saveSingleRecordToLocalSecondaryIndex(urn, BarUrn.class.getCanonicalName(), "/longFoo", 1L); + record = getRecordFromLocalSecondaryIndex(recordId); + assertNotNull(record); + assertEquals(record.getUrn(), urn.toString()); + assertEquals(record.getAspect(), BarUrn.class.getCanonicalName()); + assertEquals(record.getPath(), "/longFoo"); + assertEquals(record.getLongVal().longValue(), 1L); + + // Test indexing boolean typed value + recordId = dao.saveSingleRecordToLocalSecondaryIndex(urn, BarUrn.class.getCanonicalName(), "/boolFoo", true); + record = getRecordFromLocalSecondaryIndex(recordId); + assertNotNull(record); + assertEquals(record.getUrn(), urn.toString()); + assertEquals(record.getAspect(), BarUrn.class.getCanonicalName()); + assertEquals(record.getPath(), "/boolFoo"); + assertEquals(record.getStringVal(), "true"); + + // Test indexing float typed value + recordId = dao.saveSingleRecordToLocalSecondaryIndex(urn, BarUrn.class.getCanonicalName(), "/floatFoo", 12.34f); + record = getRecordFromLocalSecondaryIndex(recordId); + assertNotNull(record); + assertEquals(record.getUrn(), urn.toString()); + assertEquals(record.getAspect(), BarUrn.class.getCanonicalName()); + assertEquals(record.getPath(), "/floatFoo"); + assertEquals(record.getDoubleVal(), 12.34); + + // Test indexing double typed value + recordId = dao.saveSingleRecordToLocalSecondaryIndex(urn, BarUrn.class.getCanonicalName(), "/doubleFoo", 23.45); + record = getRecordFromLocalSecondaryIndex(recordId); + assertNotNull(record); + assertEquals(record.getUrn(), urn.toString()); + assertEquals(record.getAspect(), BarUrn.class.getCanonicalName()); + assertEquals(record.getPath(), "/doubleFoo"); + assertEquals(record.getDoubleVal(), 23.45); + + // Test indexing string typed value + recordId = dao.saveSingleRecordToLocalSecondaryIndex(urn, BarUrn.class.getCanonicalName(), "/stringFoo", "valFoo"); + record = getRecordFromLocalSecondaryIndex(recordId); + assertNotNull(record); + assertEquals(record.getUrn(), urn.toString()); + assertEquals(record.getAspect(), BarUrn.class.getCanonicalName()); + assertEquals(record.getPath(), "/stringFoo"); + assertEquals(record.getStringVal(), "valFoo"); + } + + @Test + void testExistsInLocalSecondaryIndex() { + EbeanLocalDAO dao = new EbeanLocalDAO(EntityAspectUnion.class, _mockProducer, _server); + BarUrn urn = makeBarUrn(0); + + assertFalse(dao.existsInLocalSecondaryIndex(urn)); + + dao.saveSingleRecordToLocalSecondaryIndex(urn, BarUrn.class.getCanonicalName(), "/barId", 0); + assertTrue(dao.existsInLocalSecondaryIndex(urn)); + } + + @Test + void testProcessAndSaveUrnToLocalSecondaryIndex() { + EbeanLocalDAO dao = new EbeanLocalDAO(EntityAspectUnion.class, _mockProducer, _server); + BarUrn barUrn = makeBarUrn(1); + BazUrn bazUrn = makeBazUrn(2); + + dao.processAndSaveUrnToLocalSecondaryIndex(barUrn); + dao.processAndSaveUrnToLocalSecondaryIndex(bazUrn); + + List barRecords = getAllRecordsFromLocalSecondaryIndex(barUrn); + assertEquals(barRecords.size(), 1); + EbeanMetadataIndex barRecord = barRecords.get(0); + assertEquals(barRecord.getUrn(), barUrn.toString()); + assertEquals(barRecord.getAspect(), BarUrn.class.getCanonicalName()); + assertEquals(barRecord.getPath(), "/barId"); + assertEquals(barRecord.getLongVal().longValue(), 1L); + + List bazRecords = getAllRecordsFromLocalSecondaryIndex(bazUrn); + assertEquals(bazRecords.size(), 1); + EbeanMetadataIndex bazRecord = bazRecords.get(0); + assertEquals(bazRecord.getUrn(), bazUrn.toString()); + assertEquals(bazRecord.getAspect(), BazUrn.class.getCanonicalName()); + assertEquals(bazRecord.getPath(), "/bazId"); + assertEquals(bazRecord.getLongVal().longValue(), 2L); + + // Test if new record is inserted with an existing urn + dao.processAndSaveUrnToLocalSecondaryIndex(barUrn); + assertEquals(getAllRecordsFromLocalSecondaryIndex(barUrn).size(), 1); + } + + @Test + void testSaveToLocalSecondaryIndex() { + EbeanLocalDAO dao = new EbeanLocalDAO(EntityAspectUnion.class, _mockProducer, _server); + BarUrn urn = makeBarUrn(1); + AspectFoo aspect = new AspectFoo(); + + dao.saveToLocalSecondaryIndex(urn, aspect, 0); + List barRecords = getAllRecordsFromLocalSecondaryIndex(urn); + assertEquals(barRecords.size(), 1); + EbeanMetadataIndex barRecord = barRecords.get(0); + assertEquals(barRecord.getUrn(), urn.toString()); + assertEquals(barRecord.getAspect(), BarUrn.class.getCanonicalName()); + assertEquals(barRecord.getPath(), "/barId"); + assertEquals(barRecord.getLongVal().longValue(), 1L); + + // Test if new record is inserted with an aspect version different than 0 + dao.saveToLocalSecondaryIndex(urn, aspect, 1); + assertEquals(getAllRecordsFromLocalSecondaryIndex(urn).size(), 1); + } + + @Test + void testGetGMAIndexPair() { + IndexValue indexValue = new IndexValue(); + // 1. IndexValue pair corresponds to boolean + indexValue.setBoolean(false); + EbeanLocalDAO.GMAIndexPair gmaIndexPair = EbeanLocalDAO.getGMAIndexPair(indexValue); + assertEquals(EbeanMetadataIndex.STRING_COLUMN, gmaIndexPair.valueType); + assertEquals("false", gmaIndexPair.value); + // 2. IndexValue pair corresponds to double + double dVal = 0.000001; + indexValue.setDouble(dVal); + gmaIndexPair = EbeanLocalDAO.getGMAIndexPair(indexValue); + assertEquals(EbeanMetadataIndex.DOUBLE_COLUMN, gmaIndexPair.valueType); + assertEquals(dVal, gmaIndexPair.value); + // 3. IndexValue pair corresponds to float + float fVal = 0.0001f; + indexValue.setFloat(fVal); + gmaIndexPair = EbeanLocalDAO.getGMAIndexPair(indexValue); + assertEquals(EbeanMetadataIndex.DOUBLE_COLUMN, gmaIndexPair.valueType); + assertEquals(fVal, gmaIndexPair.value); + // 4. IndexValue pair corresponds to int + int iVal = 100; + indexValue.setInt(iVal); + gmaIndexPair = EbeanLocalDAO.getGMAIndexPair(indexValue); + assertEquals(EbeanMetadataIndex.LONG_COLUMN, gmaIndexPair.valueType); + assertEquals(iVal, gmaIndexPair.value); + // 5. IndexValue pair corresponds to long + long lVal = 1L; + indexValue.setLong(lVal); + gmaIndexPair = EbeanLocalDAO.getGMAIndexPair(indexValue); + assertEquals(EbeanMetadataIndex.LONG_COLUMN, gmaIndexPair.valueType); + assertEquals(lVal, gmaIndexPair.value); + // 6/ IndexValue pair corresponds to string + String sVal = "testVal"; + indexValue.setString(sVal); + gmaIndexPair = EbeanLocalDAO.getGMAIndexPair(indexValue); + assertEquals(EbeanMetadataIndex.STRING_COLUMN, gmaIndexPair.valueType); + assertEquals(sVal, gmaIndexPair.value); + } + + @Test + void testListUrnsFromIndex() { + EbeanLocalDAO dao = new EbeanLocalDAO(EntityAspectUnion.class, _mockProducer, _server); + FooUrn urn1 = makeFooUrn(1); + FooUrn urn2 = makeFooUrn(2); + FooUrn urn3 = makeFooUrn(3); + addIndex(urn1, "aspect1", "/path1", "val1"); + addIndex(urn1, "aspect1", "/path2", "val2"); + addIndex(urn1, "aspect1", "/path3", "val3"); + addIndex(urn2, "aspect1", "/path1", "val1"); + addIndex(urn3, "aspect1", "/path1", "val1"); + + // 1. local secondary index is not enabled, should throw exception + IndexCriterion indexCriterion = new IndexCriterion().setAspect("aspect1"); + final IndexFilter indexFilter1 = new IndexFilter().setCriteria(new IndexCriterionArray(indexCriterion)); + dao.enableLocalSecondaryIndex(false); + + assertThrows(UnsupportedOperationException.class, () -> dao.listUrns(indexFilter1, null, 2)); + + // for the remaining tests, enable writes to local secondary index + dao.enableLocalSecondaryIndex(true); + + // 2. index criterion array is empty, should throw exception + final IndexFilter indexFilter2 = new IndexFilter().setCriteria(new IndexCriterionArray()); + + assertThrows(UnsupportedOperationException.class, () -> dao.listUrns(indexFilter2, null, 2)); + + // 3. index criterion array contains more than 1 criterion, should throw an exception + IndexCriterion indexCriterion1 = new IndexCriterion().setAspect("aspect1"); + IndexCriterion indexCriterion2 = new IndexCriterion().setAspect("aspect2"); + final IndexFilter indexFilter3 = new IndexFilter().setCriteria(new IndexCriterionArray(indexCriterion1, indexCriterion2)); + + assertThrows(UnsupportedOperationException.class, () -> dao.listUrns(indexFilter3, null, 2)); + + // 4. only aspect is provided in Index Filter + indexCriterion = new IndexCriterion().setAspect("aspect1"); + final IndexFilter indexFilter4 = new IndexFilter().setCriteria(new IndexCriterionArray(indexCriterion)); + + ListResult urns = dao.listUrns(indexFilter4, null, 2); + + assertEquals(urns.getValues(), Arrays.asList(urn1, urn2)); + assertEquals(urns.getTotalCount(), 3); + + // 5. aspect with path and value is provided in index filter + IndexValue indexValue = new IndexValue(); + indexValue.setString("val1"); + IndexPathParams indexPathParams = new IndexPathParams().setPath("/path1").setValue(indexValue); + indexCriterion = new IndexCriterion().setAspect("aspect1").setPathParams(indexPathParams); + final IndexFilter indexFilter5 = new IndexFilter().setCriteria(new IndexCriterionArray(indexCriterion)); + + urns = dao.listUrns(indexFilter5, urn1, 2); + + assertEquals(urns.getValues(), Arrays.asList(urn2, urn3)); + + // 6. aspect with correct path but incorrect value + indexValue.setString("valX"); + indexPathParams = new IndexPathParams().setPath("/path1").setValue(indexValue); + indexCriterion = new IndexCriterion().setAspect("aspect1").setPathParams(indexPathParams); + final IndexFilter indexFilter6 = new IndexFilter().setCriteria(new IndexCriterionArray(indexCriterion)); + + urns = dao.listUrns(indexFilter6, urn1, 2); + + assertEquals(urns.getTotalCount(), 0); + } + private void addMetadata(Urn urn, String aspectName, long version, RecordTemplate metadata) { EbeanMetadataAspect aspect = new EbeanMetadataAspect(); aspect.setKey(new EbeanMetadataAspect.PrimaryKey(urn.toString(), aspectName, version)); @@ -589,6 +938,25 @@ private void addMetadata(Urn urn, String aspectName, long version, RecordTemplat _server.save(aspect); } + private EbeanMetadataIndex getRecordFromLocalSecondaryIndex(long id) { + return _server.find(EbeanMetadataIndex.class, id); + } + + private List getAllRecordsFromLocalSecondaryIndex(URN urn) { + return _server.find(EbeanMetadataIndex.class).where() + .eq(EbeanMetadataIndex.URN_COLUMN, urn.toString()) + .findList(); + } + + private void addIndex(Urn urn, String aspectName, String pathName, String sVal) { + EbeanMetadataIndex index = new EbeanMetadataIndex(); + index.setUrn(urn.toString()) + .setAspect(aspectName) + .setPath(pathName) + .setStringVal(sVal); + _server.save(index); + } + private EbeanMetadataAspect getMetadata(Urn urn, String aspectName, long version) { return _server.find(EbeanMetadataAspect.class, new EbeanMetadataAspect.PrimaryKey(urn.toString(), aspectName, version)); diff --git a/metadata-dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/BarUrnPathExtractor.java b/metadata-dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/BarUrnPathExtractor.java new file mode 100644 index 0000000000000..23d5d12227631 --- /dev/null +++ b/metadata-dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/BarUrnPathExtractor.java @@ -0,0 +1,19 @@ +package com.linkedin.metadata.dao.utils; + +import com.linkedin.testing.urn.BarUrn; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; + + +public class BarUrnPathExtractor implements UrnPathExtractor { + @Override + public Map extractPaths(@Nonnull BarUrn urn) { + return Collections.unmodifiableMap(new HashMap() { + { + put("/barId", urn.getBarIdEntity()); + } + }); + } +} diff --git a/metadata-dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/BazUrnPathExtractor.java b/metadata-dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/BazUrnPathExtractor.java new file mode 100644 index 0000000000000..cd61a7a967ea2 --- /dev/null +++ b/metadata-dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/BazUrnPathExtractor.java @@ -0,0 +1,19 @@ +package com.linkedin.metadata.dao.utils; + +import com.linkedin.testing.urn.BazUrn; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; + + +public class BazUrnPathExtractor implements UrnPathExtractor { + @Override + public Map extractPaths(@Nonnull BazUrn urn) { + return Collections.unmodifiableMap(new HashMap() { + { + put("/bazId", urn.getBazIdEntity()); + } + }); + } +} diff --git a/metadata-dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/FooUrnPathExtractor.java b/metadata-dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/FooUrnPathExtractor.java new file mode 100644 index 0000000000000..7478798e658f2 --- /dev/null +++ b/metadata-dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/FooUrnPathExtractor.java @@ -0,0 +1,19 @@ +package com.linkedin.metadata.dao.utils; + +import com.linkedin.testing.urn.FooUrn; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; + + +public class FooUrnPathExtractor implements UrnPathExtractor { + @Override + public Map extractPaths(@Nonnull FooUrn urn) { + return Collections.unmodifiableMap(new HashMap() { + { + put("/fooId", urn.getFooIdEntity()); + } + }); + } +} diff --git a/metadata-dao-impl/elasticsearch-dao/build.gradle b/metadata-dao-impl/elasticsearch-dao/build.gradle index 599a3f885fb92..24c06f5765dea 100644 --- a/metadata-dao-impl/elasticsearch-dao/build.gradle +++ b/metadata-dao-impl/elasticsearch-dao/build.gradle @@ -5,6 +5,7 @@ dependencies { compile project(':metadata-models') compile externalDependency.elasticSearchRest + compile externalDependency.elasticSearchTransport compile externalDependency.guava compile externalDependency.lombok compile externalDependency.commonsIo @@ -17,4 +18,4 @@ dependencies { testCompile project(':metadata-testing:metadata-test-utils') testCompile project(':metadata-testing:metadata-test-models') testCompile externalDependency.mockito -} \ No newline at end of file +} diff --git a/metadata-dao-impl/elasticsearch-dao/src/main/java/com/linkedin/metadata/dao/search/ESBulkWriterDAO.java b/metadata-dao-impl/elasticsearch-dao/src/main/java/com/linkedin/metadata/dao/search/ESBulkWriterDAO.java new file mode 100644 index 0000000000000..f6f29c518cf69 --- /dev/null +++ b/metadata-dao-impl/elasticsearch-dao/src/main/java/com/linkedin/metadata/dao/search/ESBulkWriterDAO.java @@ -0,0 +1,75 @@ +package com.linkedin.metadata.dao.search; + +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.metadata.dao.BaseSearchWriterDAO; +import com.linkedin.metadata.dao.utils.RecordUtils; +import javax.annotation.Nonnull; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.xcontent.XContentType; + + +/** + * A {@link BaseSearchWriterDAO} that uses ElasticSearch's bulk update API. + */ +public final class ESBulkWriterDAO extends BaseSearchWriterDAO { + private static final String DEFAULT_DOCUMENT_TYPE = "doc"; + private static final int MAX_RETRIES = 3; + + private final BulkProcessor _bulkProcessor; + private final String _indexName; + private final String _documentType; + + /** + * Constructor. + * + * @param documentClass schema of the class to index + * @param bulkProcessor the bulk process to use to write to ES + * @param indexName the name of the index to write updates to + */ + public ESBulkWriterDAO(@Nonnull Class documentClass, @Nonnull BulkProcessor bulkProcessor, + @Nonnull String indexName) { + this(documentClass, bulkProcessor, indexName, DEFAULT_DOCUMENT_TYPE); + } + + /** + * Constructor. + * + * @param documentClass schema of the class to index + * @param bulkProcessor the bulk process to use to write to ES + * @param indexName the name of the index to write updates to + * @param documentType the type of document + */ + public ESBulkWriterDAO(@Nonnull Class documentClass, @Nonnull BulkProcessor bulkProcessor, + @Nonnull String indexName, @Nonnull String documentType) { + super(documentClass); + _bulkProcessor = bulkProcessor; + _indexName = indexName; + _documentType = documentType; + } + + @Override + public void upsertDocument(@Nonnull DOCUMENT document, @Nonnull String docId) { + final String documentJson = RecordUtils.toJsonString(document); + final IndexRequest indexRequest = + new IndexRequest(_indexName, _documentType, docId).source(documentJson, XContentType.JSON); + final UpdateRequest updateRequest = + new UpdateRequest(_indexName, _documentType, docId).doc(documentJson, XContentType.JSON) + .detectNoop(false) + .upsert(indexRequest) + .retryOnConflict(MAX_RETRIES); + _bulkProcessor.add(updateRequest); + } + + @Override + public void deleteDocument(@Nonnull String docId) { + _bulkProcessor.add(new DeleteRequest(_indexName, _documentType, docId)); + } + + @Override + public void close() { + _bulkProcessor.close(); + } +} diff --git a/metadata-dao-impl/elasticsearch-dao/src/main/java/com/linkedin/metadata/dao/utils/SearchUtils.java b/metadata-dao-impl/elasticsearch-dao/src/main/java/com/linkedin/metadata/dao/utils/SearchUtils.java index 44ba87296d68f..40f734b8dcfea 100644 --- a/metadata-dao-impl/elasticsearch-dao/src/main/java/com/linkedin/metadata/dao/utils/SearchUtils.java +++ b/metadata-dao-impl/elasticsearch-dao/src/main/java/com/linkedin/metadata/dao/utils/SearchUtils.java @@ -2,12 +2,10 @@ import com.linkedin.metadata.query.Condition; import com.linkedin.metadata.query.Criterion; -import com.linkedin.metadata.query.CriterionArray; import com.linkedin.metadata.query.Filter; import java.io.IOException; import java.io.InputStream; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -33,16 +31,15 @@ private SearchUtils() { */ @Nonnull public static Map getRequestMap(@Nullable Filter requestParams) { - if (requestParams == null) { return Collections.emptyMap(); } - if (requestParams.getCriteria() - .stream() - .anyMatch(criterion -> criterion.getCondition() != com.linkedin.metadata.query.Condition.EQUAL)) { - throw new IllegalArgumentException("Invalid List criteria - condition must be EQUAL: "); - } + requestParams.getCriteria().forEach(criterion -> { + if (criterion.getCondition() != com.linkedin.metadata.query.Condition.EQUAL) { + throw new UnsupportedOperationException("Unsupported condition: " + criterion.getCondition()); + } + }); return requestParams.getCriteria().stream().collect(Collectors.toMap(Criterion::getField, Criterion::getValue)); } @@ -68,21 +65,7 @@ public static QueryBuilder getQueryBuilderFromCriterion(@Nonnull Criterion crite return QueryBuilders.rangeQuery(criterion.getField()).lte(criterion.getValue().trim()); } - throw new IllegalArgumentException("Unsupported condition: " + condition); - } - - /** - * Converts a requestMap to a filter - * - * @param requestMap a map of fields and values - * @return the search filter - */ - @Nonnull - public static Filter getFilter(@Nonnull Map requestMap) { - List criterionList = requestMap.entrySet().stream() - .map(entry -> new Criterion().setField(entry.getKey()).setValue(entry.getValue())) - .collect(Collectors.toList()); - return new Filter().setCriteria(new CriterionArray(criterionList)); + throw new UnsupportedOperationException("Unsupported condition: " + condition); } @Nonnull diff --git a/metadata-dao-impl/elasticsearch-dao/src/test/java/com/linkedin/metadata/dao/search/ESSearchDAOTest.java b/metadata-dao-impl/elasticsearch-dao/src/test/java/com/linkedin/metadata/dao/search/ESSearchDAOTest.java index c55ba59d022ec..3ec6f2d2c8e9e 100644 --- a/metadata-dao-impl/elasticsearch-dao/src/test/java/com/linkedin/metadata/dao/search/ESSearchDAOTest.java +++ b/metadata-dao-impl/elasticsearch-dao/src/test/java/com/linkedin/metadata/dao/search/ESSearchDAOTest.java @@ -186,14 +186,14 @@ public void testFilteredQueryWithRangeFilter() throws IOException { } @Test - public void testFilteredQueryUnsupportedCondition() throws IOException { + public void testFilteredQueryUnsupportedCondition() { int from = 0; int size = 10; final Filter filter2 = new Filter().setCriteria(new CriterionArray(Arrays.asList( new Criterion().setField("field_contain").setValue("value_contain").setCondition(Condition.CONTAIN) ))); SortCriterion sortCriterion = new SortCriterion().setOrder(SortOrder.ASCENDING).setField("urn"); - assertThrows(IllegalArgumentException.class, () -> _searchDAO.getFilteredSearchQuery(filter2, sortCriterion, from, size)); + assertThrows(UnsupportedOperationException.class, () -> _searchDAO.getFilteredSearchQuery(filter2, sortCriterion, from, size)); } private static SearchHit makeSearchHit(int id) { diff --git a/metadata-dao-impl/elasticsearch-dao/src/test/java/com/linkedin/metadata/dao/utils/SearchUtilsTest.java b/metadata-dao-impl/elasticsearch-dao/src/test/java/com/linkedin/metadata/dao/utils/SearchUtilsTest.java index a89f3009a39eb..79f1788dc48b3 100644 --- a/metadata-dao-impl/elasticsearch-dao/src/test/java/com/linkedin/metadata/dao/utils/SearchUtilsTest.java +++ b/metadata-dao-impl/elasticsearch-dao/src/test/java/com/linkedin/metadata/dao/utils/SearchUtilsTest.java @@ -1,19 +1,26 @@ package com.linkedin.metadata.dao.utils; +import com.linkedin.metadata.query.Condition; +import com.linkedin.metadata.query.Criterion; +import com.linkedin.metadata.query.CriterionArray; import com.linkedin.metadata.query.Filter; import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.testng.annotations.Test; +import static com.linkedin.metadata.dao.utils.SearchUtils.*; import static org.testng.Assert.*; public class SearchUtilsTest { @Test public void testGetRequestMap() { + // Empty filter final Filter filter1 = QueryUtils.newFilter(null); - final Map actual1 = SearchUtils.getRequestMap(filter1); + final Map actual1 = getRequestMap(filter1); assertTrue(actual1.isEmpty()); + + // Filter with criteria with default condition final Map requestParams = Collections.unmodifiableMap(new HashMap() { { put("key1", "value1"); @@ -21,8 +28,13 @@ public void testGetRequestMap() { } }); final Filter filter2 = QueryUtils.newFilter(requestParams); - final Map actual2 = SearchUtils.getRequestMap(filter2); + final Map actual2 = getRequestMap(filter2); assertEquals(actual2, requestParams); - } + // Filter with unsupported condition + final Filter filter3 = new Filter().setCriteria(new CriterionArray( + new Criterion().setField("key").setValue("value").setCondition(Condition.CONTAIN) + )); + assertThrows(UnsupportedOperationException.class, () -> getRequestMap(filter3)); + } } diff --git a/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaMetadataEventProducer.java b/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaMetadataEventProducer.java index 852ee988073ca..e5488adb98cf0 100644 --- a/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaMetadataEventProducer.java +++ b/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaMetadataEventProducer.java @@ -20,6 +20,7 @@ import java.util.Optional; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.specific.SpecificRecord; @@ -31,6 +32,7 @@ /** * A Kafka implementation of {@link BaseMetadataEventProducer}. */ +@Slf4j public class KafkaMetadataEventProducer extends BaseMetadataEventProducer { @@ -114,14 +116,17 @@ record = EventUtils.pegasusToAvroMAE(metadataAuditEvent); @Override public void produceAspectSpecificMetadataAuditEvent(@Nonnull URN urn, @Nullable ASPECT oldValue, @Nonnull ASPECT newValue) { - - validateAspectSpecificTopic(ModelUtils.getAspectSpecificMAETopicName(urn, newValue)); + final String topicKey = ModelUtils.getAspectSpecificMAETopicName(urn, newValue); + if (!isValidateAspectSpecificTopic(topicKey)) { + log.error("The aspect specific topic {} is not registered.", topicKey); + return; + } String topic; Class maeAvroClass; RecordTemplate metadataAuditEvent; try { - topic = (String) Topics.class.getField(ModelUtils.getAspectSpecificMAETopicName(urn, newValue)).get(null); + topic = (String) Topics.class.getField(topicKey).get(null); maeAvroClass = Configs.TOPIC_SCHEMA_CLASS_MAP.get(topic); metadataAuditEvent = (RecordTemplate) EventUtils.getPegasusClass(maeAvroClass).newInstance(); @@ -159,9 +164,7 @@ private Snapshot makeSnapshot(@Nonnull URN urn, @Nonnull RecordTemplate value) { return snapshot; } - static void validateAspectSpecificTopic(@Nonnull String topic) { - if (!Arrays.stream(Topics.class.getFields()).anyMatch(field -> field.getName().equals(topic))) { - throw new IllegalArgumentException(String.format("The aspect specific topic %s is not registered.", topic)); - } + static boolean isValidateAspectSpecificTopic(@Nonnull String topic) { + return Arrays.stream(Topics.class.getFields()).anyMatch(field -> field.getName().equals(topic)); } } \ No newline at end of file diff --git a/metadata-dao/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java b/metadata-dao/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java index 6613ad9bdc4cd..1e08e387d0812 100644 --- a/metadata-dao/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java +++ b/metadata-dao/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java @@ -18,13 +18,16 @@ import com.linkedin.metadata.dao.retention.Retention; import com.linkedin.metadata.dao.retention.TimeBasedRetention; import com.linkedin.metadata.dao.retention.VersionBasedRetention; +import com.linkedin.metadata.dao.storage.LocalDAOStorageConfig; import com.linkedin.metadata.query.ExtraInfo; +import com.linkedin.metadata.query.IndexFilter; import java.time.Clock; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.function.BiConsumer; import java.util.function.Function; @@ -64,6 +67,7 @@ static class AddResult { private static final int DEFAULT_MAX_TRANSACTION_RETRY = 3; protected final BaseMetadataEventProducer _producer; + protected final LocalDAOStorageConfig _storageConfig; // Maps an aspect class to the corresponding retention policy private final Map, Retention> _aspectRetentionMap = new HashMap<>(); @@ -81,11 +85,39 @@ static class AddResult { // Always emit MAE on every update regardless if there's any actual change in value private boolean _alwaysEmitAuditEvent = false; + // Opt in to emit Aspect Specific MAE, at initial migration stage, always emit the event + private boolean _emitAspectSpecificAuditEvent = false; + + // Flag for enabling reads and writes to local secondary index + private boolean _enableLocalSecondaryIndex = false; + + // Flag for backfilling local secondary index + private boolean _backfillLocalSecondaryIndex = false; + private Clock _clock = Clock.systemUTC(); + /** + * Constructor for BaseLocalDAO + * + * @param aspectUnionClass containing union of all supported aspects. Must be a valid aspect union defined in com.linkedin.metadata.aspect + * @param producer {@link BaseMetadataEventProducer} for the metadata event producer + */ public BaseLocalDAO(@Nonnull Class aspectUnionClass, @Nonnull BaseMetadataEventProducer producer) { super(aspectUnionClass); _producer = producer; + _storageConfig = LocalDAOStorageConfig.builder().build(); + } + + /** + * Constructor for BaseLocalDAO + * + * @param producer {@link BaseMetadataEventProducer} for the metadata event producer + * @param storageConfig {@link LocalDAOStorageConfig} containing storage config of full list of supported aspects + */ + public BaseLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull LocalDAOStorageConfig storageConfig) { + super(storageConfig.getAspectStorageConfigMap().keySet()); + _producer = producer; + _storageConfig = storageConfig; } /** @@ -170,6 +202,42 @@ public void setAlwaysEmitAuditEvent(boolean alwaysEmitAuditEvent) { _alwaysEmitAuditEvent = alwaysEmitAuditEvent; } + /** + * Sets if aspect specific MAE should be always emitted after each update even if there's no actual value change. + */ + public void setEmitAspectSpecificAuditEvent(boolean emitAspectSpecificAuditEvent) { + _emitAspectSpecificAuditEvent = emitAspectSpecificAuditEvent; + } + + /** + * Sets if writes to local secondary index enabled + * @deprecated Use {@link #enableLocalSecondaryIndex(boolean)} instead + */ + public void setWriteToLocalSecondaryIndex(boolean writeToLocalSecondaryIndex) { + _enableLocalSecondaryIndex = writeToLocalSecondaryIndex; + } + + /** + * Enables reads from and writes to local secondary index + */ + public void enableLocalSecondaryIndex(boolean enableLocalSecondaryIndex) { + _enableLocalSecondaryIndex = enableLocalSecondaryIndex; + } + + /** + * Gets if reads and writes to local secondary index are enabled + */ + public boolean isLocalSecondaryIndexEnabled() { + return _enableLocalSecondaryIndex; + } + + /** + * Sets if local secondary index backfilling is enabled + */ + public void setBackfillLocalSecondaryIndex(boolean backfillLocalSecondaryIndex) { + _backfillLocalSecondaryIndex = backfillLocalSecondaryIndex; + } + /** * Adds a new version of aspect for an entity. * @@ -214,18 +282,29 @@ public ASPECT add(@Nonnull URN urn, @Nonnull Cla // 4. Apply retention policy applyRetention(urn, aspectClass, getRetention(aspectClass), largestVersion); + // 5. Save to local secondary index + if (_enableLocalSecondaryIndex) { + saveToLocalSecondaryIndex(urn, newValue, largestVersion); + } + return new AddResult<>(oldValue, newValue); }, maxTransactionRetry); final ASPECT oldValue = result.getOldValue(); final ASPECT newValue = result.getNewValue(); - // 5. Produce MAE after a successful update + // 6. Produce MAE after a successful update if (_alwaysEmitAuditEvent || oldValue != newValue) { _producer.produceMetadataAuditEvent(urn, oldValue, newValue); } - // 6. Invoke post-update hooks if there's any + // TODO: Replace step 6 with step 6.1 with diff option after pipeline is fully migrated to aspect specific events. + // 6.1. Produce aspect specific MAE after a successful update + if (_emitAspectSpecificAuditEvent) { + _producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue); + } + + // 7. Invoke post-update hooks if there's any if (_aspectPostUpdateHooksMap.containsKey(aspectClass)) { _aspectPostUpdateHooksMap.get(aspectClass).forEach(hook -> hook.accept(urn, newValue)); } @@ -283,6 +362,27 @@ protected abstract long saveLatest(@Nonnull URN @Nonnull Class aspectClass, @Nullable ASPECT oldEntry, @Nullable AuditStamp oldAuditStamp, @Nonnull ASPECT newEntry, @Nonnull AuditStamp newAuditStamp); + /** + * Saves the new value of an aspect to local secondary index + * + * @param urn the URN for the entity the aspect is attached to + * @param newValue {@link RecordTemplate} of the new value of aspect + * @param version version of the aspect + */ + protected abstract void saveToLocalSecondaryIndex(@Nonnull URN urn, + @Nullable ASPECT newValue, long version); + + /** + * Returns list of urns from local secondary index that satisfy the given filter conditions. + * + * @param indexFilter {@link IndexFilter} containing filter conditions to be applied + * @param lastUrn last urn of the previous fetched page. For the first page, this should be set as NULL + * @param pageSize maximum number of distinct urns to return + * @return {@link ListResult} of urns from local secondary index that satisfy the given filter conditions + */ + @Nonnull + public abstract ListResult listUrns(@Nonnull IndexFilter indexFilter, @Nullable URN lastUrn, int pageSize); + /** * Runs the given lambda expression in a transaction with a limited number of retries. * @@ -350,7 +450,8 @@ protected abstract void applyTimeBasedRetention( @Nonnull URN urn, @Nonnull TimeBasedRetention retention, long currentTime); /** - * Emits backfill MetadataAuditEvent for the latest version of an aspect for an entity. + * Emits backfill MAE for the latest version of an aspect of an entity and also backfills local + * secondary index if writes & backfill enabled * * @param aspectClass the type of aspect to backfill * @param urn {@link Urn} for the entity @@ -362,10 +463,63 @@ public Optional backfill(@Nonnull Class< @Nonnull URN urn) { checkValidAspect(aspectClass); Optional aspect = get(aspectClass, urn, LATEST_VERSION); - aspect.ifPresent(value -> _producer.produceMetadataAuditEvent(urn, value, value)); + aspect.ifPresent(value -> backfill(value, urn)); return aspect; } + /** + * Similar to {@link #backfill(Class, URN)} but gets a set of aspect classes and do a batch backfill + */ + @Nonnull + public Map, Optional> backfill( + @Nonnull Set> aspectClasses, @Nonnull URN urn) { + checkValidAspects(aspectClasses); + Map, Optional> aspects = get(aspectClasses, urn); + aspects.forEach((aspectClass, aspect) -> aspect.ifPresent(value -> backfill(value, urn))); + return aspects; + } + + /** + * Similar to {@link #backfill(Class, URN)} but gets a set of urns and do a batch backfill + */ + @Nonnull + public Map> backfill(@Nonnull Class aspectClass, + @Nonnull Set urns) { + checkValidAspect(aspectClass); + final Map> urnToAspects = get(aspectClass, urns); + urnToAspects.forEach((urn, aspect) -> aspect.ifPresent(value -> backfill(value, urn))); + return urnToAspects; + } + + /** + * Similar to {@link #backfill(Class, URN)} but gets a set of aspect classes and a set of URNs and do a batch backfill + */ + @Nonnull + public Map, Optional>> backfill( + @Nonnull Set> aspectClasses, @Nonnull Set urns) { + checkValidAspects(aspectClasses); + final Map, Optional>> urnToAspects = get(aspectClasses, urns); + urnToAspects.forEach((urn, aspects) -> { + aspects.forEach((aspectClass, aspect) -> aspect.ifPresent(value -> backfill(value, urn))); + }); + return urnToAspects; + } + + /** + * Emits backfill MAE for an aspect of an entity and also backfills local secondary index if writes & backfill enabled + * + * @param aspect aspect to backfill + * @param urn {@link Urn} for the entity + * @param must be a supported aspect type in {@code ASPECT_UNION}. + */ + private void backfill(@Nonnull ASPECT aspect, @Nonnull URN urn) { + // Backfill local secondary index as well if writes & backfill enabled + if (_enableLocalSecondaryIndex && _backfillLocalSecondaryIndex) { + saveToLocalSecondaryIndex(urn, aspect, FIRST_VERSION); + } + _producer.produceMetadataAuditEvent(urn, aspect, aspect); + } + /** * Paginates over all available versions of an aspect for an entity. * diff --git a/metadata-dao/src/main/java/com/linkedin/metadata/dao/BaseReadDAO.java b/metadata-dao/src/main/java/com/linkedin/metadata/dao/BaseReadDAO.java index 764af772e3b4b..f89913d7c9146 100644 --- a/metadata-dao/src/main/java/com/linkedin/metadata/dao/BaseReadDAO.java +++ b/metadata-dao/src/main/java/com/linkedin/metadata/dao/BaseReadDAO.java @@ -18,6 +18,7 @@ public abstract class BaseReadDAO { + public static final long FIRST_VERSION = 0; public static final long LATEST_VERSION = 0; // A set of pre-computed valid metadata types @@ -29,6 +30,10 @@ public BaseReadDAO(@Nonnull Class aspectUnionClass) { _validMetadataAspects = ModelUtils.getValidAspectTypes(aspectUnionClass); } + public BaseReadDAO(@Nonnull Set> aspects) { + _validMetadataAspects = aspects; + } + /** * Batch retrieves metadata aspects using multiple {@link AspectKey}s. * @@ -124,4 +129,8 @@ protected void checkValidAspect(@Nonnull Class aspectC throw new InvalidMetadataType(aspectClass + " is not a supported metadata aspect type"); } } + + protected void checkValidAspects(@Nonnull Set> aspectClasses) { + aspectClasses.forEach(aspectClass -> checkValidAspect(aspectClass)); + } } diff --git a/metadata-dao/src/main/java/com/linkedin/metadata/dao/BaseSearchWriterDAO.java b/metadata-dao/src/main/java/com/linkedin/metadata/dao/BaseSearchWriterDAO.java new file mode 100644 index 0000000000000..0c5b28cb9f032 --- /dev/null +++ b/metadata-dao/src/main/java/com/linkedin/metadata/dao/BaseSearchWriterDAO.java @@ -0,0 +1,39 @@ +package com.linkedin.metadata.dao; + +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.metadata.validator.DocumentValidator; +import javax.annotation.Nonnull; + + +/** + * A base class for all Search Writer DAOs. + * + * Search Writer DAO is a standardized interface to update a search index. + */ +public abstract class BaseSearchWriterDAO { + + protected final Class _documentClass; + + public BaseSearchWriterDAO(@Nonnull Class documentClass) { + DocumentValidator.validateDocumentSchema(documentClass); + _documentClass = documentClass; + } + + /** + * Updates or inserts the given search document. + * + * @param document the document to update / insert + * @param docId the ID of the document + */ + public abstract void upsertDocument(@Nonnull DOCUMENT document, @Nonnull String docId); + + /** + * Deletes the document with the given document ID from the index. + */ + public abstract void deleteDocument(@Nonnull String docId); + + /** + * Closes this writer, releasing any associated resources. + */ + public abstract void close(); +} diff --git a/metadata-dao/src/main/java/com/linkedin/metadata/dao/storage/LocalDAOStorageConfig.java b/metadata-dao/src/main/java/com/linkedin/metadata/dao/storage/LocalDAOStorageConfig.java new file mode 100644 index 0000000000000..23a71fda1f52b --- /dev/null +++ b/metadata-dao/src/main/java/com/linkedin/metadata/dao/storage/LocalDAOStorageConfig.java @@ -0,0 +1,47 @@ +package com.linkedin.metadata.dao.storage; + +import com.linkedin.data.template.RecordTemplate; +import java.util.Map; +import lombok.Builder; +import lombok.Value; + + +/** + * Immutable class that holds the storage config for different paths of different metadata aspects + */ +@Value +@Builder +public final class LocalDAOStorageConfig { + + /** + * Map of corresponding {@link Class} of metadata aspect to {@link AspectStorageConfig} config + */ + Map, AspectStorageConfig> aspectStorageConfigMap; + + /** + * Immutable class that holds the storage config of different pegasus paths of a given metadata aspect + */ + @Value + @Builder + public final static class AspectStorageConfig { + + /** + * Map of string representation of Pegasus Path to {@link PathStorageConfig} config + */ + Map pathStorageConfigMap; + } + + /** + * Immutable class that holds the storage config of a given pegasus path of a given metadata aspect + */ + @Value + @Builder + public final static class PathStorageConfig { + + /** + * Whether to index the pegasus path to local secondary index + */ + @Builder.Default + boolean strongConsistentSecondaryIndex = false; + } +} \ No newline at end of file diff --git a/metadata-dao/src/main/java/com/linkedin/metadata/dao/utils/ModelUtils.java b/metadata-dao/src/main/java/com/linkedin/metadata/dao/utils/ModelUtils.java index baf0b496a3b38..9e651fd8686aa 100644 --- a/metadata-dao/src/main/java/com/linkedin/metadata/dao/utils/ModelUtils.java +++ b/metadata-dao/src/main/java/com/linkedin/metadata/dao/utils/ModelUtils.java @@ -524,4 +524,11 @@ public static String getAspectS return String.format("%s_%s_%s", METADATA_AUDIT_EVENT_PREFIX, urnStr.substring(0, urnStr.length() - "Urn".length()), newValue.getClass().getSimpleName().toUpperCase()); } + + /** + * Return true if the aspect is defined in common namespace + */ + public static boolean isCommonAspect(@Nonnull Class clazz) { + return clazz.getPackage().getName().startsWith("com.linkedin.common"); + } } diff --git a/metadata-dao/src/main/pegasus/com/linkedin/metadata/query/IndexCriterion.pdl b/metadata-dao/src/main/pegasus/com/linkedin/metadata/query/IndexCriterion.pdl new file mode 100644 index 0000000000000..089cdf9a0a0e2 --- /dev/null +++ b/metadata-dao/src/main/pegasus/com/linkedin/metadata/query/IndexCriterion.pdl @@ -0,0 +1,17 @@ +namespace com.linkedin.metadata.query + +/** + * A criterion for matching a field with given value + */ +record IndexCriterion { + + /** + * FQCN of the aspect class in the index table that this criterion refers to e.g. com.linkedin.common.Status + */ + aspect: string + + /** + * Corresponding path, value and condition that this criterion refers to + */ + pathParams: optional IndexPathParams +} diff --git a/metadata-dao/src/main/pegasus/com/linkedin/metadata/query/IndexFilter.pdl b/metadata-dao/src/main/pegasus/com/linkedin/metadata/query/IndexFilter.pdl new file mode 100644 index 0000000000000..01fb323de24f6 --- /dev/null +++ b/metadata-dao/src/main/pegasus/com/linkedin/metadata/query/IndexFilter.pdl @@ -0,0 +1,12 @@ +namespace com.linkedin.metadata.query + +/** + * Filters for finding records in the index table + */ +record IndexFilter { + + /** + * A list of criteria to filter records from the index table, AND being the logical operator + */ + criteria: array[IndexCriterion] +} diff --git a/metadata-dao/src/main/pegasus/com/linkedin/metadata/query/IndexPathParams.pdl b/metadata-dao/src/main/pegasus/com/linkedin/metadata/query/IndexPathParams.pdl new file mode 100644 index 0000000000000..e7281c95877a9 --- /dev/null +++ b/metadata-dao/src/main/pegasus/com/linkedin/metadata/query/IndexPathParams.pdl @@ -0,0 +1,22 @@ +namespace com.linkedin.metadata.query + +/** + * Model combining index path, value and the condition for the criterion to be satisfied + */ +record IndexPathParams { + + /** + * Corresponding path column of the index table that this criterion refers to e.g. /removed (corresponding to field "removed" of com.linkedin.common.Status aspect) + */ + path: string + + /** + * Value of the corresponding path of the aspect + */ + value: IndexValue + + /** + * Condition for the criterion to be satisfied e.g. EQUAL, START_WITH + */ + condition: Condition = "EQUAL" +} \ No newline at end of file diff --git a/metadata-dao/src/main/pegasus/com/linkedin/metadata/query/IndexValue.pdl b/metadata-dao/src/main/pegasus/com/linkedin/metadata/query/IndexValue.pdl new file mode 100644 index 0000000000000..3a8085048b530 --- /dev/null +++ b/metadata-dao/src/main/pegasus/com/linkedin/metadata/query/IndexValue.pdl @@ -0,0 +1,13 @@ +namespace com.linkedin.metadata.query + +/** + * A union of all supported value types in the index table + */ +typeref IndexValue = union[ + boolean, + double + float, + int, + long, + string +] diff --git a/metadata-dao/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java b/metadata-dao/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java index 1fe442d83aced..1170be233217f 100644 --- a/metadata-dao/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java +++ b/metadata-dao/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java @@ -7,6 +7,7 @@ import com.linkedin.metadata.dao.retention.TimeBasedRetention; import com.linkedin.metadata.dao.retention.VersionBasedRetention; import com.linkedin.metadata.query.ExtraInfo; +import com.linkedin.metadata.query.IndexFilter; import com.linkedin.testing.AspectFoo; import com.linkedin.testing.EntityAspectUnion; import com.linkedin.testing.urn.FooUrn; @@ -19,6 +20,8 @@ import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Supplier; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.mockito.stubbing.OngoingStubbing; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -46,6 +49,12 @@ protected long saveLatest(FooUrn urn, Class void saveToLocalSecondaryIndex(@Nonnull FooUrn urn, + @Nullable ASPECT newValue, long version) { + + } + @Override protected T runInTransactionWithRetry(Supplier block, int maxTransactionRetry) { return block.get(); @@ -90,6 +99,11 @@ public ListResult listUrns(Class as return null; } + @Override + public ListResult listUrns(@Nonnull IndexFilter indexFilter, @Nullable FooUrn lastUrn, int pageSize) { + return null; + } + @Override public ListResult list(Class aspectClass, FooUrn urn, int start, int pageSize) { diff --git a/metadata-dao/src/test/java/com/linkedin/metadata/dao/utils/ModelUtilsTest.java b/metadata-dao/src/test/java/com/linkedin/metadata/dao/utils/ModelUtilsTest.java index 666de814251e5..e574a69b70a14 100644 --- a/metadata-dao/src/test/java/com/linkedin/metadata/dao/utils/ModelUtilsTest.java +++ b/metadata-dao/src/test/java/com/linkedin/metadata/dao/utils/ModelUtilsTest.java @@ -1,6 +1,7 @@ package com.linkedin.metadata.dao.utils; import com.google.common.collect.ImmutableSet; +import com.linkedin.common.Ownership; import com.linkedin.common.urn.Urn; import com.linkedin.testing.EntityFoo; import com.linkedin.testing.urn.BarUrn; @@ -273,4 +274,13 @@ public void testGetMAETopicName() throws URISyntaxException { assertEquals(ModelUtils.getAspectSpecificMAETopicName(urn, foo), "METADATA_AUDIT_EVENT_FOO_ASPECTFOO"); } + + @Test + public void testIsCommonAspect() { + boolean result = ModelUtils.isCommonAspect(AspectFoo.class); + assertFalse(result); + + result = ModelUtils.isCommonAspect(Ownership.class); + assertTrue(result); + } } diff --git a/metadata-models-generator/src/main/java/com/linkedin/metadata/generator/EventSpec.java b/metadata-models-generator/src/main/java/com/linkedin/metadata/generator/EventSpec.java index f0be592e53ea3..9ba1e0f7ac6b8 100644 --- a/metadata-models-generator/src/main/java/com/linkedin/metadata/generator/EventSpec.java +++ b/metadata-models-generator/src/main/java/com/linkedin/metadata/generator/EventSpec.java @@ -11,29 +11,29 @@ */ @Data public class EventSpec { - // doc of the model, such as: For unit tests. - protected String doc; + // delta model for partial update, such as: com.linkedin.datasetGroup.MembershipDelta. + protected String delta; - // specType of the model, such as: MetadataChangeEvent. - protected String specType; - - // fullValueType of the model, such as: com.linkedin.testing.AnnotatedAspectBaz. + // fullValueType of the model, such as: com.linkedin.identity.CorpUserInfo. protected String fullValueType; - // namespace of the model, such as: com.linkedin.testing. + // namespace of the model, such as: com.linkedin.identity. protected String namespace; - // entities leverage the model, such as: com.linkedin.common.FooBarUrn. + // specType of the model, such as: MetadataChangeEvent. + protected String specType; + + // entities leverage the model, such as: com.linkedin.common.CorpuserUrn. protected Set urnSet = new HashSet<>(); - // valueType of the model, such as: AnnotatedAspectBaz. + // valueType of the model, such as: CorpUserInfo. protected String valueType; public EventSpec() { } - public boolean hasDoc() { - return doc != null && !doc.isEmpty(); + public boolean hasDelta() { + return delta != null && !delta.isEmpty(); } public void setValueType(@Nonnull String schemaFullName) { diff --git a/metadata-models-generator/src/main/java/com/linkedin/metadata/generator/SchemaAnnotationRetriever.java b/metadata-models-generator/src/main/java/com/linkedin/metadata/generator/SchemaAnnotationRetriever.java index 31dba015f58dc..c06134542cdf4 100644 --- a/metadata-models-generator/src/main/java/com/linkedin/metadata/generator/SchemaAnnotationRetriever.java +++ b/metadata-models-generator/src/main/java/com/linkedin/metadata/generator/SchemaAnnotationRetriever.java @@ -53,13 +53,11 @@ private void generate(@Nonnull RecordDataSchema schema, @Nonnull List specs.add(eventSpec); if (annotationInfo != null && annotationInfo.containsKey(ENTITY_URNS)) { eventSpec.setUrnSet(new HashSet<>((List) annotationInfo.get(ENTITY_URNS))); - } else { - log.debug(String.format("No recognized urn annotation is presented in %s.", schema.getFullName())); + } + if (annotationInfo != null && annotationInfo.containsKey(DELTA)) { + eventSpec.setDelta((String) annotationInfo.get(DELTA)); } eventSpec.setNamespace(schema.getNamespace()); eventSpec.setValueType(schema.getFullName()); - if (schema.getDoc() != null) { - eventSpec.setDoc(schema.getDoc()); - } } } \ No newline at end of file diff --git a/metadata-models-generator/src/main/java/com/linkedin/metadata/generator/SchemaGeneratorConstants.java b/metadata-models-generator/src/main/java/com/linkedin/metadata/generator/SchemaGeneratorConstants.java index 2fa6aa29693ed..1375508571328 100644 --- a/metadata-models-generator/src/main/java/com/linkedin/metadata/generator/SchemaGeneratorConstants.java +++ b/metadata-models-generator/src/main/java/com/linkedin/metadata/generator/SchemaGeneratorConstants.java @@ -14,6 +14,7 @@ private SchemaGeneratorConstants() { // used in SchemaAnnotationRetriever static final String ASPECT = "Aspect"; + static final String DELTA = "Delta"; static final String ENTITY_URNS = "EntityUrns"; // used in EventSchemaComposer diff --git a/metadata-models-generator/src/main/resources/FailedMetadataChangeEvent.rythm b/metadata-models-generator/src/main/resources/FailedMetadataChangeEvent.rythm index 645e262fad5ca..befdf659addd5 100644 --- a/metadata-models-generator/src/main/resources/FailedMetadataChangeEvent.rythm +++ b/metadata-models-generator/src/main/resources/FailedMetadataChangeEvent.rythm @@ -7,23 +7,23 @@ namespace com.linkedin.mxe@(nameSpace) import com.linkedin.avro2pegasus.events.KafkaAuditHeader /** -* FailedMetadataChangeEvent for the @(entityName)Urn with @(eventSpec.getValueType()) aspect. -*/ + * FailedMetadataChangeEvent for the @(entityName)Urn with @(eventSpec.getValueType()) aspect. + */ @@FailedMetadataChangeEvent record FailedMetadataChangeEvent { /** - * Kafka event for capturing a failure to process a MetadataChangeEvent. - */ + * Kafka event for capturing a failure to process a MetadataChangeEvent. + */ auditHeader: optional KafkaAuditHeader /** - * The event that failed to be processed. - */ + * The event that failed to be processed. + */ metadataChangeEvent: MetadataChangeEvent /** * The error message or the stacktrace for the failure. */ error: string -} +} \ No newline at end of file diff --git a/metadata-models-generator/src/main/resources/MetadataAuditEvent.rythm b/metadata-models-generator/src/main/resources/MetadataAuditEvent.rythm index 9c54ade503d3f..179c0c94170c3 100644 --- a/metadata-models-generator/src/main/resources/MetadataAuditEvent.rythm +++ b/metadata-models-generator/src/main/resources/MetadataAuditEvent.rythm @@ -33,4 +33,4 @@ record MetadataAuditEvent { * Aspect of the @eventSpec.getValueType() after the update. */ newValue: @eventSpec.getValueType() -} +} \ No newline at end of file diff --git a/metadata-models-generator/src/main/resources/MetadataChangeEvent.rythm b/metadata-models-generator/src/main/resources/MetadataChangeEvent.rythm index bda64b9bb23a7..bba40884eee07 100644 --- a/metadata-models-generator/src/main/resources/MetadataChangeEvent.rythm +++ b/metadata-models-generator/src/main/resources/MetadataChangeEvent.rythm @@ -7,27 +7,35 @@ namespace com.linkedin.mxe@(nameSpace) import com.linkedin.avro2pegasus.events.KafkaAuditHeader import @entityUrn import @eventSpec.getFullValueType() +@if (eventSpec.hasDelta()) { +import @eventSpec.getDelta() + } /** -* MetadataChangeEvent for the @(entityName)Urn with @(eventSpec.getValueType()) aspect. -*/ + * MetadataChangeEvent for the @(entityName)Urn with @(eventSpec.getValueType()) aspect. + */ @@MetadataChangeEvent record MetadataChangeEvent { /** - * Kafka audit header. See go/kafkaauditheader for more info. - */ + * Kafka audit header. See go/kafkaauditheader for more info. + */ auditHeader: optional KafkaAuditHeader /** - * @(entityName)Urn as the key for the MetadataChangeEvent. - */ + * @(entityName)Urn as the key for the MetadataChangeEvent. + */ urn: @(entityName)Urn - @if (eventSpec.hasDoc()) { /** - * @eventSpec.getDoc() - */ + * Value of the proposed @eventSpec.getValueType() change. + */ + proposedValue: optional @eventSpec.getValueType() + @if (eventSpec.hasDelta()) { + + /** + * Delta of the proposed @SchemaGeneratorUtil.stripNamespace(eventSpec.getDelta()) partial update. + */ + proposedDelta: optional @SchemaGeneratorUtil.stripNamespace(eventSpec.getDelta()) } - @SchemaGeneratorUtil.deCapitalize(eventSpec.getValueType()): optional @eventSpec.getValueType() } \ No newline at end of file diff --git a/metadata-models-generator/src/test/resources/com/linkedin/mxe/bar/annotatedAspectBar/FailedMetadataChangeEvent.pdl b/metadata-models-generator/src/test/resources/com/linkedin/mxe/bar/annotatedAspectBar/FailedMetadataChangeEvent.pdl index 8cb221c975b0b..54b1f57cee0d5 100644 --- a/metadata-models-generator/src/test/resources/com/linkedin/mxe/bar/annotatedAspectBar/FailedMetadataChangeEvent.pdl +++ b/metadata-models-generator/src/test/resources/com/linkedin/mxe/bar/annotatedAspectBar/FailedMetadataChangeEvent.pdl @@ -3,23 +3,23 @@ namespace com.linkedin.mxe.bar.annotatedAspectBar import com.linkedin.avro2pegasus.events.KafkaAuditHeader /** -* FailedMetadataChangeEvent for the BarUrn with AnnotatedAspectBar aspect. -*/ + * FailedMetadataChangeEvent for the BarUrn with AnnotatedAspectBar aspect. + */ @FailedMetadataChangeEvent record FailedMetadataChangeEvent { /** - * Kafka event for capturing a failure to process a MetadataChangeEvent. - */ + * Kafka event for capturing a failure to process a MetadataChangeEvent. + */ auditHeader: optional KafkaAuditHeader /** - * The event that failed to be processed. - */ + * The event that failed to be processed. + */ metadataChangeEvent: MetadataChangeEvent /** * The error message or the stacktrace for the failure. */ error: string -} +} \ No newline at end of file diff --git a/metadata-models-generator/src/test/resources/com/linkedin/mxe/bar/annotatedAspectBar/MetadataAuditEvent.pdl b/metadata-models-generator/src/test/resources/com/linkedin/mxe/bar/annotatedAspectBar/MetadataAuditEvent.pdl index 567bea65e6b5a..49776e310958d 100644 --- a/metadata-models-generator/src/test/resources/com/linkedin/mxe/bar/annotatedAspectBar/MetadataAuditEvent.pdl +++ b/metadata-models-generator/src/test/resources/com/linkedin/mxe/bar/annotatedAspectBar/MetadataAuditEvent.pdl @@ -29,4 +29,4 @@ record MetadataAuditEvent { * Aspect of the AnnotatedAspectBar after the update. */ newValue: AnnotatedAspectBar -} +} \ No newline at end of file diff --git a/metadata-models-generator/src/test/resources/com/linkedin/mxe/bar/annotatedAspectBar/MetadataChangeEvent.pdl b/metadata-models-generator/src/test/resources/com/linkedin/mxe/bar/annotatedAspectBar/MetadataChangeEvent.pdl index a7a5f2ccbe0a9..fa5ae2e07ab7a 100644 --- a/metadata-models-generator/src/test/resources/com/linkedin/mxe/bar/annotatedAspectBar/MetadataChangeEvent.pdl +++ b/metadata-models-generator/src/test/resources/com/linkedin/mxe/bar/annotatedAspectBar/MetadataChangeEvent.pdl @@ -3,25 +3,31 @@ namespace com.linkedin.mxe.bar.annotatedAspectBar import com.linkedin.avro2pegasus.events.KafkaAuditHeader import com.linkedin.testing.BarUrn import com.linkedin.testing.AnnotatedAspectBar +import com.linkedin.testing.AnnotatedAspectBarDelta /** -* MetadataChangeEvent for the BarUrn with AnnotatedAspectBar aspect. -*/ + * MetadataChangeEvent for the BarUrn with AnnotatedAspectBar aspect. + */ @MetadataChangeEvent record MetadataChangeEvent { /** - * Kafka audit header. See go/kafkaauditheader for more info. - */ + * Kafka audit header. See go/kafkaauditheader for more info. + */ auditHeader: optional KafkaAuditHeader /** - * BarUrn as the key for the MetadataChangeEvent. - */ + * BarUrn as the key for the MetadataChangeEvent. + */ urn: BarUrn /** - * For unit tests - */ - annotatedAspectBar: optional AnnotatedAspectBar + * Value of the proposed AnnotatedAspectBar change. + */ + proposedValue: optional AnnotatedAspectBar + + /** + * Delta of the proposed AnnotatedAspectBarDelta partial update. + */ + proposedDelta: optional AnnotatedAspectBarDelta } \ No newline at end of file diff --git a/metadata-models-generator/src/test/resources/com/linkedin/testing/AnnotatedAspectBar.pdl b/metadata-models-generator/src/test/resources/com/linkedin/testing/AnnotatedAspectBar.pdl index d61ae86d7187f..e3220655768b1 100644 --- a/metadata-models-generator/src/test/resources/com/linkedin/testing/AnnotatedAspectBar.pdl +++ b/metadata-models-generator/src/test/resources/com/linkedin/testing/AnnotatedAspectBar.pdl @@ -3,7 +3,8 @@ namespace com.linkedin.testing /** * For unit tests */ -@Aspect.EntityUrns = ["com.linkedin.testing.FooUrn", "com.linkedin.testing.BarUrn"] +@Aspect = { "Delta": "com.linkedin.testing.AnnotatedAspectBarDelta", + "EntityUrns": ["com.linkedin.testing.FooUrn", "com.linkedin.testing.BarUrn"]} record AnnotatedAspectBar { /** For unit tests */ diff --git a/metadata-models-generator/src/test/resources/com/linkedin/testing/AnnotatedAspectBarDelta.pdl b/metadata-models-generator/src/test/resources/com/linkedin/testing/AnnotatedAspectBarDelta.pdl new file mode 100644 index 0000000000000..5ff78d8d37c67 --- /dev/null +++ b/metadata-models-generator/src/test/resources/com/linkedin/testing/AnnotatedAspectBarDelta.pdl @@ -0,0 +1,10 @@ +namespace com.linkedin.testing + +/** + * For unit tests + */ +record AnnotatedAspectBarDelta { + + /** For unit tests */ + stringFieldToUpdate: string +} \ No newline at end of file diff --git a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseBrowsableEntityResource.java b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseBrowsableEntityResource.java index 3e75149835d72..8f808f6764277 100644 --- a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseBrowsableEntityResource.java +++ b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseBrowsableEntityResource.java @@ -46,6 +46,11 @@ public BaseBrowsableEntityResource(@Nonnull Class snapshotClass, super(snapshotClass, aspectUnionClass); } + public BaseBrowsableEntityResource(@Nonnull Class snapshotClass, + @Nonnull Class aspectUnionClass, @Nonnull Class urnClass) { + super(snapshotClass, aspectUnionClass, urnClass); + } + /** * Returns a {@link BaseBrowseDAO}. */ diff --git a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseEntityResource.java b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseEntityResource.java index 9363b5ca387b6..e9c97955a9ed7 100644 --- a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseEntityResource.java +++ b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseEntityResource.java @@ -7,6 +7,9 @@ import com.linkedin.metadata.dao.AspectKey; import com.linkedin.metadata.dao.BaseLocalDAO; import com.linkedin.metadata.dao.utils.ModelUtils; +import com.linkedin.metadata.query.IndexCriterion; +import com.linkedin.metadata.query.IndexCriterionArray; +import com.linkedin.metadata.query.IndexFilter; import com.linkedin.parseq.Task; import com.linkedin.restli.common.ComplexResourceKey; import com.linkedin.restli.common.EmptyRecord; @@ -27,6 +30,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import static com.linkedin.metadata.dao.BaseReadDAO.*; import static com.linkedin.metadata.restli.RestliConstants.*; @@ -58,13 +62,20 @@ public abstract class BaseEntityResource< private final Class _snapshotClass; private final Class _aspectUnionClass; private final Set> _supportedAspectClasses; + private final Class _urnClass; public BaseEntityResource(@Nonnull Class snapshotClass, @Nonnull Class aspectUnionClass) { + this(snapshotClass, aspectUnionClass, null); + } + + public BaseEntityResource(@Nonnull Class snapshotClass, + @Nonnull Class aspectUnionClass, @Nullable Class urnClass) { super(); ModelUtils.validateSnapshotAspect(snapshotClass, aspectUnionClass); _snapshotClass = snapshotClass; _aspectUnionClass = aspectUnionClass; _supportedAspectClasses = ModelUtils.getValidAspectTypes(_aspectUnionClass); + _urnClass = urnClass; } /** @@ -117,7 +128,7 @@ protected BaseRestliAuditor getAuditor() { @RestMethod.Get @Nonnull public Task get(@Nonnull ComplexResourceKey id, - @QueryParam(PARAM_ASPECTS) @Optional("[]") @Nonnull String[] aspectNames) { + @QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) { return RestliUtils.toTask(() -> { final URN urn = toUrn(id.getKey()); @@ -136,7 +147,7 @@ public Task get(@Nonnull ComplexResourceKey id, @Nonnull public Task, VALUE>> batchGet( @Nonnull Set> ids, - @QueryParam(PARAM_ASPECTS) @Optional("[]") @Nonnull String[] aspectNames) { + @QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) { return RestliUtils.toTask(() -> { final Map, URN> urnMap = ids.stream().collect(Collectors.toMap(Function.identity(), id -> toUrn(id.getKey()))); @@ -177,7 +188,7 @@ protected Task ingestInternal(@Nonnull SNAPSHOT snapshot, @Action(name = ACTION_GET_SNAPSHOT) @Nonnull public Task getSnapshot(@ActionParam(PARAM_URN) @Nonnull String urnString, - @ActionParam(PARAM_ASPECTS) @Optional("[]") @Nonnull String[] aspectNames) { + @ActionParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) { return RestliUtils.toTask(() -> { final URN urn = parseUrnParam(urnString); @@ -202,7 +213,7 @@ public Task getSnapshot(@ActionParam(PARAM_URN) @Nonnull String urnStr @Action(name = ACTION_BACKFILL) @Nonnull public Task backfill(@ActionParam(PARAM_URN) @Nonnull String urnString, - @ActionParam(PARAM_ASPECTS) @Optional("[]") @Nonnull String[] aspectNames) { + @ActionParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) { return RestliUtils.toTask(() -> { final URN urn = parseUrnParam(urnString); @@ -215,9 +226,63 @@ public Task backfill(@ActionParam(PARAM_URN) @Nonnull String urnString }); } + /** + * An action method for emitting MAE backfill messages for a set of entities. + */ + @Action(name = ACTION_BATCH_BACKFILL) + @Nonnull + public Task batchBackfill(@ActionParam(PARAM_URNS) @Nonnull String[] urns, + @ActionParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) { + + return RestliUtils.toTask(() -> { + final Set urnSet = Arrays.stream(urns).map(urnString -> parseUrnParam(urnString)).collect(Collectors.toSet()); + getLocalDAO().backfill(parseAspectsParam(aspectNames), urnSet); + return null; + }); + } + + /** + * For strongly consistent local secondary index, this provides {@link IndexFilter} which uses FQCN of the entity urn to filter + * on the aspect field of the index table. This serves the purpose of returning urns that are of given entity type from index table. + */ + @Nonnull + private IndexFilter getDefaultIndexFilter() { + if (_urnClass == null) { + throw new UnsupportedOperationException("Urn class has not been defined in BaseEntityResource"); + } + final IndexCriterion indexCriterion = new IndexCriterion().setAspect(_urnClass.getCanonicalName()); + return new IndexFilter().setCriteria(new IndexCriterionArray(indexCriterion)); + } + + /** + * An action method for getting filtered urns from local secondary index. + * If no filter conditions are provided, then it returns urns of given entity type. + * + * @param indexFilter {@link IndexFilter} that defines the filter conditions + * @param lastUrn last urn of the previous fetched page. For the first page, this should be set as NULL + * @param limit maximum number of distinct urns to return + * @return Array of urns represented as string + */ + @Action(name = ACTION_LIST_URNS_FROM_INDEX) + @Nonnull + public Task listUrnsFromIndex(@ActionParam(PARAM_FILTER) @Optional @Nullable IndexFilter indexFilter, + @ActionParam(PARAM_URN) @Optional @Nullable String lastUrn, @ActionParam(PARAM_LIMIT) int limit) { + + final IndexFilter filter = indexFilter == null ? getDefaultIndexFilter() : indexFilter; + + return RestliUtils.toTask(() -> + getLocalDAO() + .listUrns(filter, lastUrn == null ? null : parseUrnParam(lastUrn), limit) + .getValues() + .stream() + .map(Urn::toString) + .collect(Collectors.toList()) + .toArray(new String[0])); + } + @Nonnull - protected Set> parseAspectsParam(@Nonnull String[] aspectNames) { - if (aspectNames.length == 0) { + protected Set> parseAspectsParam(@Nullable String[] aspectNames) { + if (aspectNames == null) { return _supportedAspectClasses; } return Arrays.asList(aspectNames).stream().map(ModelUtils::getAspectClass).collect(Collectors.toSet()); diff --git a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseEntitySimpleKeyResource.java b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseEntitySimpleKeyResource.java index 68775536d0218..949c90f55eb1c 100644 --- a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseEntitySimpleKeyResource.java +++ b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseEntitySimpleKeyResource.java @@ -25,6 +25,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import static com.linkedin.metadata.dao.BaseReadDAO.*; import static com.linkedin.metadata.restli.RestliConstants.*; @@ -117,7 +118,7 @@ protected BaseRestliAuditor getAuditor() { @Nonnull public Task get( @Nonnull KEY id, - @QueryParam(PARAM_ASPECTS) @Optional("[]") @Nonnull String[] aspectNames) { + @QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) { return RestliUtils.toTask(() -> { final URN urn = toUrn(id); @@ -136,7 +137,7 @@ public Task get( @Nonnull public Task> batchGet( @Nonnull Set ids, - @QueryParam(PARAM_ASPECTS) @Optional("[]") @Nonnull String[] aspectNames) { + @QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) { return RestliUtils.toTask(() -> { final Map urnIdMap = ids.stream() @@ -177,7 +178,7 @@ public Task ingest(@ActionParam(PARAM_SNAPSHOT) @Nonnull SNAPSHOT snapshot @Nonnull public Task getSnapshot( @ActionParam(PARAM_URN) @Nonnull String urnString, - @ActionParam(PARAM_ASPECTS) @Optional("[]") @Nonnull String[] aspectNames) { + @ActionParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) { return RestliUtils.toTask(() -> { final URN urn = parseUrnParam(urnString); @@ -205,7 +206,7 @@ public Task getSnapshot( @Action(name = ACTION_BACKFILL) @Nonnull public Task backfill(@ActionParam(PARAM_URN) @Nonnull String urnString, - @ActionParam(PARAM_ASPECTS) @Optional("[]") @Nonnull String[] aspectNames) { + @ActionParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) { return RestliUtils.toTask(() -> { final URN urn = parseUrnParam(urnString); @@ -218,8 +219,8 @@ public Task backfill(@ActionParam(PARAM_URN) @Nonnull String urnString } @Nonnull - protected Set> parseAspectsParam(@Nonnull String[] aspectNames) { - if (aspectNames.length == 0) { + protected Set> parseAspectsParam(@Nullable String[] aspectNames) { + if (aspectNames == null) { return _supportedAspectClasses; } return Arrays.stream(aspectNames) diff --git a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseSearchableClient.java b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseSearchableClient.java index 89c90c6c0be54..36b3e68efd3d0 100644 --- a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseSearchableClient.java +++ b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseSearchableClient.java @@ -34,16 +34,16 @@ public BaseSearchableClient(@Nonnull Client restliClient) { * @throws RemoteInvocationException when the rest.li request fails */ @Nonnull - public abstract CollectionResponse search(@Nonnull String input, @Nonnull StringArray aspectNames, @Nullable Map requestFilters, + public abstract CollectionResponse search(@Nonnull String input, @Nullable StringArray aspectNames, @Nullable Map requestFilters, @Nullable SortCriterion sortCriterion, int start, int count) throws RemoteInvocationException; /** - * Similar to {@link #search(String, StringArray, Map, SortCriterion, int, int)} with empty list for aspect names, meaning all aspects will be returned + * Similar to {@link #search(String, StringArray, Map, SortCriterion, int, int)} with null for aspect names, meaning all aspects will be returned */ @Nonnull public CollectionResponse search(@Nonnull String input, @Nullable Map requestFilters, @Nullable SortCriterion sortCriterion, int start, int count) throws RemoteInvocationException { - return search(input, new StringArray(), requestFilters, sortCriterion, start, count); + return search(input, null, requestFilters, sortCriterion, start, count); } /** diff --git a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseSearchableEntityResource.java b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseSearchableEntityResource.java index 71b1827af650b..22f2d0f6c1535 100644 --- a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseSearchableEntityResource.java +++ b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseSearchableEntityResource.java @@ -62,6 +62,11 @@ public BaseSearchableEntityResource(@Nonnull Class snapshotClass, super(snapshotClass, aspectUnionClass); } + public BaseSearchableEntityResource(@Nonnull Class snapshotClass, + @Nonnull Class aspectUnionClass, @Nonnull Class urnClass) { + super(snapshotClass, aspectUnionClass, urnClass); + } + /** * Returns a document-specific {@link BaseSearchDAO}. */ @@ -81,7 +86,7 @@ public BaseSearchableEntityResource(@Nonnull Class snapshotClass, @RestMethod.GetAll @Nonnull public Task> getAll(@Nonnull PagingContext pagingContext, - @QueryParam(PARAM_ASPECTS) @Optional("[]") @Nonnull String[] aspectNames, + @QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames, @QueryParam(PARAM_FILTER) @Optional @Nullable Filter filter, @QueryParam(PARAM_SORT) @Optional @Nullable SortCriterion sortCriterion) { @@ -97,7 +102,7 @@ public Task> getAll(@Nonnull PagingContext pagingContext, @Finder(FINDER_SEARCH) @Nonnull public Task> search(@QueryParam(PARAM_INPUT) @Nonnull String input, - @QueryParam(PARAM_ASPECTS) @Optional("[]") @Nonnull String[] aspectNames, + @QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames, @QueryParam(PARAM_FILTER) @Optional @Nullable Filter filter, @QueryParam(PARAM_SORT) @Optional @Nullable SortCriterion sortCriterion, @PagingContextParam @Nonnull PagingContext pagingContext) { @@ -119,7 +124,7 @@ public Task autocomplete(@ActionParam(PARAM_QUERY) @Nonnull @Nonnull private CollectionResult getSearchQueryCollectionResult(@Nonnull SearchResult searchResult, - @Nonnull String[] aspectNames) { + @Nullable String[] aspectNames) { final List matchedUrns = searchResult.getDocumentList() .stream() diff --git a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseSearchableEntitySimpleKeyResource.java b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseSearchableEntitySimpleKeyResource.java index 09016297adc17..bf32147b1d81a 100644 --- a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseSearchableEntitySimpleKeyResource.java +++ b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseSearchableEntitySimpleKeyResource.java @@ -73,7 +73,7 @@ public BaseSearchableEntitySimpleKeyResource( @Nonnull public Task> getAll( @Nonnull PagingContext pagingContext, - @QueryParam(PARAM_ASPECTS) @Optional("[]") @Nonnull String[] aspectNames, + @QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames, @QueryParam(PARAM_FILTER) @Optional @Nullable Filter filter, @QueryParam(PARAM_SORT) @Optional @Nullable SortCriterion sortCriterion) { @@ -91,7 +91,7 @@ public Task> getAll( @Nonnull public Task> search( @QueryParam(PARAM_INPUT) @Nonnull String input, - @QueryParam(PARAM_ASPECTS) @Optional("[]") @Nonnull String[] aspectNames, + @QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames, @QueryParam(PARAM_FILTER) @Optional @Nullable Filter filter, @QueryParam(PARAM_SORT) @Optional @Nullable SortCriterion sortCriterion, @PagingContextParam @Nonnull PagingContext pagingContext) { diff --git a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseSingleAspectEntitySimpleKeyResource.java b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseSingleAspectEntitySimpleKeyResource.java index afd320c0d948e..5ba2546f177bd 100644 --- a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseSingleAspectEntitySimpleKeyResource.java +++ b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseSingleAspectEntitySimpleKeyResource.java @@ -5,8 +5,18 @@ import com.linkedin.data.template.RecordTemplate; import com.linkedin.data.template.UnionTemplate; import com.linkedin.metadata.dao.AspectKey; +import com.linkedin.metadata.dao.ListResult; +import com.linkedin.metadata.dao.utils.ModelUtils; import com.linkedin.metadata.dao.utils.RecordUtils; +import com.linkedin.metadata.query.ListResultMetadata; +import com.linkedin.metadata.validator.ValidationUtils; +import com.linkedin.parseq.Task; +import com.linkedin.restli.server.CollectionResult; +import com.linkedin.restli.server.PagingContext; +import com.linkedin.restli.server.annotations.PagingContextParam; +import com.linkedin.restli.server.annotations.RestMethod; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -43,11 +53,9 @@ public abstract class BaseSingleAspectEntitySimpleKeyResource< /** * Constructor. - * */ - public BaseSingleAspectEntitySimpleKeyResource( - @Nonnull Class aspectClass, - @Nonnull Class aspectUnionClass, - @Nonnull Class valueClass, + */ + public BaseSingleAspectEntitySimpleKeyResource(@Nonnull Class aspectClass, + @Nonnull Class aspectUnionClass, @Nonnull Class valueClass, @Nonnull Class snapshotClass) { super(aspectUnionClass, snapshotClass); @@ -62,7 +70,7 @@ public BaseSingleAspectEntitySimpleKeyResource( * @param partialEntity the partial entity. * @param urn urn of the entity. * @return the complete entity. - * */ + */ @Nonnull protected abstract VALUE createEntity(@Nonnull VALUE partialEntity, @Nonnull URN urn); @@ -70,20 +78,18 @@ public BaseSingleAspectEntitySimpleKeyResource( * Override {@link BaseEntitySimpleKeyResource}'s method to override the default logic of returning entity values * for each urn. The base classes assumes that the aspects are fields in the entity value whereas in this class * the aspect is included in the value. - * */ + */ @Override @Nonnull - protected Map getUrnEntityMap( - @Nonnull Collection urns, + protected Map getUrnEntityMap(@Nonnull Collection urns, @Nonnull Set> aspectClasses) { return getUrnEntityMapInternal(urns); } @Nonnull private Map getUrnEntityMapInternal(@Nonnull Collection urns) { - final Set> aspectKeys = urns.stream() - .map(urn -> new AspectKey<>(_aspectClass, urn, LATEST_VERSION)) - .collect(Collectors.toSet()); + final Set> aspectKeys = + urns.stream().map(urn -> new AspectKey<>(_aspectClass, urn, LATEST_VERSION)).collect(Collectors.toSet()); final Map, Optional> aspectKeyOptionalAspects = getLocalDAO().get(aspectKeys); @@ -91,20 +97,18 @@ private Map getUrnEntityMapInternal(@Nonnull Collection urns) { return aspectKeyOptionalAspects.entrySet() .stream() .filter(entry -> entry.getValue().isPresent()) - .collect(Collectors.toMap( - entry -> entry.getKey().getUrn(), - entry -> { - final URN urn = entry.getKey().getUrn(); - @SuppressWarnings("unchecked") - ASPECT aspect = (ASPECT) entry.getValue().get(); - return createEntity(createPartialEntityFromAspect(aspect), urn); - })); + .collect(Collectors.toMap(entry -> entry.getKey().getUrn(), entry -> { + final URN urn = entry.getKey().getUrn(); + @SuppressWarnings("unchecked") + ASPECT aspect = (ASPECT) entry.getValue().get(); + return createEntity(createPartialEntityFromAspect(aspect), urn); + })); } - /*** + /** * Creates a partial entity value from the aspect. The other fields in the value are set using * the {@link #createEntity(ASPECT, URN)} method. - * */ + */ @Nonnull private VALUE createPartialEntityFromAspect(@Nonnull ASPECT aspect) { try { @@ -121,7 +125,7 @@ private VALUE createPartialEntityFromAspect(@Nonnull ASPECT aspect) { * Throwing an exception with a `not implemented` error message as this method is only required * by parent class {@link BaseEntitySimpleKeyResource} method- {@link #getUrnEntityMap(Collection, Set)}, * which has been overridden here. - * */ + */ @Override @Nonnull protected VALUE toValue(@Nonnull SNAPSHOT snapshot) { @@ -132,10 +136,43 @@ protected VALUE toValue(@Nonnull SNAPSHOT snapshot) { * Throwing an exception with a `not implemented` error message as this method is only required * by parent class {@link BaseEntitySimpleKeyResource} method- {@link #getUrnEntityMap(Collection, Set)}, * which has been overridden here. - * */ + */ @Override @Nonnull protected SNAPSHOT toSnapshot(@Nonnull VALUE value, @Nonnull URN urn) { throw new RuntimeException("Not implemented."); } + + /** + * Gets all {@link VALUE} objects from DB for an entity with single aspect + * Warning: this works only if the aspect is not shared with other entities. + * + * It paginates over the latest version of a specific aspect for all Urns + * By default the list is sorted in ascending order of urn + * + * @param pagingContext Paging context. + * @return collection of latest resource(s). + */ + @RestMethod.GetAll + @Nonnull + public Task> getAllWithMetadata( + @PagingContextParam @Nonnull PagingContext pagingContext) { + + if (ModelUtils.isCommonAspect(_aspectClass)) { + ValidationUtils.invalidSchema("Aspect '%s' is a common aspect that could be shared between multiple entities." + + "Please use BaseSingleAspectSearchableEntitySimpleKeyResource's GetAll method instead", + _aspectClass.getCanonicalName()); + } + + return RestliUtils.toTask(() -> { + + final ListResult aspects = + getLocalDAO().list(_aspectClass, pagingContext.getStart(), pagingContext.getCount()); + + final List entities = + aspects.getValues().stream().map(this::createPartialEntityFromAspect).collect(Collectors.toList()); + + return new CollectionResult<>(entities, aspects.getMetadata()); + }); + } } diff --git a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/RestliConstants.java b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/RestliConstants.java index 5e28421ba7ddf..e83ab72ec499c 100644 --- a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/RestliConstants.java +++ b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/RestliConstants.java @@ -7,10 +7,12 @@ private RestliConstants() { } public static final String ACTION_AUTOCOMPLETE = "autocomplete"; public static final String ACTION_BACKFILL = "backfill"; + public static final String ACTION_BATCH_BACKFILL = "batchBackfill"; public static final String ACTION_BROWSE = "browse"; public static final String ACTION_GET_BROWSE_PATHS = "getBrowsePaths"; public static final String ACTION_GET_SNAPSHOT = "getSnapshot"; public static final String ACTION_INGEST = "ingest"; + public static final String ACTION_LIST_URNS_FROM_INDEX = "listUrnsFromIndex"; public static final String PARAM_INPUT = "input"; public static final String PARAM_ASPECTS = "aspects"; @@ -23,4 +25,5 @@ private RestliConstants() { } public static final String PARAM_LIMIT = "limit"; public static final String PARAM_SNAPSHOT = "snapshot"; public static final String PARAM_URN = "urn"; + public static final String PARAM_URNS = "urns"; } diff --git a/metadata-restli-resource/src/test/java/com/linkedin/metadata/restli/BaseEntityResourceTest.java b/metadata-restli-resource/src/test/java/com/linkedin/metadata/restli/BaseEntityResourceTest.java index 7b54a63cd4739..ca1e42dfe3d82 100644 --- a/metadata-restli-resource/src/test/java/com/linkedin/metadata/restli/BaseEntityResourceTest.java +++ b/metadata-restli-resource/src/test/java/com/linkedin/metadata/restli/BaseEntityResourceTest.java @@ -6,8 +6,12 @@ import com.linkedin.data.template.RecordTemplate; import com.linkedin.metadata.dao.AspectKey; import com.linkedin.metadata.dao.BaseLocalDAO; +import com.linkedin.metadata.dao.ListResult; import com.linkedin.metadata.dao.utils.ModelUtils; import com.linkedin.metadata.dao.utils.RecordUtils; +import com.linkedin.metadata.query.IndexCriterion; +import com.linkedin.metadata.query.IndexCriterionArray; +import com.linkedin.metadata.query.IndexFilter; import com.linkedin.parseq.BaseEngineTest; import com.linkedin.restli.common.HttpStatus; import com.linkedin.restli.server.ResourceContext; @@ -19,6 +23,7 @@ import com.linkedin.testing.EntityKey; import com.linkedin.testing.EntitySnapshot; import com.linkedin.testing.EntityValue; +import com.linkedin.testing.urn.FooUrn; import java.net.URISyntaxException; import java.util.Arrays; import java.util.Collections; @@ -46,7 +51,7 @@ public class BaseEntityResourceTest extends BaseEngineTest { class TestResource extends BaseEntityResource { public TestResource() { - super(EntitySnapshot.class, EntityAspectUnion.class); + super(EntitySnapshot.class, EntityAspectUnion.class, Urn.class); } @Nonnull @@ -128,7 +133,7 @@ public void testGet() { when(_mockLocalDAO.get(new HashSet<>(Arrays.asList(aspect1Key, aspect2Key)))).thenReturn( Collections.singletonMap(aspect1Key, Optional.of(foo))); - EntityValue value = runAndWait(_resource.get(makeResourceKey(urn), new String[0])); + EntityValue value = runAndWait(_resource.get(makeResourceKey(urn), null)); assertEquals(value.getFoo(), foo); assertFalse(value.hasBar()); @@ -201,7 +206,7 @@ public void testBatchGet() { ImmutableMap.of(aspectFooKey1, Optional.of(foo), aspectFooKey2, Optional.of(bar))); Map keyValueMap = runAndWait( - _resource.batchGet(ImmutableSet.of(makeResourceKey(urn1), makeResourceKey(urn2)), new String[0])).entrySet() + _resource.batchGet(ImmutableSet.of(makeResourceKey(urn1), makeResourceKey(urn2)), null)).entrySet() .stream() .collect(Collectors.toMap(e -> e.getKey().getKey(), e -> e.getValue())); @@ -283,7 +288,7 @@ public void testGetSnapshotWithAllAspects() { Set> aspectKeys = ImmutableSet.of(fooKey, barKey); when(_mockLocalDAO.get(aspectKeys)).thenReturn(ImmutableMap.of(fooKey, Optional.of(foo), barKey, Optional.of(bar))); - EntitySnapshot snapshot = runAndWait(_resource.getSnapshot(urn.toString(), new String[0])); + EntitySnapshot snapshot = runAndWait(_resource.getSnapshot(urn.toString(), null)); assertEquals(snapshot.getUrn(), urn); @@ -323,7 +328,7 @@ public void testBackfillAllAspects() { when(_mockLocalDAO.backfill(AspectFoo.class, urn)).thenReturn(Optional.of(foo)); when(_mockLocalDAO.backfill(AspectBar.class, urn)).thenReturn(Optional.of(bar)); - String[] backfilledAspects = runAndWait(_resource.backfill(urn.toString(), new String[0])); + String[] backfilledAspects = runAndWait(_resource.backfill(urn.toString(), null)); assertEquals(ImmutableSet.copyOf(backfilledAspects), ImmutableSet.of(ModelUtils.getAspectName(AspectFoo.class), ModelUtils.getAspectName(AspectBar.class))); @@ -340,4 +345,65 @@ public void testBackfillWithInvalidUrn() { fail("No exception thrown"); } + + @Test + public void testBatchBackfill() { + Urn urn1 = makeUrn(1); + Urn urn2 = makeUrn(2); + + runAndWait(_resource.batchBackfill(new String[]{urn1.toString(), urn2.toString()}, + new String[] {"com.linkedin.testing.AspectFoo", "com.linkedin.testing.AspectBar"})); + + verify(_mockLocalDAO, times(1)) + .backfill(ImmutableSet.of(AspectFoo.class, AspectBar.class), ImmutableSet.of(urn1, urn2)); + } + + @Test + public void testListUrnsFromIndex() { + // case 1: indexFilter is non-null + IndexCriterion indexCriterion1 = new IndexCriterion().setAspect("aspect1"); + IndexFilter indexFilter1 = new IndexFilter().setCriteria(new IndexCriterionArray(indexCriterion1)); + FooUrn urn1 = makeFooUrn(1); + FooUrn urn2 = makeFooUrn(2); + FooUrn urn3 = makeFooUrn(3); + List urns1 = Arrays.asList(urn2, urn3); + ListResult listResult1 = ListResult.builder().values(urns1).totalCount(100).build(); + + when(_mockLocalDAO.listUrns(indexFilter1, urn1, 2)).thenReturn(listResult1); + String[] actual = runAndWait(_resource.listUrnsFromIndex(indexFilter1, urn1.toString(), 2)); + assertEquals(actual, new String[] {urn2.toString(), urn3.toString()}); + + // case 2: indexFilter is null + IndexCriterion indexCriterion2 = new IndexCriterion().setAspect("com.linkedin.common.urn.Urn"); + IndexFilter indexFilter2 = new IndexFilter().setCriteria(new IndexCriterionArray(indexCriterion2)); + when(_mockLocalDAO.listUrns(indexFilter2, urn1, 2)).thenReturn(listResult1); + actual = runAndWait(_resource.listUrnsFromIndex(null, urn1.toString(), 2)); + assertEquals(actual, new String[] {urn2.toString(), urn3.toString()}); + + // case 3: lastUrn is null + List urns3 = Arrays.asList(urn1, urn2); + ListResult listResult3 = ListResult.builder().values(urns3).totalCount(100).build(); + when(_mockLocalDAO.listUrns(indexFilter2, null, 2)).thenReturn(listResult3); + actual = runAndWait(_resource.listUrnsFromIndex(null, null, 2)); + assertEquals(actual, new String[] {urn1.toString(), urn2.toString()}); + } + + @Test + public void testParseAspectsParam() { + // Only 1 aspect + Set> aspectClasses = _resource + .parseAspectsParam(new String[] {AspectFoo.class.getCanonicalName()}); + assertEquals(aspectClasses.size(), 1); + assertTrue(aspectClasses.contains(AspectFoo.class)); + + // No aspect + aspectClasses = _resource.parseAspectsParam(new String[] {}); + assertEquals(aspectClasses.size(), 0); + + // All aspects + aspectClasses = _resource.parseAspectsParam(null); + assertEquals(aspectClasses.size(), 2); + assertTrue(aspectClasses.contains(AspectFoo.class)); + assertTrue(aspectClasses.contains(AspectBar.class)); + } } diff --git a/metadata-restli-resource/src/test/java/com/linkedin/metadata/restli/BaseEntitySimpleKeyResourceTest.java b/metadata-restli-resource/src/test/java/com/linkedin/metadata/restli/BaseEntitySimpleKeyResourceTest.java index e4f20d5702ae0..3a023ce98a0cf 100644 --- a/metadata-restli-resource/src/test/java/com/linkedin/metadata/restli/BaseEntitySimpleKeyResourceTest.java +++ b/metadata-restli-resource/src/test/java/com/linkedin/metadata/restli/BaseEntitySimpleKeyResourceTest.java @@ -59,7 +59,7 @@ public void testGet() { when(_mockLocalDAO.get(new HashSet<>(Arrays.asList(aspect1Key, aspect2Key)))) .thenReturn(Collections.singletonMap(aspect1Key, Optional.of(foo))); - EntityValue value = runAndWait(_resource.get(id, new String[0])); + EntityValue value = runAndWait(_resource.get(id, null)); assertEquals(value.getFoo(), foo); assertFalse(value.hasBar()); @@ -134,7 +134,7 @@ public void testBatchGet() { when(_mockLocalDAO.get(ImmutableSet.of(aspectFooKey1, aspectBarKey1, aspectFooKey2, aspectBarKey2))).thenReturn( ImmutableMap.of(aspectFooKey1, Optional.of(foo), aspectFooKey2, Optional.of(bar))); - Map keyValueMap = runAndWait(_resource.batchGet(ImmutableSet.of(id1, id2), new String[0])) + Map keyValueMap = runAndWait(_resource.batchGet(ImmutableSet.of(id1, id2), null)) .entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); @@ -204,7 +204,7 @@ public void testGetSnapshotWithAllAspects() { Set> aspectKeys = ImmutableSet.of(fooKey, barKey); when(_mockLocalDAO.get(aspectKeys)).thenReturn(ImmutableMap.of(fooKey, Optional.of(foo), barKey, Optional.of(bar))); - EntitySnapshot snapshot = runAndWait(_resource.getSnapshot(urn.toString(), new String[0])); + EntitySnapshot snapshot = runAndWait(_resource.getSnapshot(urn.toString(), null)); assertEquals(snapshot.getUrn(), urn); @@ -241,7 +241,7 @@ public void testBackfillAllAspects() { when(_mockLocalDAO.backfill(AspectFoo.class, urn)).thenReturn(Optional.of(foo)); when(_mockLocalDAO.backfill(AspectBar.class, urn)).thenReturn(Optional.of(bar)); - String[] backfilledAspects = runAndWait(_resource.backfill(urn.toString(), new String[0])); + String[] backfilledAspects = runAndWait(_resource.backfill(urn.toString(), null)); assertEquals(ImmutableSet.copyOf(backfilledAspects), ImmutableSet.of(ModelUtils.getAspectName(AspectFoo.class), ModelUtils.getAspectName(AspectBar.class))); diff --git a/metadata-restli-resource/src/test/java/com/linkedin/metadata/restli/BaseSingleAspectEntitySimpleKeyResourceTest.java b/metadata-restli-resource/src/test/java/com/linkedin/metadata/restli/BaseSingleAspectEntitySimpleKeyResourceTest.java index 9a58e3884dda2..6377ffcc94279 100644 --- a/metadata-restli-resource/src/test/java/com/linkedin/metadata/restli/BaseSingleAspectEntitySimpleKeyResourceTest.java +++ b/metadata-restli-resource/src/test/java/com/linkedin/metadata/restli/BaseSingleAspectEntitySimpleKeyResourceTest.java @@ -1,11 +1,20 @@ package com.linkedin.metadata.restli; +import com.google.common.collect.ImmutableList; +import com.linkedin.common.AuditStamp; +import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; import com.linkedin.metadata.dao.AspectKey; import com.linkedin.metadata.dao.BaseLocalDAO; +import com.linkedin.metadata.dao.ListResult; import com.linkedin.metadata.dao.utils.ModelUtils; +import com.linkedin.metadata.query.ExtraInfo; +import com.linkedin.metadata.query.ExtraInfoArray; +import com.linkedin.metadata.query.ListResultMetadata; import com.linkedin.parseq.BaseEngineTest; import com.linkedin.restli.common.HttpStatus; +import com.linkedin.restli.server.CollectionResult; +import com.linkedin.restli.server.PagingContext; import com.linkedin.restli.server.ResourceContext; import com.linkedin.restli.server.RestLiServiceException; import com.linkedin.testing.AspectBar; @@ -27,9 +36,11 @@ import org.testng.annotations.Test; import static com.linkedin.metadata.dao.BaseReadDAO.LATEST_VERSION; +import static com.linkedin.metadata.utils.TestUtils.*; +import static com.linkedin.testing.TestUtils.*; import static org.mockito.Mockito.*; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; +import static org.testng.Assert.*; + public class BaseSingleAspectEntitySimpleKeyResourceTest extends BaseEngineTest { @@ -50,8 +61,8 @@ public void testGet() throws URISyntaxException { SingleAspectEntityUrn urn = new SingleAspectEntityUrn(id1); AspectBar aspect = new AspectBar().setValue(field1); AspectKey aspectKey = new AspectKey<>(AspectBar.class, urn, LATEST_VERSION); - when(_mockLocalDao.get(Collections.singleton(aspectKey))) - .thenReturn(Collections.singletonMap(aspectKey, Optional.of(aspect))); + when(_mockLocalDao.get(Collections.singleton(aspectKey))).thenReturn( + Collections.singletonMap(aspectKey, Optional.of(aspect))); EntityValue result = runAndWait(_resource.get(id1, new String[0])); @@ -76,8 +87,10 @@ public void testBatchGet() throws URISyntaxException { AspectBar aspect2 = new AspectBar().setValue(field11); AspectKey aspectKey2 = new AspectKey<>(AspectBar.class, urn2, LATEST_VERSION); - Set> keys = new HashSet<>(Arrays.asList(aspectKey1, aspectKey2)); - Map, Optional> keyAspectMap = new HashMap<>(); + Set> keys = + new HashSet<>(Arrays.asList(aspectKey1, aspectKey2)); + Map, Optional> keyAspectMap = + new HashMap<>(); keyAspectMap.put(aspectKey1, Optional.of(aspect1)); keyAspectMap.put(aspectKey2, Optional.of(aspect2)); @@ -98,8 +111,7 @@ public void testGetNotFound() throws URISyntaxException { SingleAspectEntityUrn urn = new SingleAspectEntityUrn(id1); AspectKey aspectKey = new AspectKey<>(AspectBar.class, urn, LATEST_VERSION); - when(_mockLocalDao.get(Collections.singleton(aspectKey))) - .thenReturn(Collections.emptyMap()); + when(_mockLocalDao.get(Collections.singleton(aspectKey))).thenReturn(Collections.emptyMap()); try { runAndWait(_resource.get(id1, new String[0])); @@ -114,10 +126,9 @@ public void testIngest() throws URISyntaxException { String field1 = "foo"; SingleAspectEntityUrn urn = new SingleAspectEntityUrn(id1); - AspectBar aspect = new AspectBar() - .setValue(field1); - List aspectUnions = Collections.singletonList( - ModelUtils.newAspectUnion(EntityAspectUnion.class, aspect)); + AspectBar aspect = new AspectBar().setValue(field1); + List aspectUnions = + Collections.singletonList(ModelUtils.newAspectUnion(EntityAspectUnion.class, aspect)); EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspectUnions); @@ -134,16 +145,15 @@ public void testGetSnapshot() throws URISyntaxException { int field2 = 1000; SingleAspectEntityUrn urn = new SingleAspectEntityUrn(id1); - AspectBar aspect = new AspectBar() - .setValue(field1); - List aspectUnions = Collections.singletonList( - ModelUtils.newAspectUnion(EntityAspectUnion.class, aspect)); + AspectBar aspect = new AspectBar().setValue(field1); + List aspectUnions = + Collections.singletonList(ModelUtils.newAspectUnion(EntityAspectUnion.class, aspect)); AspectKey aspectKey = new AspectKey<>(AspectBar.class, urn, LATEST_VERSION); - when(_mockLocalDao.get(Collections.singleton(aspectKey))) - .thenReturn(Collections.singletonMap(aspectKey, Optional.of(aspect))); + when(_mockLocalDao.get(Collections.singleton(aspectKey))).thenReturn( + Collections.singletonMap(aspectKey, Optional.of(aspect))); - EntitySnapshot resultSnapshot = runAndWait(_resource.getSnapshot(urn.toString(), new String[0])); + EntitySnapshot resultSnapshot = runAndWait(_resource.getSnapshot(urn.toString(), null)); assertEquals(resultSnapshot, ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspectUnions)); } @@ -168,7 +178,7 @@ public void testBackfill() throws URISyntaxException { SingleAspectEntityUrn urn = new SingleAspectEntityUrn(id1); - runAndWait(_resource.backfill(urn.toString(), new String[0])); + runAndWait(_resource.backfill(urn.toString(), null)); verify(_mockLocalDao, times(1)).backfill(eq(AspectBar.class), eq(urn)); verifyNoMoreInteractions(_mockLocalDao); } @@ -182,11 +192,32 @@ public void testBackfillWithInvalidUrn() { } } + @Test + public void testGetAll() { + List bars = ImmutableList.of(new AspectBar().setValue("e1"), new AspectBar().setValue("e2")); + ExtraInfo extraInfo1 = makeExtraInfo(makeUrn(1), LATEST_VERSION, makeAuditStamp("bar1")); + ExtraInfo extraInfo2 = makeExtraInfo(makeUrn(2), LATEST_VERSION, makeAuditStamp("bar2")); + ListResultMetadata listResultMetadata = + new ListResultMetadata().setExtraInfos(new ExtraInfoArray(ImmutableList.of(extraInfo1, extraInfo2))); + ListResult listResult = ListResult.builder().values(bars).metadata(listResultMetadata).build(); + + PagingContext pagingContext = new PagingContext(0, 2); + when(_mockLocalDao.list(AspectBar.class, pagingContext.getStart(), pagingContext.getCount())).thenReturn(listResult); + + CollectionResult entities = runAndWait(_resource.getAllWithMetadata(pagingContext)); + assertEquals(entities.getElements(), bars); + assertEquals(entities.getMetadata(), listResultMetadata); + } + + private ExtraInfo makeExtraInfo(Urn urn, Long version, AuditStamp audit) { + return new ExtraInfo().setUrn(urn).setVersion(version).setAudit(audit); + } + /** * Test implementation of BaseSingleAspectEntitySimpleKeyResource. * */ - private class TestResource extends BaseSingleAspectEntitySimpleKeyResource< - Long, EntityValue, SingleAspectEntityUrn, AspectBar, EntityAspectUnion, EntitySnapshot> { + private class TestResource extends + BaseSingleAspectEntitySimpleKeyResource { TestResource() { super(AspectBar.class, EntityAspectUnion.class, EntityValue.class, EntitySnapshot.class); diff --git a/metadata-testing/metadata-test-models/src/main/java/com/linkedin/testing/urn/BarUrn.java b/metadata-testing/metadata-test-models/src/main/java/com/linkedin/testing/urn/BarUrn.java index 441fda8254288..4845aa4b4b97a 100644 --- a/metadata-testing/metadata-test-models/src/main/java/com/linkedin/testing/urn/BarUrn.java +++ b/metadata-testing/metadata-test-models/src/main/java/com/linkedin/testing/urn/BarUrn.java @@ -7,12 +7,26 @@ public final class BarUrn extends Urn { public static final String ENTITY_TYPE = "entityBar"; + // Can be obtained via getEntityKey, but not in open source. We need to unify the internal / external URN definitions. + private final int _id; public BarUrn(int id) throws URISyntaxException { super(ENTITY_TYPE, Integer.toString(id)); + this._id = id; } - public static BarUrn createFromString(String rawUrn) throws URISyntaxException { - return new BarUrn(Urn.createFromString(rawUrn).getIdAsInt()); + public int getBarIdEntity() { + return _id; + } + + @Override + public boolean equals(Object obj) { + // Override for find bugs, bug delegate to super implementation, both in open source and internally. + return super.equals(obj); + } + + @Override + public int hashCode() { + return super.hashCode(); } } diff --git a/metadata-testing/metadata-test-models/src/main/java/com/linkedin/testing/urn/BazUrn.java b/metadata-testing/metadata-test-models/src/main/java/com/linkedin/testing/urn/BazUrn.java index 916ffd247b713..58f5b277a0c52 100644 --- a/metadata-testing/metadata-test-models/src/main/java/com/linkedin/testing/urn/BazUrn.java +++ b/metadata-testing/metadata-test-models/src/main/java/com/linkedin/testing/urn/BazUrn.java @@ -7,12 +7,26 @@ public final class BazUrn extends Urn { public static final String ENTITY_TYPE = "entityBaz"; + // Can be obtained via getEntityKey, but not in open source. We need to unify the internal / external URN definitions. + private final int _id; public BazUrn(int id) throws URISyntaxException { super(ENTITY_TYPE, Integer.toString(id)); + this._id = id; } - public static BazUrn createFromString(String rawUrn) throws URISyntaxException { - return new BazUrn(Urn.createFromString(rawUrn).getIdAsInt()); + public int getBazIdEntity() { + return _id; + } + + @Override + public boolean equals(Object obj) { + // Override for find bugs, bug delegate to super implementation, both in open source and internally. + return super.equals(obj); + } + + @Override + public int hashCode() { + return super.hashCode(); } } diff --git a/metadata-testing/metadata-test-models/src/main/java/com/linkedin/testing/urn/FooUrn.java b/metadata-testing/metadata-test-models/src/main/java/com/linkedin/testing/urn/FooUrn.java index c07df418097b0..ec80058748472 100644 --- a/metadata-testing/metadata-test-models/src/main/java/com/linkedin/testing/urn/FooUrn.java +++ b/metadata-testing/metadata-test-models/src/main/java/com/linkedin/testing/urn/FooUrn.java @@ -7,12 +7,26 @@ public final class FooUrn extends Urn { public static final String ENTITY_TYPE = "entityFoo"; + // Can be obtained via getEntityKey, but not in open source. We need to unify the internal / external URN definitions. + private final int _id; public FooUrn(int id) throws URISyntaxException { super(ENTITY_TYPE, Integer.toString(id)); + this._id = id; } - public static FooUrn createFromString(String rawUrn) throws URISyntaxException { - return new FooUrn(Urn.createFromString(rawUrn).getIdAsInt()); + public int getFooIdEntity() { + return _id; + } + + @Override + public boolean equals(Object obj) { + // Override for find bugs, bug delegate to super implementation, both in open source and internally. + return super.equals(obj); + } + + @Override + public int hashCode() { + return super.hashCode(); } } diff --git a/metadata-testing/metadata-test-models/src/main/pegasus/com/linkedin/testing/BarSearchDocument.pdl b/metadata-testing/metadata-test-models/src/main/pegasus/com/linkedin/testing/BarSearchDocument.pdl new file mode 100644 index 0000000000000..01315df5f5475 --- /dev/null +++ b/metadata-testing/metadata-test-models/src/main/pegasus/com/linkedin/testing/BarSearchDocument.pdl @@ -0,0 +1,16 @@ +namespace com.linkedin.testing + +/** + * For unit testing + */ +record BarSearchDocument { + /** + * Urn of the entity. + */ + urn: BarUrn + + /** + * Value of th entity. + */ + value: optional string +} \ No newline at end of file