Skip to content

Commit

Permalink
Update metadata-models to head! (datahub-project#1811)
Browse files Browse the repository at this point in the history
metadata-models 80.0.0 -> 90.0.13:

   90.0.13: Roll forward: Fix the open source build by avoiding URN method that isn't part of the open source URN.
    90.0.2: Refactor listUrnsFromIndex method
    90.0.0: Start distinguishing between [] aspects vs null aspects input param
    89.0.4: Fix the open source build by avoiding URN method that isn't part of the open source URN.
    89.0.2: fix some test case name
    89.0.0: META-12686: Made the MXE_v5 topics become strictly ACL'ed to avoid the wildcard write ACL as "MetadataXEvent.+"
    88.0.6: change DAO to take Storage Config as input
    88.0.3: Add a comment on lack of avro generation for MXEv5 + add MXEv5 to the pegasus validation task.
   87.0.15: META-12651: Integrate the metadata-models-ext with metadata-models
   87.0.13: add StorageConfig to Local DAO
    87.0.3: Treat empty aspect vs optional aspect same until all clients are migrated
    87.0.2: Treat empty aspect vs optional aspect differently
    87.0.1: META-12533: Skip processing unregistered aspect specific MAE.
    83.0.6: action method to return list of urns from strong consistent index
    83.0.4: Change input param type for batch backfill
    83.0.3: Implement batch backfill
    83.0.1: Implement support for OR filter in browse query
   82.0.10: Throw UnsupportedOperationException for unsupported condition types in search filter
    82.0.6: Implement local secondary backfilling index as part of backfill method
    82.0.5: [strongly consistent index] implement getUrns method
    82.0.4: Add indexing urn fields to the local secondary index
    82.0.0: Render Delta fiels in the MCE_v5.
    81.0.1: Add pegasus to avro conversion for FMCE
    80.0.4: add get all support for BaseSingleAspectEntitySimpleKeyResource
    80.0.2: Add a BaseSearchWriterDAO with an ESBulkWriterDAO implementation.
    80.0.1: META-12254: Produce aspect specific MAE with always emit option
    80.0.0: Convert getNodesInTraversedPath to getSubgraph to return complete view of the subgraph (nodes+edges)
  • Loading branch information
John Plaisted authored and arunvasudevan committed Sep 10, 2020
1 parent 1863800 commit 17b8161
Show file tree
Hide file tree
Showing 56 changed files with 1,593 additions and 185 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Expand Up @@ -35,6 +35,7 @@ project.ext.externalDependency = [
'ebean': 'io.ebean:ebean:11.33.3',
'ebeanAgent': 'io.ebean:ebean-agent:11.27.1',
'elasticSearchRest': 'org.elasticsearch.client:elasticsearch-rest-high-level-client:5.6.8',
'elasticSearchTransport': 'org.elasticsearch.client:transport:5.6.8',
'findbugsAnnotations': 'com.google.code.findbugs:annotations:3.0.1',
'guice': 'com.google.inject:guice:4.2.2',
'guava': 'com.google.guava:guava:27.0.1-jre',
Expand Down Expand Up @@ -62,6 +63,7 @@ project.ext.externalDependency = [
'parseqTest': 'com.linkedin.parseq:parseq:3.0.7:test',
'playDocs': 'com.typesafe.play:play-docs_2.11:2.6.18',
'playGuice': 'com.typesafe.play:play-guice_2.11:2.6.18',
'playJavaJdbc': 'com.typesafe.play:play-java-jdbc_2.11:2.6.18',
'playTest': 'com.typesafe.play:play-test_2.11:2.6.18',
'postgresql': 'org.postgresql:postgresql:42.2.14',
'reflections': 'org.reflections:reflections:0.9.11',
Expand All @@ -70,6 +72,7 @@ project.ext.externalDependency = [
'springBeans': 'org.springframework:spring-beans:5.2.3.RELEASE',
'springContext': 'org.springframework:spring-context:5.2.3.RELEASE',
'springCore': 'org.springframework:spring-core:5.2.3.RELEASE',
'springJdbc': 'org.springframework:spring-jdbc:5.2.3.RELEASE',
'springWeb': 'org.springframework:spring-web:5.2.3.RELEASE',
'springBootAutoconfigure': 'org.springframework.boot:spring-boot-autoconfigure:2.1.2.RELEASE',
'springBootStarterWeb': 'org.springframework.boot:spring-boot-starter-web:2.1.2.RELEASE',
Expand Down
Expand Up @@ -9,7 +9,6 @@
import com.linkedin.dataset.UpstreamLineage;
import com.linkedin.metadata.dao.BaseLocalDAO;
import com.linkedin.metadata.dao.BaseQueryDAO;
import com.linkedin.metadata.dao.utils.SearchUtils;
import com.linkedin.metadata.entity.DatasetEntity;
import com.linkedin.metadata.query.CriterionArray;
import com.linkedin.metadata.query.Filter;
Expand Down Expand Up @@ -62,7 +61,7 @@ public DownstreamLineageResource() {
@RestMethod.Get
public Task<DownstreamLineage> get(@PathKeysParam @Nonnull PathKeys keys) {
final DatasetUrn datasetUrn = getUrn(keys);
final Filter filter = SearchUtils.getFilter(Collections.singletonMap("upstreams", datasetUrn.toString()));
final Filter filter = newFilter(Collections.singletonMap("upstreams", datasetUrn.toString()));

return RestliUtils.toTask(() -> {
final List<DatasetUrn> downstreamDatasets = _queryDAO
Expand Down Expand Up @@ -97,4 +96,4 @@ private DatasetUrn getUrn(@PathKeysParam @Nonnull PathKeys keys) {
DatasetKey key = keys.<ComplexResourceKey<DatasetKey, EmptyRecord>>get(DATASET_KEY).getKey();
return new DatasetUrn(key.getPlatform(), key.getName(), key.getOrigin());
}
}
}
Expand Up @@ -48,4 +48,4 @@ public void testGetDocumentsToUpdateFromCorpGroupSnapshot() {
assertEquals(actualDocs.get(0).getGroups(), Collections.singletonList(groupName));
assertEquals(actualDocs.get(0).getEmail(), email);
}
}
}
Expand Up @@ -49,4 +49,4 @@ public void testGetDocumentsToUpdateFromCorpUserSnapshot() {
assertEquals(actualDocs.get(2).getSkills(), Collections.emptyList());
assertEquals(actualDocs.get(2).getTeams(), Arrays.asList("team1", "team2"));
}
}
}
Expand Up @@ -9,10 +9,15 @@
import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer;
import com.linkedin.metadata.dao.retention.TimeBasedRetention;
import com.linkedin.metadata.dao.retention.VersionBasedRetention;
import com.linkedin.metadata.dao.storage.LocalDAOStorageConfig;
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.dao.utils.RecordUtils;
import com.linkedin.metadata.query.ExtraInfo;
import com.linkedin.metadata.query.ExtraInfoArray;
import com.linkedin.metadata.query.IndexCriterion;
import com.linkedin.metadata.query.IndexCriterionArray;
import com.linkedin.metadata.query.IndexFilter;
import com.linkedin.metadata.query.IndexValue;
import com.linkedin.metadata.query.ListResultMetadata;
import io.ebean.DuplicateKeyException;
import io.ebean.EbeanServer;
Expand All @@ -25,8 +30,10 @@
import java.net.URISyntaxException;
import java.sql.Timestamp;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
Expand All @@ -35,8 +42,10 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.persistence.RollbackException;
import lombok.Value;

import static com.linkedin.metadata.dao.EbeanMetadataAspect.*;
import static com.linkedin.metadata.dao.utils.RegisteredUrnPathExtractors.*;


/**
Expand All @@ -46,18 +55,52 @@ public class EbeanLocalDAO<ASPECT_UNION extends UnionTemplate, URN extends Urn>
extends BaseLocalDAO<ASPECT_UNION, URN> {

private static final String EBEAN_MODEL_PACKAGE = EbeanMetadataAspect.class.getPackage().getName();
private static final String EBEAN_INDEX_PACKAGE = EbeanMetadataIndex.class.getPackage().getName();

protected final EbeanServer _server;

@Value
static class GMAIndexPair {
public String valueType;
public Object value;
}

/**
* Constructor for EbeanLocalDAO
*
* @param aspectUnionClass containing union of all supported aspects. Must be a valid aspect union defined in com.linkedin.metadata.aspect
* @param producer {@link BaseMetadataEventProducer} for the metadata event producer
* @param serverConfig {@link ServerConfig} that defines the configuration of EbeanServer instances
*/
public EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull BaseMetadataEventProducer producer,
@Nonnull ServerConfig serverConfig) {
super(aspectUnionClass, producer);
_server = createServer(serverConfig);
}

/**
* Constructor for EbeanLocalDAO
*
* @param producer {@link BaseMetadataEventProducer} for the metadata event producer
* @param serverConfig {@link ServerConfig} that defines the configuration of EbeanServer instances
* @param storageConfig {@link LocalDAOStorageConfig} containing storage config of full list of supported aspects
*/
public EbeanLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull ServerConfig serverConfig,
@Nonnull LocalDAOStorageConfig storageConfig) {
super(producer, storageConfig);
_server = createServer(serverConfig);
}

