From c8a26faec7ab293dfc16506cf97efd41e7e8274d Mon Sep 17 00:00:00 2001 From: lu-xiaoshuang <121755080+lu-xiaoshuang@users.noreply.github.com> Date: Thu, 18 May 2023 20:00:00 +0800 Subject: [PATCH] add kafka reporter part 2 --- build/version.props | 2 +- .../grpc/SkyApm.Sample.GrpcServer/skyapm.json | 72 +++++++---- skyapm-dotnet.sln | 7 ++ .../Config/InstrumentConfig.cs | 15 ++- src/SkyApm.Abstractions/Logging/ILogger.cs | 2 +- .../Extensions/ServiceCollectionExtensions.cs | 78 ++++++++++-- .../SkyApm.Agent.Hosting.csproj | 1 + .../GrpcNetClientICarrierHeaderCollection.cs | 24 ++-- .../Client/ClientDiagnosticProcessor.cs | 4 +- .../HttpClientDiagnosticProcessor.cs | 24 ++-- .../Command/ConfigCommand.cs | 32 +++-- src/SkyApm.Transport.Grpc/CLRStatsReporter.cs | 6 + .../Common/SegmentV8Helpers.cs | 5 - src/SkyApm.Transport.Grpc/LogReporter.cs | 53 ++++++++ src/SkyApm.Transport.Grpc/SegmentReporter.cs | 16 ++- .../V8/CLRStatsReporter.cs | 11 +- src/SkyApm.Transport.Grpc/V8/LogReporter.cs | 14 +-- .../V8/SegmentReporter.cs | 8 +- .../CLRStatsReporter.cs | 52 ++++++++ .../Common/SegmentV8Helpers.cs | 115 +++++++++++++++++ src/SkyApm.Transport.Kafka/LogReporter.cs | 53 ++++++++ src/SkyApm.Transport.Kafka/PingCaller.cs | 45 +++++++ src/SkyApm.Transport.Kafka/SegmentReporter.cs | 53 ++++++++ src/SkyApm.Transport.Kafka/ServiceRegister.cs | 47 +++++++ .../SkyApm.Transport.Kafka.csproj | 25 ++++ .../V8/CLRStatsReporter.cs | 99 +++++++++++++++ src/SkyApm.Transport.Kafka/V8/LogReporter.cs | 116 ++++++++++++++++++ src/SkyApm.Transport.Kafka/V8/PingCaller.cs | 44 +++++++ .../V8/SegmentReporter.cs | 84 +++++++++++++ .../V8/ServiceRegister.cs | 50 ++++++++ .../ConfigurationBuilderExtensions.cs | 59 +++++---- 31 files changed, 1081 insertions(+), 135 deletions(-) create mode 100644 src/SkyApm.Transport.Grpc/LogReporter.cs create mode 100644 src/SkyApm.Transport.Kafka/CLRStatsReporter.cs create mode 100644 src/SkyApm.Transport.Kafka/Common/SegmentV8Helpers.cs create mode 100644 src/SkyApm.Transport.Kafka/LogReporter.cs create mode 100644 src/SkyApm.Transport.Kafka/PingCaller.cs create mode 100644 src/SkyApm.Transport.Kafka/SegmentReporter.cs create mode 100644 src/SkyApm.Transport.Kafka/ServiceRegister.cs create mode 100644 src/SkyApm.Transport.Kafka/SkyApm.Transport.Kafka.csproj create mode 100644 src/SkyApm.Transport.Kafka/V8/CLRStatsReporter.cs create mode 100644 src/SkyApm.Transport.Kafka/V8/LogReporter.cs create mode 100644 src/SkyApm.Transport.Kafka/V8/PingCaller.cs create mode 100644 src/SkyApm.Transport.Kafka/V8/SegmentReporter.cs create mode 100644 src/SkyApm.Transport.Kafka/V8/ServiceRegister.cs diff --git a/build/version.props b/build/version.props index 7dc91a1e..d705cb49 100644 --- a/build/version.props +++ b/build/version.props @@ -1,7 +1,7 @@ 2 - 1 + 2 0 $(VersionMajor).$(VersionMinor).$(VersionPatch) diff --git a/sample/grpc/SkyApm.Sample.GrpcServer/skyapm.json b/sample/grpc/SkyApm.Sample.GrpcServer/skyapm.json index ab6782cb..3243da89 100644 --- a/sample/grpc/SkyApm.Sample.GrpcServer/skyapm.json +++ b/sample/grpc/SkyApm.Sample.GrpcServer/skyapm.json @@ -1,29 +1,47 @@ { - "SkyWalking": { - "ServiceName": "grpc-greeter-server", - "Namespace": "", - "HeaderVersions": [ - "sw8" - ], - "Sampling": { - "SamplePer3Secs": -1, - "Percentage": -1.0 - }, - "Logging": { - "Level": "Information", - "FilePath": "logs/skyapm-{Date}.log" - }, - "Transport": { - "Interval": 3000, - "ProtocolVersion": "v8", - "QueueSize": 30000, - "BatchSize": 3000, - "gRPC": { - "Servers": "localhost:11800", - "Timeout": 100000, - "ConnectTimeout": 100000, - "ReportTimeout": 600000 - } + "SkyWalking": { + "ServiceName": "grpc-greeter-server", + "Namespace": "", + "HeaderVersions": [ + "sw8" + ], + "Sampling": { + "SamplePer3Secs": -1, + "Percentage": -1.0 + }, + "Logging": { + "Level": "Information", + "FilePath": "logs/skyapm-{Date}.log" + }, + "MeterActive": true, + "MetricActive": true, + "SegmentActive": true, + "ProfilingActive": true, + "ManagementActive": true, + "LogActive": true, + "Transport": { + "Interval": 3000, + "ProtocolVersion": "v8", + "QueueSize": 30000, + "BatchSize": 3000, + "Reporter": "kafka", + "gRPC": { + "Servers": "localhost:11800", + "Timeout": 100000, + "ConnectTimeout": 100000, + "ReportTimeout": 600000 + }, + "Kafka": { + "BootstrapServers": "localhost:19091,localhost:19092,localhost:19093", + "ProducerConfig": "", + "GetTopicTimeout": 3000, + "TopicMeters": "skywalking-meters", + "TopicMetrics": "skywalking-metrics", + "TopicSegments": "skywalking-segments", + "TopicProfilings": "skywalking-profilings", + "TopicManagements": "skywalking-managements", + "TopicLogs": "skywalking-logs" + } + } } - } -} \ No newline at end of file +} diff --git a/skyapm-dotnet.sln b/skyapm-dotnet.sln index 4448c226..4c33cff2 100644 --- a/skyapm-dotnet.sln +++ b/skyapm-dotnet.sln @@ -125,6 +125,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SkyApm.PeerFormatters.MySql EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SkyApm.Diagnostics.FreeRedis", "src\SkyApm.Diagnostics.FreeRedis\SkyApm.Diagnostics.FreeRedis.csproj", "{829F7955-3138-4CA2-8AA3-3E4D72C97599}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SkyApm.Transport.Kafka", "src\SkyApm.Transport.Kafka\SkyApm.Transport.Kafka.csproj", "{8BCBAFD0-2C88-8886-5ECF-82D8A88888CB}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -287,6 +289,10 @@ Global {829F7955-3138-4CA2-8AA3-3E4D72C97599}.Debug|Any CPU.Build.0 = Debug|Any CPU {829F7955-3138-4CA2-8AA3-3E4D72C97599}.Release|Any CPU.ActiveCfg = Release|Any CPU {829F7955-3138-4CA2-8AA3-3E4D72C97599}.Release|Any CPU.Build.0 = Release|Any CPU + {8BCBAFD0-2C88-8886-5ECF-82D8A88888CB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8BCBAFD0-2C88-8886-5ECF-82D8A88888CB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8BCBAFD0-2C88-8886-5ECF-82D8A88888CB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8BCBAFD0-2C88-8886-5ECF-82D8A88888CB}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -341,6 +347,7 @@ Global {5DBE2053-EBAE-404F-A7B3-9CE3CD2152D9} = {D122E6AE-6FE7-4C1A-826F-5964ABBF2C9D} {2A313B7E-CC41-4556-8055-D87266C398BF} = {D122E6AE-6FE7-4C1A-826F-5964ABBF2C9D} {829F7955-3138-4CA2-8AA3-3E4D72C97599} = {B5E677CF-2920-4B0A-A056-E73F6B2CF2BC} + {8BCBAFD0-2C88-8886-5ECF-82D8A88888CB} = {CBFF7EE0-69D7-4D6A-9BBD-8E567FF4D810} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {94C0DA2C-CCCB-4314-93A2-9809B5DD0583} diff --git a/src/SkyApm.Abstractions/Config/InstrumentConfig.cs b/src/SkyApm.Abstractions/Config/InstrumentConfig.cs index acbc79e2..1718fcf3 100644 --- a/src/SkyApm.Abstractions/Config/InstrumentConfig.cs +++ b/src/SkyApm.Abstractions/Config/InstrumentConfig.cs @@ -32,12 +32,23 @@ public class InstrumentConfig public string ServiceInstanceName { get; set; } - public string[] HeaderVersions { get; set; } + + public bool MeterActive { get; set; } = true; + + public bool MetricActive { get; set; } = true; + + public bool SegmentActive { get; set; } = true; + + public bool ProfilingActive { get; set; } = true; + + public bool ManagementActive { get; set; } = true; + + public bool LogActive { get; set; } = true; } public static class HeaderVersions { public static string SW8 { get; } = "sw8"; } -} \ No newline at end of file +} diff --git a/src/SkyApm.Abstractions/Logging/ILogger.cs b/src/SkyApm.Abstractions/Logging/ILogger.cs index 7ff9cb9b..723ed1a8 100644 --- a/src/SkyApm.Abstractions/Logging/ILogger.cs +++ b/src/SkyApm.Abstractions/Logging/ILogger.cs @@ -32,4 +32,4 @@ public interface ILogger void Trace(string message); } -} \ No newline at end of file +} \ No newline at end of file diff --git a/src/SkyApm.Agent.Hosting/Extensions/ServiceCollectionExtensions.cs b/src/SkyApm.Agent.Hosting/Extensions/ServiceCollectionExtensions.cs index 0f4c03c1..b6b2f1bb 100644 --- a/src/SkyApm.Agent.Hosting/Extensions/ServiceCollectionExtensions.cs +++ b/src/SkyApm.Agent.Hosting/Extensions/ServiceCollectionExtensions.cs @@ -28,7 +28,6 @@ using SkyApm.Service; using SkyApm.Tracing; using SkyApm.Transport; -using SkyApm.Transport.Grpc; using SkyApm.Utilities.Configuration; using SkyApm.Utilities.DependencyInjection; using SkyApm.Utilities.Logging; @@ -72,7 +71,10 @@ private static IServiceCollection AddSkyAPMCore(this IServiceCollection services services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); - services.AddTracing().AddSampling().AddGrpcTransport().AddSkyApmLogging(); + services.AddTracing(); + services.AddSampling(); + services.AddTransport(); + services.AddSkyApmLogging(); var extensions = services.AddSkyApmExtensions() .AddHttpClient() .AddGrpcClient() @@ -115,15 +117,70 @@ private static IServiceCollection AddSampling(this IServiceCollection services) return services; } - private static IServiceCollection AddGrpcTransport(this IServiceCollection services) + private static IServiceCollection AddTransport(this IServiceCollection services) { - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); + // TODO + // HELP ME + // how to get `reporter' during dependency injection? + string reporter = "grpc"; + while (true) + { + //ServiceProvider serviceProvider = services.BuildServiceProvider(); + //if (serviceProvider == null) + //{ + // break; + //} + //IConfigAccessor configAccessor = serviceProvider.GetService(); + //if (configAccessor == null) + //{ + // break; + //} + //TransportConfig transportConfig = configAccessor.Get(); + //if (transportConfig == null) + //{ + // break; + //} + //reporter = transportConfig.Reporter; + //if (reporter == null) + //{ + // reporter = ""; + //} + break; + } + switch (reporter.ToLower()) + { + case "grpc": + services.AddTransportGrpc(); + break; + case "kafka": + services.AddTransportKafka(); + break; + default: + services.AddTransportGrpc(); + break; + } + return services; + } + + private static IServiceCollection AddTransportGrpc(this IServiceCollection services) + { + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + return services; + } + + private static IServiceCollection AddTransportKafka(this IServiceCollection services) + { + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); return services; } @@ -132,6 +189,5 @@ private static IServiceCollection AddSkyApmLogging(this IServiceCollection servi services.AddSingleton(); return services; } - } } diff --git a/src/SkyApm.Agent.Hosting/SkyApm.Agent.Hosting.csproj b/src/SkyApm.Agent.Hosting/SkyApm.Agent.Hosting.csproj index 206c7ff3..56cf9456 100644 --- a/src/SkyApm.Agent.Hosting/SkyApm.Agent.Hosting.csproj +++ b/src/SkyApm.Agent.Hosting/SkyApm.Agent.Hosting.csproj @@ -44,6 +44,7 @@ + diff --git a/src/SkyApm.Diagnostics.Grpc.Net.Client/GrpcNetClientICarrierHeaderCollection.cs b/src/SkyApm.Diagnostics.Grpc.Net.Client/GrpcNetClientICarrierHeaderCollection.cs index 2da81587..2a627d54 100644 --- a/src/SkyApm.Diagnostics.Grpc.Net.Client/GrpcNetClientICarrierHeaderCollection.cs +++ b/src/SkyApm.Diagnostics.Grpc.Net.Client/GrpcNetClientICarrierHeaderCollection.cs @@ -36,19 +36,19 @@ public GrpcNetClientICarrierHeaderCollection(HttpRequestMessage request) public void Add(string key, string value) { _request.Headers.Add(key, value); - } - - public string Get(string key) - { - if (_request.Headers.TryGetValues(key, out var values)) - return values.FirstOrDefault(); - return null; - } - + } + + public string Get(string key) + { + if (_request.Headers.TryGetValues(key, out var values)) + return values.FirstOrDefault(); + return null; + } + public IEnumerator> GetEnumerator() - { - return _request.Headers.Select(x => - new KeyValuePair(x.Key, x.Value.FirstOrDefault())) + { + return _request.Headers.Select(x => + new KeyValuePair(x.Key, x.Value.FirstOrDefault())) .GetEnumerator(); } diff --git a/src/SkyApm.Diagnostics.Grpc/Client/ClientDiagnosticProcessor.cs b/src/SkyApm.Diagnostics.Grpc/Client/ClientDiagnosticProcessor.cs index 00dc19e7..1e555ef6 100644 --- a/src/SkyApm.Diagnostics.Grpc/Client/ClientDiagnosticProcessor.cs +++ b/src/SkyApm.Diagnostics.Grpc/Client/ClientDiagnosticProcessor.cs @@ -19,7 +19,7 @@ using Grpc.Core; using Grpc.Core.Interceptors; using SkyApm.Common; -using SkyApm.Config; +using SkyApm.Config; using SkyApm.Tracing; using SkyApm.Tracing.Segments; using System; @@ -40,7 +40,7 @@ public class ClientDiagnosticProcessor _tracingConfig = configAccessor.Get(); } - public Metadata BeginRequest(ClientInterceptorContext grpcContext) + public Metadata BeginRequest(ClientInterceptorContext grpcContext) where TRequest : class where TResponse : class { diff --git a/src/SkyApm.Diagnostics.HttpClient/HttpClientDiagnosticProcessor.cs b/src/SkyApm.Diagnostics.HttpClient/HttpClientDiagnosticProcessor.cs index add96ea3..6ca41211 100644 --- a/src/SkyApm.Diagnostics.HttpClient/HttpClientDiagnosticProcessor.cs +++ b/src/SkyApm.Diagnostics.HttpClient/HttpClientDiagnosticProcessor.cs @@ -21,9 +21,9 @@ using System.Linq; using System.Net.Http; using SkyApm.Common; -using SkyApm.Config; -using SkyApm.Diagnostics.HttpClient.Config; -using SkyApm.Diagnostics.HttpClient.Extensions; +using SkyApm.Config; +using SkyApm.Diagnostics.HttpClient.Config; +using SkyApm.Diagnostics.HttpClient.Extensions; using SkyApm.Diagnostics.HttpClient.Filters; using SkyApm.Tracing; @@ -35,7 +35,7 @@ public class HttpClientTracingDiagnosticProcessor : ITracingDiagnosticProcessor //private readonly IContextCarrierFactory _contextCarrierFactory; private readonly ITracingContext _tracingContext; - + private readonly IExitSegmentContextAccessor _contextAccessor; private readonly IEnumerable _requestDiagnosticHandlers; @@ -45,7 +45,7 @@ public class HttpClientTracingDiagnosticProcessor : ITracingDiagnosticProcessor private readonly HttpClientDiagnosticConfig _httpClientDiagnosticConfig; public HttpClientTracingDiagnosticProcessor(ITracingContext tracingContext, - IExitSegmentContextAccessor contextAccessor, + IExitSegmentContextAccessor contextAccessor, IEnumerable requestDiagnosticHandlers, IConfigAccessor configAccessor) { @@ -88,13 +88,13 @@ public void HttpResponse([Property(Name = "Response")] HttpResponseMessage respo context.Span.AddTag(Tags.STATUS_CODE, statusCode); - if(response.Content != null && _httpClientDiagnosticConfig.CollectResponseBodyContentTypes?.Count > 0) - { - var responseBody = response.Content.TryCollectAsString( - _httpClientDiagnosticConfig.CollectResponseBodyContentTypes, - _httpClientDiagnosticConfig.CollectBodyLengthThreshold); - if (!string.IsNullOrEmpty(responseBody)) - context.Span.AddTag(Tags.HTTP_RESPONSE_BODY, responseBody); + if(response.Content != null && _httpClientDiagnosticConfig.CollectResponseBodyContentTypes?.Count > 0) + { + var responseBody = response.Content.TryCollectAsString( + _httpClientDiagnosticConfig.CollectResponseBodyContentTypes, + _httpClientDiagnosticConfig.CollectBodyLengthThreshold); + if (!string.IsNullOrEmpty(responseBody)) + context.Span.AddTag(Tags.HTTP_RESPONSE_BODY, responseBody); } } diff --git a/src/SkyApm.DotNet.CLI/Command/ConfigCommand.cs b/src/SkyApm.DotNet.CLI/Command/ConfigCommand.cs index cf7e8614..1ff44793 100644 --- a/src/SkyApm.DotNet.CLI/Command/ConfigCommand.cs +++ b/src/SkyApm.DotNet.CLI/Command/ConfigCommand.cs @@ -105,7 +105,7 @@ private void Generate if (! GRPC.Equals(reporter, StringComparison.OrdinalIgnoreCase) && ! KAFKA.Equals(reporter, StringComparison.OrdinalIgnoreCase)) { - Console.WriteLine("Invalid reporter type {0}. Use default value.", reporter); + Console.WriteLine("Invalid reporter type {0}. Use default type.", reporter); reporter = GRPC; } @@ -115,7 +115,7 @@ private void Generate { "ProtocolVersion", "v8" }, { "QueueSize", 30000 }, { "BatchSize", 3000 }, - { "reporter", reporter.ToLower() }, + { "Reporter", reporter.ToLower() }, }; { @@ -150,14 +150,14 @@ private void Generate var loggingConfig = new Dictionary { - {"Level", "Information"}, - {"FilePath", Path.Combine("logs", "skyapm-{Date}.log")} + { "Level", "Information" }, + { "FilePath", Path.Combine("logs", "skyapm-{Date}.log") } }; var samplingConfig = new Dictionary { - {"SamplePer3Secs", -1}, - {"Percentage", -1d} + { "SamplePer3Secs", -1 }, + { "Percentage", -1d } }; var HeaderVersionsConfig = new string[] @@ -167,17 +167,23 @@ private void Generate var skyAPMConfig = new Dictionary { - {"ServiceName", serviceName}, - {"Namespace", string.Empty}, - {"HeaderVersions", HeaderVersionsConfig}, - {"Sampling", samplingConfig}, - {"Logging", loggingConfig}, - {"Transport", transportConfig} + { "ServiceName", serviceName }, + { "Namespace", string.Empty }, + { "HeaderVersions", HeaderVersionsConfig }, + { "Sampling", samplingConfig }, + { "Logging", loggingConfig }, + { "MeterActive", true }, + { "MetricActive", true }, + { "SegmentActive", true }, + { "ProfilingActive", true }, + { "ManagementActive", true }, + { "LogActive", true }, + { "Transport", transportConfig } }; var rootConfig = new Dictionary { - {"SkyWalking", skyAPMConfig} + { "SkyWalking", skyAPMConfig } }; using (var writer = configFile.CreateText()) diff --git a/src/SkyApm.Transport.Grpc/CLRStatsReporter.cs b/src/SkyApm.Transport.Grpc/CLRStatsReporter.cs index 373fdf09..22fbbea6 100644 --- a/src/SkyApm.Transport.Grpc/CLRStatsReporter.cs +++ b/src/SkyApm.Transport.Grpc/CLRStatsReporter.cs @@ -27,12 +27,14 @@ namespace SkyApm.Transport.Grpc { public class CLRStatsReporter : ICLRStatsReporter { + private readonly InstrumentConfig _instrumentConfig; private readonly TransportConfig _transportConfig; private readonly ICLRStatsReporter _clrStatsReporterV8; public CLRStatsReporter(ConnectionManager connectionManager, ILoggerFactory loggerFactory, IConfigAccessor configAccessor, IRuntimeEnvironment runtimeEnvironment) { + _instrumentConfig = configAccessor.Get(); _transportConfig = configAccessor.Get(); _clrStatsReporterV8 = new V8.CLRStatsReporter(connectionManager, loggerFactory, configAccessor, runtimeEnvironment); } @@ -40,6 +42,10 @@ public class CLRStatsReporter : ICLRStatsReporter public async Task ReportAsync(CLRStatsRequest statsRequest, CancellationToken cancellationToken = default(CancellationToken)) { + if (_instrumentConfig != null && ! _instrumentConfig.MeterActive) + { + return; + } if (_transportConfig.ProtocolVersion == ProtocolVersions.V8) await _clrStatsReporterV8.ReportAsync(statsRequest); } diff --git a/src/SkyApm.Transport.Grpc/Common/SegmentV8Helpers.cs b/src/SkyApm.Transport.Grpc/Common/SegmentV8Helpers.cs index 38802faa..889a72f3 100644 --- a/src/SkyApm.Transport.Grpc/Common/SegmentV8Helpers.cs +++ b/src/SkyApm.Transport.Grpc/Common/SegmentV8Helpers.cs @@ -18,13 +18,8 @@ using System; using System.Linq; -using Google.Protobuf; using SkyApm.Common; -using SkyApm.Tracing.Segments; using SkyWalking.NetworkProtocol.V3; -using SegmentReference = SkyWalking.NetworkProtocol.V3.SegmentReference; -using SpanLayer = SkyWalking.NetworkProtocol.V3.SpanLayer; -using SpanType = SkyWalking.NetworkProtocol.V3.SpanType; namespace SkyApm.Transport.Grpc.Common { diff --git a/src/SkyApm.Transport.Grpc/LogReporter.cs b/src/SkyApm.Transport.Grpc/LogReporter.cs new file mode 100644 index 00000000..0eab3316 --- /dev/null +++ b/src/SkyApm.Transport.Grpc/LogReporter.cs @@ -0,0 +1,53 @@ +/* + * Licensed to the SkyAPM under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The SkyAPM licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +using SkyApm.Config; +using SkyApm.Logging; + +namespace SkyApm.Transport.Grpc +{ + public class LogReporter : ILogReporter + { + private readonly InstrumentConfig _instrumentConfig; + private readonly TransportConfig _transportConfig; + private readonly ILogReporter _logReporter; + + public LogReporter(ConnectionManager connectionManager, ILoggerFactory loggerFactory, + IConfigAccessor configAccessor) + { + _instrumentConfig = configAccessor.Get(); + _transportConfig = configAccessor.Get(); + _logReporter = new V8.LogReporter(connectionManager, loggerFactory, configAccessor); + } + + public async Task ReportAsync(IReadOnlyCollection logRequests, + CancellationToken cancellationToken = default(CancellationToken)) + { + if (_instrumentConfig != null && ! _instrumentConfig.LogActive) + { + return; + } + if (_transportConfig.ProtocolVersion == ProtocolVersions.V8) + await _logReporter.ReportAsync(logRequests); + } + } +} diff --git a/src/SkyApm.Transport.Grpc/SegmentReporter.cs b/src/SkyApm.Transport.Grpc/SegmentReporter.cs index 534c416f..6aff292e 100644 --- a/src/SkyApm.Transport.Grpc/SegmentReporter.cs +++ b/src/SkyApm.Transport.Grpc/SegmentReporter.cs @@ -26,21 +26,27 @@ namespace SkyApm.Transport.Grpc { public class SegmentReporter : ISegmentReporter { - private readonly ISegmentReporter _segmentReporterV8; + private readonly InstrumentConfig _instrumentConfig; private readonly TransportConfig _transportConfig; + private readonly ISegmentReporter _segmentReporterV8; - public SegmentReporter(ConnectionManager connectionManager, IConfigAccessor configAccessor, - ILoggerFactory loggerFactory) + public SegmentReporter(ConnectionManager connectionManager, ILoggerFactory loggerFactory, + IConfigAccessor configAccessor) { + _instrumentConfig = configAccessor.Get(); _transportConfig = configAccessor.Get(); - _segmentReporterV8 = new V8.SegmentReporter(connectionManager, configAccessor, loggerFactory); + _segmentReporterV8 = new V8.SegmentReporter(connectionManager, loggerFactory, configAccessor); } public async Task ReportAsync(IReadOnlyCollection segmentRequests, CancellationToken cancellationToken = default(CancellationToken)) { + if (_instrumentConfig != null && ! _instrumentConfig.SegmentActive) + { + return; + } if (_transportConfig.ProtocolVersion == ProtocolVersions.V8) await _segmentReporterV8.ReportAsync(segmentRequests, cancellationToken); } } -} \ No newline at end of file +} diff --git a/src/SkyApm.Transport.Grpc/V8/CLRStatsReporter.cs b/src/SkyApm.Transport.Grpc/V8/CLRStatsReporter.cs index 3fb62ff3..10cb5c82 100644 --- a/src/SkyApm.Transport.Grpc/V8/CLRStatsReporter.cs +++ b/src/SkyApm.Transport.Grpc/V8/CLRStatsReporter.cs @@ -17,6 +17,7 @@ */ using System; +using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using SkyApm.Config; @@ -28,20 +29,18 @@ namespace SkyApm.Transport.Grpc.V8 { internal class CLRStatsReporter : ICLRStatsReporter { - private readonly ConnectionManager _connectionManager; private readonly ILogger _logger; - private readonly GrpcConfig _config; - private readonly IRuntimeEnvironment _runtimeEnvironment; + private readonly ConnectionManager _connectionManager; private readonly InstrumentConfig _instrumentConfig; + private readonly GrpcConfig _config; public CLRStatsReporter(ConnectionManager connectionManager, ILoggerFactory loggerFactory, IConfigAccessor configAccessor, IRuntimeEnvironment runtimeEnvironment) { - _connectionManager = connectionManager; _logger = loggerFactory.CreateLogger(typeof(CLRStatsReporter)); - _config = configAccessor.Get(); - _runtimeEnvironment = runtimeEnvironment; + _connectionManager = connectionManager; _instrumentConfig = configAccessor.Get(); + _config = configAccessor.Get(); } public async Task ReportAsync(CLRStatsRequest statsRequest, diff --git a/src/SkyApm.Transport.Grpc/V8/LogReporter.cs b/src/SkyApm.Transport.Grpc/V8/LogReporter.cs index 2ee058ba..77c67270 100644 --- a/src/SkyApm.Transport.Grpc/V8/LogReporter.cs +++ b/src/SkyApm.Transport.Grpc/V8/LogReporter.cs @@ -28,22 +28,22 @@ using System.Threading; using System.Threading.Tasks; -namespace SkyApm.Transport.Grpc +namespace SkyApm.Transport.Grpc.V8 { public class LogReporter : ILogReporter { - private readonly ConnectionManager _connectionManager; private readonly ILogger _logger; - private readonly GrpcConfig _grpcConfig; + private readonly ConnectionManager _connectionManager; private readonly InstrumentConfig _instrumentConfig; + private readonly GrpcConfig _grpcConfig; - public LogReporter(ConnectionManager connectionManager, IConfigAccessor configAccessor, - ILoggerFactory loggerFactory) + public LogReporter(ConnectionManager connectionManager, ILoggerFactory loggerFactory, + IConfigAccessor configAccessor) { + _logger = loggerFactory.CreateLogger(typeof(LogReporter)); _connectionManager = connectionManager; - _grpcConfig = configAccessor.Get(); _instrumentConfig = configAccessor.Get(); - _logger = loggerFactory.CreateLogger(typeof(LogReporter)); + _grpcConfig = configAccessor.Get(); } public async Task ReportAsync(IReadOnlyCollection logRequests, diff --git a/src/SkyApm.Transport.Grpc/V8/SegmentReporter.cs b/src/SkyApm.Transport.Grpc/V8/SegmentReporter.cs index aca4ebdc..4bc34cbe 100644 --- a/src/SkyApm.Transport.Grpc/V8/SegmentReporter.cs +++ b/src/SkyApm.Transport.Grpc/V8/SegmentReporter.cs @@ -32,16 +32,16 @@ namespace SkyApm.Transport.Grpc.V8 { internal class SegmentReporter : ISegmentReporter { - private readonly ConnectionManager _connectionManager; private readonly ILogger _logger; + private readonly ConnectionManager _connectionManager; private readonly GrpcConfig _config; - public SegmentReporter(ConnectionManager connectionManager, IConfigAccessor configAccessor, - ILoggerFactory loggerFactory) + public SegmentReporter(ConnectionManager connectionManager, ILoggerFactory loggerFactory, + IConfigAccessor configAccessor) { + _logger = loggerFactory.CreateLogger(typeof(SegmentReporter)); _connectionManager = connectionManager; _config = configAccessor.Get(); - _logger = loggerFactory.CreateLogger(typeof(SegmentReporter)); } public async Task ReportAsync(IReadOnlyCollection segmentRequests, diff --git a/src/SkyApm.Transport.Kafka/CLRStatsReporter.cs b/src/SkyApm.Transport.Kafka/CLRStatsReporter.cs new file mode 100644 index 00000000..8f348a65 --- /dev/null +++ b/src/SkyApm.Transport.Kafka/CLRStatsReporter.cs @@ -0,0 +1,52 @@ +/* + * Licensed to the SkyAPM under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The SkyAPM licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +using System.Threading; +using System.Threading.Tasks; + +using SkyApm.Config; +using SkyApm.Logging; + +namespace SkyApm.Transport.Kafka +{ + public class CLRStatsReporter : ICLRStatsReporter + { + private readonly InstrumentConfig _instrumentConfig; + private readonly TransportConfig _transportConfig; + private readonly ICLRStatsReporter _clrStatsReporterV8; + + public CLRStatsReporter(ILoggerFactory loggerFactory, + IConfigAccessor configAccessor) + { + _instrumentConfig = configAccessor.Get(); + _transportConfig = configAccessor.Get(); + _clrStatsReporterV8 = new V8.CLRStatsReporter(loggerFactory, configAccessor); + } + + public async Task ReportAsync(CLRStatsRequest statsRequest, + CancellationToken cancellationToken = default(CancellationToken)) + { + if (_instrumentConfig != null && ! _instrumentConfig.MeterActive) + { + return; + } + if (_transportConfig.ProtocolVersion == ProtocolVersions.V8) + await _clrStatsReporterV8.ReportAsync(statsRequest); + } + } +} diff --git a/src/SkyApm.Transport.Kafka/Common/SegmentV8Helpers.cs b/src/SkyApm.Transport.Kafka/Common/SegmentV8Helpers.cs new file mode 100644 index 00000000..16295656 --- /dev/null +++ b/src/SkyApm.Transport.Kafka/Common/SegmentV8Helpers.cs @@ -0,0 +1,115 @@ +/* + * Licensed to the SkyAPM under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The SkyAPM licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +using System; +using System.Linq; + +using SkyApm.Common; +using SkyWalking.NetworkProtocol.V3; + +namespace SkyApm.Transport.Kafka.Common +{ + internal static class SegmentV8Helpers + { + public static SegmentObject Map(SegmentRequest request) + { + var traceSegment = new SegmentObject + { + TraceId = request.TraceId, //todo: is there chances request.UniqueIds.Count > 1 ? + TraceSegmentId = request.Segment.SegmentId, + Service = request.Segment.ServiceId, + ServiceInstance = request.Segment.ServiceInstanceId, + IsSizeLimited = false + }; + + traceSegment.Spans.Add(request.Segment.Spans.Select(MapToSpan).ToArray()); + return traceSegment; + } + + private static SpanObject MapToSpan(SpanRequest request) + { + var spanObject = new SpanObject + { + SpanId = request.SpanId, + ParentSpanId = request.ParentSpanId, + StartTime = request.StartTime, + EndTime = request.EndTime, + SpanType = (SpanType) request.SpanType, + SpanLayer = (SpanLayer) request.SpanLayer, + IsError = request.IsError, + }; + + ReadStringOrIntValue(spanObject, request.Component, ComponentReader, ComponentIdReader); + ReadStringOrIntValue(spanObject, request.OperationName, OperationNameReader, OperationNameIdReader); + ReadStringOrIntValue(spanObject, request.Peer, PeerReader, PeerIdReader); + + spanObject.Tags.Add(request.Tags.Select(x => new KeyStringValuePair {Key = x.Key, Value = x.Value ?? string.Empty})); + spanObject.Refs.AddRange(request.References.Select(MapToSegmentReference).ToArray()); + spanObject.Logs.AddRange(request.Logs.Select(MapToLogMessage).ToArray()); + + return spanObject; + } + + private static SegmentReference MapToSegmentReference(SegmentReferenceRequest referenceRequest) + { + var reference = new SkyWalking.NetworkProtocol.V3.SegmentReference + { + TraceId = referenceRequest.TraceId, + ParentService = referenceRequest.ParentServiceId, + ParentServiceInstance = referenceRequest.ParentServiceInstanceId, + ParentSpanId = referenceRequest.ParentSpanId, + RefType = (RefType) referenceRequest.RefType, + ParentTraceSegmentId = referenceRequest.ParentSegmentId, + ParentEndpoint = referenceRequest.ParentEndpointName.ToString(), + NetworkAddressUsedAtPeer = referenceRequest.NetworkAddress.ToString() + }; + + return reference; + } + + private static Log MapToLogMessage(LogDataRequest request) + { + var logMessage = new Log {Time = request.Timestamp}; + logMessage.Data.AddRange(request.Data.Select(x => new KeyStringValuePair {Key = x.Key, Value = x.Value ?? string.Empty}) + .ToArray()); + return logMessage; + } + + private static void ReadStringOrIntValue(T instance, StringOrIntValue stringOrIntValue, + Action stringValueReader, Action intValueReader) + { + // We should first check and prefer the int value to reduce the network transport payload + // in case both int and string value is present. + if (stringOrIntValue.HasIntValue) + { + intValueReader.Invoke(instance, stringOrIntValue.GetIntValue()); + } + else if (stringOrIntValue.HasStringValue) + { + stringValueReader.Invoke(instance, stringOrIntValue.GetStringValue()); + } + } + + private static readonly Action ComponentReader = (s, val) => { /*nolonger support*/}; + private static readonly Action ComponentIdReader = (s, val) => s.ComponentId = val; + private static readonly Action OperationNameReader = (s, val) => s.OperationName = val; + private static readonly Action OperationNameIdReader = (s, val) => { /*nolonger support*/ }; + private static readonly Action PeerReader = (s, val) => s.Peer = val; + private static readonly Action PeerIdReader = (s, val) => { /*nolonger support*/ }; + } +} diff --git a/src/SkyApm.Transport.Kafka/LogReporter.cs b/src/SkyApm.Transport.Kafka/LogReporter.cs new file mode 100644 index 00000000..d407a8af --- /dev/null +++ b/src/SkyApm.Transport.Kafka/LogReporter.cs @@ -0,0 +1,53 @@ +/* + * Licensed to the SkyAPM under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The SkyAPM licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +using SkyApm.Config; +using SkyApm.Logging; + +namespace SkyApm.Transport.Kafka +{ + public class LogReporter : ILogReporter + { + private readonly InstrumentConfig _instrumentConfig; + private readonly TransportConfig _transportConfig; + private readonly ILogReporter _logReporter; + + public LogReporter(ILoggerFactory loggerFactory, + IConfigAccessor configAccessor) + { + _instrumentConfig = configAccessor.Get(); + _transportConfig = configAccessor.Get(); + _logReporter = new V8.LogReporter(loggerFactory, configAccessor); + } + + public async Task ReportAsync(IReadOnlyCollection logRequests, + CancellationToken cancellationToken = default(CancellationToken)) + { + if (_instrumentConfig != null && ! _instrumentConfig.LogActive) + { + return; + } + if (_transportConfig.ProtocolVersion == ProtocolVersions.V8) + await _logReporter.ReportAsync(logRequests); + } + } +} diff --git a/src/SkyApm.Transport.Kafka/PingCaller.cs b/src/SkyApm.Transport.Kafka/PingCaller.cs new file mode 100644 index 00000000..dbca34c0 --- /dev/null +++ b/src/SkyApm.Transport.Kafka/PingCaller.cs @@ -0,0 +1,45 @@ +/* + * Licensed to the SkyAPM under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The SkyAPM licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +using System.Threading; +using System.Threading.Tasks; + +using SkyApm.Config; +using SkyApm.Logging; + +namespace SkyApm.Transport.Kafka +{ + public class PingCaller : IPingCaller + { + private readonly TransportConfig _transportConfig; + private readonly IPingCaller _pingCallerV8; + + public PingCaller(ILoggerFactory loggerFactory, + IConfigAccessor configAccessor) + { + _transportConfig = configAccessor.Get(); + _pingCallerV8 = new V8.PingCaller(loggerFactory, configAccessor); + } + + public async Task PingAsync(PingRequest request, CancellationToken cancellationToken = default(CancellationToken)) + { + if (_transportConfig.ProtocolVersion == ProtocolVersions.V8) + await _pingCallerV8.PingAsync(request, cancellationToken); + } + } +} diff --git a/src/SkyApm.Transport.Kafka/SegmentReporter.cs b/src/SkyApm.Transport.Kafka/SegmentReporter.cs new file mode 100644 index 00000000..754230e3 --- /dev/null +++ b/src/SkyApm.Transport.Kafka/SegmentReporter.cs @@ -0,0 +1,53 @@ +/* + * Licensed to the SkyAPM under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The SkyAPM licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +using SkyApm.Config; +using SkyApm.Logging; + +namespace SkyApm.Transport.Kafka +{ + public class SegmentReporter : ISegmentReporter + { + private readonly InstrumentConfig _instrumentConfig; + private readonly TransportConfig _transportConfig; + private readonly ISegmentReporter _segmentReporterV8; + + public SegmentReporter(ILoggerFactory loggerFactory, + IConfigAccessor configAccessor) + { + _instrumentConfig = configAccessor.Get(); + _transportConfig = configAccessor.Get(); + _segmentReporterV8 = new V8.SegmentReporter(loggerFactory, configAccessor); + } + + public async Task ReportAsync(IReadOnlyCollection segmentRequests, + CancellationToken cancellationToken = default(CancellationToken)) + { + if (_instrumentConfig != null && ! _instrumentConfig.SegmentActive) + { + return; + } + if (_transportConfig.ProtocolVersion == ProtocolVersions.V8) + await _segmentReporterV8.ReportAsync(segmentRequests, cancellationToken); + } + } +} diff --git a/src/SkyApm.Transport.Kafka/ServiceRegister.cs b/src/SkyApm.Transport.Kafka/ServiceRegister.cs new file mode 100644 index 00000000..bae702aa --- /dev/null +++ b/src/SkyApm.Transport.Kafka/ServiceRegister.cs @@ -0,0 +1,47 @@ +/* + * Licensed to the SkyAPM under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The SkyAPM licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +using System.Threading; +using System.Threading.Tasks; + +using SkyApm.Config; +using SkyApm.Logging; + +namespace SkyApm.Transport.Kafka +{ + public class ServiceRegister : IServiceRegister + { + private readonly TransportConfig _transportConfig; + private readonly IServiceRegister _serviceRegisterV8; + + public ServiceRegister(ILoggerFactory loggerFactory, + IConfigAccessor configAccessor) + { + _transportConfig = configAccessor.Get(); + _serviceRegisterV8 = new V8.ServiceRegister(loggerFactory, configAccessor); + } + + public async Task ReportInstancePropertiesAsync(ServiceInstancePropertiesRequest serviceInstancePropertiesRequest, + CancellationToken cancellationToken = default(CancellationToken)) + { + if (_transportConfig.ProtocolVersion == ProtocolVersions.V8) + return await _serviceRegisterV8.ReportInstancePropertiesAsync(serviceInstancePropertiesRequest, cancellationToken); + return true; + } + } +} diff --git a/src/SkyApm.Transport.Kafka/SkyApm.Transport.Kafka.csproj b/src/SkyApm.Transport.Kafka/SkyApm.Transport.Kafka.csproj new file mode 100644 index 00000000..e11dfc21 --- /dev/null +++ b/src/SkyApm.Transport.Kafka/SkyApm.Transport.Kafka.csproj @@ -0,0 +1,25 @@ + + + + + $(Product) kafka data transmitter. + $(PackagePrefix).Transport.Kafka + $(PackagePrefix).Transport.Kafka + $(PackagePrefix).Transport.Kafka + SkyWalking;Kafka + + + netstandard2.0 + SkyApm.Transport.Kafka + + + + + + + + + + + + diff --git a/src/SkyApm.Transport.Kafka/V8/CLRStatsReporter.cs b/src/SkyApm.Transport.Kafka/V8/CLRStatsReporter.cs new file mode 100644 index 00000000..0f503b31 --- /dev/null +++ b/src/SkyApm.Transport.Kafka/V8/CLRStatsReporter.cs @@ -0,0 +1,99 @@ +/* + * Licensed to the SkyAPM under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The SkyAPM licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +using System; +using System.Threading; +using System.Threading.Tasks; + +using Confluent.Kafka; +using Google.Protobuf; +using SkyApm.Config; +using SkyApm.Logging; +using SkyWalking.NetworkProtocol.V3; + +namespace SkyApm.Transport.Kafka.V8 +{ + internal class CLRStatsReporter : ICLRStatsReporter + { + private readonly ILogger _logger; + private readonly InstrumentConfig _instrumentConfig; + private readonly KafkaConfig _config; + private readonly ProducerConfig _producerConfig; + private readonly ProducerBuilder _producerBuilder; + private readonly IProducer _producer; + private readonly string _topic; + + public CLRStatsReporter(ILoggerFactory loggerFactory, + IConfigAccessor configAccessor) + { + _logger = loggerFactory.CreateLogger(typeof(CLRStatsReporter)); + _instrumentConfig = configAccessor.Get(); + _config = configAccessor.Get(); + _producerConfig = new ProducerConfig(); + _producerConfig.BootstrapServers = _config.BootstrapServers; + _producerBuilder = new ProducerBuilder(_producerConfig); + _producer = _producerBuilder.Build(); + _topic = _config.TopicMetrics; + } + + public async Task ReportAsync(CLRStatsRequest statsRequest, + CancellationToken cancellationToken = default(CancellationToken)) + { + // TODO + // check whether _producer is okay + + try + { + var request = new CLRMetricCollection + { + Service = _instrumentConfig.ServiceName, + ServiceInstance = _instrumentConfig.ServiceInstanceName, + }; + var metric = new CLRMetric + { + Cpu = new CPU + { + UsagePercent = statsRequest.CPU.UsagePercent + }, + Gc = new ClrGC + { + Gen0CollectCount = statsRequest.GC.Gen0CollectCount, + Gen1CollectCount = statsRequest.GC.Gen1CollectCount, + Gen2CollectCount = statsRequest.GC.Gen2CollectCount, + HeapMemory = statsRequest.GC.HeapMemory + }, + Thread = new ClrThread + { + AvailableWorkerThreads = statsRequest.Thread.AvailableWorkerThreads, + AvailableCompletionPortThreads = statsRequest.Thread.AvailableCompletionPortThreads, + MaxWorkerThreads = statsRequest.Thread.MaxWorkerThreads, + MaxCompletionPortThreads = statsRequest.Thread.MaxCompletionPortThreads + }, + Time = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + }; + request.Metrics.Add(metric); + byte[] byteArray = request.ToByteArray(); + await _producer.ProduceAsync(_topic, new Message { Key = request.ServiceInstance, Value = byteArray }); + } + catch (Exception e) + { + _logger.Warning("Report CLR Stats error. " + e); + } + } + } +} diff --git a/src/SkyApm.Transport.Kafka/V8/LogReporter.cs b/src/SkyApm.Transport.Kafka/V8/LogReporter.cs new file mode 100644 index 00000000..fd76abef --- /dev/null +++ b/src/SkyApm.Transport.Kafka/V8/LogReporter.cs @@ -0,0 +1,116 @@ +/* + * Licensed to the SkyAPM under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The SkyAPM licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +using Confluent.Kafka; +using Google.Protobuf; +using SkyApm.Config; +using SkyApm.Logging; +using SkyWalking.NetworkProtocol.V3; + +namespace SkyApm.Transport.Kafka.V8 +{ + public class LogReporter : ILogReporter + { + private readonly ILogger _logger; + private readonly InstrumentConfig _instrumentConfig; + private readonly KafkaConfig _config; + private readonly ProducerConfig _producerConfig; + private readonly ProducerBuilder _producerBuilder; + private readonly IProducer _producer; + private readonly string _topic; + + public LogReporter(ILoggerFactory loggerFactory, + IConfigAccessor configAccessor) + { + _logger = loggerFactory.CreateLogger(typeof(LogReporter)); + _instrumentConfig = configAccessor.Get(); + _config = configAccessor.Get(); + _producerConfig = new ProducerConfig(); + _producerConfig.BootstrapServers = _config.BootstrapServers; + _producerBuilder = new ProducerBuilder(_producerConfig); + _producer = _producerBuilder.Build(); + _topic = _config.TopicLogs; + } + + public async Task ReportAsync(IReadOnlyCollection logRequests, + CancellationToken cancellationToken = default) + { + // TODO + // check whether _producer is okay? + + try + { + var stopwatch = Stopwatch.StartNew(); + foreach (var logRequest in logRequests) + { + var logBody = new LogData() + { + Timestamp = logRequest.Date, + Service = _instrumentConfig.ServiceName, + ServiceInstance = _instrumentConfig.ServiceInstanceName, + Endpoint = logRequest.Endpoint ?? "null", + Body = new LogDataBody() + { + Type = "text", + Text = new TextLog() + { + Text = logRequest.Message, + }, + }, + Tags = new LogTags(), + }; + if (logRequest.SegmentReference != null) + { + logBody.TraceContext = new TraceContext() + { + TraceId = logRequest.SegmentReference.TraceId, + TraceSegmentId = logRequest.SegmentReference.SegmentId, + }; + } + foreach (var tag in logRequest.Tags) + { + logBody.Tags.Data.Add(new KeyStringValuePair() + { + Key = tag.Key, + Value = tag.Value.ToString(), + }); + } + byte[] byteArray = logBody.ToByteArray(); + await _producer.ProduceAsync(_topic, new Message { Key = logBody.Service, Value = byteArray }); + } + stopwatch.Stop(); + _logger.Information($"Report {logRequests.Count} logs. cost: {stopwatch.Elapsed}s"); + } + catch (IOException ex) + { + _logger.Error("Report log fail.", ex); + } + catch (Exception ex) + { + _logger.Error("Report log fail.", ex); + } + } + } +} diff --git a/src/SkyApm.Transport.Kafka/V8/PingCaller.cs b/src/SkyApm.Transport.Kafka/V8/PingCaller.cs new file mode 100644 index 00000000..aec2b551 --- /dev/null +++ b/src/SkyApm.Transport.Kafka/V8/PingCaller.cs @@ -0,0 +1,44 @@ +/* + * Licensed to the SkyAPM under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The SkyAPM licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +using System.Threading; +using System.Threading.Tasks; + +using SkyApm.Config; +using SkyApm.Logging; + +namespace SkyApm.Transport.Kafka.V8 +{ + internal class PingCaller : IPingCaller + { + private readonly ILogger _logger; + private readonly KafkaConfig _config; + + public PingCaller(ILoggerFactory loggerFactory, + IConfigAccessor configAccessor) + { + _logger = loggerFactory.CreateLogger(typeof(PingCaller)); + _config = configAccessor.Get(); + } + + public Task PingAsync(PingRequest request, CancellationToken cancellationToken = default(CancellationToken)) + { + return Task.CompletedTask; + } + } +} diff --git a/src/SkyApm.Transport.Kafka/V8/SegmentReporter.cs b/src/SkyApm.Transport.Kafka/V8/SegmentReporter.cs new file mode 100644 index 00000000..4d984895 --- /dev/null +++ b/src/SkyApm.Transport.Kafka/V8/SegmentReporter.cs @@ -0,0 +1,84 @@ +/* + * Licensed to the SkyAPM under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The SkyAPM licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; + +using Confluent.Kafka; +using Google.Protobuf; +using SkyApm.Config; +using SkyApm.Logging; +using SkyApm.Transport.Kafka.Common; +using SkyWalking.NetworkProtocol.V3; + +namespace SkyApm.Transport.Kafka.V8 +{ + internal class SegmentReporter : ISegmentReporter + { + private readonly ILogger _logger; + private readonly InstrumentConfig _instrumentConfig; + private readonly KafkaConfig _config; + private readonly ProducerConfig _producerConfig; + private readonly ProducerBuilder _producerBuilder; + private readonly IProducer _producer; + private readonly string _topic; + + public SegmentReporter(ILoggerFactory loggerFactory, + IConfigAccessor configAccessor) + { + _logger = loggerFactory.CreateLogger(typeof(SegmentReporter)); + _instrumentConfig = configAccessor.Get(); + _config = configAccessor.Get(); + _producerConfig = new ProducerConfig(); + _producerConfig.BootstrapServers = _config.BootstrapServers; + _producerBuilder = new ProducerBuilder(_producerConfig); + _producer = _producerBuilder.Build(); + _topic = _config.TopicSegments; + } + + public async Task ReportAsync(IReadOnlyCollection segmentRequests, + CancellationToken cancellationToken = default(CancellationToken)) + { + // TODO + // check whether producer is okay? + + try + { + var stopwatch = Stopwatch.StartNew(); + { + + foreach (var segment in segmentRequests) + { + SegmentObject segmentObject = SegmentV8Helpers.Map(segment); + byte[] byteArray = segmentObject.ToByteArray(); + await _producer.ProduceAsync(_topic, new Message { Key = segmentObject.TraceSegmentId, Value = byteArray }); + } + } + stopwatch.Stop(); + _logger.Information($"Report {segmentRequests.Count} trace segment. cost: {stopwatch.Elapsed}s"); + } + catch (Exception ex) + { + _logger.Error("Report trace segment fail.", ex); + } + } + } +} diff --git a/src/SkyApm.Transport.Kafka/V8/ServiceRegister.cs b/src/SkyApm.Transport.Kafka/V8/ServiceRegister.cs new file mode 100644 index 00000000..1db637d6 --- /dev/null +++ b/src/SkyApm.Transport.Kafka/V8/ServiceRegister.cs @@ -0,0 +1,50 @@ +/* + * Licensed to the SkyAPM under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The SkyAPM licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +using System.Threading; +using System.Threading.Tasks; + +using SkyApm.Config; +using SkyApm.Logging; + +namespace SkyApm.Transport.Kafka.V8 +{ + internal class ServiceRegister : IServiceRegister + { + private const string OS_NAME = "os_name"; + private const string HOST_NAME = "host_name"; + private const string IPV4 = "ipv4"; + private const string PROCESS_NO = "process_no"; + private const string LANGUAGE = "language"; + + private readonly ILogger _logger; + private readonly KafkaConfig _config; + + public ServiceRegister(ILoggerFactory loggerFactory, + IConfigAccessor configAccessor) + { + _logger = loggerFactory.CreateLogger(typeof(ServiceRegister)); + _config = configAccessor.Get(); + } + + public async Task ReportInstancePropertiesAsync(ServiceInstancePropertiesRequest serviceInstancePropertiesRequest, CancellationToken cancellationToken = default) + { + return true; + } + } +} diff --git a/src/SkyApm.Utilities.Configuration/ConfigurationBuilderExtensions.cs b/src/SkyApm.Utilities.Configuration/ConfigurationBuilderExtensions.cs index bda79c14..73378c32 100644 --- a/src/SkyApm.Utilities.Configuration/ConfigurationBuilderExtensions.cs +++ b/src/SkyApm.Utilities.Configuration/ConfigurationBuilderExtensions.cs @@ -34,33 +34,38 @@ public static IConfigurationBuilder AddSkyWalkingDefaultConfig(this IConfigurati var defaultLogFile = Path.Combine("logs", "skyapm-{Date}.log"); var defaultConfig = new Dictionary { - {"SkyWalking:Namespace", configuration?.GetSection("SkyWalking:Namespace").Value ?? string.Empty }, - {"SkyWalking:ServiceName", configuration?.GetSection("SkyWalking:ServiceName").Value ?? "My_Service" }, - {"Skywalking:ServiceInstanceName", configuration?.GetSection("SkyWalking:ServiceInstanceName").Value ?? BuildDefaultServiceInstanceName() }, - {"SkyWalking:HeaderVersions:0", configuration?.GetSection("SkyWalking:HeaderVersions:0").Value ?? HeaderVersions.SW8 }, - {"SkyWalking:Sampling:SamplePer3Secs", configuration?.GetSection("SkyWalking:Sampling:SamplePer3Secs").Value ?? "-1" }, - {"SkyWalking:Sampling:Percentage", configuration?.GetSection("SkyWalking:Sampling:Percentage").Value ?? "-1" }, - {"SkyWalking:Logging:Level", configuration?.GetSection("SkyWalking:Logging:Level").Value ?? "Information" }, - {"SkyWalking:Logging:FilePath", configuration?.GetSection("SkyWalking:Logging:FilePath").Value ?? defaultLogFile }, - {"SkyWalking:Transport:Interval", configuration?.GetSection("SkyWalking:Transport:Interval").Value ?? "3000" }, - {"SkyWalking:Transport:ProtocolVersion", configuration?.GetSection("SkyWalking:Transport:ProtocolVersion").Value ?? ProtocolVersions.V8 }, - {"SkyWalking:Transport:QueueSize", configuration?.GetSection("SkyWalking:Transport:QueueSize").Value ?? "30000" }, - {"SkyWalking:Transport:BatchSize", configuration?.GetSection("SkyWalking:Transport:BatchSize").Value ?? "3000" }, - {"SkyWalking:Transport:Reporter", configuration?.GetSection("SkyWalking:Transport:Reporter").Value ?? "grpc" }, - {"SkyWalking:Transport:gRPC:Servers",configuration?.GetSection("SkyWalking:Transport:gRPC:Servers").Value ?? "localhost:11800" }, - {"SkyWalking:Transport:gRPC:Timeout",configuration?.GetSection("SkyWalking:Transport:gRPC:Timeout").Value ?? "10000" }, - {"SkyWalking:Transport:gRPC:ReportTimeout",configuration?.GetSection("SkyWalking:Transport:gRPC:ReportTimeout").Value ?? "600000" }, - {"SkyWalking:Transport:gRPC:ConnectTimeout",configuration?.GetSection("SkyWalking:Transport:gRPC:ConnectTimeout").Value ?? "10000" }, - {"SkyWalking:Transport:Kafka:BootstrapServers",configuration?.GetSection("SkyWalking:Transport:Kafka:BootstrapServers").Value ?? "localhost:9092" }, - {"SkyWalking:Transport:Kafka:ProducerConfig",configuration?.GetSection("SkyWalking:Transport:Kafka:ProducerConfig").Value ?? "" }, - {"SkyWalking:Transport:Kafka:GetTopicTimeout",configuration?.GetSection("SkyWalking:Transport:Kafka:GetTopicTimeout").Value ?? "3000" }, - {"SkyWalking:Transport:Kafka:TopicMeters",configuration?.GetSection("SkyWalking:Transport:Kafka:TopicMeters").Value ?? "skywalking-meters" }, - {"SkyWalking:Transport:Kafka:TopicMetrics",configuration?.GetSection("SkyWalking:Transport:Kafka:TopicMetrics").Value ?? "skywalking-metrics" }, - {"SkyWalking:Transport:Kafka:TopicSegments",configuration?.GetSection("SkyWalking:Transport:Kafka:TopicSegments").Value ?? "skywalking-segments" }, - {"SkyWalking:Transport:Kafka:TopicProfilings",configuration?.GetSection("SkyWalking:Transport:Kafka:TopicProfilings").Value ?? "skywalking-profilings" }, - {"SkyWalking:Transport:Kafka:TopicManagements",configuration?.GetSection("SkyWalking:Transport:Kafka:TopicManagements").Value ?? "skywalking-managements" }, - {"SkyWalking:Transport:Kafka:TopicLogs",configuration?.GetSection("SkyWalking:Transport:Kafka:TopicLogs").Value ?? "skywalking-logs" } - + { "SkyWalking:Namespace", configuration?.GetSection("SkyWalking:Namespace").Value ?? string.Empty }, + { "SkyWalking:ServiceName", configuration?.GetSection("SkyWalking:ServiceName").Value ?? "My_Service" }, + { "Skywalking:ServiceInstanceName", configuration?.GetSection("SkyWalking:ServiceInstanceName").Value ?? BuildDefaultServiceInstanceName() }, + { "SkyWalking:HeaderVersions:0", configuration?.GetSection("SkyWalking:HeaderVersions:0").Value ?? HeaderVersions.SW8 }, + { "SkyWalking:Sampling:SamplePer3Secs", configuration?.GetSection("SkyWalking:Sampling:SamplePer3Secs").Value ?? "-1" }, + { "SkyWalking:Sampling:Percentage", configuration?.GetSection("SkyWalking:Sampling:Percentage").Value ?? "-1" }, + { "SkyWalking:Logging:Level", configuration?.GetSection("SkyWalking:Logging:Level").Value ?? "Information" }, + { "SkyWalking:Logging:FilePath", configuration?.GetSection("SkyWalking:Logging:FilePath").Value ?? defaultLogFile }, + { "Skywalking:MeterActive", configuration?.GetSection("SkyWalking:MeterActive").Value ?? "true" }, + { "SkyWalking:MetricActive", configuration?.GetSection("SkyWalking:MetricActive").Value ?? "true" }, + { "SkyWalking:SegmentActive", configuration?.GetSection("SkyWalking:SegmentActive").Value ?? "true" }, + { "SkyWalking:ProfilingActive", configuration?.GetSection("SkyWalking:ProfilingActive").Value ?? "true" }, + { "SkyWalking:ManagementActive", configuration?.GetSection("SkyWalking:ManagementActive").Value ?? "true" }, + { "SkyWalking:LogActive", configuration?.GetSection("SkyWalking:LogActive").Value ?? "true" }, + { "SkyWalking:Transport:Interval", configuration?.GetSection("SkyWalking:Transport:Interval").Value ?? "3000" }, + { "SkyWalking:Transport:ProtocolVersion", configuration?.GetSection("SkyWalking:Transport:ProtocolVersion").Value ?? ProtocolVersions.V8 }, + { "SkyWalking:Transport:QueueSize", configuration?.GetSection("SkyWalking:Transport:QueueSize").Value ?? "30000" }, + { "SkyWalking:Transport:BatchSize", configuration?.GetSection("SkyWalking:Transport:BatchSize").Value ?? "3000" }, + { "SkyWalking:Transport:Reporter", configuration?.GetSection("SkyWalking:Transport:Reporter").Value ?? "grpc" }, + { "SkyWalking:Transport:gRPC:Servers", configuration?.GetSection("SkyWalking:Transport:gRPC:Servers").Value ?? "localhost:11800" }, + { "SkyWalking:Transport:gRPC:Timeout", configuration?.GetSection("SkyWalking:Transport:gRPC:Timeout").Value ?? "10000" }, + { "SkyWalking:Transport:gRPC:ReportTimeout", configuration?.GetSection("SkyWalking:Transport:gRPC:ReportTimeout").Value ?? "600000" }, + { "SkyWalking:Transport:gRPC:ConnectTimeout", configuration?.GetSection("SkyWalking:Transport:gRPC:ConnectTimeout").Value ?? "10000" }, + { "SkyWalking:Transport:Kafka:BootstrapServers", configuration?.GetSection("SkyWalking:Transport:Kafka:BootstrapServers").Value ?? "localhost:9092" }, + { "SkyWalking:Transport:Kafka:ProducerConfig", configuration?.GetSection("SkyWalking:Transport:Kafka:ProducerConfig").Value ?? "" }, + { "SkyWalking:Transport:Kafka:GetTopicTimeout", configuration?.GetSection("SkyWalking:Transport:Kafka:GetTopicTimeout").Value ?? "3000" }, + { "SkyWalking:Transport:Kafka:TopicMeters", configuration?.GetSection("SkyWalking:Transport:Kafka:TopicMeters").Value ?? "skywalking-meters" }, + { "SkyWalking:Transport:Kafka:TopicMetrics", configuration?.GetSection("SkyWalking:Transport:Kafka:TopicMetrics").Value ?? "skywalking-metrics" }, + { "SkyWalking:Transport:Kafka:TopicSegments", configuration?.GetSection("SkyWalking:Transport:Kafka:TopicSegments").Value ?? "skywalking-segments" }, + { "SkyWalking:Transport:Kafka:TopicProfilings", configuration?.GetSection("SkyWalking:Transport:Kafka:TopicProfilings").Value ?? "skywalking-profilings" }, + { "SkyWalking:Transport:Kafka:TopicManagements", configuration?.GetSection("SkyWalking:Transport:Kafka:TopicManagements").Value ?? "skywalking-managements" }, + { "SkyWalking:Transport:Kafka:TopicLogs", configuration?.GetSection("SkyWalking:Transport:Kafka:TopicLogs").Value ?? "skywalking-logs" } }; return builder.AddInMemoryCollection(defaultConfig); }