Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto switching to read only mode #1052 #1056

Merged
merged 13 commits into from
Jul 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,27 @@

### ...

## 1.1.1 (05.07.2019)

### Enhancements

#### ([1052](https://github.com/allegro/hermes/issues/1052)) Auto switching to read only mode in hermes-management

Hermes-management verifies whether all zookeeper clusters are available.

It writes periodically a timestamp to each one of them.

When the timestamp write fails on one of the zk clusters then management switches into ReadOnly mode.

This feature is disabled by default. Enable with:

```yaml
management:
health:
periodSeconds: 30
enabled: true
```

## 1.1.0 (02.07.2019)

### Enhancements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class ZookeeperPaths {
public static final String BLACKLIST_PATH = "blacklist";
public static final String MAX_RATE_PATH = "max-rate";
public static final String MAX_RATE_HISTORY_PATH = "history";
public static final String STORAGE_HEALTH_PATH = "storage-health";

private final String basePath;

Expand Down Expand Up @@ -172,4 +173,8 @@ public String oAuthProvidersPath() {
public String oAuthProviderPath(String oAuthProviderName) {
return Joiner.on(URL_SEPARATOR).join(oAuthProvidersPath(), oAuthProviderName);
}

public String nodeHealthPathForManagementHost(String host, String port) {
return Joiner.on(URL_SEPARATOR).join(basePath, STORAGE_HEALTH_PATH, String.format("%s_%s", host, port));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package pl.allegro.tech.hermes.management.domain.health;

class CouldNotResolveHostNameException extends RuntimeException {
CouldNotResolveHostNameException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package pl.allegro.tech.hermes.management.domain.health;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;
import pl.allegro.tech.hermes.management.domain.mode.ModeService;
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClientManager;

import javax.annotation.PostConstruct;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Component
@ConditionalOnProperty(name = "management.health.enabled", havingValue = "true")
public class HealthCheckScheduler {

private static final Logger logger = LoggerFactory.getLogger(HealthCheckScheduler.class);

private final HealthCheckTask healthCheckTask;
private final Long period;
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("storage-health-check-scheduler-%d").build()
);

public HealthCheckScheduler(ZookeeperClientManager zookeeperClientManager,
ZookeeperPaths zookeeperPaths,
NodeDataProvider nodeDataProvider,
ObjectMapper objectMapper,
ModeService modeService,
@Value("${management.health.periodSeconds}") Long periodSeconds) {
String healthCheckPath = zookeeperPaths.nodeHealthPathForManagementHost(nodeDataProvider.getHostname(), nodeDataProvider.getServerPort());
this.period = periodSeconds;
this.healthCheckTask = new HealthCheckTask(zookeeperClientManager.getClients(), healthCheckPath, objectMapper, modeService);
}

@PostConstruct
public void scheduleHealthCheck() {
jewertow marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Starting the storage health check scheduler");
executorService.scheduleAtFixedRate(healthCheckTask, 0, period, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package pl.allegro.tech.hermes.management.domain.health;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.management.domain.mode.ModeService;
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClient;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

class HealthCheckTask implements Runnable {

private static final Logger logger = LoggerFactory.getLogger(HealthCheckTask.class);

private final Collection<ZookeeperClient> zookeeperClients;
private final String healthCheckPath;
private final ObjectMapper objectMapper;
private final ModeService modeService;

HealthCheckTask(Collection<ZookeeperClient> zookeeperClients, String healthCheckPath, ObjectMapper objectMapper, ModeService modeService) {
this.zookeeperClients = zookeeperClients;
this.healthCheckPath = healthCheckPath;
this.objectMapper = objectMapper;
this.modeService = modeService;
}

@Override
public void run() {
final List<HealthCheckResult> healthCheckResults = zookeeperClients.stream()
.map(this::doHealthCheck)
.collect(Collectors.toList());
updateMode(healthCheckResults);
}

private HealthCheckResult doHealthCheck(ZookeeperClient zookeeperClient) {
final String timestamp = LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
try {
zookeeperClient.ensurePathExists(healthCheckPath);
zookeeperClient.getCuratorFramework()
.setData()
.forPath(healthCheckPath, objectMapper.writeValueAsBytes(timestamp));
cristaloleg marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Storage healthy for datacenter {}", zookeeperClient.getDatacenterName());
return HealthCheckResult.HEALTHY;
} catch (Exception e) {
logger.error("Storage health check failed for datacenter {}", zookeeperClient.getDatacenterName(), e);
return HealthCheckResult.UNHEALTHY;
}
}

private void updateMode(List<HealthCheckResult> healthCheckResults) {
if (healthCheckResults.contains(HealthCheckResult.UNHEALTHY)) {
modeService.setMode(ModeService.ManagementMode.READ_ONLY);
} else {
modeService.setMode(ModeService.ManagementMode.READ_WRITE);
}
}

private enum HealthCheckResult {
HEALTHY, UNHEALTHY
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package pl.allegro.tech.hermes.management.domain.health;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.net.InetAddress;
import java.net.UnknownHostException;

@Component
class NodeDataProvider {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will not change it.


private final String serverPort;

NodeDataProvider(@Value("${server.port}") String serverPort) {
this.serverPort = serverPort;
}

String getHostname() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
throw new CouldNotResolveHostNameException(e);
}
}

String getServerPort() {
return serverPort;
}
}
3 changes: 3 additions & 0 deletions hermes-management/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ management:
server:
servlet:
context-path: /
health:
periodSeconds: 30
enabled: true

schema.repository.type: schema_registry

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package pl.allegro.tech.hermes.management.domain.health

import com.fasterxml.jackson.databind.ObjectMapper
import pl.allegro.tech.hermes.management.config.storage.StorageClustersProperties
import pl.allegro.tech.hermes.management.config.storage.StorageProperties
import pl.allegro.tech.hermes.management.domain.mode.ModeService
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClient
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClientManager
import pl.allegro.tech.hermes.management.utils.MultiZookeeperIntegrationTest

class HealthCheckTaskTest extends MultiZookeeperIntegrationTest {

def healthCheckPath = '/hermes/storage-health/hostname_8080'
def modeService = new ModeService()
ZookeeperClientManager manager
HealthCheckTask healthCheckTask

def setup() {
manager = buildZookeeperClientManager()
manager.start()
assertZookeeperClientsConnected(manager.clients)
manager.clients.each { client -> setupZookeeperPath(client, healthCheckPath) }
healthCheckTask = new HealthCheckTask(manager.clients, healthCheckPath, new ObjectMapper(), modeService)
}

def cleanup() {
manager.stop()
}

def "should not change mode in case of successful health check"() {
given:
assert !modeService.readOnlyEnabled

when:
healthCheckTask.run()

then:
!modeService.readOnlyEnabled
}

def "should change mode to READ_ONLY in case of failed health check"() {
given:
assert !modeService.readOnlyEnabled

and:
zookeeper1.stop()

when:
healthCheckTask.run()

then:
modeService.readOnlyEnabled
}

def "should change mode to READ_ONLY in case of failed health check and set READ_WRITE back again in case of successful next connection"() {
given:
assert !modeService.readOnlyEnabled

and:
zookeeper1.stop()

when:
healthCheckTask.run()

then:
modeService.readOnlyEnabled

and:
zookeeper1.restart()

and:
healthCheckTask.run()

and:
!modeService.readOnlyEnabled
}

static buildZookeeperClientManager(String dc = "dc1") {
def properties = new StorageClustersProperties(clusters: [
new StorageProperties(connectionString: "localhost:$DC_1_ZOOKEEPER_PORT", datacenter: DC_1_NAME),
new StorageProperties(connectionString: "localhost:$DC_2_ZOOKEEPER_PORT", datacenter: DC_2_NAME)
])
new ZookeeperClientManager(properties, new TestDatacenterNameProvider(dc))
}

static findClientByDc(List<ZookeeperClient> clients, String dcName) {
clients.find { it.datacenterName == dcName }
}

static setupZookeeperPath(ZookeeperClient zookeeperClient, String path) {
def healthCheckPathExists = zookeeperClient.curatorFramework
.checkExists()
.forPath(path) != null
if (!healthCheckPathExists) {
zookeeperClient.curatorFramework
.create()
.creatingParentContainersIfNeeded()
.forPath(path)
}
}

static assertZookeeperClientsConnected(List<ZookeeperClient> clients) {
def dc1Client = findClientByDc(clients, DC_1_NAME)
assert assertClientConnected(dc1Client)

def dc2Client = findClientByDc(clients, DC_2_NAME)
assert assertClientConnected(dc2Client)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,27 @@ package pl.allegro.tech.hermes.management.utils
import org.apache.curator.test.TestingServer
import pl.allegro.tech.hermes.management.infrastructure.dc.DatacenterNameProvider
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClient
import pl.allegro.tech.hermes.test.helper.util.Ports
import spock.lang.Specification

abstract class MultiZookeeperIntegrationTest extends Specification {

static final int DC_1_ZOOKEEPER_PORT = 9500
static final int DC_1_ZOOKEEPER_PORT = Ports.nextAvailable()
static final String DC_1_NAME = "dc1"
static final int DC_2_ZOOKEEPER_PORT = 9501
static final int DC_2_ZOOKEEPER_PORT = Ports.nextAvailable()
static final String DC_2_NAME = "dc2"

static zookeeper1 = new TestingServer(DC_1_ZOOKEEPER_PORT, false)
static zookeeper2 = new TestingServer(DC_2_ZOOKEEPER_PORT, false)
TestingServer zookeeper1
TestingServer zookeeper2

def setupSpec() {
def setup() {
zookeeper1 = new TestingServer(DC_1_ZOOKEEPER_PORT, false)
zookeeper2 = new TestingServer(DC_2_ZOOKEEPER_PORT, false)
zookeeper1.start()
zookeeper2.start()
}

def cleanupSpec(){
def cleanup() {
zookeeper1.stop()
zookeeper2.stop()
}
Expand Down