Skip to content

Commit

Permalink
Atlas Cloud Watch: Update the Spring config to supply firehose beans. (
Browse files Browse the repository at this point in the history
  • Loading branch information
manolama committed Mar 10, 2023
1 parent 605bc8a commit e2f9f90
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 41 deletions.
26 changes: 26 additions & 0 deletions atlas-cloudwatch/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,25 @@ redis-io-dispatcher {
throughput = 16
}

iep.leader {
rediscluster {
# Note: The client app must set the URI to a Redis cluster. For AWS Elastic Cache
# it may look like:
#uri = "cluster.deadbeef.clustercfg.usw2.cache.amazonaws.com"

# Leader election doesn't need many resources.
connection.pool.max = 1
connection.port = 7101
cmd.timeout = 2s
}

leaderId = ${netflix.iep.env.instance-id}

resourceIds = [
${netflix.iep.env.cluster}
]
}

atlas {

poller {
Expand Down Expand Up @@ -131,6 +150,13 @@ atlas {
// How long to wait after the top of the step (usually minute) before starting the scrape.
publishOffset = 5s

// Jedis driver settings.
redis {
connection.pool.max = 8
connection.port = 7101
cmd.timeout = 2000
}

// Batch size for flushing data back to the poller manager
batch-size = 1000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,119 @@
*/
package com.netflix.atlas.spring

import com.netflix.iep.aws2.AwsClientFactory
import akka.actor.ActorSystem
import com.netflix.atlas.akka.AkkaHttpClient
import com.netflix.atlas.akka.DefaultAkkaHttpClient
import com.netflix.atlas.cloudwatch.CloudWatchMetricsProcessor
import com.netflix.atlas.cloudwatch.CloudWatchRules
import com.netflix.atlas.cloudwatch.NetflixTagger
import com.netflix.atlas.cloudwatch.PublishRouter
import com.netflix.atlas.cloudwatch.RedisClusterCloudWatchMetricsProcessor
import com.netflix.atlas.cloudwatch.Tagger
import com.netflix.iep.leader.api.LeaderStatus
import com.netflix.spectator.api.Registry
import com.netflix.spectator.api.Spectator.globalRegistry
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.StrictLogging
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient
import redis.clients.jedis.Connection
import redis.clients.jedis.HostAndPort
import redis.clients.jedis.JedisCluster

import java.util.Optional

/**
* Configures the binding for the cloudwatch client and poller.
*/
@Configuration
class CloudWatchConfiguration {
class CloudWatchConfiguration extends StrictLogging {

// REPLACE With the dyn config jar include
@Bean
def getConfig: Config = ConfigFactory.load()

@Bean
def cloudWatchRules(config: Config): CloudWatchRules = new CloudWatchRules(config)

@Bean
def tagger(config: Config): Tagger = new NetflixTagger(
config.getConfig("atlas.cloudwatch.tagger")
)

@Bean
def publishRouter(
config: Config,
registry: Optional[Registry],
tagger: Tagger,
httpClient: AkkaHttpClient,
system: ActorSystem
): PublishRouter = {
val r = registry.orElseGet(() => globalRegistry())
new PublishRouter(config, r, tagger, httpClient)(system)
}

@Bean
def redisCache(
config: Config,
registry: Optional[Registry],
tagger: Tagger,
jedis: JedisCluster,
leaderStatus: LeaderStatus,
rules: CloudWatchRules,
publishRouter: PublishRouter,
system: ActorSystem
): CloudWatchMetricsProcessor = {
val r = registry.orElseGet(() => globalRegistry())
new RedisClusterCloudWatchMetricsProcessor(
config,
r,
tagger,
jedis,
leaderStatus,
rules,
publishRouter
)(system)
}

@Bean
def getTagger(
config: Config
): NetflixTagger = {
new NetflixTagger(config.getConfig("atlas.cloudwatch.tagger"))
}

@Bean
def httpClient(
system: ActorSystem
): DefaultAkkaHttpClient = {
new DefaultAkkaHttpClient("PubProxy")(system)
}

/**
* Purposely giving each requestee a different cluster client in order to reduce
* connection pool contention when scraping gauges.
*
* @param config
* The Typesafe config.
* @return
* A Jedis cluster client.
*/
@Bean
def cloudWatchClient(factory: AwsClientFactory): CloudWatchClient = {
factory.newInstance(classOf[CloudWatchClient])
def getJedisClient(
config: Config
): JedisCluster = {
val poolConfig = new GenericObjectPoolConfig[Connection]()
poolConfig.setMaxTotal(config.getInt("atlas.cloudwatch.redis.connection.pool.max"))
val cluster =
config.getString("iep.leader.rediscluster.uri") // RedisClusterConfig.getClusterName(config)
logger.info(s"Using Redis cluster ${cluster}")
new JedisCluster(
new HostAndPort(cluster, config.getInt("atlas.cloudwatch.redis.connection.port")),
config.getInt("atlas.cloudwatch.redis.cmd.timeout"),
poolConfig
)
}
}

This file was deleted.

0 comments on commit e2f9f90

Please sign in to comment.