Skip to content

Commit

Permalink
添加响应处理插件,用于记录服务响应的内容;添加插入运行级别,可指定同一类型插件运行的顺序。
Browse files Browse the repository at this point in the history
  • Loading branch information
beetlex-io committed Oct 11, 2019
1 parent 81ad7a1 commit 1085341
Show file tree
Hide file tree
Showing 15 changed files with 360 additions and 123 deletions.
5 changes: 5 additions & 0 deletions Bumblebee.BaseSample/Program.cs
Expand Up @@ -2,6 +2,7 @@
using System.Reflection;
using BeetleX.FastHttpApi;
using Bumblebee.Events;
using Bumblebee.Plugins;
using Bumblebee.Servers;
using Newtonsoft.Json.Linq;

Expand Down Expand Up @@ -39,6 +40,8 @@ public class RequestingTest : Plugins.IRequestingHandler

public string Description => "RequestingTest";

public PluginLevel Level => PluginLevel.None;

public void Execute(EventRequestingArgs e)
{
//e.Gateway.Response(e.Response, new NotFoundResult("Gateway not found!"));
Expand Down Expand Up @@ -68,6 +71,8 @@ public class RequestedTest : Plugins.IRequestedHandler

public string Description => "RequestedTest";

public PluginLevel Level => PluginLevel.None;

public void Execute(EventRequestCompletedArgs e)
{
Console.WriteLine($"{e.Url} request to {e.Server.Uri} user time {e.Time}ms");
Expand Down
8 changes: 1 addition & 7 deletions Bumblebee.ConsoleServer/Bumblebee.ConsoleServer.csproj
Expand Up @@ -6,17 +6,11 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="BeetleX.Bumblebee.Configuration" Version="1.1.7" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="2.2.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Bumblebee\Bumblebee.csproj" />
</ItemGroup>

<ItemGroup>
<None Update="Gateway.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="HttpConfig.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
Expand Down
21 changes: 0 additions & 21 deletions Bumblebee.ConsoleServer/Gateway.json

This file was deleted.

11 changes: 11 additions & 0 deletions Bumblebee.ConsoleServer/Program.cs
Expand Up @@ -2,6 +2,7 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -28,6 +29,16 @@ public virtual Task StartAsync(CancellationToken cancellationToken)
{
g = new Gateway();
g.Open();
g.LoadPlugin(typeof(Bumblebee.Configuration.ErrorFilter).Assembly);
if (Environment.OSVersion.Platform == PlatformID.Win32NT)
{
var ps = new ProcessStartInfo("http://localhost:9090/__system/bumblebee/")
{
UseShellExecute = true,
Verb = "open"
};
Process.Start(ps);
}
return Task.CompletedTask;
}
public virtual Task StopAsync(CancellationToken cancellationToken)
Expand Down
4 changes: 2 additions & 2 deletions Bumblebee/Bumblebee.csproj
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<TargetFramework>netcoreapp2.1</TargetFramework>
<Version>1.0.2.8</Version>
<Version>1.1.6</Version>
<Authors>henryfan</Authors>
<Company>ikende.com</Company>
<Copyright>Copyright © ikende.com 2019 email:henryfan@msn.com</Copyright>
Expand All @@ -11,7 +11,7 @@
<PackageLicenseUrl>https://github.com/IKende/Bumblebee/blob/master/LICENSE</PackageLicenseUrl>
<PackageProjectUrl>https://github.com/IKende/Bumblebee</PackageProjectUrl>
<PackageId>BeetleX.Bumblebee</PackageId>
<AssemblyVersion>1.0.2.8</AssemblyVersion>
<AssemblyVersion>1.1.6.0</AssemblyVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
16 changes: 9 additions & 7 deletions Bumblebee/Events/EventRequestCompletedArgs.cs
Expand Up @@ -5,12 +5,14 @@

namespace Bumblebee.Events
{
public class EventRequestCompletedArgs : EventArgs
public class EventRequestCompletedArgs
{
public EventRequestCompletedArgs(Routes.UrlRoute urlRoute, HttpRequest request, HttpResponse response, Gateway gateway, int code,
Servers.ServerAgent server, long useTime)
Servers.ServerAgent server, long useTime, long requestid, string error)

{
Error = error;
RequestID = requestid;
Gateway = gateway;
BaseUrl = request.BaseUrl;
Url = request.Url;
Expand All @@ -29,15 +31,15 @@ public class EventRequestCompletedArgs : EventArgs

public string Path { get; set; }

public Gateway Gateway { get; private set; }
public Gateway Gateway { get; set; }

public string RemoteIPAddress { get; private set; }
public string RemoteIPAddress { get; set; }

public IDictionary<string, object> Data { get; private set; }
public IDictionary<string, object> Data { get; set; }

public IDictionary<string, string> Cookies { get; private set; }
public IDictionary<string, string> Cookies { get; set; }

public IDictionary<string, string> Headers { get; private set; }
public IDictionary<string, string> Headers { get; set; }

public string Method { get; set; }

Expand Down
27 changes: 27 additions & 0 deletions Bumblebee/Events/EventRespondingArgs.cs
@@ -0,0 +1,27 @@
using Bumblebee.Servers;
using System;
using System.Collections.Generic;
using System.Text;

namespace Bumblebee.Events
{
public struct EventRespondingArgs
{

public string ResponseStatus { get; set; }

public BeetleX.FastHttpApi.Header Header { get; set; }

public Gateway Gateway { get; set; }

public BeetleX.FastHttpApi.HttpRequest Request { get; set; }

public ArraySegment<byte> Data { get; set; }

public bool Completed { get; set; }

public bool FirstReceive { get; set; }

public ServerAgent Server { get; set; }
}
}
98 changes: 55 additions & 43 deletions Bumblebee/Gateway.cs
Expand Up @@ -40,25 +40,25 @@ public class Gateway : IDisposable

public const int SERVER_PROCESS_ERROR_CODE = 582;

public static int BufferSize { get; set; } = 1024 * 8;

public static int PoolMaxSize { get; set; } = 1024 * 10;

static Gateway()
{

GATEWAY_SERVER_HEDER = Encoding.UTF8.GetBytes("Server: Bumblebee(BeetleX)\r\n");
KEEP_ALIVE = Encoding.UTF8.GetBytes("Connection: keep-alive\r\n");
BufferPool.BUFFER_SIZE = 1024 * 8;
BufferPool.POOL_MAX_SIZE = 1024 * 10;

}

public Gateway()
{
BufferPool.BUFFER_SIZE = BufferSize;
BufferPool.POOL_MAX_SIZE = PoolMaxSize;
HttpServer = new HttpApiServer();
Routes = new Routes.RouteCenter(this);
Agents = new Servers.ServerCenter(this);
this.PluginCenter = new PluginCenter(this);
this.Pluginer = new Pluginer(this, null);
//HttpServer.Options.IOQueueEnabled = true;

Statistics.Server = "Gateway";
AgentMaxSocketError = 3;
MaxStatsUrls = 2000;
Expand All @@ -67,9 +67,6 @@ public Gateway()
ThreadQueues = (Environment.ProcessorCount / 2);
if (ThreadQueues == 0)
ThreadQueues = 1;

AgentBufferSize = 1024 * 8;
AgentBufferPoolSize = 1024 * 200;
GatewayQueueSize = Environment.ProcessorCount * 500;
InstanceID = Guid.NewGuid().ToString("N");
}
Expand All @@ -80,10 +77,6 @@ public Gateway()

public bool OutputServerAddress { get; set; } = false;

public int AgentBufferSize { get; set; }

public int AgentBufferPoolSize { get; set; }

public int AgentMaxConnection { get; set; }

public int AgentMaxSocketError { get; set; }
Expand Down Expand Up @@ -207,7 +200,6 @@ private void OnRequest(object sender, EventHttpRequestArgs e)
EventResponseErrorArgs error = new EventResponseErrorArgs(
e.Request, e.Response, this, "Cluster server unavailable", Gateway.CLUSTER_SERVER_UNAVAILABLE
);
ProcessError(Gateway.CLUSTER_SERVER_UNAVAILABLE, e.Request);
OnResponseError(error);
}
else
Expand All @@ -223,14 +215,12 @@ private void OnRequest(object sender, EventHttpRequestArgs e)
{
Events.EventResponseErrorArgs erea = new Events.EventResponseErrorArgs(
e.Request, e.Response, this, $"The gateway queue overflow!", Gateway.GATEWAY_QUEUE_OVERFLOW);
this.ProcessError(Gateway.GATEWAY_QUEUE_OVERFLOW, e.Request);
this.OnResponseError(erea);
}
else
{
AddRequest(new Tuple<UrlRouteAgent, HttpRequest, HttpResponse>(item, e.Request, e.Response));
}
// item.Execute(e.Request, e.Response);
}
else
{
Expand Down Expand Up @@ -264,12 +254,6 @@ private void OnRequest(object sender, EventHttpRequestArgs e)

}

internal void ProcessError(int code, HttpRequest request)
{
Statistics.Add(code, 1);
Routes.GetUrlStatistics(request.BaseUrl).Add(code, 1, null);
}

public void Response(HttpResponse response, object result)
{
HttpServer.RequestExecuted();
Expand All @@ -282,62 +266,90 @@ internal void OnResponseError(EventResponseErrorArgs e)
{
HttpServer.Log(BeetleX.EventArgs.LogType.Warring, $"Gateway {e.Request.ID} {e.Request.RemoteIPAddress} {e.Request.Method} {e.Request.Url} error {e.Message}");
}

HttpServer.RequestExecuted();
if (Pluginer.RequestedEnabled)
{
EventRequestCompletedArgs se = new EventRequestCompletedArgs(
null,
e.Request,
e.Response,
this,
e.ErrorCode,
null,
1,
e.Request.ID,
e.Message
);
Pluginer.Requested(se);
}
IncrementRequestCompleted(e.Request, e.ErrorCode, 1, null);
this.Pluginer.ResponseError(e);
if (e.Result != null)
{
e.Response.Result(e.Result);
}
}

internal void OnResponding(RequestAgent request, ArraySegment<byte> data, bool completed)
{
if (request.Code == 200)
{
EventRespondingArgs e = new EventRespondingArgs();
e.Completed = completed;
e.FirstReceive = request.BodyReceives == 1;
e.Data = data;
e.Gateway = this;
e.ResponseStatus = request.ResponseStatus;
e.Header = request.ResponseHeader;
e.Request = request.Request;
e.Server = request.Server;
this.Pluginer.Responding(e);
}
}

internal void OnRequestCompleted(Servers.RequestAgent success)
public void IncrementRequestCompleted(HttpRequest request, int code, long time, Servers.ServerAgent server = null)
{
HttpServer.RequestExecuted();
if ((success.Code >= 200 && success.Code < 300) || (success.Code >= 500 && success.Code < 600))
if ((code >= 200 && code < 400) || (code >= 500 && code < 600))
{
var stats = Routes.GetUrlStatistics(success.Request.BaseUrl);
stats.Add(success.Code, success.Time, success.Server);

var stats = Routes.GetUrlStatistics(request.BaseUrl);
stats.Add(code, time, server);
}
else
{
if (Routes.UrlStatisticsCount < this.MaxStatsUrls)
if (Routes.UrlStatisticsCount < this.MaxStatsUrls && code != 404)
{
var stats = Routes.GetUrlStatistics(success.Request.BaseUrl);
stats.Add(success.Code, success.Time, success.Server);
var stats = Routes.GetUrlStatistics(request.BaseUrl);
stats.Add(code, time, server);
}

}
Statistics.Add(success.Code, success.Time);
Pluginer.Requested(success);
Statistics.Add(code, time);
}

internal void OnRequestCompleted(Servers.RequestAgent success)
{
IncrementRequestCompleted(success.Request, success.Code, success.Time, success.Server);
if (Pluginer.RequestedEnabled)
Pluginer.Requested(success.GetEventRequestCompletedArgs());
}

public void Open()
{
BufferPool.BUFFER_SIZE = AgentBufferSize;
BufferPool.POOL_MAX_SIZE = AgentBufferPoolSize;
BufferPool.BUFFER_SIZE = BufferSize;
BufferPool.POOL_MAX_SIZE = PoolMaxSize;
HttpServer[GATEWAY_TAG] = this;
HttpServer.ModuleManager.AssemblyLoding += (o, e) =>
{
LoadPlugin(e.Assembly);
};
if (HttpServer.Options.CacheLogMaxSize < 1000)
HttpServer.Options.CacheLogMaxSize = 1000;
if (HttpServer.Options.BufferPoolMaxMemory < 1024)
HttpServer.Options.BufferPoolMaxMemory = 1024;
if (HttpServer.Options.BufferSize < 1024 * 8)
HttpServer.Options.BufferSize = 1024 * 8;
mIOQueue = new BeetleX.Dispatchs.DispatchCenter<Tuple<UrlRouteAgent, HttpRequest, HttpResponse>>(OnRouteExecute,
ThreadQueues);
HttpServer.Options.UrlIgnoreCase = false;
// HttpServer.Options.IOQueueEnabled = true;
HttpServer.Open();
HttpServer.HttpRequesting += OnRequest;
LoadConfig();
//GatewayController controller = new GatewayController(this);
//HttpServer.ActionFactory.Register(controller);
PluginCenter.Load(typeof(Gateway).Assembly);
HttpServer.Log(BeetleX.EventArgs.LogType.Info, $"Gateway server started [v:{this.GetType().Assembly.GetName().Version}]");
mVerifyTimer = new Timer(OnVerifyTimer, null, 1000, 1000);
Expand Down

0 comments on commit 1085341

Please sign in to comment.