Skip to content

Commit

Permalink
fix(deletes): Fixing system metadata index deletes (#3693)
Browse files Browse the repository at this point in the history
  • Loading branch information
jjoyce0510 committed Dec 8, 2021
1 parent 1a5121a commit 3bac7f7
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 13 deletions.
Expand Up @@ -90,6 +90,30 @@ public BulkByScrollResponse deleteByUrn(@Nonnull final String urn) {
return null;
}

public BulkByScrollResponse deleteByUrnAspect(
@Nonnull final String urn,
@Nonnull final String aspect
) {
BoolQueryBuilder finalQuery = QueryBuilders.boolQuery();
finalQuery.must(QueryBuilders.termQuery("urn", urn));
finalQuery.must(QueryBuilders.termQuery("aspect", aspect));

DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();

deleteByQueryRequest.setQuery(finalQuery);

deleteByQueryRequest.indices(indexConvention.getIndexName(INDEX_NAME));

try {
final BulkByScrollResponse deleteResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
return deleteResponse;
} catch (IOException e) {
log.error("ERROR: Failed to delete by query. See stacktrace for a more detailed error:");
e.printStackTrace();
}
return null;
}

public SearchResponse findByParams(Map<String, String> searchParams) {
SearchRequest searchRequest = new SearchRequest();

Expand Down
Expand Up @@ -8,11 +8,11 @@
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.mxe.SystemMetadata;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -24,7 +24,6 @@
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
Expand Down Expand Up @@ -73,21 +72,19 @@ private String toDocId(@Nonnull final String urn, @Nonnull final String aspect)
String rawDocId = urn + DOC_DELIMETER + aspect;

try {
byte[] bytesOfRawDocID = rawDocId.getBytes("UTF-8");
byte[] bytesOfRawDocID = rawDocId.getBytes(StandardCharsets.UTF_8);
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] thedigest = md.digest(bytesOfRawDocID);
return new String(thedigest, StandardCharsets.US_ASCII);
} catch (NoSuchAlgorithmException | UnsupportedEncodingException e) {
return Base64.getEncoder().encodeToString(thedigest);
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
return rawDocId;
}
}

@Override
public Boolean delete(String urn, String aspect) {
String docId = toDocId(urn, aspect);
DeleteResponse response = _esDAO.deleteByDocId(docId);
return response.status().getStatus() >= 200 && response.status().getStatus() < 300;
public void deleteAspect(String urn, String aspect) {
_esDAO.deleteByUrnAspect(urn, aspect);
}

@Override
Expand Down
Expand Up @@ -8,7 +8,13 @@


public interface SystemMetadataService {
Boolean delete(String urn, String aspect);
/**
* Deletes a specific aspect from the system metadata service.
*
* @param urn the urn of the entity
* @param aspect the aspect to delete
*/
void deleteAspect(String urn, String aspect);

void deleteUrn(String finalOldUrn);

Expand Down
Expand Up @@ -261,9 +261,14 @@ private void updateSystemMetadata(SystemMetadata systemMetadata, Urn urn, Aspect

private void deleteSystemMetadata(Urn urn, AspectSpec aspectSpec, Boolean isKeyAspect) {
if (isKeyAspect) {
// Delete all aspects
log.debug(String.format("Deleting all system metadata for urn: %s", urn));
_systemMetadataService.deleteUrn(urn.toString());
} else {
// Delete all aspects from system metadata service
log.debug(String.format("Deleting system metadata for urn: %s, aspect: %s", urn, aspectSpec.getName()));
_systemMetadataService.deleteAspect(urn.toString(), aspectSpec.getName());
}
_systemMetadataService.delete(urn.toString(), aspectSpec.getName());
}

private void deleteGraphData(Urn urn, AspectSpec aspectSpec, RecordTemplate aspect, Boolean isKeyAspect) {
Expand Down
4 changes: 2 additions & 2 deletions smoke-test/test_e2e.py
Expand Up @@ -594,8 +594,8 @@ def test_frontend_list_policies(frontend_session):
assert res_data["data"]
assert res_data["data"]["listPolicies"]
assert res_data["data"]["listPolicies"]["start"] is 0
assert res_data["data"]["listPolicies"]["count"] is 8
assert len(res_data["data"]["listPolicies"]["policies"]) is 8 # Length of default policies.
assert res_data["data"]["listPolicies"]["count"] > 0
assert len(res_data["data"]["listPolicies"]["policies"]) > 0

@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion", "test_frontend_list_policies"])
def test_frontend_update_policy(frontend_session):
Expand Down

0 comments on commit 3bac7f7

Please sign in to comment.