Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync staging with master #3037

Merged
merged 14 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion .github/workflows/chart-release-dispatcher.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,42 @@ jobs:
run: |
echo "branch=${{ github.event.workflow_run.head_branch }}" >> $GITHUB_OUTPUT

# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@v3
with:
token: ${{ secrets.my_pat }}
ref: ${{ steps.extract_branch.outputs.branch }}
fetch-depth: 0

- name: Get SHA of the branch
id: get_sha
run: |
branch_name=${{ steps.extract_branch.outputs.branch }}
sha=$(git rev-parse "refs/heads/$branch_name")
echo "GIT_SHA: $sha"
echo "sha=${sha}" >> $GITHUB_OUTPUT

- name: Extract Repository Name
id: extract_repo_name
run: |
repo_name=$(basename $GITHUB_REPOSITORY)
echo "repo_name=${repo_name}" >> $GITHUB_OUTPUT

- name: Get PR url and PR User
id: get_pr_url_user
run: |
head_sha=$(curl -s -H "Authorization: Bearer ${{ secrets.my_pat }}" -H "Accept: application/vnd.github.v3+json" "https://api.github.com/repos/${{ github.repository }}/actions/runs/${{ github.event.workflow_run.id }}/jobs" | jq -r '.jobs[0].head_sha')
echo "Head SHA: $head_sha"
pr_url=$(curl -s -H "Authorization: Bearer ${{ secrets.my_pat }}" -H "Accept: application/vnd.github.v3+json" "https://api.github.com/search/issues?q=sha:$head_sha+type:pr" | jq -r '.items[0].html_url')
pr_user=$(curl -s -H "Authorization: Bearer ${{ secrets.my_pat }}" -H "Accept: application/vnd.github.v3+json" "https://api.github.com/search/issues?q=sha:$head_sha+type:pr" | jq -r '.items[0].user.login')
echo "pr_url=$pr_url" >> $GITHUB_OUTPUT
echo "pr_user=$pr_user" >> $GITHUB_OUTPUT

- name: echo PR_URL and PR_USER
run: |
echo "${{ steps.get_pr_url_user.outputs.pr_url }}"
echo "${{ steps.get_pr_url_user.outputs.pr_user }}"

- name: Repository Dispatch
uses: peter-evans/repository-dispatch@v2
with:
Expand All @@ -42,6 +72,8 @@ jobs:
{
"repo": {
"name": "${{ steps.extract_repo_name.outputs.repo_name }}",
"branch": "${{ steps.extract_branch.outputs.branch }}"
"branch": "${{ steps.extract_branch.outputs.branch }}",
"pr_url": "${{ steps.get_pr_url_user.outputs.pr_url }}",
"pr_user": "${{ steps.get_pr_url_user.outputs.pr_user }}"
}
}
3 changes: 2 additions & 1 deletion addons/policies/bootstrap_heka_policies.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"policyServiceName": "heka",
"policyType": "deny",
"policyPriority": 1,
"isPolicyEnabled": false,
"policyUsers": [],
"policyGroups": [],
"policyRoles":
Expand All @@ -32,4 +33,4 @@
}
}
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
private Date updateTime = null;
private String deleteHandler = null;
private Integer depth = null;
private Integer traversalOrder = null;

private Map<String, AtlasSearchResult> collapse = null;


public AtlasEntityHeader() {
this(null, null);
}
Expand Down Expand Up @@ -147,21 +150,17 @@ public AtlasEntityHeader(AtlasEntity entity) {
}
}

public String getGuid() {
return guid;
}
public String getGuid() { return guid; }

public void setGuid(String guid) {
this.guid = guid;
}
public void setGuid(String guid) { this.guid = guid; }

public Integer getDepth() {
return depth;
}
public Integer getDepth() { return depth; }

public void setDepth(Integer depth) {
this.depth = depth;
}
public void setDepth(Integer depth) { this.depth = depth; }

public Integer getTraversalOrder() { return traversalOrder; }