@Nonnull
private EbeanServer createServer(@Nonnull ServerConfig serverConfig) {
// Make sure that the serverConfig includes the package that contains DAO's Ebean model.
if (!serverConfig.getPackages().contains(EBEAN_MODEL_PACKAGE)) {
serverConfig.getPackages().add(EBEAN_MODEL_PACKAGE);
}
_server = EbeanServerFactory.create(serverConfig);
if (!serverConfig.getPackages().contains(EBEAN_INDEX_PACKAGE)) {
serverConfig.getPackages().add(EBEAN_INDEX_PACKAGE);
}
return EbeanServerFactory.create(serverConfig);
}

// For testing purpose
Expand All @@ -67,6 +110,12 @@ public EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull Bas
_server = server;
}

// For testing purpose
EbeanLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull EbeanServer server, @Nonnull LocalDAOStorageConfig storageConfig) {
super(producer, storageConfig);
_server = server;
}

/**
* Return the {@link EbeanServer} server instance used for customized queries.
*/
Expand Down Expand Up @@ -155,6 +204,18 @@ protected <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN urn, @Non
return largestVersion;
}

@Override
protected <ASPECT extends RecordTemplate> void saveToLocalSecondaryIndex(@Nonnull URN urn,
@Nonnull ASPECT newValue, long version) {

// Process and save URN
// Only do this with the first version of each aspect
if (version == FIRST_VERSION) {
processAndSaveUrnToLocalSecondaryIndex(urn);
}
processAndSaveAspectToLocalSecondaryIndex(urn, newValue);
}

