Skip to content

Commit

Permalink
Merge pull request #1056 from allegro/auto-switching-to-read-only-mod…
Browse files Browse the repository at this point in the history
…e-1052-2

Auto switching to read only mode #1052
  • Loading branch information
jewertow committed Jul 5, 2019
2 parents 6cf6f39 + 8de06ea commit 51185b3
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 6 deletions.
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,27 @@

### ...

## 1.1.1 (05.07.2019)

### Enhancements

#### ([1052](https://github.com/allegro/hermes/issues/1052)) Auto switching to read only mode in hermes-management

Hermes-management verifies whether all zookeeper clusters are available.

It writes periodically a timestamp to each one of them.

When the timestamp write fails on one of the zk clusters then management switches into ReadOnly mode.

This feature is disabled by default. Enable with:

```yaml
management:
health:
periodSeconds: 30
enabled: true
```

## 1.1.0 (02.07.2019)

### Enhancements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class ZookeeperPaths {
public static final String BLACKLIST_PATH = "blacklist";
public static final String MAX_RATE_PATH = "max-rate";
public static final String MAX_RATE_HISTORY_PATH = "history";
public static final String STORAGE_HEALTH_PATH = "storage-health";

private final String basePath;

Expand Down Expand Up @@ -172,4 +173,8 @@ public String oAuthProvidersPath() {
public String oAuthProviderPath(String oAuthProviderName) {
return Joiner.on(URL_SEPARATOR).join(oAuthProvidersPath(), oAuthProviderName);
}

public String nodeHealthPathForManagementHost(String host, String port) {
return Joiner.on(URL_SEPARATOR).join(basePath, STORAGE_HEALTH_PATH, String.format("%s_%s", host, port));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package pl.allegro.tech.hermes.management.domain.health;

class CouldNotResolveHostNameException extends RuntimeException {
CouldNotResolveHostNameException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package pl.allegro.tech.hermes.management.domain.health;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;
import pl.allegro.tech.hermes.management.domain.mode.ModeService;
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClientManager;

import javax.annotation.PostConstruct;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Component
@ConditionalOnProperty(name = "management.health.enabled", havingValue = "true")
public class HealthCheckScheduler {

private static final Logger logger = LoggerFactory.getLogger(HealthCheckScheduler.class);

private final HealthCheckTask healthCheckTask;
private final Long period;
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("storage-health-check-scheduler-%d").build()
);

public HealthCheckScheduler(ZookeeperClientManager zookeeperClientManager,
ZookeeperPaths zookeeperPaths,
NodeDataProvider nodeDataProvider,
ObjectMapper objectMapper,
ModeService modeService,
@Value("${management.health.periodSeconds}") Long periodSeconds) {
String healthCheckPath = zookeeperPaths.nodeHealthPathForManagementHost(nodeDataProvider.getHostname(), nodeDataProvider.getServerPort());
this.period = periodSeconds;
this.healthCheckTask = new HealthCheckTask(zookeeperClientManager.getClients(), healthCheckPath, objectMapper, modeService);
}

@PostConstruct
public void scheduleHealthCheck() {
logger.info("Starting the storage health check scheduler");
executorService.scheduleAtFixedRate(healthCheckTask, 0, period, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package pl.allegro.tech.hermes.management.domain.health;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.management.domain.mode.ModeService;
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClient;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

class HealthCheckTask implements Runnable {

private static final Logger logger = LoggerFactory.getLogger(HealthCheckTask.class);

private final Collection<ZookeeperClient> zookeeperClients;
private final String healthCheckPath;
private final ObjectMapper objectMapper;
private final ModeService modeService;

HealthCheckTask(Collection<ZookeeperClient> zookeeperClients, String healthCheckPath, ObjectMapper objectMapper, ModeService modeService) {
this.zookeeperClients = zookeeperClients;
this.healthCheckPath = healthCheckPath;
this.objectMapper = objectMapper;
this.modeService = modeService;
}

@Override
public void run() {
final List<HealthCheckResult> healthCheckResults = zookeeperClients.stream()
.map(this::doHealthCheck)
.collect(Collectors.toList());
updateMode(healthCheckResults);
}

private HealthCheckResult doHealthCheck(ZookeeperClient zookeeperClient) {
final String timestamp = LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
try {
zookeeperClient.ensurePathExists(healthCheckPath);
zookeeperClient.getCuratorFramework()
.setData()
.forPath(healthCheckPath, objectMapper.writeValueAsBytes(timestamp));
logger.info("Storage healthy for datacenter {}", zookeeperClient.getDatacenterName());
return HealthCheckResult.HEALTHY;
} catch (Exception e) {
logger.error("Storage health check failed for datacenter {}", zookeeperClient.getDatacenterName(), e);
return HealthCheckResult.UNHEALTHY;
}
}

private void updateMode(List<HealthCheckResult> healthCheckResults) {
if (healthCheckResults.contains(HealthCheckResult.UNHEALTHY)) {
modeService.setMode(ModeService.ManagementMode.READ_ONLY);
} else {
modeService.setMode(ModeService.ManagementMode.READ_WRITE);
}
}

private enum HealthCheckResult {
HEALTHY, UNHEALTHY
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package pl.allegro.tech.hermes.management.domain.health;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.net.InetAddress;
import java.net.UnknownHostException;

@Component
class NodeDataProvider {

private final String serverPort;

NodeDataProvider(@Value("${server.port}") String serverPort) {
this.serverPort = serverPort;
}

String getHostname() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
throw new CouldNotResolveHostNameException(e);
}
}

String getServerPort() {
return serverPort;
}
}
3 changes: 3 additions & 0 deletions hermes-management/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ management:
server:
servlet:
context-path: /
health:
periodSeconds: 30
enabled: true

schema.repository.type: schema_registry

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package pl.allegro.tech.hermes.management.domain.health

import com.fasterxml.jackson.databind.ObjectMapper
import pl.allegro.tech.hermes.management.config.storage.StorageClustersProperties
import pl.allegro.tech.hermes.management.config.storage.StorageProperties
import pl.allegro.tech.hermes.management.domain.mode.ModeService
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClient
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClientManager
import pl.allegro.tech.hermes.management.utils.MultiZookeeperIntegrationTest

class HealthCheckTaskTest extends MultiZookeeperIntegrationTest {

def healthCheckPath = '/hermes/storage-health/hostname_8080'
def modeService = new ModeService()
ZookeeperClientManager manager
HealthCheckTask healthCheckTask

def setup() {
manager = buildZookeeperClientManager()
manager.start()
assertZookeeperClientsConnected(manager.clients)
manager.clients.each { client -> setupZookeeperPath(client, healthCheckPath) }
healthCheckTask = new HealthCheckTask(manager.clients, healthCheckPath, new ObjectMapper(), modeService)
}

def cleanup() {
manager.stop()
}

def "should not change mode in case of successful health check"() {
given:
assert !modeService.readOnlyEnabled

when:
healthCheckTask.run()

then:
!modeService.readOnlyEnabled
}

def "should change mode to READ_ONLY in case of failed health check"() {
given:
assert !modeService.readOnlyEnabled

and:
zookeeper1.stop()

when:
healthCheckTask.run()

then:
modeService.readOnlyEnabled
}

def "should change mode to READ_ONLY in case of failed health check and set READ_WRITE back again in case of successful next connection"() {
given:
assert !modeService.readOnlyEnabled

and:
zookeeper1.stop()

when:
healthCheckTask.run()

then:
modeService.readOnlyEnabled

and:
zookeeper1.restart()

and:
healthCheckTask.run()

and:
!modeService.readOnlyEnabled
}

static buildZookeeperClientManager(String dc = "dc1") {
def properties = new StorageClustersProperties(clusters: [
new StorageProperties(connectionString: "localhost:$DC_1_ZOOKEEPER_PORT", datacenter: DC_1_NAME),
new StorageProperties(connectionString: "localhost:$DC_2_ZOOKEEPER_PORT", datacenter: DC_2_NAME)
])
new ZookeeperClientManager(properties, new TestDatacenterNameProvider(dc))
}

static findClientByDc(List<ZookeeperClient> clients, String dcName) {
clients.find { it.datacenterName == dcName }
}

static setupZookeeperPath(ZookeeperClient zookeeperClient, String path) {
def healthCheckPathExists = zookeeperClient.curatorFramework
.checkExists()
.forPath(path) != null
if (!healthCheckPathExists) {
zookeeperClient.curatorFramework
.create()
.creatingParentContainersIfNeeded()
.forPath(path)
}
}

static assertZookeeperClientsConnected(List<ZookeeperClient> clients) {
def dc1Client = findClientByDc(clients, DC_1_NAME)
assert assertClientConnected(dc1Client)

def dc2Client = findClientByDc(clients, DC_2_NAME)
assert assertClientConnected(dc2Client)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,27 @@ package pl.allegro.tech.hermes.management.utils
import org.apache.curator.test.TestingServer
import pl.allegro.tech.hermes.management.infrastructure.dc.DatacenterNameProvider
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClient
import pl.allegro.tech.hermes.test.helper.util.Ports
import spock.lang.Specification

abstract class MultiZookeeperIntegrationTest extends Specification {

static final int DC_1_ZOOKEEPER_PORT = 9500
static final int DC_1_ZOOKEEPER_PORT = Ports.nextAvailable()
static final String DC_1_NAME = "dc1"
static final int DC_2_ZOOKEEPER_PORT = 9501
static final int DC_2_ZOOKEEPER_PORT = Ports.nextAvailable()
static final String DC_2_NAME = "dc2"

static zookeeper1 = new TestingServer(DC_1_ZOOKEEPER_PORT, false)
static zookeeper2 = new TestingServer(DC_2_ZOOKEEPER_PORT, false)
TestingServer zookeeper1
TestingServer zookeeper2

def setupSpec() {
def setup() {
zookeeper1 = new TestingServer(DC_1_ZOOKEEPER_PORT, false)
zookeeper2 = new TestingServer(DC_2_ZOOKEEPER_PORT, false)
zookeeper1.start()
zookeeper2.start()
}

def cleanupSpec(){
def cleanup() {
zookeeper1.stop()
zookeeper2.stop()
}
Expand Down

0 comments on commit 51185b3

Please sign in to comment.