Skip to content

Commit

Permalink
add kafka reporter part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
lu-xiaoshuang authored and Xiaoshuang LU committed Jul 3, 2023
1 parent 728c045 commit c8a26fa
Show file tree
Hide file tree
Showing 31 changed files with 1,081 additions and 135 deletions.
2 changes: 1 addition & 1 deletion build/version.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>
<PropertyGroup>
<VersionMajor>2</VersionMajor>
<VersionMinor>1</VersionMinor>
<VersionMinor>2</VersionMinor>
<VersionPatch>0</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
Expand Down
72 changes: 45 additions & 27 deletions sample/grpc/SkyApm.Sample.GrpcServer/skyapm.json
Original file line number Diff line number Diff line change
@@ -1,29 +1,47 @@
{
"SkyWalking": {
"ServiceName": "grpc-greeter-server",
"Namespace": "",
"HeaderVersions": [
"sw8"
],
"Sampling": {
"SamplePer3Secs": -1,
"Percentage": -1.0
},
"Logging": {
"Level": "Information",
"FilePath": "logs/skyapm-{Date}.log"
},
"Transport": {
"Interval": 3000,
"ProtocolVersion": "v8",
"QueueSize": 30000,
"BatchSize": 3000,
"gRPC": {
"Servers": "localhost:11800",
"Timeout": 100000,
"ConnectTimeout": 100000,
"ReportTimeout": 600000
}
"SkyWalking": {
"ServiceName": "grpc-greeter-server",
"Namespace": "",
"HeaderVersions": [
"sw8"
],
"Sampling": {
"SamplePer3Secs": -1,
"Percentage": -1.0
},
"Logging": {
"Level": "Information",
"FilePath": "logs/skyapm-{Date}.log"
},
"MeterActive": true,
"MetricActive": true,
"SegmentActive": true,
"ProfilingActive": true,
"ManagementActive": true,
"LogActive": true,
"Transport": {
"Interval": 3000,
"ProtocolVersion": "v8",
"QueueSize": 30000,
"BatchSize": 3000,
"Reporter": "kafka",
"gRPC": {
"Servers": "localhost:11800",
"Timeout": 100000,
"ConnectTimeout": 100000,
"ReportTimeout": 600000
},
"Kafka": {
"BootstrapServers": "localhost:19091,localhost:19092,localhost:19093",
"ProducerConfig": "",
"GetTopicTimeout": 3000,
"TopicMeters": "skywalking-meters",
"TopicMetrics": "skywalking-metrics",
"TopicSegments": "skywalking-segments",
"TopicProfilings": "skywalking-profilings",
"TopicManagements": "skywalking-managements",
"TopicLogs": "skywalking-logs"
}
}
}
}
}
}
7 changes: 7 additions & 0 deletions skyapm-dotnet.sln
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SkyApm.PeerFormatters.MySql
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SkyApm.Diagnostics.FreeRedis", "src\SkyApm.Diagnostics.FreeRedis\SkyApm.Diagnostics.FreeRedis.csproj", "{829F7955-3138-4CA2-8AA3-3E4D72C97599}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SkyApm.Transport.Kafka", "src\SkyApm.Transport.Kafka\SkyApm.Transport.Kafka.csproj", "{8BCBAFD0-2C88-8886-5ECF-82D8A88888CB}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -287,6 +289,10 @@ Global
{829F7955-3138-4CA2-8AA3-3E4D72C97599}.Debug|Any CPU.Build.0 = Debug|Any CPU
{829F7955-3138-4CA2-8AA3-3E4D72C97599}.Release|Any CPU.ActiveCfg = Release|Any CPU
{829F7955-3138-4CA2-8AA3-3E4D72C97599}.Release|Any CPU.Build.0 = Release|Any CPU
{8BCBAFD0-2C88-8886-5ECF-82D8A88888CB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8BCBAFD0-2C88-8886-5ECF-82D8A88888CB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8BCBAFD0-2C88-8886-5ECF-82D8A88888CB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8BCBAFD0-2C88-8886-5ECF-82D8A88888CB}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -341,6 +347,7 @@ Global
{5DBE2053-EBAE-404F-A7B3-9CE3CD2152D9} = {D122E6AE-6FE7-4C1A-826F-5964ABBF2C9D}
{2A313B7E-CC41-4556-8055-D87266C398BF} = {D122E6AE-6FE7-4C1A-826F-5964ABBF2C9D}
{829F7955-3138-4CA2-8AA3-3E4D72C97599} = {B5E677CF-2920-4B0A-A056-E73F6B2CF2BC}
{8BCBAFD0-2C88-8886-5ECF-82D8A88888CB} = {CBFF7EE0-69D7-4D6A-9BBD-8E567FF4D810}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {94C0DA2C-CCCB-4314-93A2-9809B5DD0583}
Expand Down
15 changes: 13 additions & 2 deletions src/SkyApm.Abstractions/Config/InstrumentConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,23 @@ public class InstrumentConfig

public string ServiceInstanceName { get; set; }


public string[] HeaderVersions { get; set; }

