Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: Bulk push metadata-models changes #1811

Merged
merged 1 commit into from Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.gradle
Expand Up @@ -35,6 +35,7 @@ project.ext.externalDependency = [
'ebean': 'io.ebean:ebean:11.33.3',
'ebeanAgent': 'io.ebean:ebean-agent:11.27.1',
'elasticSearchRest': 'org.elasticsearch.client:elasticsearch-rest-high-level-client:5.6.8',
'elasticSearchTransport': 'org.elasticsearch.client:transport:5.6.8',
'findbugsAnnotations': 'com.google.code.findbugs:annotations:3.0.1',
'guice': 'com.google.inject:guice:4.2.2',
'guava': 'com.google.guava:guava:27.0.1-jre',
Expand Down Expand Up @@ -62,6 +63,7 @@ project.ext.externalDependency = [
'parseqTest': 'com.linkedin.parseq:parseq:3.0.7:test',
'playDocs': 'com.typesafe.play:play-docs_2.11:2.6.18',
'playGuice': 'com.typesafe.play:play-guice_2.11:2.6.18',
'playJavaJdbc': 'com.typesafe.play:play-java-jdbc_2.11:2.6.18',
'playTest': 'com.typesafe.play:play-test_2.11:2.6.18',
'postgresql': 'org.postgresql:postgresql:42.2.14',
'reflections': 'org.reflections:reflections:0.9.11',
Expand All @@ -70,6 +72,7 @@ project.ext.externalDependency = [
'springBeans': 'org.springframework:spring-beans:5.2.3.RELEASE',
'springContext': 'org.springframework:spring-context:5.2.3.RELEASE',
'springCore': 'org.springframework:spring-core:5.2.3.RELEASE',
'springJdbc': 'org.springframework:spring-jdbc:5.2.3.RELEASE',
'springWeb': 'org.springframework:spring-web:5.2.3.RELEASE',
'springBootAutoconfigure': 'org.springframework.boot:spring-boot-autoconfigure:2.1.2.RELEASE',
'springBootStarterWeb': 'org.springframework.boot:spring-boot-starter-web:2.1.2.RELEASE',
Expand Down
Expand Up @@ -9,7 +9,6 @@
import com.linkedin.dataset.UpstreamLineage;
import com.linkedin.metadata.dao.BaseLocalDAO;
import com.linkedin.metadata.dao.BaseQueryDAO;
import com.linkedin.metadata.dao.utils.SearchUtils;
import com.linkedin.metadata.entity.DatasetEntity;
import com.linkedin.metadata.query.CriterionArray;
import com.linkedin.metadata.query.Filter;
Expand Down Expand Up @@ -62,7 +61,7 @@ public DownstreamLineageResource() {
@RestMethod.Get
public Task<DownstreamLineage> get(@PathKeysParam @Nonnull PathKeys keys) {
final DatasetUrn datasetUrn = getUrn(keys);
final Filter filter = SearchUtils.getFilter(Collections.singletonMap("upstreams", datasetUrn.toString()));
final Filter filter = newFilter(Collections.singletonMap("upstreams", datasetUrn.toString()));

return RestliUtils.toTask(() -> {
final List<DatasetUrn> downstreamDatasets = _queryDAO
Expand Down Expand Up @@ -97,4 +96,4 @@ private DatasetUrn getUrn(@PathKeysParam @Nonnull PathKeys keys) {
DatasetKey key = keys.<ComplexResourceKey<DatasetKey, EmptyRecord>>get(DATASET_KEY).getKey();
return new DatasetUrn(key.getPlatform(), key.getName(), key.getOrigin());
}
}
}
Expand Up @@ -48,4 +48,4 @@ public void testGetDocumentsToUpdateFromCorpGroupSnapshot() {
assertEquals(actualDocs.get(0).getGroups(), Collections.singletonList(groupName));
assertEquals(actualDocs.get(0).getEmail(), email);
}
}
}
Expand Up @@ -49,4 +49,4 @@ public void testGetDocumentsToUpdateFromCorpUserSnapshot() {
assertEquals(actualDocs.get(2).getSkills(), Collections.emptyList());
assertEquals(actualDocs.get(2).getTeams(), Arrays.asList("team1", "team2"));
}
}
}
Expand Up @@ -9,10 +9,15 @@
import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer;
import com.linkedin.metadata.dao.retention.TimeBasedRetention;
import com.linkedin.metadata.dao.retention.VersionBasedRetention;
import com.linkedin.metadata.dao.storage.LocalDAOStorageConfig;
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.dao.utils.RecordUtils;
import com.linkedin.metadata.query.ExtraInfo;
import com.linkedin.metadata.query.ExtraInfoArray;
import com.linkedin.metadata.query.IndexCriterion;
import com.linkedin.metadata.query.IndexCriterionArray;
import com.linkedin.metadata.query.IndexFilter;
import com.linkedin.metadata.query.IndexValue;
import com.linkedin.metadata.query.ListResultMetadata;
import io.ebean.DuplicateKeyException;
import io.ebean.EbeanServer;
Expand All @@ -25,8 +30,10 @@
import java.net.URISyntaxException;
import java.sql.Timestamp;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
Expand All @@ -35,8 +42,10 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.persistence.RollbackException;
import lombok.Value;

import static com.linkedin.metadata.dao.EbeanMetadataAspect.*;
import static com.linkedin.metadata.dao.utils.RegisteredUrnPathExtractors.*;


/**
Expand All @@ -46,18 +55,52 @@ public class EbeanLocalDAO<ASPECT_UNION extends UnionTemplate, URN extends Urn>
extends BaseLocalDAO<ASPECT_UNION, URN> {

private static final String EBEAN_MODEL_PACKAGE = EbeanMetadataAspect.class.getPackage().getName();
private static final String EBEAN_INDEX_PACKAGE = EbeanMetadataIndex.class.getPackage().getName();

protected final EbeanServer _server;

@Value
static class GMAIndexPair {
public String valueType;
public Object value;
}

/**
* Constructor for EbeanLocalDAO
*
* @param aspectUnionClass containing union of all supported aspects. Must be a valid aspect union defined in com.linkedin.metadata.aspect
* @param producer {@link BaseMetadataEventProducer} for the metadata event producer
* @param serverConfig {@link ServerConfig} that defines the configuration of EbeanServer instances
*/
public EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull BaseMetadataEventProducer producer,
@Nonnull ServerConfig serverConfig) {
super(aspectUnionClass, producer);
_server = createServer(serverConfig);
}

