Skip to content

Commit

Permalink
Feature/oss/update to v2 endpoints (datahub-project#4128)
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanHolstien authored and hevandro-veiga committed Feb 18, 2022
1 parent adcca27 commit 2defc19
Show file tree
Hide file tree
Showing 77 changed files with 2,099 additions and 1,809 deletions.
@@ -1,16 +1,20 @@
package com.linkedin.datahub.graphql;

import com.linkedin.common.SubTypes;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.r2.RemoteInvocationException;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

@Slf4j
@AllArgsConstructor
Expand All @@ -21,21 +25,23 @@ public class SubTypesResolver implements DataFetcher<CompletableFuture<SubTypes>
String _aspectName;

@Override
@Nullable
public CompletableFuture<SubTypes> get(DataFetchingEnvironment environment) throws Exception {
return CompletableFuture.supplyAsync(() -> {
final QueryContext context = environment.getContext();
final String urn = ((Entity) environment.getSource()).getUrn();
Optional<SubTypes> subType;
SubTypes subType = null;
final String urnStr = ((Entity) environment.getSource()).getUrn();
try {
subType = _entityClient.getVersionedAspect(urn, _aspectName, 0L, SubTypes.class, context.getAuthentication());
} catch (RemoteInvocationException e) {
throw new RuntimeException("Failed to fetch aspect " + _aspectName + " for urn " + urn + " ", e);
}
if (subType.isPresent()) {
return subType.get();
} else {
return null;
final Urn urn = Urn.createFromString(urnStr);
EntityResponse entityResponse = _entityClient.batchGetV2(urn.getEntityType(), Collections.singleton(urn),
Collections.singleton(_aspectName), context.getAuthentication()).get(urn);
if (entityResponse != null && entityResponse.getAspects().containsKey(_aspectName)) {
subType = new SubTypes(entityResponse.getAspects().get(_aspectName).getValue().data());
}
} catch (RemoteInvocationException | URISyntaxException e) {
throw new RuntimeException("Failed to fetch aspect " + _aspectName + " for urn " + urnStr + " ", e);
}
return subType;
});
}
}
@@ -1,14 +1,15 @@
package com.linkedin.datahub.graphql;

import com.linkedin.common.urn.Urn;
import com.linkedin.data.DataMap;