public void setTraversalOrder(Integer traversalOrder) { this.traversalOrder = traversalOrder; }

public AtlasEntity.Status getStatus() {
return status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag

LineageOnDemandConstraints lineageConstraintsByGuid = getAndValidateLineageConstraintsByGuid(guid, atlasLineageOnDemandContext);
AtlasLineageOnDemandInfo.LineageDirection direction = lineageConstraintsByGuid.getDirection();
int level = 0;
int depth = lineageConstraintsByGuid.getDepth();
AtlasLineageOnDemandInfo ret = initializeLineageOnDemandInfo(guid);

Expand All @@ -293,33 +294,37 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag

AtomicInteger inputEntitiesTraversed = new AtomicInteger(0);
AtomicInteger outputEntitiesTraversed = new AtomicInteger(0);
AtomicInteger traversalOrder = new AtomicInteger(1);
if (isDataSet) {
AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH)
traverseEdgesOnDemand(datasetVertex, true, depth, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed);
traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder);
if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH)
traverseEdgesOnDemand(datasetVertex, false, depth, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed);
traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder);
AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes());
baseEntityHeader.setDepth(level);
baseEntityHeader.setTraversalOrder(0);
ret.getGuidEntityMap().put(guid, baseEntityHeader);
} else {
AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
// make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1'
if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) {
Iterator<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE).iterator();
traverseEdgesOnDemand(processEdges, true, depth, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed);
traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder);
}
if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) {
Iterator<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE).iterator();
traverseEdgesOnDemand(processEdges, false, depth, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed);
traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder);
}
}
RequestContext.get().endMetricRecord(metricRecorder);
return ret;
}


private void traverseEdgesOnDemand(Iterator<AtlasEdge> processEdges, boolean isInput, int depth, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed) throws AtlasBaseException {
private void traverseEdgesOnDemand(Iterator<AtlasEdge> processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException {
AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT;
int nextLevel = isInput ? level - 1: level + 1;

while (processEdges.hasNext()) {
AtlasEdge processEdge = processEdges.next();
AtlasVertex datasetVertex = processEdge.getInVertex();
Expand All @@ -336,7 +341,8 @@ private void traverseEdgesOnDemand(Iterator<AtlasEdge> processEdges, boolean isI
if (incrementAndCheckIfRelationsLimitReached(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction)) {
break;
} else {
addEdgeToResult(processEdge, ret, atlasLineageOnDemandContext);
addEdgeToResult(processEdge, ret, atlasLineageOnDemandContext, nextLevel, traversalOrder);
traversalOrder.incrementAndGet();
}

String inGuid = AtlasGraphUtilsV2.getIdFromVertex(datasetVertex);
Expand All @@ -346,17 +352,17 @@ private void traverseEdgesOnDemand(Iterator<AtlasEdge> processEdges, boolean isI
ret.getRelationsOnDemand().put(inGuid, new LineageInfoOnDemand(inGuidLineageConstrains));
}

traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed);
traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder);
}
}

private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, Set<String> visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed) throws AtlasBaseException {
private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set<String> visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException {
if (isEntityTraversalLimitReached(entitiesTraversed))
return;
if (depth != 0) { // base condition of recursion for depth
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("traverseEdgesOnDemand");
AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT;

int nextLevel = isInput ? level - 1: level + 1;
// keep track of visited vertices to avoid circular loop
visitedVertices.add(getId(datasetVertex));

Expand Down Expand Up @@ -385,7 +391,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
else
continue;
} else {
addEdgeToResult(incomingEdge, ret, atlasLineageOnDemandContext);
addEdgeToResult(incomingEdge, ret, atlasLineageOnDemandContext, level, traversalOrder);
}

AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut");
Expand Down Expand Up @@ -413,13 +419,14 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
else
continue;
} else {
addEdgeToResult(outgoingEdge, ret, atlasLineageOnDemandContext);
addEdgeToResult(outgoingEdge, ret, atlasLineageOnDemandContext, nextLevel, traversalOrder);
entitiesTraversed.incrementAndGet();
traversalOrder.incrementAndGet();
if (isEntityTraversalLimitReached(entitiesTraversed))
setEntityLimitReachedFlag(isInput, ret);
}
if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) {
traverseEdgesOnDemand(entityVertex, isInput, depth - 1, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed); // execute inner depth
traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); // execute inner depth
}
}
}
Expand Down Expand Up @@ -883,9 +890,9 @@ private void addEdgeToResult(AtlasEdge edge, AtlasLineageInfo lineageInfo,
}
}

