Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync staging with master #3011

Merged
merged 17 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions addons/policies/bootstrap_admin_policies.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,31 @@
"admin-task-cud"
]
}
},
{
"typeName": "AuthPolicy",
"attributes": {
"name": "ADMIN_ALLOW_FEATURE_FLAG_CUD",
"qualifiedName": "ADMIN_ALLOW_FEATURE_FLAG_CUD",
"policyCategory": "bootstrap",
"policySubCategory": "default",
"policyServiceName": "atlas",
"policyType": "allow",
"policyPriority": 1,
"policyUsers": [
"service-account-atlan-argo",
"service-account-atlan-backend"
],
"policyGroups": [],
"policyRoles": [],
"policyResourceCategory": "ADMIN",
"policyResources": [
"atlas-service:*"
],
"policyActions": [
"admin-featureFlag-cud"
]
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,11 @@
"itemId": 24,
"name": "admin-task-cud",
"label": "Admin task CUD API"
},
{
"itemId": 25,
"name": "admin-featureFlag-cud",
"label": "Admin featureflag CUD API"
}

],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ public enum AtlasPrivilege {
ADMIN_ENTITY_AUDITS("admin-entity-audits"),
ADMIN_REPAIR_INDEX("admin-repair-index"),

ADMIN_TASK_CUD("admin-task-cud");
ADMIN_TASK_CUD("admin-task-cud"),

ADMIN_FEATURE_FLAG_CUD("admin-featureFlag-cud");
private final String type;

AtlasPrivilege(String actionType){
Expand Down
29 changes: 24 additions & 5 deletions common/src/main/java/org/apache/atlas/repository/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.atlas.repository;

import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.service.FeatureFlagStore;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -133,6 +135,12 @@ public final class Constants {
public static final String GLOSSARY_TERMS_EDGE_LABEL = "r:AtlasGlossaryTermAnchor";
public static final String GLOSSARY_CATEGORY_EDGE_LABEL = "r:AtlasGlossaryCategoryAnchor";

/**
* MESH property keys.
*/
public static final String DATA_DOMAIN_ENTITY_TYPE = "DataDomain";
public static final String DATA_PRODUCT_ENTITY_TYPE = "DataProduct";


/**
* SQL property keys.
Expand Down Expand Up @@ -378,7 +386,6 @@ public enum SupportedFileExtensions { XLSX, XLS, CSV }

public static final String CATALOG_PROCESS_INPUT_RELATIONSHIP_LABEL = "__Process.inputs";
public static final String CATALOG_PROCESS_OUTPUT_RELATIONSHIP_LABEL = "__Process.outputs";
public static final String COLUMN_LINEAGE_RELATIONSHIP_LABEL = "__Process.columnProcesses";
public static final String CLASSIFICATION_PROPAGATION_MODE_DEFAULT ="DEFAULT";
public static final String CLASSIFICATION_PROPAGATION_MODE_RESTRICT_LINEAGE ="RESTRICT_LINEAGE";

Expand All @@ -388,14 +395,12 @@ public enum SupportedFileExtensions { XLSX, XLS, CSV }
public static final HashMap<String, ArrayList<String>> CLASSIFICATION_PROPAGATION_MODE_LABELS_MAP = new HashMap<String, ArrayList<String>>(){{
put(CLASSIFICATION_PROPAGATION_MODE_RESTRICT_LINEAGE, new ArrayList<>(
Arrays.asList(CATALOG_PROCESS_INPUT_RELATIONSHIP_LABEL,
CATALOG_PROCESS_OUTPUT_RELATIONSHIP_LABEL,
COLUMN_LINEAGE_RELATIONSHIP_LABEL
CATALOG_PROCESS_OUTPUT_RELATIONSHIP_LABEL
)));
put(CLASSIFICATION_PROPAGATION_MODE_DEFAULT, null);
put(CLASSIFICATION_PROPAGATION_MODE_RESTRICT_HIERARCHY, new ArrayList<>(
Arrays.asList(CATALOG_PROCESS_INPUT_RELATIONSHIP_LABEL,
CATALOG_PROCESS_OUTPUT_RELATIONSHIP_LABEL,
COLUMN_LINEAGE_RELATIONSHIP_LABEL
CATALOG_PROCESS_OUTPUT_RELATIONSHIP_LABEL
)));
}};

Expand Down Expand Up @@ -453,6 +458,20 @@ private static String getEncodedTypePropertyKey(String defaultKey) {
}
}

public static String getESIndex() {
String indexSuffix = null;
if(AtlasConfiguration.ATLAS_MAINTENANCE_MODE.getBoolean()) {
try {
if (FeatureFlagStore.evaluate("use_temp_es_index", "true")) {
indexSuffix = "_temp";
}
} catch (Exception e) {
LOG.error("Failed to evaluate feature flag with error", e);
}
}
return indexSuffix == null ? VERTEX_INDEX_NAME : VERTEX_INDEX_NAME + indexSuffix;
}

public static String getStaticFileAsString(String fileName) throws IOException {
String atlasHomeDir = System.getProperty("atlas.home");
atlasHomeDir = StringUtils.isEmpty(atlasHomeDir) ? "." : atlasHomeDir;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.apache.atlas.service;

import org.apache.atlas.service.redis.RedisService;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

@Component
public class FeatureFlagStore {
private static RedisService redisService = null;
public FeatureFlagStore(@Qualifier("redisServiceImpl") RedisService redisService) {
FeatureFlagStore.redisService = redisService;
}

public static boolean evaluate(String key, String expectedValue) {
boolean ret = false;
try{
if (StringUtils.isEmpty(key) || StringUtils.isEmpty(expectedValue))
return ret;
String value = redisService.getValue(addFeatureFlagNamespace(key));
ret = StringUtils.equals(value, expectedValue);
} catch (Exception e) {
return ret;
}
return ret;
}

public static void setFlag(String key, String value) {
if (StringUtils.isEmpty(key) || StringUtils.isEmpty(value))
return;

redisService.putValue(addFeatureFlagNamespace(key), value);
}

public static void deleteFlag(String key) {
if (StringUtils.isEmpty(key))
return;

redisService.removeValue(addFeatureFlagNamespace(key));
}

private static String addFeatureFlagNamespace(String key) {
return "ff:"+key;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public interface MetricsRegistry {

void collect(String requestId, String requestUri, AtlasPerfMetrics metrics);

void collectIApplicationMetrics(String requestId, String requestUri, List<AtlasPerfMetrics.Metric> applicationMetrics);
void collectApplicationMetrics(String requestId, String requestUri, List<AtlasPerfMetrics.Metric> applicationMetrics);

void scrape(PrintWriter writer) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void collect(String requestId, String requestUri, AtlasPerfMetrics metric
}
}
//Use this if you want to publish Histograms
public void collectIApplicationMetrics(String requestId, String requestUri, List<AtlasPerfMetrics.Metric> applicationMetrics){
public void collectApplicationMetrics(String requestId, String requestUri, List<AtlasPerfMetrics.Metric> applicationMetrics){
try {
for(AtlasPerfMetrics.Metric metric : applicationMetrics){
if (metric.getMetricType() == AtlasMetricType.COUNTER) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,14 @@ public String getValue(String key) {
@Override
public String putValue(String key, String value) {
// Put the value in the redis cache with TTL
redisCacheClient.getBucket(convertToNamespace(key)).set(value, 30, TimeUnit.SECONDS);
redisCacheClient.getBucket(convertToNamespace(key)).set(value);
return value;
}

@Override
public String putValue(String key, String value, int timeout) {
// Put the value in the redis cache with TTL
redisCacheClient.getBucket(convertToNamespace(key)).set(value, timeout, TimeUnit.SECONDS);
return value;
}

Expand Down Expand Up @@ -152,7 +159,7 @@ Config getCacheImplConfig() {
.addSentinelAddress(formatUrls(atlasConfig.getStringArray(ATLAS_REDIS_SENTINEL_URLS)))
.setUsername(atlasConfig.getString(ATLAS_REDIS_USERNAME))
.setPassword(atlasConfig.getString(ATLAS_REDIS_PASSWORD))
.setTimeout(50) //Setting UP timeout to 10ms
.setTimeout(50) //Setting UP timeout to 50ms
.setRetryAttempts(0);
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public String getValue(String key) {
}

@Override
public String putValue(String key, String value) {
public String putValue(String key, String value, int timeout) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public interface RedisService {

String putValue(String key, String value);

String putValue(String key, String value, int timeout);

void removeValue(String key);

Logger getLogger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public String getValue(String key) {
}

@Override
public String putValue(String key, String value) {
public String putValue(String key, String value, int timeout) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,11 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP
boolean contextIdExists = StringUtils.isNotEmpty(searchParams.getSearchContextId()) && searchParams.getSearchContextSequenceNo() != null;
try {
if(contextIdExists) {
// If the search context id and greater sequence no is present, then we need to delete the previous search context async
// If the search context id and greater sequence no is present,
// then we need to delete the previous search context async
processRequestWithSameSearchContextId(searchParams);
}
AsyncQueryResult response = submitAsyncSearch(searchParams, false).get();
//Sleep for 5 seconds to allow ES to process the request
if(response.isRunning()) {
/*
* If the response is still running, then we need to wait for the response
Expand All @@ -196,11 +196,7 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP
String searchContextId = searchParams.getSearchContextId();
Integer searchContextSequenceNo = searchParams.getSearchContextSequenceNo();
if (contextIdExists) {
try {
CompletableFuture.runAsync(() -> SearchContextCache.put(searchContextId, searchContextSequenceNo, esSearchId));
} catch (Exception e) {
LOG.error("Failed to update the search context cache {}", e.getMessage());
}
CompletableFuture.runAsync(() -> SearchContextCache.put(searchContextId, searchContextSequenceNo, esSearchId));
}
response = getAsyncSearchResponse(searchParams, esSearchId).get();
if (response == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
package org.apache.atlas.repository.graphdb.janus;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.atlas.RequestContext;
import org.apache.atlas.service.redis.RedisService;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Component
public class SearchContextCache {
private static final Logger LOG = LoggerFactory.getLogger(SearchContextCache.class);
private static RedisService redisService = null;

private static final Cache<String, String> searchContextLocalCache = CacheBuilder.newBuilder()
.maximumSize(200)
.expireAfterWrite(30, TimeUnit.SECONDS)
.build();

public static final String INVALID_SEQUENCE = "invalid_sequence";


Expand All @@ -32,21 +24,16 @@ public static void put(String key, Integer sequence, String esAsyncId) {
try {
// Build the string in format `sequence/esAsyncId` and store it in redis
String val = sequence + "/" + esAsyncId;
redisService.putValue(key, val);
searchContextLocalCache.put(key, val);
redisService.putValue(key, val, 30);
} finally {
RequestContext.get().endMetricRecord(metric);
}
}
public static String get(String key) {
String ret = null;
try {
ret = searchContextLocalCache.getIfPresent(key);
if (ret == null) {
ret = redisService.getValue(key);
}
return ret;
return redisService.getValue(key);
} catch (Exception e) {
LOG.error("Error while fetching value from Redis", e);
return null;
}

Expand Down Expand Up @@ -80,7 +67,6 @@ public static String getESAsyncSearchIdFromContextCache(String key, Integer sequ
public static void remove(String key) {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("removeFromCache");
try {
searchContextLocalCache.invalidate(key);
redisService.removeValue(key);
} finally {
RequestContext.get().endMetricRecord(metric);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.userprofile.UserProfileService;
import org.apache.atlas.searchlog.ESSearchLogger;
import org.apache.atlas.service.FeatureFlagStore;
import org.apache.atlas.stats.StatsClient;
import org.apache.atlas.type.*;
import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
Expand Down Expand Up @@ -75,9 +76,7 @@
import static org.apache.atlas.SortOrder.ASCENDING;
import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.repository.Constants.ASSET_ENTITY_TYPE;
import static org.apache.atlas.repository.Constants.OWNER_ATTRIBUTE;
import static org.apache.atlas.repository.Constants.VERTEX_INDEX_NAME;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.BASIC_SEARCH_STATE_FILTER;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TO_RANGE_LIST;

Expand Down Expand Up @@ -1134,8 +1133,10 @@ public List<AtlasEntityHeader> searchUsingTermQualifiedName(int from, int size,
}

private String getIndexName(IndexSearchParams params) throws AtlasBaseException {
String vertexIndexName = getESIndex();

if (StringUtils.isEmpty(params.getPersona()) && StringUtils.isEmpty(params.getPurpose())) {
return VERTEX_INDEX_NAME;
return vertexIndexName;
}

String qualifiedName = "";
Expand All @@ -1151,7 +1152,7 @@ private String getIndexName(IndexSearchParams params) throws AtlasBaseException
if (StringUtils.isNotEmpty(aliasName)) {
if(params.isAccessControlExclusive()) {
accessControlExclusiveDsl(params, aliasName);
aliasName = aliasName+","+VERTEX_INDEX_NAME;
aliasName = aliasName+","+vertexIndexName;
}
return aliasName;
} else {
Expand Down
Loading
Loading