/**
* Constructor for EbeanLocalDAO
*
* @param producer {@link BaseMetadataEventProducer} for the metadata event producer
* @param serverConfig {@link ServerConfig} that defines the configuration of EbeanServer instances
* @param storageConfig {@link LocalDAOStorageConfig} containing storage config of full list of supported aspects
*/
public EbeanLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull ServerConfig serverConfig,
@Nonnull LocalDAOStorageConfig storageConfig) {
super(producer, storageConfig);
_server = createServer(serverConfig);
}

@Nonnull
private EbeanServer createServer(@Nonnull ServerConfig serverConfig) {
// Make sure that the serverConfig includes the package that contains DAO's Ebean model.
if (!serverConfig.getPackages().contains(EBEAN_MODEL_PACKAGE)) {
serverConfig.getPackages().add(EBEAN_MODEL_PACKAGE);
}
_server = EbeanServerFactory.create(serverConfig);
if (!serverConfig.getPackages().contains(EBEAN_INDEX_PACKAGE)) {
serverConfig.getPackages().add(EBEAN_INDEX_PACKAGE);
}
return EbeanServerFactory.create(serverConfig);
}

// For testing purpose
Expand All @@ -67,6 +110,12 @@ public EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull Bas
_server = server;
}

// For testing purpose
EbeanLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull EbeanServer server, @Nonnull LocalDAOStorageConfig storageConfig) {
super(producer, storageConfig);
_server = server;
}

/**
* Return the {@link EbeanServer} server instance used for customized queries.
*/
Expand Down Expand Up @@ -155,6 +204,18 @@ protected <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN urn, @Non
return largestVersion;
}

@Override
protected <ASPECT extends RecordTemplate> void saveToLocalSecondaryIndex(@Nonnull URN urn,
@Nonnull ASPECT newValue, long version) {

// Process and save URN
// Only do this with the first version of each aspect
if (version == FIRST_VERSION) {
processAndSaveUrnToLocalSecondaryIndex(urn);
}
processAndSaveAspectToLocalSecondaryIndex(urn, newValue);
}

@Override
@Nullable
protected <ASPECT extends RecordTemplate> AspectEntry<ASPECT> getLatest(@Nonnull URN urn,
Expand Down Expand Up @@ -192,6 +253,47 @@ protected void save(@Nonnull URN urn, @Nonnull RecordTemplate value, @Nonnull Au
}
}

