Skip to content
This repository has been archived by the owner on Aug 2, 2023. It is now read-only.

Introducing System.IO.Pipelines #980

Merged
merged 3 commits into from
Nov 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
198 changes: 129 additions & 69 deletions corefxlab.sln

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions samples/System.IO.Pipelines.Samples/AspNetHttpServerSample.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System.Text;
using System.IO.Pipelines.Samples.Http;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;

namespace System.IO.Pipelines.Samples
{
public class AspNetHttpServerSample
{
private static readonly UTF8Encoding _utf8Encoding = new UTF8Encoding(false);
private static readonly byte[] _helloWorldPayload = Encoding.UTF8.GetBytes("Hello, World!");

public static void Run()
{
using (var httpServer = new HttpServer())
{
var host = new WebHostBuilder()
.UseUrls("http://*:5000")
.UseServer(httpServer)
// .UseKestrel()
.Configure(app =>
{
app.Run(context =>
{
context.Response.StatusCode = 200;
context.Response.ContentType = "text/plain";
// HACK: Setting the Content-Length header manually avoids the cost of serializing the int to a string.
// This is instead of: httpContext.Response.ContentLength = _helloWorldPayload.Length;
context.Response.Headers["Content-Length"] = "13";
return context.Response.Body.WriteAsync(_helloWorldPayload, 0, _helloWorldPayload.Length);
});
})
.Build();
host.Run();
}
}
}
}
41 changes: 41 additions & 0 deletions samples/System.IO.Pipelines.Samples/CompressionSample.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System;
using System.IO;
using System.IO.Compression;
using System.IO.Pipelines.Compression;
using System.IO.Pipelines.File;

namespace System.IO.Pipelines.Samples
{
public class CompressionSample
{
public static void Run()
{
using (var factory = new PipelineFactory())
{
var filePath = Path.GetFullPath("Program.cs");

// This is what Stream looks like
//var fs = File.OpenRead(filePath);
//var compressed = new MemoryStream();
//var compressStream = new DeflateStream(compressed, CompressionMode.Compress);
//fs.CopyTo(compressStream);
//compressStream.Flush();
//compressed.Seek(0, SeekOrigin.Begin);

var input = factory.ReadFile(filePath)
.DeflateCompress(factory, CompressionLevel.Optimal)
.DeflateDecompress(factory);

// Wrap the console in a pipeline writer
var output = factory.CreateWriter(Console.OpenStandardOutput());

// Copy from the file reader to the console writer
input.CopyToAsync(output).GetAwaiter().GetResult();

input.Complete();
Copy link

@dazinator dazinator Dec 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if you forget to call Complete() ? Do you think there would be any benefit if IPipelineReader / 'IPipelineReader' implemented IDisposable, and called Complete() in it's dispose?


output.Complete();
}
}
}
}
142 changes: 142 additions & 0 deletions samples/System.IO.Pipelines.Samples/Framing/Codec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
using System;
using System.IO;
using System.Net;
using System.Text;
using System.Text.Formatting;
using System.Threading.Tasks;
using System.IO.Pipelines.Networking.Libuv;
using System.IO.Pipelines.Text.Primitives;

namespace System.IO.Pipelines.Samples.Framing
{
public static class ProtocolHandling
{
public static void Run()
{
var ip = IPAddress.Any;
int port = 5000;
var thread = new UvThread();
var listener = new UvTcpListener(thread, new IPEndPoint(ip, port));
listener.OnConnection(async connection =>
{
var pipelineConnection = MakePipeline(connection);

var decoder = new LineDecoder();
var handler = new LineHandler();

// Initialize the handler with the connection
handler.Initialize(pipelineConnection);

try
{
while (true)
{
// Wait for data
var result = await pipelineConnection.Input.ReadAsync();
var input = result.Buffer;

try
{
if (input.IsEmpty && result.IsCompleted)
{
// No more data
break;
}

Line line;
while (decoder.TryDecode(ref input, out line))
{
await handler.HandleAsync(line);
}

if (!input.IsEmpty && result.IsCompleted)
{
// Didn't get the whole frame and the connection ended
throw new EndOfStreamException();
}
}
finally
{
// Consume the input
pipelineConnection.Input.Advance(input.Start, input.End);
}
}
}
finally
{
// Close the input, which will tell the producer to stop producing
pipelineConnection.Input.Complete();

// Close the output, which will close the connection
pipelineConnection.Output.Complete();
}
});

listener.StartAsync().GetAwaiter().GetResult();

Console.WriteLine($"Listening on {ip} on port {port}");
Console.ReadKey();

listener.Dispose();
thread.Dispose();
}

public static IPipelineConnection MakePipeline(IPipelineConnection connection)
{
// Do something fancy here to wrap the connection, SSL etc
return connection;
}
}

public class Line
{
public string Data { get; set; }
}

public class LineHandler : IFrameHandler<Line>
{
private PipelineTextOutput _textOutput;

public void Initialize(IPipelineConnection connection)
{
_textOutput = new PipelineTextOutput(connection.Output, EncodingData.InvariantUtf8);
}

public Task HandleAsync(Line message)
{
// Echo back to the caller
_textOutput.Append(message.Data);
return _textOutput.FlushAsync();
}
}

public class LineDecoder : IFrameDecoder<Line>
{
public bool TryDecode(ref ReadableBuffer input, out Line frame)
{
ReadableBuffer slice;
ReadCursor cursor;
if (input.TrySliceTo((byte)'\r', (byte)'\n', out slice, out cursor))
{
frame = new Line { Data = slice.GetUtf8String() };
input = input.Slice(cursor).Slice(1);
return true;
}

frame = null;
return false;
}
}

public interface IFrameDecoder<TInput>
{
bool TryDecode(ref ReadableBuffer input, out TInput frame);
}

public interface IFrameHandler<TInput>
{
void Initialize(IPipelineConnection connection);

Task HandleAsync(TInput message);
}
}