private void addEdgeToResult(AtlasEdge edge, AtlasLineageOnDemandInfo lineageInfo, AtlasLineageOnDemandContext atlasLineageOnDemandContext) throws AtlasBaseException {
private void addEdgeToResult(AtlasEdge edge, AtlasLineageOnDemandInfo lineageInfo, AtlasLineageOnDemandContext atlasLineageOnDemandContext, int level, AtomicInteger traversalOrder) throws AtlasBaseException {
if (!lineageContainsVisitedEdgeV2(lineageInfo, edge)) {
processEdge(edge, lineageInfo, atlasLineageOnDemandContext);
processEdge(edge, lineageInfo, atlasLineageOnDemandContext, level, traversalOrder);
}
}

Expand Down Expand Up @@ -1455,26 +1462,41 @@ private void processEdge(final AtlasEdge edge, final Map<String, AtlasEntityHead
}
}

private void processEdge(final AtlasEdge edge, final AtlasLineageOnDemandInfo lineageInfo, AtlasLineageOnDemandContext atlasLineageOnDemandContext) throws AtlasBaseException {
processEdge(edge, lineageInfo.getGuidEntityMap(), lineageInfo.getRelations(), lineageInfo.getVisitedEdges(), atlasLineageOnDemandContext.getAttributes());
private void processEdge(final AtlasEdge edge, final AtlasLineageOnDemandInfo lineageInfo, AtlasLineageOnDemandContext atlasLineageOnDemandContext, int level, AtomicInteger traversalOrder) throws AtlasBaseException {
processEdge(edge, lineageInfo.getGuidEntityMap(), lineageInfo.getRelations(), lineageInfo.getVisitedEdges(), atlasLineageOnDemandContext.getAttributes(), level, traversalOrder);
}

private void processEdge(final AtlasEdge edge, final Map<String, AtlasEntityHeader> entities, final Set<AtlasLineageOnDemandInfo.LineageRelation> relations, final Set<String> visitedEdges, final Set<String> attributes) throws AtlasBaseException {
private void processEdge(final AtlasEdge edge, final Map<String, AtlasEntityHeader> entities, final Set<AtlasLineageOnDemandInfo.LineageRelation> relations, final Set<String> visitedEdges, final Set<String> attributes, int level, AtomicInteger traversalOrder) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processEdge");
AtlasVertex inVertex = edge.getInVertex();
AtlasVertex outVertex = edge.getOutVertex();

String inTypeName = AtlasGraphUtilsV2.getTypeName(inVertex);
AtlasEntityType inEntityType = atlasTypeRegistry.getEntityTypeByName(inTypeName);
if (inEntityType == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, inTypeName);
}
boolean inIsProcess = inEntityType.getTypeAndAllSuperTypes().contains(PROCESS_SUPER_TYPE);

String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex);
String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex);
String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class);
boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE);


if (!entities.containsKey(inGuid)) {
AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeader(inVertex, attributes);
if (!inIsProcess) {
entityHeader.setDepth(level);
entityHeader.setTraversalOrder(traversalOrder.get());
}
entities.put(inGuid, entityHeader);
}
if (!entities.containsKey(outGuid)) {
AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeader(outVertex, attributes);
if (inIsProcess) {
entityHeader.setDepth(level);
entityHeader.setTraversalOrder(traversalOrder.get());
}
entities.put(outGuid, entityHeader);
}
if (isInputEdge) {
Expand Down
Loading