Skip to content

Commit

Permalink
[pinpoint-apm#8619] Extract zookeeper cluster module
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Jan 27, 2022
1 parent 9ff0617 commit 54e397f
Show file tree
Hide file tree
Showing 52 changed files with 279 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import com.navercorp.pinpoint.web.cluster.connection.ClusterConnectionManager;
import com.navercorp.pinpoint.web.cluster.connection.ClusterConnector;
import com.navercorp.pinpoint.web.config.WebClusterConfig;
import org.apache.zookeeper.KeeperException;

import java.io.IOException;
import java.util.List;

/**
Expand All @@ -38,7 +36,7 @@ public EmptyClusterConnectionManager(WebClusterConfig config) {
}

@Override
public void start() throws InterruptedException, IOException, KeeperException {
public void start() {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@

import com.navercorp.pinpoint.web.cluster.ClusterDataManager;
import com.navercorp.pinpoint.web.vo.AgentInfo;
import org.apache.zookeeper.KeeperException;

import java.io.IOException;
import java.util.List;

/**
* @author minwoo.jung
*/
public class EmptyClusterDataManager implements ClusterDataManager {
@Override
public void start() throws InterruptedException, IOException, KeeperException, Exception {
public void start() {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import com.navercorp.pinpoint.web.cluster.ClusterManager;
import com.navercorp.pinpoint.web.config.WebClusterConfig;
import com.navercorp.pinpoint.web.vo.AgentInfo;
import org.apache.zookeeper.KeeperException;

import java.io.IOException;
import java.util.List;

/**
Expand All @@ -35,7 +33,7 @@ public EmptyClusterManager() {
};

@Override
public void start() throws InterruptedException, IOException, KeeperException {
public void start() {
}

@Override
Expand Down
4 changes: 4 additions & 0 deletions collector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-commons-server</artifactId>
</dependency>
<dependency>
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-commons-server-cluster</artifactId>
</dependency>
<dependency>
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-commons-hbase</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,16 @@
import com.navercorp.pinpoint.common.server.cluster.zookeeper.CuratorZookeeperClient;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperClient;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperEventWatcher;
import com.navercorp.pinpoint.common.server.util.concurrent.CommonState;
import com.navercorp.pinpoint.common.server.util.concurrent.CommonStateContext;
import org.apache.zookeeper.KeeperException;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.exception.PinpointZookeeperException;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.util.CommonState;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.util.CommonStateContext;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.springframework.util.Assert;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.Objects;

/**
Expand All @@ -40,6 +38,7 @@
public class FlinkClusterService {

private final Logger logger = LogManager.getLogger(this.getClass());

private final CommonStateContext serviceState;
private final FlinkConfiguration config;
private final FlinkClusterConnectionManager clusterConnectionManager;
Expand All @@ -56,7 +55,7 @@ public FlinkClusterService(FlinkConfiguration config, FlinkClusterConnectionMana
}

@PostConstruct
public void setUp() throws KeeperException, IOException, InterruptedException {
public void setUp() {
if (!config.isFlinkClusterEnable()) {
logger.info("flink cluster disable.");
return;
Expand All @@ -69,7 +68,11 @@ public void setUp() throws KeeperException, IOException, InterruptedException {

ClusterManagerWatcher watcher = new ClusterManagerWatcher(pinpointFlinkClusterPath);
this.client = new CuratorZookeeperClient(config.getFlinkClusterZookeeperAddress(), config.getFlinkClusterSessionTimeout(), watcher);
this.client.connect();
try {
this.client.connect();
} catch (PinpointZookeeperException e) {
throw new RuntimeException("ZookeeperClient connect failed", e);
}

this.zookeeperClusterManager = new ZookeeperClusterManager(client, pinpointFlinkClusterPath, clusterConnectionManager);
this.zookeeperClusterManager.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import com.navercorp.pinpoint.collector.util.AddressParser;
import com.navercorp.pinpoint.collector.util.MultipleAddress;
import com.navercorp.pinpoint.common.profiler.concurrent.PinpointThreadFactory;

import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperClient;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperConstants;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.exception.ConnectionException;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.exception.PinpointZookeeperException;
import com.navercorp.pinpoint.common.server.util.concurrent.CommonState;
import com.navercorp.pinpoint.common.server.util.concurrent.CommonStateContext;

import com.navercorp.pinpoint.common.server.cluster.zookeeper.util.CommonState;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.util.CommonStateContext;
import com.navercorp.pinpoint.common.util.StringUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -123,7 +123,7 @@ public void stop() {
if (!(this.workerState.changeStateDestroying())) {
CommonState state = this.workerState.getCurrentState();

logger.info("{} already {}.", this.getClass().getSimpleName(), state.toString());
logger.info("{} already {}.", this.getClass().getSimpleName(), state);
return;
}

Expand Down Expand Up @@ -164,7 +164,7 @@ public void handleAndRegisterWatcher(String path) {
}
} else {
CommonState state = this.workerState.getCurrentState();
logger.info("{} invalid state {}.", this.getClass().getSimpleName(), state.toString());
logger.info("{} invalid state {}.", this.getClass().getSimpleName(), state);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import com.navercorp.pinpoint.common.server.cluster.zookeeper.CuratorZookeeperClient;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperClient;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperEventWatcher;
import com.navercorp.pinpoint.common.server.util.concurrent.CommonState;
import com.navercorp.pinpoint.common.server.util.concurrent.CommonStateContext;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.util.CommonState;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.util.CommonStateContext;
import com.navercorp.pinpoint.common.util.Assert;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -153,7 +153,7 @@ public void tearDown() {
if (!(this.serviceState.changeStateDestroying())) {
CommonState state = this.serviceState.getCurrentState();

logger.info("{} already {}.", this.getClass().getSimpleName(), state.toString());
logger.info("{} already {}.", this.getClass().getSimpleName(), state);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.navercorp.pinpoint.common.server.cluster.zookeeper.CreateNodeMessage;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperClient;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.exception.PinpointZookeeperException;
import com.navercorp.pinpoint.common.server.util.concurrent.CommonStateContext;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.util.CommonStateContext;
import com.navercorp.pinpoint.common.util.BytesUtils;
import com.navercorp.pinpoint.common.util.CollectionUtils;
import com.navercorp.pinpoint.rpc.util.ClassUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.navercorp.pinpoint.collector.cluster.ClusterPointRepository;
import com.navercorp.pinpoint.collector.cluster.ProfilerClusterManager;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperClient;
import com.navercorp.pinpoint.common.server.util.concurrent.CommonStateContext;

import com.navercorp.pinpoint.common.server.cluster.zookeeper.util.CommonStateContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
Expand Down
4 changes: 2 additions & 2 deletions collector/src/main/resources/applicationContext-collector.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
com.navercorp.pinpoint.collector.service,
com.navercorp.pinpoint.common.server.bo.codec,
com.navercorp.pinpoint.common.server.util,
com.navercorp.pinpoint.common.server.bo,
com.navercorp.pinpoint.common.server.config" />
com.navercorp.pinpoint.common.server.bo" />

<import resource="classpath:applicationContext-collector-profile.xml"/>
<import resource="classpath:applicationContext-collector-grpc.xml"/>
<import resource="classpath:applicationContext-collector-thrift.xml"/>
<import resource="classpath:applicationContext-collector-hbase.xml"/>
<import resource="classpath:applicationContext-collector-namespace.xml"/>

<bean class="com.navercorp.pinpoint.common.server.cluster.zookeeper.config.ClusterConfigurationFactory"/>

<bean id="metricRegistry" class="com.codahale.metrics.MetricRegistry">
</bean>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,26 @@ public InMemoryZookeeperClient(boolean throwException) {
}

@Override
public void connect() throws IOException {
public void connect() throws PinpointZookeeperException {
connected = true;
}

@Override
public synchronized void createPath(String value) throws PinpointZookeeperException, InterruptedException {
public synchronized void createPath(String value) throws PinpointZookeeperException {
ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(value);
contents.put(pathAndNode.getPath(), EMPTY_BYTE);
}

@Override
public synchronized void createNode(CreateNodeMessage createNodeMessage) throws PinpointZookeeperException, InterruptedException {
public synchronized void createNode(CreateNodeMessage createNodeMessage) throws PinpointZookeeperException {
byte[] bytes = contents.putIfAbsent(createNodeMessage.getNodePath(), createNodeMessage.getData());
if (bytes != null) {
throw new BadOperationException("node already exist");
}
}

@Override
public synchronized void createOrSetNode(CreateNodeMessage createNodeMessage) throws PinpointZookeeperException, KeeperException, InterruptedException {
public synchronized void createOrSetNode(CreateNodeMessage createNodeMessage) throws PinpointZookeeperException {
if (intAdder.incrementAndGet() % 2 == 1 && throwException) {
throw new PinpointZookeeperException("exception");
}
Expand All @@ -80,18 +80,18 @@ public synchronized void createOrSetNode(CreateNodeMessage createNodeMessage) th
}

@Override
public synchronized byte[] getData(String path) throws PinpointZookeeperException, InterruptedException {
public synchronized byte[] getData(String path) throws PinpointZookeeperException {
byte[] bytes = contents.get(path);
return bytes;
}

@Override
public byte[] getData(String path, boolean watch) throws PinpointZookeeperException, InterruptedException {
public byte[] getData(String path, boolean watch) throws PinpointZookeeperException {
return contents.get(path);
}

@Override
public synchronized void delete(String path) throws PinpointZookeeperException, InterruptedException {
public synchronized void delete(String path) throws PinpointZookeeperException {
contents.remove(path);
}

Expand Down
117 changes: 117 additions & 0 deletions commons-server-cluster/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2019 NAVER Corp.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint</artifactId>
<version>2.4.0-SNAPSHOT</version>
</parent>

<artifactId>pinpoint-commons-server-cluster</artifactId>
<packaging>jar</packaging>

<properties>
<jdk.version>11</jdk.version>
<jdk.home>${env.JAVA_11_HOME}</jdk.home>
<plugin.animal-sniffer.skip>true</plugin.animal-sniffer.skip>
<test.jdk.home>${jdk.home}</test.jdk.home>

<log4j2.version>${log4j2-jdk8.version}</log4j2.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
<version>${spring.boot.version}</version>
</dependency>


<!-- Logging dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-jcl</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-testcase</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

</dependencies>


</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.navercorp.pinpoint.common.server.cluster.zookeeper;


import org.springframework.util.Assert;

import java.util.Objects;
Expand Down
Loading

0 comments on commit 54e397f

Please sign in to comment.