Skip to content

Commit

Permalink
Merge pull request #2987 from atlanhq/PLT-1300-search-cancellation-fr…
Browse files Browse the repository at this point in the history
…om-metastore-with-global-context-cache-using-redis

Plt 1300 search cancellation from metastore with global context cache using redis
  • Loading branch information
sumandas0 committed May 2, 2024
2 parents 8add61d + 5bf86cf commit 9b19d24
Show file tree
Hide file tree
Showing 23 changed files with 1,035 additions and 80 deletions.
70 changes: 70 additions & 0 deletions addons/policies/bootstrap_entity_policies.json
Original file line number Diff line number Diff line change
Expand Up @@ -3005,6 +3005,76 @@
"entity-delete"
]
}
},
{
"typeName": "AuthPolicy",
"attributes":
{
"name": "READ_DATA_CONTRACT",
"qualifiedName": "READ_DATA_CONTRACT",
"policyCategory": "bootstrap",
"policySubCategory": "default",
"policyServiceName": "atlas",
"policyType": "allow",
"policyPriority": 1,
"policyUsers":
[],
"policyGroups":
[],
"policyRoles":
[
"$admin",
"$member",
"$api-token-default-access"
],
"policyResourceCategory": "ENTITY",
"policyResources":
[
"entity-type:DataContract",
"entity-classification:*",
"entity:*"
],
"policyActions":
[
"entity-read"
]
}
},
{
"typeName": "AuthPolicy",
"attributes":
{
"name": "CU_DATA_CONTRACT",
"qualifiedName": "CU_DATA_CONTRACT",
"description": "cu allow for data contract",
"policyCategory": "bootstrap",
"policySubCategory": "default",
"policyServiceName": "atlas",
"policyType": "allow",
"policyPriority": 1,
"policyUsers":
[],
"policyGroups":
[],
"policyRoles":
[
"$admin",
"$member",
"$api-token-default-access"
],
"policyResourceCategory": "ENTITY",
"policyResources":
[
"entity-type:DataContract",
"entity-classification:*",
"entity:*"
],
"policyActions":
[
"entity-create",
"entity-update"
]
}
}
]
}
10 changes: 9 additions & 1 deletion common/src/main/java/org/apache/atlas/repository/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@ public final class Constants {
public static final String ASSET_README_EDGE_LABEL = "__Asset.readme";
public static final String ASSET_LINK_EDGE_LABEL = "__Asset.links";

/**
* Contract
*/
public static final String CONTRACT_ENTITY_TYPE = "DataContract";
public static final String ATTR_CONTRACT_VERSION = "dataContractVersion";


/**
* Lineage relations.
*/
Expand Down Expand Up @@ -403,7 +410,8 @@ public enum SupportedFileExtensions { XLSX, XLS, CSV }
public static final String ATTR_STARRED_DETAILS_LIST = "starredDetailsList";
public static final String ATTR_ASSET_STARRED_BY = "assetStarredBy";
public static final String ATTR_ASSET_STARRED_AT = "assetStarredAt";

public static final String ATTR_CERTIFICATE_STATUS = "certificateStatus";
public static final String ATTR_CONTRACT = "dataContractJson";
public static final String STRUCT_STARRED_DETAILS = "StarredDetails";

public static final String KEYCLOAK_ROLE_ADMIN = "$admin";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public interface MetricsRegistry {

void collect(String requestId, String requestUri, AtlasPerfMetrics metrics);

void collectIndexsearch(String requestId, String requestUri, List<AtlasPerfMetrics.Metric> applicationMetrics);
void collectIApplicationMetrics(String requestId, String requestUri, List<AtlasPerfMetrics.Metric> applicationMetrics);

void scrape(PrintWriter writer) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package org.apache.atlas.service.metrics;

import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.*;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.utils.AtlasMetricType;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -64,35 +62,42 @@ public void collect(String requestId, String requestUri, AtlasPerfMetrics metric
}
}
//Use this if you want to publish Histograms
public void collectIndexsearch(String requestId, String requestUri, List<AtlasPerfMetrics.Metric> applicationMetrics){
public void collectIApplicationMetrics(String requestId, String requestUri, List<AtlasPerfMetrics.Metric> applicationMetrics){
try {
for(AtlasPerfMetrics.Metric metric : applicationMetrics){
Timer.builder(APPLICATION_LEVEL_METRICS_SUMMARY)
.serviceLevelObjectives(
Duration.ofMillis(500),
Duration.ofMillis(750),
Duration.ofMillis(1000),
Duration.ofMillis(1200),
Duration.ofMillis(1500),
Duration.ofSeconds(2),
Duration.ofSeconds(3),
Duration.ofSeconds(4),
Duration.ofSeconds(5),
Duration.ofSeconds(7),
Duration.ofSeconds(10),
Duration.ofSeconds(15),
Duration.ofSeconds(20),
Duration.ofSeconds(25),
Duration.ofSeconds(30),
Duration.ofSeconds(40),
Duration.ofSeconds(60),
Duration.ofSeconds(90),
Duration.ofSeconds(120),
Duration.ofSeconds(180)
)
.publishPercentiles(PERCENTILES)
.tags(convertToMicrometerTags(metric.getTags()))
.register(getMeterRegistry()).record(metric.getTotalTimeMSecs(), TimeUnit.MILLISECONDS);
if (metric.getMetricType() == AtlasMetricType.COUNTER) {
Counter.builder(metric.getName())
.tags(convertToMicrometerTags(metric.getTags()))
.register(getMeterRegistry())
.increment(metric.getInvocations());
} else {
Timer.builder(APPLICATION_LEVEL_METRICS_SUMMARY)
.serviceLevelObjectives(
Duration.ofMillis(500),
Duration.ofMillis(750),
Duration.ofMillis(1000),
Duration.ofMillis(1200),
Duration.ofMillis(1500),
Duration.ofSeconds(2),
Duration.ofSeconds(3),
Duration.ofSeconds(4),
Duration.ofSeconds(5),
Duration.ofSeconds(7),
Duration.ofSeconds(10),
Duration.ofSeconds(15),
Duration.ofSeconds(20),
Duration.ofSeconds(25),
Duration.ofSeconds(30),
Duration.ofSeconds(40),
Duration.ofSeconds(60),
Duration.ofSeconds(90),
Duration.ofSeconds(120),
Duration.ofSeconds(180)
)
.publishPercentiles(PERCENTILES)
.tags(convertToMicrometerTags(metric.getTags()))
.register(getMeterRegistry()).record(metric.getTotalTimeMSecs(), TimeUnit.MILLISECONDS);
}
}
} catch (Exception e) {
LOG.error("Failed to collect metrics", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public abstract class AbstractRedisService implements RedisService {
private static final String ATLAS_METASTORE_SERVICE = "atlas-metastore-service";

RedissonClient redisClient;
RedissonClient redisCacheClient;
Map<String, RLock> keyLockMap;
Configuration atlasConfig;
long waitTimeInMS;
Expand Down Expand Up @@ -71,6 +72,25 @@ public void releaseDistributedLock(String key) {
}
}

@Override
public String getValue(String key) {
// If value doesn't exist, return null else return the value
return (String) redisCacheClient.getBucket(convertToNamespace(key)).get();
}

@Override
public String putValue(String key, String value) {
// Put the value in the redis cache with TTL
redisCacheClient.getBucket(convertToNamespace(key)).set(value, 30, TimeUnit.SECONDS);
return value;
}

@Override
public void removeValue(String key) {
// Remove the value from the redis cache
redisCacheClient.getBucket(convertToNamespace(key)).delete();
}

private String getHostAddress() throws UnknownHostException {
return InetAddress.getLocalHost().getHostAddress();
}
Expand All @@ -85,6 +105,11 @@ private Config initAtlasConfig() throws AtlasException {
return redisConfig;
}

private String convertToNamespace(String key){
// Append key with namespace :atlas
return "atlas:"+key;
}

Config getLocalConfig() throws AtlasException {
Config config = initAtlasConfig();
config.useSingleServer()
Expand Down Expand Up @@ -112,6 +137,26 @@ Config getProdConfig() throws AtlasException {
return config;
}

Config getCacheImplConfig() {
Config config = new Config();
config.useSentinelServers()
.setClientName(ATLAS_METASTORE_SERVICE+"-redisCache")
.setReadMode(ReadMode.MASTER_SLAVE)
.setCheckSentinelsList(false)
.setKeepAlive(true)
.setMasterConnectionMinimumIdleSize(10)
.setMasterConnectionPoolSize(20)
.setSlaveConnectionMinimumIdleSize(10)
.setSlaveConnectionPoolSize(20)
.setMasterName(atlasConfig.getString(ATLAS_REDIS_MASTER_NAME))
.addSentinelAddress(formatUrls(atlasConfig.getStringArray(ATLAS_REDIS_SENTINEL_URLS)))
.setUsername(atlasConfig.getString(ATLAS_REDIS_USERNAME))
.setPassword(atlasConfig.getString(ATLAS_REDIS_PASSWORD))
.setTimeout(10) //Setting UP timeout to 10ms
.setRetryAttempts(0);
return config;
}

private String[] formatUrls(String[] urls) throws IllegalArgumentException {
if (ArrayUtils.isEmpty(urls)) {
getLogger().error("Invalid redis cluster urls");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ public void releaseDistributedLock(String key) {
//do nothing
}

@Override
public String getValue(String key) {
return null;
}

@Override
public String putValue(String key, String value) {
return null;
}

@Override
public void removeValue(String key) {

}

@Override
public Logger getLogger() {
return LOG;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ public interface RedisService {

void releaseDistributedLock(String key);

String getValue(String key);

String putValue(String key, String value);

void removeValue(String key);

Logger getLogger();

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class RedisServiceImpl extends AbstractRedisService{
@PostConstruct
public void init() throws AtlasException {
redisClient = Redisson.create(getProdConfig());
redisCacheClient = Redisson.create(getCacheImplConfig());
LOG.info("Sentinel redis client created successfully.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,25 @@ public class RedisServiceLocalImpl extends AbstractRedisService {
@PostConstruct
public void init() throws AtlasException {
redisClient = Redisson.create(getLocalConfig());
redisCacheClient = Redisson.create(getLocalConfig());
LOG.info("Local redis client created successfully.");
}

@Override
public String getValue(String key) {
return null;
}

@Override
public String putValue(String key, String value) {
return null;
}

@Override
public void removeValue(String key) {

}

@Override
public Logger getLogger() {
return LOG;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.apache.atlas.utils;

public enum AtlasMetricType {
COUNTER,
GAUGE,
HISTOGRAM,
METER,
TIMER
}
14 changes: 14 additions & 0 deletions common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ long getElapsedTime() {

public static class Metric {
private final String name;

private AtlasMetricType metricType;
private long invocations = 0;
private long totalTimeMSecs = 0;
HashMap<String, String> tags = new HashMap<>();
Expand All @@ -112,6 +114,14 @@ public Metric(String name) {
this.name = name;
}

public void setMetricType(AtlasMetricType metricType) {
this.metricType = metricType;
}

public AtlasMetricType getMetricType() {
return metricType;
}

public String getName() {
return name;
}
Expand All @@ -135,5 +145,9 @@ public HashMap<String, String> getTags() {
return tags;
}

public void incrementInvocations() {
invocations++;
}

}
}
4 changes: 4 additions & 0 deletions graphdb/janus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-server-api</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Loading

0 comments on commit 9b19d24

Please sign in to comment.