@Override
@Nullable
protected <ASPECT extends RecordTemplate> AspectEntry<ASPECT> getLatest(@Nonnull URN urn,
Expand Down Expand Up @@ -192,6 +253,47 @@ protected void save(@Nonnull URN urn, @Nonnull RecordTemplate value, @Nonnull Au
}
}

protected long saveSingleRecordToLocalSecondaryIndex(@Nonnull URN urn, @Nonnull String aspect,
@Nonnull String path, @Nonnull Object value) {

final EbeanMetadataIndex record = new EbeanMetadataIndex()
.setUrn(urn.toString())
.setAspect(aspect)
.setPath(path);
if (value instanceof Integer || value instanceof Long) {
record.setLongVal(Long.valueOf(value.toString()));
} else if (value instanceof Float || value instanceof Double) {
record.setDoubleVal(Double.valueOf(value.toString()));
} else {
record.setStringVal(value.toString());
}

_server.insert(record);
return record.getId();
}

@Nonnull
Map<Class<? extends RecordTemplate>, LocalDAOStorageConfig.AspectStorageConfig> getStrongConsistentIndexPaths() {
return Collections.unmodifiableMap(new HashMap<>(_storageConfig.getAspectStorageConfigMap()));
}

protected void processAndSaveUrnToLocalSecondaryIndex(@Nonnull URN urn) {
if (existsInLocalSecondaryIndex(urn)) {
return;
}

final Map<String, Object> pathValueMap = getUrnPathExtractor(urn.getClass()).extractPaths(urn);
pathValueMap.forEach(
(path, value) -> saveSingleRecordToLocalSecondaryIndex(urn, urn.getClass().getCanonicalName(), path, value)
);
}