public bool MeterActive { get; set; } = true;

public bool MetricActive { get; set; } = true;

public bool SegmentActive { get; set; } = true;

public bool ProfilingActive { get; set; } = true;

public bool ManagementActive { get; set; } = true;

public bool LogActive { get; set; } = true;
}

public static class HeaderVersions
{
public static string SW8 { get; } = "sw8";
}
}
}
2 changes: 1 addition & 1 deletion src/SkyApm.Abstractions/Logging/ILogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ public interface ILogger

void Trace(string message);
}
}
}
78 changes: 67 additions & 11 deletions src/SkyApm.Agent.Hosting/Extensions/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
using SkyApm.Service;
using SkyApm.Tracing;
using SkyApm.Transport;
using SkyApm.Transport.Grpc;
using SkyApm.Utilities.Configuration;
using SkyApm.Utilities.DependencyInjection;
using SkyApm.Utilities.Logging;
Expand Down Expand Up @@ -72,7 +71,10 @@ private static IServiceCollection AddSkyAPMCore(this IServiceCollection services
services.AddSingleton<IEnvironmentProvider, HostingEnvironmentProvider>();
services.AddSingleton<ISkyApmLogDispatcher, AsyncQueueSkyApmLogDispatcher>();
services.AddSingleton<IPeerFormatter, PeerFormatter>();
services.AddTracing().AddSampling().AddGrpcTransport().AddSkyApmLogging();
services.AddTracing();
services.AddSampling();
services.AddTransport();
services.AddSkyApmLogging();
var extensions = services.AddSkyApmExtensions()
.AddHttpClient()
.AddGrpcClient()
Expand Down Expand Up @@ -115,15 +117,70 @@ private static IServiceCollection AddSampling(this IServiceCollection services)
return services;
}

private static IServiceCollection AddGrpcTransport(this IServiceCollection services)
private static IServiceCollection AddTransport(this IServiceCollection services)
{
services.AddSingleton<ISegmentReporter, SegmentReporter>();
services.AddSingleton<ILogReporter, LogReporter>();
services.AddSingleton<ICLRStatsReporter, CLRStatsReporter>();
services.AddSingleton<ConnectionManager>();
services.AddSingleton<IPingCaller, PingCaller>();
services.AddSingleton<IServiceRegister, ServiceRegister>();
services.AddSingleton<IExecutionService, ConnectService>();
// TODO
// HELP ME
// how to get `reporter' during dependency injection?
string reporter = "grpc";
while (true)
{
//ServiceProvider serviceProvider = services.BuildServiceProvider();
//if (serviceProvider == null)
//{
// break;
//}
//IConfigAccessor configAccessor = serviceProvider.GetService<IConfigAccessor>();
//if (configAccessor == null)
//{
// break;
//}
//TransportConfig transportConfig = configAccessor.Get<TransportConfig>();
//if (transportConfig == null)
//{
// break;
//}
//reporter = transportConfig.Reporter;
//if (reporter == null)
//{
// reporter = "";
//}
break;
}
switch (reporter.ToLower())
{
case "grpc":
services.AddTransportGrpc();
break;
case "kafka":
services.AddTransportKafka();
break;
default:
services.AddTransportGrpc();
break;
}
return services;
}

private static IServiceCollection AddTransportGrpc(this IServiceCollection services)
{
services.AddSingleton<ISegmentReporter, SkyApm.Transport.Grpc.SegmentReporter>();
services.AddSingleton<ILogReporter, SkyApm.Transport.Grpc.LogReporter>();
services.AddSingleton<ICLRStatsReporter, SkyApm.Transport.Grpc.CLRStatsReporter>();
services.AddSingleton<SkyApm.Transport.Grpc.ConnectionManager>();
services.AddSingleton<IPingCaller, SkyApm.Transport.Grpc.PingCaller>();
services.AddSingleton<IServiceRegister, SkyApm.Transport.Grpc.ServiceRegister>();
services.AddSingleton<IExecutionService, SkyApm.Transport.Grpc.ConnectService>();
return services;
}

private static IServiceCollection AddTransportKafka(this IServiceCollection services)
{
services.AddSingleton<ISegmentReporter, SkyApm.Transport.Kafka.SegmentReporter>();
services.AddSingleton<ILogReporter, SkyApm.Transport.Kafka.LogReporter>();
services.AddSingleton<ICLRStatsReporter, SkyApm.Transport.Kafka.CLRStatsReporter>();
services.AddSingleton<IPingCaller, SkyApm.Transport.Kafka.PingCaller>();
services.AddSingleton<IServiceRegister, SkyApm.Transport.Kafka.ServiceRegister>();
return services;
}

