Using Akka.DotNet.Streams to answer HTTP POST #6246
-
Hello. I was told to implement using Akka.Dotnet a System that receives a HTTP POST request, gives a response to said request and then uses the data to send 2 other POST requests. But 1 of the final endpoints can't receive as many requests as the one I am trying to implement. I thought about using Akka.Streams because I can then create a Source with .TcpStream() Sure, I want 1 input and 1 output, but in the middle of that I would like to be able to use a Broadcast with 3 "Sinks". 1 for each POST request that I want to make and the other one for the POST response. I also saw that a Flow can be created based on a FlowShape and the shape based on a Graph. But that didn't get me anywere. The following code DOES NOT work but it is similar to my attempt. static async Task Main(string[] args)
{
// define an incoming request processing logic
var actorSystem = ActorSystem.Create("AkkaStreams", @"akka.loglevel = DEBUG");
var materializer = actorSystem.Materializer();
Source<Akka.Streams.Dsl.Tcp.IncomingConnection, Task<Akka.Streams.Dsl.Tcp.ServerBinding>> connections =
actorSystem.TcpStream().Bind("127.0.0.1", 8888);
var post_1 = Sink.ForEach<ByteString>(/* send first POST */); // Use Something similar to .Grouped()
var post_2 = Sink.ForEach<ByteString>(/* send second POST */); // Use Something similar to .Grouped()
var response = Flow.Create<ByteString>(/* send HTTP response */);
var http_reciever = Flow.Create<ByteString>()
.Select(c =>
{
String str_value = Encoding.UTF8.GetString(Encoding.Default.GetBytes(c.ToString()));
// str_value.Replace(Environment.NewLine, "\n");
string[] body_helper = str_value.Split(Environment.NewLine + Environment.NewLine);
return ByteString.FromString((body_helper.Length > 1) ? body_helper[1] : "");
});
await connections.RunForeach(connection =>
{
Console.WriteLine($"New connection from: {connection.RemoteAddress}");
Flow<ByteString, ByteString, Akka.NotUsed> fl_shape = Flow.FromGraph(
GraphDsl.Create(b =>
{
var broadcast = b.Add(new Broadcast<ByteString>(3));
var flow1 = b.From(http_reciever).Via(broadcast).Via(response);
var flow2 = b.From(broadcast).Via(post_1);
var flow3 = b.From(broadcast).Via(post_2);
var final_shape = new FlowShape<ByteString, ByteString>(http_reciever, response); // This. Doesn't Compile. I tried something different but this is what I'd like to do
return final_shape; // When this is return would be to run I get a Run Time Error
}));
connection.HandleWith(final_shape, materializer);
}, materializer); Problems I am having:
Thx for reading |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 4 replies
-
@tiago-andrade-dvs some ideas
I have a sample you might also be interested in looking at: https://github.com/Aaronontheweb/Akka.Streams.Benchmark/blob/dev/src/Akka.Streams.Benchmark/Program.cs |
Beta Was this translation helpful? Give feedback.
-
var actorSystem = ActorSystem.Create("AkkaStreams", @"akka.loglevel = DEBUG");
var materializer = actorSystem.Materializer();
Source<Tcp.IncomingConnection, Task<Tcp.ServerBinding>> connections = actorSystem.TcpStream().Bind("127.0.0.1", 8888);
var (serverBind, source) = connections.PreMaterialize(materializer);
var sourceTask = source.RunForeach(connection =>
{
Console.WriteLine($"New connection from: {connection.RemoteAddress}");
Flow<ByteString, ByteString, Akka.NotUsed> flShape = Flow.FromGraph(
GraphDsl.Create(b =>
{
var httpReceiver = Flow.Create<ByteString>()
.Select(c =>
{
var strValue = c.ToString();
var bodyHelper = strValue.Split(Environment.NewLine + Environment.NewLine);
var result = (bodyHelper.Length > 1) ? bodyHelper[1] : "";
result = string.IsNullOrEmpty(result) ? "empty" : result;
Console.WriteLine($"Received: {result}");
return ByteString.FromString(result);
});
var post1 = Flow.Create<ByteString>()
.SelectAsync(1, async c =>
{
// Dont mind if Flow<ByteString, ByteString, Akka.NotUsed> response is "sent" and this isn't done yet
Console.WriteLine("Starting POST Request 1");
await Task.Delay(1000);
/* etc ...*/
Console.WriteLine("POST Request 1 Done");
return c;
})
.Grouped(3)
.Select(x => x.Aggregate(ByteString.Empty, (s, byteString) => s.Concat(byteString)));
var post2 = Flow.Create<ByteString>()
.SelectAsync(1, async c =>
{
// Dont mind if Flow<ByteString, ByteString, Akka.NotUsed> response is "sent" and this isn't done yet
Console.WriteLine("Starting POST Request 2");
await Task.Delay(1000);
/* etc ...*/
Console.WriteLine("POST Request 2 Done");
return c;
})
.Grouped(3)
.Select(x => x.Aggregate(ByteString.Empty, (s, byteString) => s.Concat(byteString)));
var response = Flow.Create<ByteString>()
.Select(c =>
{
var cont = c.ToString();
var sb = new StringBuilder("HTTP/1.1");
sb.Append(" 200 OK\n");
sb.Append("Content-Type: text/plain\n");
sb.Append("Content-Length: " + cont.Length);
sb.Append("\n\n");
sb.Append(cont);
Console.WriteLine($"Response: {sb}");
return ByteString.FromString(sb.ToString());
});
var httpReceiverFlow = b.Add(httpReceiver);
var receiveBroadcast = b.Add(new Broadcast<ByteString>(3));
var post1Flow = b.Add(post1);
var post2Flow = b.Add(post2);
var responseFlow = b.Add(response);
b.From(httpReceiverFlow).To(receiveBroadcast);
b.From(receiveBroadcast.Out(0)).To(responseFlow);
b.From(receiveBroadcast.Out(1))
.Via(post1Flow)
.To(Sink.ForEach<ByteString>(bs => Console.WriteLine($"post1: {bs}")));
b.From(receiveBroadcast.Out(2))
.Via(post2Flow)
.To(Sink.ForEach<ByteString>(bs => Console.WriteLine($"post2: {bs}")));
var finalShape = new FlowShape<ByteString, ByteString>(httpReceiverFlow.Inlet, responseFlow.Outlet);
return finalShape;
}));
connection.HandleWith(flShape, materializer);
}, materializer);
await serverBind;
await actorSystem.WhenTerminated; |
Beta Was this translation helpful? Give feedback.
@tiago-andrade-dvs