protected long saveSingleRecordToLocalSecondaryIndex(@Nonnull URN urn, @Nonnull String aspect,
@Nonnull String path, @Nonnull Object value) {

final EbeanMetadataIndex record = new EbeanMetadataIndex()
.setUrn(urn.toString())
.setAspect(aspect)
.setPath(path);
if (value instanceof Integer || value instanceof Long) {
record.setLongVal(Long.valueOf(value.toString()));
} else if (value instanceof Float || value instanceof Double) {
record.setDoubleVal(Double.valueOf(value.toString()));
} else {
record.setStringVal(value.toString());
}

_server.insert(record);
return record.getId();
}

@Nonnull
Map<Class<? extends RecordTemplate>, LocalDAOStorageConfig.AspectStorageConfig> getStrongConsistentIndexPaths() {
return Collections.unmodifiableMap(new HashMap<>(_storageConfig.getAspectStorageConfigMap()));
}

protected void processAndSaveUrnToLocalSecondaryIndex(@Nonnull URN urn) {
if (existsInLocalSecondaryIndex(urn)) {
return;
}

final Map<String, Object> pathValueMap = getUrnPathExtractor(urn.getClass()).extractPaths(urn);
pathValueMap.forEach(
(path, value) -> saveSingleRecordToLocalSecondaryIndex(urn, urn.getClass().getCanonicalName(), path, value)
);
}

// TODO: Will be implemented later
protected <ASPECT extends RecordTemplate> void processAndSaveAspectToLocalSecondaryIndex(@Nonnull URN urn,
@Nullable ASPECT newValue) {

}

@Override
protected <ASPECT extends RecordTemplate> long getNextVersion(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass) {

Expand Down Expand Up @@ -248,6 +350,12 @@ protected <ASPECT extends RecordTemplate> void applyTimeBasedRetention(@Nonnull
.map(record -> toRecordTemplate(key.getAspectClass(), record))));
}

public boolean existsInLocalSecondaryIndex(@Nonnull URN urn) {
return _server.find(EbeanMetadataIndex.class)
.where().eq(URN_COLUMN, urn.toString())
.exists();
}

