Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agent service discorvery #5120

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
002f4c2
add SuperDataset tag for add super size dataset shards config in es
EvanLjp May 16, 2020
3fead29
explain the meaning of indexShardsNumber and superDatasetIndexShardsF…
EvanLjp May 16, 2020
f84bf3e
code style format
EvanLjp May 17, 2020
b9c0d17
Merge branch 'master' into master
EvanLjp May 17, 2020
f176bdb
code style format
EvanLjp May 17, 2020
34df429
code style format and change notes
EvanLjp May 17, 2020
e698ddf
Merge remote-tracking branch 'upstream/master'
EvanLjp May 20, 2020
52e21a2
Merge remote-tracking branch 'upstream/master'
EvanLjp May 27, 2020
87cc6a2
Merge remote-tracking branch 'upstream/master'
EvanLjp Jun 10, 2020
39669e6
Merge remote-tracking branch 'upstream/master'
EvanLjp Jun 22, 2020
172145b
Merge remote-tracking branch 'upstream/master'
EvanLjp Jul 12, 2020
2c32e00
Merge remote-tracking branch 'upstream/master'
EvanLjp Jul 14, 2020
2adb22a
add agent kubernetes endpoints discovery
EvanLjp Jul 14, 2020
c129e22
Merge remote-tracking branch 'upstream/master' into agent-service-dis…
EvanLjp Jul 17, 2020
d97bac3
keep the origin url
EvanLjp Jul 17, 2020
0bb68a7
keep the origin url
EvanLjp Jul 17, 2020
4fe3d3e
Merge branch 'agent-service-discorvery' of https://github.com/EvanLjp…
EvanLjp Jul 17, 2020
cb91517
add licence
EvanLjp Jul 17, 2020
7be9f07
Merge remote-tracking branch 'upstream/master'
EvanLjp Jul 20, 2020
9bf0783
Merge remote-tracking branch 'upstream/master'
EvanLjp Jul 29, 2020
2dd54c3
Merge remote-tracking branch 'upstream/master'
EvanLjp Aug 1, 2020
2e9a0dd
Merge branch 'master' into agent-service-discorvery
EvanLjp Aug 1, 2020
1c83209
add kubernetes service discovery plugin by custom config
EvanLjp Aug 1, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.agent.core.boot;

import lombok.Getter;

@Getter
public class Address {
private final String host;
private final int port;

public Address(String host, int port) {
this.host = host;
this.port = port;
}

@Override
public int hashCode() {
return toString().hashCode();
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;

Address address = (Address) obj;
return host.equals(address.host) && port == address.port;
}

@Override
public String toString() {
return host + ":" + port;
}

}
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.agent.core.boot;

import java.util.List;

/**
* declare service discovery interface
*/
public interface DiscoveryService extends BootService {
/**
* query remote oap receiver addresses
*
* @return receiver addresses
*/
List<Address> queryRemoteAddresses();

}
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.agent.core.discovery;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.skywalking.apm.agent.core.boot.Address;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DiscoveryService;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.plugin.PluginException;

@DefaultImplementor
public class DefaultDiscoveryService implements DiscoveryService {

private List<Address> addresses;

@Override
public void prepare() throws Throwable {
String backendService = Config.Collector.BACKEND_SERVICE;
if (backendService.trim().length() == 0) {
throw new PluginException("static backendService config is required in static service discovery plugin");
}
addresses = Arrays.stream(backendService.split(",")).map(item -> {
String[] remotes = item.split(":");
return new Address(remotes[0], Integer.parseInt(remotes[1]));
}).collect(Collectors.toList());
}

@Override
public void boot() throws Throwable {
}

@Override
public void onComplete() throws Throwable {
}

@Override
public void shutdown() throws Throwable {
}

@Override
public List<Address> queryRemoteAddresses() {
return Objects.nonNull(addresses) ? addresses : new ArrayList<>();
}

}
Expand Up @@ -18,21 +18,24 @@

package org.apache.skywalking.apm.agent.core.remote;