import com.linkedin.data.codec.JacksonDataCodec;
import com.linkedin.datahub.graphql.generated.AspectParams;
import com.linkedin.datahub.graphql.generated.AspectRenderSpec;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.RawAspect;
import com.linkedin.datahub.graphql.resolvers.EntityTypeMapper;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
Expand All @@ -17,7 +18,9 @@
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.AllArgsConstructor;
Expand All @@ -44,17 +47,24 @@ public CompletableFuture<List<RawAspect>> get(DataFetchingEnvironment environmen
List<RawAspect> results = new ArrayList<>();

final QueryContext context = environment.getContext();
final String urn = ((Entity) environment.getSource()).getUrn();
final String urnStr = ((Entity) environment.getSource()).getUrn();
final EntityType entityType = ((Entity) environment.getSource()).getType();
final String entityTypeName = EntityTypeMapper.getName(entityType);
final AspectParams input = bindArgument(environment.getArgument("input"), AspectParams.class);

EntitySpec entitySpec = _entityRegistry.getEntitySpec(entityTypeName);
entitySpec.getAspectSpecs().stream().filter(aspectSpec -> shouldReturnAspect(aspectSpec, input)).forEach(aspectSpec -> {
try {
Urn urn = Urn.createFromString(urnStr);
RawAspect result = new RawAspect();
DataMap resolvedAspect =
_entityClient.getRawAspect(urn, aspectSpec.getName(), 0L, context.getAuthentication());
EntityResponse entityResponse =
_entityClient.batchGetV2(urn.getEntityType(), Collections.singleton(urn),
Collections.singleton(aspectSpec.getName()), context.getAuthentication()).get(urn);
if (entityResponse == null || !entityResponse.getAspects().containsKey(aspectSpec.getName())) {
return;
}

DataMap resolvedAspect = entityResponse.getAspects().get(aspectSpec.getName()).getValue().data();
if (resolvedAspect == null || resolvedAspect.keySet().size() != 1) {
return;
}
Expand All @@ -74,8 +84,8 @@ public CompletableFuture<List<RawAspect>> get(DataFetchingEnvironment environmen
result.setRenderSpec(resultRenderSpec);

results.add(result);
} catch (IOException | RemoteInvocationException e) {
throw new RuntimeException("Failed to fetch aspect " + aspectSpec.getName() + " for urn " + urn + " ", e);
} catch (IOException | RemoteInvocationException | URISyntaxException e) {
throw new RuntimeException("Failed to fetch aspect " + aspectSpec.getName() + " for urn " + urnStr + " ", e);
}
});
return results;
Expand Down
Expand Up @@ -5,21 +5,23 @@
import com.datahub.authorization.Authorizer;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.AuthenticatedUser;
import com.linkedin.datahub.graphql.generated.CorpUser;
import com.linkedin.datahub.graphql.generated.PlatformPrivileges;
import com.linkedin.datahub.graphql.types.corpuser.mappers.CorpUserMapper;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.authorization.PoliciesConfig;
import com.linkedin.datahub.graphql.generated.AuthenticatedUser;
import com.linkedin.datahub.graphql.generated.CorpUser;
import com.linkedin.datahub.graphql.types.corpuser.mappers.CorpUserSnapshotMapper;
import com.linkedin.metadata.snapshot.CorpUserSnapshot;
import com.linkedin.r2.RemoteInvocationException;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static com.linkedin.datahub.graphql.resolvers.ingest.IngestionAuthUtils.*;
import static com.linkedin.metadata.Constants.*;


/**
Expand All @@ -45,10 +47,9 @@ public CompletableFuture<AuthenticatedUser> get(DataFetchingEnvironment environm
try {
// 1. Get currently logged in user profile.
final Urn userUrn = Urn.createFromString(context.getActorUrn());
final CorpUserSnapshot gmsUser = _entityClient.get(userUrn, context.getAuthentication())
.getValue()
.getCorpUserSnapshot();
final CorpUser corpUser = CorpUserSnapshotMapper.map(gmsUser);
final EntityResponse gmsUser = _entityClient.batchGetV2(CORP_USER_ENTITY_NAME,
Collections.singleton(userUrn), null, context.getAuthentication()).get(userUrn);
final CorpUser corpUser = CorpUserMapper.map(gmsUser);

// 2. Get platform privileges
final PlatformPrivileges platformPrivileges = new PlatformPrivileges();
Expand Down
Expand Up @@ -8,19 +8,20 @@
import com.linkedin.datahub.graphql.exception.DataHubGraphQLErrorCode;
import com.linkedin.datahub.graphql.exception.DataHubGraphQLException;
import com.linkedin.datahub.graphql.generated.AddGroupMembersInput;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.identity.GroupMembership;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.aspect.VersionedAspect;
import com.linkedin.metadata.utils.GenericAspectUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;
import static com.linkedin.metadata.Constants.*;


/**
Expand Down Expand Up @@ -51,9 +52,9 @@ public CompletableFuture<Boolean> get(final DataFetchingEnvironment environment)
}
})
.thenApply(ignored -> CompletableFuture.allOf(
userUrnStrs.stream().map(userUrnStr -> CompletableFuture.runAsync(() -> {
addUserToGroup(userUrnStr, groupUrnStr, context);
})).toArray(CompletableFuture[]::new)))
userUrnStrs.stream().map(userUrnStr -> CompletableFuture.runAsync(() ->
addUserToGroup(userUrnStr, groupUrnStr, context)
)).toArray(CompletableFuture[]::new)))
.thenApply((ignored) -> Boolean.TRUE);
}
throw new AuthorizationException("Unauthorized to perform this action. Please contact your DataHub administrator.");
Expand All @@ -62,12 +63,13 @@ public CompletableFuture<Boolean> get(final DataFetchingEnvironment environment)
private void addUserToGroup(final String userUrnStr, final String groupUrnStr, final QueryContext context) {
try {
// First, fetch user's group membership aspect.
final VersionedAspect gmsAspect =
_entityClient.getAspectOrNull(userUrnStr, Constants.GROUP_MEMBERSHIP_ASPECT_NAME,
Constants.ASPECT_LATEST_VERSION, context.getAuthentication());
Urn userUrn = Urn.createFromString(userUrnStr);
final EntityResponse entityResponse =
_entityClient.batchGetV2(CORP_USER_ENTITY_NAME, Collections.singleton(userUrn),
Collections.singleton(GROUP_MEMBERSHIP_ASPECT_NAME), context.getAuthentication()).get(userUrn);

GroupMembership groupMembership;
if (gmsAspect == null) {
if (entityResponse == null || !entityResponse.getAspects().containsKey(GROUP_MEMBERSHIP_ASPECT_NAME)) {
// Verify the user exists
if (!userExists(userUrnStr, context)) {
throw new DataHubGraphQLException("Failed to add member to group. User does not exist.", DataHubGraphQLErrorCode.NOT_FOUND);
Expand All @@ -76,7 +78,8 @@ private void addUserToGroup(final String userUrnStr, final String groupUrnStr, f
groupMembership = new GroupMembership();
groupMembership.setGroups(new UrnArray());
} else {
groupMembership = gmsAspect.getAspect().getGroupMembership();
groupMembership = new GroupMembership(entityResponse.getAspects()
.get(GROUP_MEMBERSHIP_ASPECT_NAME).getValue().data());
}
// Handle the duplicate case.
final Urn groupUrn = Urn.createFromString(groupUrnStr);
Expand All @@ -86,8 +89,8 @@ private void addUserToGroup(final String userUrnStr, final String groupUrnStr, f
// Finally, create the MetadataChangeProposal.
final MetadataChangeProposal proposal = new MetadataChangeProposal();
proposal.setEntityUrn(Urn.createFromString(userUrnStr));
proposal.setEntityType(Constants.CORP_USER_ENTITY_NAME);
proposal.setAspectName(Constants.GROUP_MEMBERSHIP_ASPECT_NAME);
proposal.setEntityType(CORP_USER_ENTITY_NAME);
proposal.setAspectName(GROUP_MEMBERSHIP_ASPECT_NAME);
proposal.setAspect(GenericAspectUtils.serializeAspect(groupMembership));
proposal.setChangeType(ChangeType.UPSERT);
_entityClient.ingestProposal(proposal, context.getAuthentication());
Expand All @@ -98,25 +101,27 @@ private void addUserToGroup(final String userUrnStr, final String groupUrnStr, f

private boolean groupExists(final String groupUrnStr, final QueryContext context) {
try {
final VersionedAspect keyAspect = _entityClient.getAspectOrNull(
groupUrnStr,
Constants.CORP_GROUP_KEY_ASPECT_NAME,
Constants.ASPECT_LATEST_VERSION,
context.getAuthentication());
return keyAspect != null;
Urn groupUrn = Urn.createFromString(groupUrnStr);
final EntityResponse entityResponse = _entityClient.batchGetV2(
CORP_GROUP_ENTITY_NAME,
Collections.singleton(groupUrn),
Collections.singleton(CORP_GROUP_KEY_ASPECT_NAME),
context.getAuthentication()).get(groupUrn);
return entityResponse != null && entityResponse.getAspects().containsKey(CORP_GROUP_KEY_ASPECT_NAME);
} catch (Exception e) {
throw new DataHubGraphQLException("Failed to fetch group!", DataHubGraphQLErrorCode.SERVER_ERROR);
}
}

private boolean userExists(final String userUrnStr, final QueryContext context) {
try {
final VersionedAspect keyAspect = _entityClient.getAspectOrNull(
userUrnStr,
Constants.CORP_USER_KEY_ASPECT_NAME,
Constants.ASPECT_LATEST_VERSION,
context.getAuthentication());
return keyAspect != null;
Urn userUrn = Urn.createFromString(userUrnStr);
final EntityResponse entityResponse = _entityClient.batchGetV2(
CORP_USER_ENTITY_NAME,
Collections.singleton(userUrn),
Collections.singleton(CORP_USER_KEY_ASPECT_NAME),
context.getAuthentication()).get(userUrn);
return entityResponse != null && entityResponse.getAspects().containsKey(CORP_USER_KEY_ASPECT_NAME);
} catch (Exception e) {
throw new DataHubGraphQLException("Failed to fetch user!", DataHubGraphQLErrorCode.SERVER_ERROR);
}
Expand Down
Expand Up @@ -7,23 +7,23 @@
import com.linkedin.datahub.graphql.generated.CorpGroup;
import com.linkedin.datahub.graphql.generated.ListGroupsInput;
import com.linkedin.datahub.graphql.generated.ListGroupsResult;
import com.linkedin.datahub.graphql.types.corpgroup.mappers.CorpGroupSnapshotMapper;
import com.linkedin.entity.Entity;
import com.linkedin.datahub.graphql.types.corpgroup.mappers.CorpGroupMapper;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.query.ListResult;
import com.linkedin.metadata.snapshot.CorpGroupSnapshot;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;
import static com.linkedin.metadata.Constants.*;


public class ListGroupsResolver implements DataFetcher<CompletableFuture<ListGroupsResult>> {

Expand All @@ -50,10 +50,11 @@ public CompletableFuture<ListGroupsResult> get(final DataFetchingEnvironment env
try {
// First, get all group Urns.
final ListResult gmsResult =
_entityClient.list(Constants.CORP_GROUP_ENTITY_NAME, Collections.emptyMap(), start, count, context.getAuthentication());
_entityClient.list(CORP_GROUP_ENTITY_NAME, Collections.emptyMap(), start, count, context.getAuthentication());

// Then, get hydrate all groups.
final Map<Urn, Entity> entities = _entityClient.batchGet(new HashSet<>(gmsResult.getEntities()), context.getAuthentication());
final Map<Urn, EntityResponse> entities = _entityClient.batchGetV2(CORP_GROUP_ENTITY_NAME,
new HashSet<>(gmsResult.getEntities()), null, context.getAuthentication());

// Now that we have entities we can bind this to a result.
final ListGroupsResult result = new ListGroupsResult();
Expand All @@ -70,16 +71,9 @@ public CompletableFuture<ListGroupsResult> get(final DataFetchingEnvironment env
throw new AuthorizationException("Unauthorized to perform this action. Please contact your DataHub administrator.");
}

private List<CorpGroup> mapEntities(final Collection<Entity> entities) {
final List<CorpGroup> results = new ArrayList<>();
for (final Entity entity : entities) {
final CorpGroupSnapshot snapshot = entity.getValue().getCorpGroupSnapshot();
results.add(mapCorpGroupSnapshot(snapshot));
}
return results;
}

private CorpGroup mapCorpGroupSnapshot(final CorpGroupSnapshot snapshot) {
return CorpGroupSnapshotMapper.map(snapshot);
private List<CorpGroup> mapEntities(final Collection<EntityResponse> entities) {
return entities.stream()
.map(CorpGroupMapper::map)
.collect(Collectors.toList());
}
}

0 comments on commit 2defc19

Please sign in to comment.