@Nonnull
private List<EbeanMetadataAspect> batchGet(@Nonnull Set<AspectKey<URN, ? extends RecordTemplate>> keys) {

Expand Down Expand Up @@ -379,15 +487,24 @@ public <ASPECT extends RecordTemplate> ListResult<ASPECT> list(@Nonnull Class<AS
}

@Nonnull
private static Urn extractUrn(@Nonnull EbeanMetadataAspect aspect) {
final String urn = aspect.getKey().getUrn();
private static Urn extractUrn(@Nonnull String urn) {
try {
return new Urn(urn);
} catch (URISyntaxException e) {
throw new ModelConversionException("Invalid URN: " + urn);
}
}

@Nonnull
private static Urn extractUrn(@Nonnull EbeanMetadataAspect aspect) {
return extractUrn(aspect.getKey().getUrn());
}

@Nonnull
private static Urn extractUrn(@Nonnull EbeanMetadataIndex index) {
return extractUrn(index.getUrn());
}

@Nonnull
private static <ASPECT extends RecordTemplate> ASPECT toRecordTemplate(@Nonnull Class<ASPECT> aspectClass,
@Nonnull EbeanMetadataAspect aspect) {
Expand All @@ -396,8 +513,8 @@ private static <ASPECT extends RecordTemplate> ASPECT toRecordTemplate(@Nonnull

@Nonnull
private <T> ListResult<T> toListResult(@Nonnull List<T> values, @Nullable ListResultMetadata listResultMetadata,
@Nonnull PagedList<?> pagedList, int start) {
final int nextStart = pagedList.hasNext() ? start + pagedList.getList().size() : ListResult.INVALID_NEXT_START;
@Nonnull PagedList<?> pagedList, @Nullable Integer start) {
final int nextStart = (start != null && pagedList.hasNext()) ? start.intValue() + pagedList.getList().size() : ListResult.INVALID_NEXT_START;
return ListResult.<T>builder()
// Format
.values(values)
Expand Down Expand Up @@ -460,4 +577,82 @@ public long newNumericId(@Nonnull String namespace, int maxTransactionRetry) {
return id;
}, maxTransactionRetry).getId();
}

@Nonnull
static GMAIndexPair getGMAIndexPair(@Nonnull IndexValue indexValue) {
final Object object;
if (indexValue.isBoolean()) {
object = indexValue.getBoolean().toString();
return new GMAIndexPair(EbeanMetadataIndex.STRING_COLUMN, object);
} else if (indexValue.isDouble()) {
object = indexValue.getDouble();
return new GMAIndexPair(EbeanMetadataIndex.DOUBLE_COLUMN, object);
} else if (indexValue.isFloat()) {
object = indexValue.getFloat();
return new GMAIndexPair(EbeanMetadataIndex.DOUBLE_COLUMN, object);
} else if (indexValue.isInt()) {
object = indexValue.getInt();
return new GMAIndexPair(EbeanMetadataIndex.LONG_COLUMN, object);
} else if (indexValue.isLong()) {
object = indexValue.getLong();
return new GMAIndexPair(EbeanMetadataIndex.LONG_COLUMN, object);
} else if (indexValue.isString()) {
object = indexValue.getString();
return new GMAIndexPair(EbeanMetadataIndex.STRING_COLUMN, object);
} else {
throw new IllegalArgumentException("Invalid index value " + indexValue);
}
}

/**
* Returns list of urns from strongly consistent secondary index that satisfy the given filter conditions.
* Results are sorted in increasing alphabetical order of urn.
* NOTE: Currently this works for only one filter condition
* TODO: Extend the support for multiple filter conditions
*
* @param indexFilter {@link IndexFilter} containing filter conditions to be applied
* @param lastUrn last urn of the previous fetched page. This eliminates the need to use offset which
* is known to slow down performance of MySQL queries. For the first page, this should be set as NULL
* @param pageSize maximum number of distinct urns to return
* @return {@link ListResult} of urns from strongly consistent secondary index that satisfy the given filter conditions
*/
@Override
@Nonnull
public ListResult<Urn> listUrns(@Nonnull IndexFilter indexFilter, @Nullable URN lastUrn, int pageSize) {
if (!isLocalSecondaryIndexEnabled()) {
throw new UnsupportedOperationException("Local secondary index isn't supported by EbeanLocalDAO");
}
final IndexCriterionArray indexCriterionArray = indexFilter.getCriteria();
if (indexCriterionArray.size() == 0) {
throw new UnsupportedOperationException("Empty Index Filter is not supported by EbeanLocalDAO");
}
if (indexCriterionArray.size() > 1) {
throw new UnsupportedOperationException("Currently only one filter condition is supported by EbeanLocalDAO");
}

final IndexCriterion criterion = indexCriterionArray.get(0);
ExpressionList<EbeanMetadataIndex> expressionList = _server.find(EbeanMetadataIndex.class)
.setDistinct(true)
.select(EbeanMetadataIndex.URN_COLUMN)
.where()
.gt(EbeanMetadataIndex.URN_COLUMN, lastUrn == null ? "" : lastUrn.toString())
.eq(EbeanMetadataIndex.ASPECT_COLUMN, criterion.getAspect());
if (criterion.hasPathParams()) {
final GMAIndexPair gmaIndexPair = getGMAIndexPair(criterion.getPathParams().getValue());
expressionList = expressionList
.eq(EbeanMetadataIndex.PATH_COLUMN, criterion.getPathParams().getPath())
.eq(gmaIndexPair.valueType, gmaIndexPair.value);
}
final PagedList<EbeanMetadataIndex> pagedList = expressionList
.orderBy()
.asc(EbeanMetadataIndex.URN_COLUMN)
.setMaxRows(pageSize)
.findPagedList();
final List<Urn> urns = pagedList.getList()
.stream()
.map(EbeanLocalDAO::extractUrn)
.filter(Objects::nonNull)
.collect(Collectors.toList());
return toListResult(urns, null, pagedList, null);
}
}
Expand Up @@ -11,6 +11,7 @@
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.experimental.Accessors;


@Getter
Expand All @@ -35,6 +36,7 @@
EbeanMetadataIndex.URN_COLUMN
})
@Entity
@Accessors(chain = true)
@Table(name = "metadata_index")
public class EbeanMetadataIndex extends Model {

Expand Down Expand Up @@ -74,5 +76,4 @@ public class EbeanMetadataIndex extends Model {

@Column(name = DOUBLE_COLUMN)
protected Double doubleVal;

}