From 5264cd433b4e229ce498742f9d935d878d488980 Mon Sep 17 00:00:00 2001 From: David Chapman Date: Wed, 15 May 2024 13:20:35 +0100 Subject: [PATCH] Support dynamically generated streams to avoid having to materialize the raw payload up-front. Resolves #143 --- ClickHouse.Client.Tests/ConnectionTests.cs | 19 ++++++++++ ClickHouse.Client/ADO/ClickHouseConnection.cs | 25 +++++++++++-- .../ADO/StreamCallbackContent.cs | 36 +++++++++++++++++++ ClickHouse.Client/ClickHouse.Client.csproj | 1 + 4 files changed, 79 insertions(+), 2 deletions(-) create mode 100644 ClickHouse.Client/ADO/StreamCallbackContent.cs diff --git a/ClickHouse.Client.Tests/ConnectionTests.cs b/ClickHouse.Client.Tests/ConnectionTests.cs index f3e1930..85a0eb5 100644 --- a/ClickHouse.Client.Tests/ConnectionTests.cs +++ b/ClickHouse.Client.Tests/ConnectionTests.cs @@ -3,6 +3,8 @@ using System.Linq; using System.Net; using System.Net.Http; +using System.Text; +using System.Threading; using System.Threading.Tasks; using ClickHouse.Client.ADO; using ClickHouse.Client.Utility; @@ -191,5 +193,22 @@ public void ShouldExcludePasswordFromRedactedConnectionString() Assert.That(conn.RedactedConnectionString, Is.Not.Contains($"Password={MOCK}")); } + [Test] + public async Task ShouldPostDynamicallyGeneratedRawStream() + { + var targetTable = "test.raw_stream"; + + await connection.ExecuteStatementAsync($"DROP TABLE IF EXISTS {targetTable}"); + await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value Int32) ENGINE Null"); + await connection.PostStreamAsync($"INSERT INTO {targetTable} FORMAT CSV", async (stream, ct) => { + + foreach (var i in Enumerable.Range(1, 1000)) + { + var line = $"{i}\n"; + await stream.WriteAsync(Encoding.UTF8.GetBytes(line), ct); + } + }, false, CancellationToken.None); + } + private static string[] GetColumnNames(DataTable table) => table.Columns.Cast().Select(dc => dc.ColumnName).ToArray(); } diff --git a/ClickHouse.Client/ADO/ClickHouseConnection.cs b/ClickHouse.Client/ADO/ClickHouseConnection.cs index fd0e30c..b49defd 100644 --- a/ClickHouse.Client/ADO/ClickHouseConnection.cs +++ b/ClickHouse.Client/ADO/ClickHouseConnection.cs @@ -277,7 +277,28 @@ public override async Task OpenAsync(CancellationToken cancellationToken) /// indicates whether "Content-Encoding: gzip" header should be added /// Cancellation token /// Task-wrapped HttpResponseMessage object - public async Task PostStreamAsync(string sql, Stream data, bool isCompressed, CancellationToken token) + public Task PostStreamAsync(string sql, Stream data, bool isCompressed, CancellationToken token) + { + var content = new StreamContent(data); + return PostStreamAsync(sql, content, isCompressed, token); + } + + /// + /// Warning: implementation-specific API. Exposed to allow custom optimizations + /// May change in future versions + /// + /// SQL query to add to URL, may be empty + /// Callback invoked to write to the stream. May contain SQL query at the beginning. May be gzip-compressed + /// indicates whether "Content-Encoding: gzip" header should be added + /// Cancellation token + /// Task-wrapped HttpResponseMessage object + public Task PostStreamAsync(string sql, Func callback, bool isCompressed, CancellationToken token) + { + var content = new StreamCallbackContent(callback, token); + return PostStreamAsync(sql, content, isCompressed, token); + } + + private async Task PostStreamAsync(string sql, HttpContent content, bool isCompressed, CancellationToken token) { using var activity = this.StartActivity("PostStreamAsync"); activity.SetQuery(sql); @@ -286,7 +307,7 @@ public async Task PostStreamAsync(string sql, Stream data, bool isCompressed, Ca using var postMessage = new HttpRequestMessage(HttpMethod.Post, builder.ToString()); AddDefaultHttpHeaders(postMessage.Headers); - postMessage.Content = new StreamContent(data); + postMessage.Content = content; postMessage.Content.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream"); if (isCompressed) { diff --git a/ClickHouse.Client/ADO/StreamCallbackContent.cs b/ClickHouse.Client/ADO/StreamCallbackContent.cs new file mode 100644 index 0000000..15d6dd1 --- /dev/null +++ b/ClickHouse.Client/ADO/StreamCallbackContent.cs @@ -0,0 +1,36 @@ + +using System; +using System.IO; +using System.Net; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; + +namespace ClickHouse.Client.ADO; + +/// +/// HttpContent implementation allowing streaming large payloads without having to materialize +/// the entire stream up-front. +/// +internal class StreamCallbackContent : HttpContent +{ + private readonly Func callback; + private readonly CancellationToken cancellationToken; + + public StreamCallbackContent(Func callback, CancellationToken cancellationToken) + { + this.callback = callback; + this.cancellationToken = cancellationToken; + } + + protected override Task SerializeToStreamAsync(Stream stream, TransportContext context) + { + return callback(stream, cancellationToken); + } + + protected override bool TryComputeLength(out long length) + { + length = 0; + return false; + } +} diff --git a/ClickHouse.Client/ClickHouse.Client.csproj b/ClickHouse.Client/ClickHouse.Client.csproj index 83646fb..03bc7ad 100644 --- a/ClickHouse.Client/ClickHouse.Client.csproj +++ b/ClickHouse.Client/ClickHouse.Client.csproj @@ -32,6 +32,7 @@ runtime; build; native; contentfiles; analyzers; buildtransitive +