Skip to content

Commit

Permalink
Support Envoy {AccessLog,Metrics}Service API V3
Browse files Browse the repository at this point in the history
  • Loading branch information
kezhenxu94 committed Dec 15, 2020
1 parent 62b690b commit a1c543b
Show file tree
Hide file tree
Showing 38 changed files with 2,238 additions and 374 deletions.
Expand Up @@ -18,34 +18,35 @@

package org.apache.skywalking.oap.server.receiver.envoy;

import io.envoyproxy.envoy.service.accesslog.v2.AccessLogServiceGrpc;
import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsResponse;
import io.grpc.stub.StreamObserver;
import com.google.protobuf.Message;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.ServiceLoader;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
import org.apache.skywalking.oap.server.receiver.envoy.als.ProtoMessages;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.receiver.envoy.als.wrapper.Identifier;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogServiceImplBase {
private static final Logger LOGGER = LoggerFactory.getLogger(AccessLogServiceGRPCHandler.class);
private final List<ALSHTTPAnalysis> envoyHTTPAnalysisList;
@Slf4j
public class AccessLogServiceGRPCHandler {
protected final List<ALSHTTPAnalysis> envoyHTTPAnalysisList;

private final CounterMetrics counter;
private final HistogramMetrics histogram;
private final CounterMetrics sourceDispatcherCounter;
protected final CounterMetrics counter;

protected final HistogramMetrics histogram;

protected final CounterMetrics sourceDispatcherCounter;

public AccessLogServiceGRPCHandler(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException {
ServiceLoader<ALSHTTPAnalysis> alshttpAnalyses = ServiceLoader.load(ALSHTTPAnalysis.class);
Expand All @@ -59,75 +60,51 @@ public AccessLogServiceGRPCHandler(ModuleManager manager, EnvoyMetricReceiverCon
}
}

LOGGER.debug("envoy HTTP analysis: " + envoyHTTPAnalysisList);
log.debug("envoy HTTP analysis: " + envoyHTTPAnalysisList);

MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
counter = metricCreator.createCounter("envoy_als_in_count", "The count of envoy ALS metric received", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
histogram = metricCreator.createHistogramMetric("envoy_als_in_latency", "The process latency of service ALS metric receiver", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
sourceDispatcherCounter = metricCreator.createCounter("envoy_als_source_dispatch_count", "The count of envoy ALS metric received", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
}

public StreamObserver<StreamAccessLogsMessage> streamAccessLogs(
StreamObserver<StreamAccessLogsResponse> responseObserver) {
return new StreamObserver<StreamAccessLogsMessage>() {
private volatile boolean isFirst = true;
private Role role;
private StreamAccessLogsMessage.Identifier identifier;

@Override
public void onNext(StreamAccessLogsMessage message) {
counter.inc();

HistogramMetrics.Timer timer = histogram.createTimer();
try {
if (isFirst) {
identifier = message.getIdentifier();
isFirst = false;
role = Role.NONE;
for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) {
role = analysis.identify(identifier, role);
}
}

StreamAccessLogsMessage.LogEntriesCase logCase = message.getLogEntriesCase();

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Messaged is identified from Envoy[{}], role[{}] in [{}]. Received msg {}", identifier
.getNode()
.getId(), role, logCase, message);
}

switch (logCase) {
case HTTP_LOGS:
StreamAccessLogsMessage.HTTPAccessLogEntries logs = message.getHttpLogs();

List<ServiceMeshMetric.Builder> sourceResult = new ArrayList<>();
for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) {
logs.getLogEntryList().forEach(log -> {
sourceResult.addAll(analysis.analysis(identifier, log, role));
});
}

sourceDispatcherCounter.inc(sourceResult.size());
sourceResult.forEach(TelemetryDataDispatcher::process);
break;
}
} finally {
timer.finish();
private Role role;

private volatile boolean isFirst = true;

private Identifier identifier;

protected void handle(Message message) {
counter.inc();

try (final HistogramMetrics.Timer ignored = histogram.createTimer()) {
if (isFirst) {
identifier = new Identifier(ProtoMessages.findField(message, "identifier", null));
isFirst = false;
role = Role.NONE;
for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) {
role = analysis.identify(identifier, role);
}
}

@Override
public void onError(Throwable throwable) {
LOGGER.error("Error in receiving access log from envoy", throwable);
responseObserver.onCompleted();
if (log.isDebugEnabled()) {
log.debug("Messaged is identified from Envoy[{}], role[{}]. Received msg {}", identifier, role, message);
}

@Override
public void onCompleted() {
responseObserver.onNext(StreamAccessLogsResponse.newBuilder().build());
responseObserver.onCompleted();
final List<Message> logs = ProtoMessages.findField(message, "http_logs.log_entry", Collections.emptyList());
final List<ServiceMeshMetric.Builder> sourceResult = new ArrayList<>();
for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) {
logs.forEach(log -> sourceResult.addAll(analysis.analysis(identifier, log, role)));
}
};

sourceDispatcherCounter.inc(sourceResult.size());
sourceResult.forEach(TelemetryDataDispatcher::process);
}
}

public void reset() {
role = null;
isFirst = true;
identifier = null;
}
}
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.receiver.envoy;

import io.envoyproxy.envoy.service.accesslog.v2.AccessLogServiceGrpc;
import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsResponse;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;

@Slf4j
public class AccessLogServiceV2GRPCHandler extends AccessLogServiceGrpc.AccessLogServiceImplBase {
private final AccessLogServiceGRPCHandler handler;

public AccessLogServiceV2GRPCHandler(final ModuleManager manager, final EnvoyMetricReceiverConfig config) throws ModuleStartException {
handler = new AccessLogServiceGRPCHandler(manager, config);
}

@Override
public StreamObserver<StreamAccessLogsMessage> streamAccessLogs(final StreamObserver<StreamAccessLogsResponse> responseObserver) {
handler.reset();

return new StreamObserver<StreamAccessLogsMessage>() {
@Override
public void onNext(StreamAccessLogsMessage message) {
handler.handle(message);
}

@Override
public void onError(Throwable throwable) {
log.error("Error in receiving access log from envoy", throwable);
responseObserver.onCompleted();
handler.reset();
}

@Override
public void onCompleted() {
responseObserver.onNext(StreamAccessLogsResponse.newBuilder().build());
responseObserver.onCompleted();
handler.reset();
}
};
}
}
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.receiver.envoy;

import io.envoyproxy.envoy.service.accesslog.v3.AccessLogServiceGrpc;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsResponse;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;

@Slf4j
public class AccessLogServiceV3GRPCHandler extends AccessLogServiceGrpc.AccessLogServiceImplBase {
private final AccessLogServiceGRPCHandler handler;

public AccessLogServiceV3GRPCHandler(final ModuleManager manager, final EnvoyMetricReceiverConfig config) throws ModuleStartException {
handler = new AccessLogServiceGRPCHandler(manager, config);
}

public StreamObserver<StreamAccessLogsMessage> streamAccessLogs(final StreamObserver<StreamAccessLogsResponse> responseObserver) {
handler.reset();

return new StreamObserver<StreamAccessLogsMessage>() {
@Override
public void onNext(StreamAccessLogsMessage message) {
handler.handle(message);
}

@Override
public void onError(Throwable throwable) {
log.error("Error in receiving access log from envoy", throwable);
responseObserver.onCompleted();
handler.reset();
}

@Override
public void onCompleted() {
responseObserver.onNext(StreamAccessLogsResponse.newBuilder().build());
responseObserver.onCompleted();
handler.reset();
}
};
}
}
Expand Up @@ -68,9 +68,11 @@ public void start() throws ServiceNotProvidedException, ModuleStartException {
.getService(OALEngineLoaderService.class)
.load(EnvoyOALDefine.INSTANCE);

service.addHandler(new MetricServiceGRPCHandler(getManager()));
service.addHandler(new MetricServiceV2GRPCHandler(getManager()));
service.addHandler(new MetricServiceV3GRPCHandler(getManager()));
}
service.addHandler(new AccessLogServiceGRPCHandler(getManager(), config));
service.addHandler(new AccessLogServiceV2GRPCHandler(getManager(), config));
service.addHandler(new AccessLogServiceV3GRPCHandler(getManager(), config));
}

@Override
Expand Down

0 comments on commit a1c543b

Please sign in to comment.