// TODO: Will be implemented later
protected <ASPECT extends RecordTemplate> void processAndSaveAspectToLocalSecondaryIndex(@Nonnull URN urn,
@Nullable ASPECT newValue) {

}

@Override
protected <ASPECT extends RecordTemplate> long getNextVersion(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass) {

Expand Down Expand Up @@ -248,6 +350,12 @@ protected <ASPECT extends RecordTemplate> void applyTimeBasedRetention(@Nonnull
.map(record -> toRecordTemplate(key.getAspectClass(), record))));
}

public boolean existsInLocalSecondaryIndex(@Nonnull URN urn) {
return _server.find(EbeanMetadataIndex.class)
.where().eq(URN_COLUMN, urn.toString())
.exists();
}

@Nonnull
private List<EbeanMetadataAspect> batchGet(@Nonnull Set<AspectKey<URN, ? extends RecordTemplate>> keys) {

Expand Down Expand Up @@ -379,15 +487,24 @@ public <ASPECT extends RecordTemplate> ListResult<ASPECT> list(@Nonnull Class<AS
}

@Nonnull
private static Urn extractUrn(@Nonnull EbeanMetadataAspect aspect) {
final String urn = aspect.getKey().getUrn();
private static Urn extractUrn(@Nonnull String urn) {
try {
return new Urn(urn);
} catch (URISyntaxException e) {
throw new ModelConversionException("Invalid URN: " + urn);
}
}

@Nonnull
private static Urn extractUrn(@Nonnull EbeanMetadataAspect aspect) {
return extractUrn(aspect.getKey().getUrn());
}

@Nonnull
private static Urn extractUrn(@Nonnull EbeanMetadataIndex index) {
return extractUrn(index.getUrn());
}

@Nonnull
private static <ASPECT extends RecordTemplate> ASPECT toRecordTemplate(@Nonnull Class<ASPECT> aspectClass,
@Nonnull EbeanMetadataAspect aspect) {
Expand All @@ -396,8 +513,8 @@ private static <ASPECT extends RecordTemplate> ASPECT toRecordTemplate(@Nonnull

@Nonnull
private <T> ListResult<T> toListResult(@Nonnull List<T> values, @Nullable ListResultMetadata listResultMetadata,
@Nonnull PagedList<?> pagedList, int start) {
final int nextStart = pagedList.hasNext() ? start + pagedList.getList().size() : ListResult.INVALID_NEXT_START;
@Nonnull PagedList<?> pagedList, @Nullable Integer start) {
final int nextStart = (start != null && pagedList.hasNext()) ? start.intValue() + pagedList.getList().size() : ListResult.INVALID_NEXT_START;
return ListResult.<T>builder()
// Format
.values(values)
Expand Down Expand Up @@ -460,4 +577,82 @@ public long newNumericId(@Nonnull String namespace, int maxTransactionRetry) {
return id;
}, maxTransactionRetry).getId();
}

@Nonnull
static GMAIndexPair getGMAIndexPair(@Nonnull IndexValue indexValue) {
final Object object;
if (indexValue.isBoolean()) {
object = indexValue.getBoolean().toString();
return new GMAIndexPair(EbeanMetadataIndex.STRING_COLUMN, object);
} else if (indexValue.isDouble()) {
object = indexValue.getDouble();
return new GMAIndexPair(EbeanMetadataIndex.DOUBLE_COLUMN, object);
} else if (indexValue.isFloat()) {
object = indexValue.getFloat();
return new GMAIndexPair(EbeanMetadataIndex.DOUBLE_COLUMN, object);
} else if (indexValue.isInt()) {
object = indexValue.getInt();
return new GMAIndexPair(EbeanMetadataIndex.LONG_COLUMN, object);
} else if (indexValue.isLong()) {
object = indexValue.getLong();
return new GMAIndexPair(EbeanMetadataIndex.LONG_COLUMN, object);
} else if (indexValue.isString()) {
object = indexValue.getString();
return new GMAIndexPair(EbeanMetadataIndex.STRING_COLUMN, object);
} else {
throw new IllegalArgumentException("Invalid index value " + indexValue);
}
}

/**
* Returns list of urns from strongly consistent secondary index that satisfy the given filter conditions.
* Results are sorted in increasing alphabetical order of urn.
* NOTE: Currently this works for only one filter condition
* TODO: Extend the support for multiple filter conditions
*
* @param indexFilter {@link IndexFilter} containing filter conditions to be applied
* @param lastUrn last urn of the previous fetched page. This eliminates the need to use offset which
* is known to slow down performance of MySQL queries. For the first page, this should be set as NULL
* @param pageSize maximum number of distinct urns to return
* @return {@link ListResult} of urns from strongly consistent secondary index that satisfy the given filter conditions
*/
@Override
@Nonnull
public ListResult<Urn> listUrns(@Nonnull IndexFilter indexFilter, @Nullable URN lastUrn, int pageSize) {
if (!isLocalSecondaryIndexEnabled()) {
throw new UnsupportedOperationException("Local secondary index isn't supported by EbeanLocalDAO");
}
final IndexCriterionArray indexCriterionArray = indexFilter.getCriteria();
if (indexCriterionArray.size() == 0) {
throw new UnsupportedOperationException("Empty Index Filter is not supported by EbeanLocalDAO");
}
if (indexCriterionArray.size() > 1) {
throw new UnsupportedOperationException("Currently only one filter condition is supported by EbeanLocalDAO");
}

final IndexCriterion criterion = indexCriterionArray.get(0);
ExpressionList<EbeanMetadataIndex> expressionList = _server.find(EbeanMetadataIndex.class)
.setDistinct(true)
.select(EbeanMetadataIndex.URN_COLUMN)
.where()
.gt(EbeanMetadataIndex.URN_COLUMN, lastUrn == null ? "" : lastUrn.toString())
.eq(EbeanMetadataIndex.ASPECT_COLUMN, criterion.getAspect());
if (criterion.hasPathParams()) {
final GMAIndexPair gmaIndexPair = getGMAIndexPair(criterion.getPathParams().getValue());
expressionList = expressionList
.eq(EbeanMetadataIndex.PATH_COLUMN, criterion.getPathParams().getPath())
.eq(gmaIndexPair.valueType, gmaIndexPair.value);
}
final PagedList<EbeanMetadataIndex> pagedList = expressionList
.orderBy()
.asc(EbeanMetadataIndex.URN_COLUMN)
.setMaxRows(pageSize)
.findPagedList();
final List<Urn> urns = pagedList.getList()
.stream()
.map(EbeanLocalDAO::extractUrn)
.filter(Objects::nonNull)
.collect(Collectors.toList());
return toListResult(urns, null, pagedList, null);
}
}
Expand Up @@ -11,6 +11,7 @@
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.experimental.Accessors;


@Getter
Expand All @@ -35,6 +36,7 @@
EbeanMetadataIndex.URN_COLUMN
})
@Entity
@Accessors(chain = true)
@Table(name = "metadata_index")
public class EbeanMetadataIndex extends Model {

Expand Down Expand Up @@ -74,5 +76,4 @@ public class EbeanMetadataIndex extends Model {

@Column(name = DOUBLE_COLUMN)
protected Double doubleVal;

}

0 comments on commit 17b8161

Please sign in to comment.