import com.google.common.base.Joiner;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.boot.Address;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.discovery.DefaultDiscoveryService;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
Expand All @@ -46,8 +49,7 @@ public class GRPCChannelManager implements BootService, Runnable {
private volatile boolean reconnect = true;
private final Random random = new Random();
private final List<GRPCChannelListener> listeners = Collections.synchronizedList(new LinkedList<>());
private volatile List<String> grpcServers;
private volatile int selectedIdx = -1;
private volatile Address selectedAddress = null;
private volatile int reconnectCount = 0;

@Override
Expand All @@ -57,12 +59,7 @@ public void prepare() {

@Override
public void boot() {
if (Config.Collector.BACKEND_SERVICE.trim().length() == 0) {
logger.error("Collector server addresses are not set.");
logger.error("Agent will not uplink any data.");
return;
}
grpcServers = Arrays.asList(Config.Collector.BACKEND_SERVICE.split(","));

connectCheckFuture = Executors.newSingleThreadScheduledExecutor(
new DefaultNamedThreadFactory("GRPCChannelManager")
).scheduleAtFixedRate(
Expand Down Expand Up @@ -93,21 +90,18 @@ public void shutdown() {
public void run() {
logger.debug("Selected collector grpc service running, reconnect:{}.", reconnect);
if (reconnect) {
List<Address> grpcServers = ServiceManager.INSTANCE.findService(DefaultDiscoveryService.class).queryRemoteAddresses();
logger.info("running server addresses:{}", Joiner.on(",").join(grpcServers));
if (grpcServers.size() > 0) {
String server = "";
Address server = selectAddress(grpcServers);
try {
int index = Math.abs(random.nextInt()) % grpcServers.size();
if (index != selectedIdx) {
selectedIdx = index;

server = grpcServers.get(index);
String[] ipAndPort = server.split(":");

if (selectedAddress == null || !selectedAddress.equals(server)) {
selectedAddress = server;
if (managedChannel != null) {
managedChannel.shutdownNow();
}

managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
managedChannel = GRPCChannel.newBuilder(server.getHost(), server.getPort())
.addManagedChannelBuilder(new StandardChannelBuilder())
.addManagedChannelBuilder(new TLSChannelBuilder())
.addChannelDecorator(new AgentIDDecorator())
Expand Down Expand Up @@ -138,6 +132,11 @@ public void run() {
}
}

private Address selectAddress(List<Address> grpcServers) {
int index = Math.abs(random.nextInt()) % grpcServers.size();
return grpcServers.get(index);
}

public void addChannelListener(GRPCChannelListener listener) {
listeners.add(listener);
}
Expand Down
Expand Up @@ -30,4 +30,5 @@ org.apache.skywalking.apm.agent.core.profile.ProfileTaskChannelService
org.apache.skywalking.apm.agent.core.profile.ProfileSnapshotSender
org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService
org.apache.skywalking.apm.agent.core.meter.MeterService
org.apache.skywalking.apm.agent.core.meter.MeterSender
org.apache.skywalking.apm.agent.core.meter.MeterSender
org.apache.skywalking.apm.agent.core.discovery.DefaultDiscoveryService
Expand Up @@ -58,7 +58,7 @@ public static void afterClass() {
public void testServiceDependencies() throws Exception {
HashMap<Class, BootService> registryService = getFieldValue(ServiceManager.INSTANCE, "bootedServices");

assertThat(registryService.size(), is(15));
assertThat(registryService.size(), is(16));

assertTraceSegmentServiceClient(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class));
assertContextManager(ServiceManager.INSTANCE.findService(ContextManager.class));
Expand Down
5 changes: 5 additions & 0 deletions apm-sniffer/config/agent.config
Expand Up @@ -90,3 +90,8 @@ logging.level=${SW_LOGGING_LEVEL:INFO}

# Kafka producer configuration
plugin.kafka.bootstrap_servers=${SW_KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
# service discovery configuration
# kubernetes
#plugin.kubernetesservice.namespace=${SW_CLUSTER_K8S_NAMESPACE:default}
#plugin.kubernetesservice.label_selector=${SW_CLUSTER_LABEL_SELECTOR:app=skywalking-oap,release=skywalking}
#plugin.kubernetesservice.port_name=${SW_CLUSTER_PORT_NAME:GRPC}
1 change: 1 addition & 0 deletions apm-sniffer/optional-plugins/pom.xml
Expand Up @@ -48,6 +48,7 @@
<module>zookeeper-3.4.x-plugin</module>
<module>customize-enhance-plugin</module>
<module>kotlin-coroutine-plugin</module>
<module>service-discovery-plugins</module>
</modules>

<dependencies>
Expand Down
@@ -0,0 +1,74 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>service-discovery-plugins</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>8.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kubernetes-servicediscovery-plugin</artifactId>
<properties>
<kubernetes.version>8.0.0</kubernetes.version>
</properties>
<dependencies>
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
<version>${kubernetes.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
<relocations>
<relocation>
<pattern>com.google.gson</pattern>
<shadedPattern>shaded.com.google.gson</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>