Expand All @@ -132,6 +189,5 @@ private static IServiceCollection AddSkyApmLogging(this IServiceCollection servi
services.AddSingleton<ILoggerFactory, DefaultLoggerFactory>();
return services;
}

}
}
1 change: 1 addition & 0 deletions src/SkyApm.Agent.Hosting/SkyApm.Agent.Hosting.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
<ProjectReference Include="..\SkyApm.PeerFormatters.MySqlConnector\SkyApm.PeerFormatters.MySqlConnector.csproj" />
<ProjectReference Include="..\SkyApm.PeerFormatters.SqlClient\SkyApm.PeerFormatters.SqlClient.csproj" />
<ProjectReference Include="..\SkyApm.Transport.Grpc\SkyApm.Transport.Grpc.csproj" />
<ProjectReference Include="..\SkyApm.Transport.Kafka\SkyApm.Transport.Kafka.csproj" />
<ProjectReference Include="..\SkyApm.Utilities.Configuration\SkyApm.Utilities.Configuration.csproj" />
<ProjectReference Include="..\SkyApm.Utilities.DependencyInjection\SkyApm.Utilities.DependencyInjection.csproj" />
<ProjectReference Include="..\SkyApm.Utilities.Logging\SkyApm.Utilities.Logging.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ public GrpcNetClientICarrierHeaderCollection(HttpRequestMessage request)
public void Add(string key, string value)
{
_request.Headers.Add(key, value);
}

public string Get(string key)
{
if (_request.Headers.TryGetValues(key, out var values))
return values.FirstOrDefault();
return null;
}

}

public string Get(string key)
{
if (_request.Headers.TryGetValues(key, out var values))
return values.FirstOrDefault();
return null;
}

public IEnumerator<KeyValuePair<string, string>> GetEnumerator()
{
return _request.Headers.Select(x =>
new KeyValuePair<string, string>(x.Key, x.Value.FirstOrDefault()))
{
return _request.Headers.Select(x =>
new KeyValuePair<string, string>(x.Key, x.Value.FirstOrDefault()))
.GetEnumerator();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
using Grpc.Core;
using Grpc.Core.Interceptors;
using SkyApm.Common;
using SkyApm.Config;
using SkyApm.Config;
using SkyApm.Tracing;
using SkyApm.Tracing.Segments;
using System;
Expand All @@ -40,7 +40,7 @@ public class ClientDiagnosticProcessor
_tracingConfig = configAccessor.Get<TracingConfig>();
}

public Metadata BeginRequest<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> grpcContext)
public Metadata BeginRequest<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> grpcContext)
where TRequest : class
where TResponse : class
{
Expand Down
24 changes: 12 additions & 12 deletions src/SkyApm.Diagnostics.HttpClient/HttpClientDiagnosticProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
using System.Linq;
using System.Net.Http;
using SkyApm.Common;
using SkyApm.Config;
using SkyApm.Diagnostics.HttpClient.Config;
using SkyApm.Diagnostics.HttpClient.Extensions;
using SkyApm.Config;
using SkyApm.Diagnostics.HttpClient.Config;
using SkyApm.Diagnostics.HttpClient.Extensions;
using SkyApm.Diagnostics.HttpClient.Filters;
using SkyApm.Tracing;

Expand All @@ -35,7 +35,7 @@ public class HttpClientTracingDiagnosticProcessor : ITracingDiagnosticProcessor

//private readonly IContextCarrierFactory _contextCarrierFactory;
private readonly ITracingContext _tracingContext;


private readonly IExitSegmentContextAccessor _contextAccessor;

private readonly IEnumerable<IRequestDiagnosticHandler> _requestDiagnosticHandlers;
Expand All @@ -45,7 +45,7 @@ public class HttpClientTracingDiagnosticProcessor : ITracingDiagnosticProcessor
private readonly HttpClientDiagnosticConfig _httpClientDiagnosticConfig;

public HttpClientTracingDiagnosticProcessor(ITracingContext tracingContext,
IExitSegmentContextAccessor contextAccessor,
IExitSegmentContextAccessor contextAccessor,
IEnumerable<IRequestDiagnosticHandler> requestDiagnosticHandlers,
IConfigAccessor configAccessor)
{
Expand Down Expand Up @@ -88,13 +88,13 @@ public void HttpResponse([Property(Name = "Response")] HttpResponseMessage respo

context.Span.AddTag(Tags.STATUS_CODE, statusCode);

if(response.Content != null && _httpClientDiagnosticConfig.CollectResponseBodyContentTypes?.Count > 0)
{
var responseBody = response.Content.TryCollectAsString(
_httpClientDiagnosticConfig.CollectResponseBodyContentTypes,
_httpClientDiagnosticConfig.CollectBodyLengthThreshold);
if (!string.IsNullOrEmpty(responseBody))
context.Span.AddTag(Tags.HTTP_RESPONSE_BODY, responseBody);
if(response.Content != null && _httpClientDiagnosticConfig.CollectResponseBodyContentTypes?.Count > 0)
{
var responseBody = response.Content.TryCollectAsString(
_httpClientDiagnosticConfig.CollectResponseBodyContentTypes,
_httpClientDiagnosticConfig.CollectBodyLengthThreshold);
if (!string.IsNullOrEmpty(responseBody))
context.Span.AddTag(Tags.HTTP_RESPONSE_BODY, responseBody);
}
}

Expand Down

0 comments on commit c8a26fa

Please sign in to comment.