Skip to content

Commit

Permalink
Support dynamically generated streams to avoid having to materialize …
Browse files Browse the repository at this point in the history
…the raw payload up-front. Resolves #143
  • Loading branch information
qed- committed May 15, 2024
1 parent b939d00 commit 5264cd4
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 2 deletions.
19 changes: 19 additions & 0 deletions ClickHouse.Client.Tests/ConnectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ClickHouse.Client.ADO;
using ClickHouse.Client.Utility;
Expand Down Expand Up @@ -191,5 +193,22 @@ public void ShouldExcludePasswordFromRedactedConnectionString()
Assert.That(conn.RedactedConnectionString, Is.Not.Contains($"Password={MOCK}"));
}

[Test]
public async Task ShouldPostDynamicallyGeneratedRawStream()
{
var targetTable = "test.raw_stream";

await connection.ExecuteStatementAsync($"DROP TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value Int32) ENGINE Null");
await connection.PostStreamAsync($"INSERT INTO {targetTable} FORMAT CSV", async (stream, ct) => {
foreach (var i in Enumerable.Range(1, 1000))
{
var line = $"{i}\n";
await stream.WriteAsync(Encoding.UTF8.GetBytes(line), ct);
}
}, false, CancellationToken.None);
}

private static string[] GetColumnNames(DataTable table) => table.Columns.Cast<DataColumn>().Select(dc => dc.ColumnName).ToArray();
}
25 changes: 23 additions & 2 deletions ClickHouse.Client/ADO/ClickHouseConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,28 @@ public override async Task OpenAsync(CancellationToken cancellationToken)
/// <param name="isCompressed">indicates whether "Content-Encoding: gzip" header should be added</param>
/// <param name="token">Cancellation token</param>
/// <returns>Task-wrapped HttpResponseMessage object</returns>
public async Task PostStreamAsync(string sql, Stream data, bool isCompressed, CancellationToken token)
public Task PostStreamAsync(string sql, Stream data, bool isCompressed, CancellationToken token)
{
var content = new StreamContent(data);
return PostStreamAsync(sql, content, isCompressed, token);
}

/// <summary>
/// Warning: implementation-specific API. Exposed to allow custom optimizations
/// May change in future versions
/// </summary>
/// <param name="sql">SQL query to add to URL, may be empty</param>
/// <param name="callback">Callback invoked to write to the stream. May contain SQL query at the beginning. May be gzip-compressed</param>
/// <param name="isCompressed">indicates whether "Content-Encoding: gzip" header should be added</param>
/// <param name="token">Cancellation token</param>
/// <returns>Task-wrapped HttpResponseMessage object</returns>
public Task PostStreamAsync(string sql, Func<Stream, CancellationToken, Task> callback, bool isCompressed, CancellationToken token)
{
var content = new StreamCallbackContent(callback, token);
return PostStreamAsync(sql, content, isCompressed, token);
}

private async Task PostStreamAsync(string sql, HttpContent content, bool isCompressed, CancellationToken token)
{
using var activity = this.StartActivity("PostStreamAsync");
activity.SetQuery(sql);
Expand All @@ -286,7 +307,7 @@ public async Task PostStreamAsync(string sql, Stream data, bool isCompressed, Ca
using var postMessage = new HttpRequestMessage(HttpMethod.Post, builder.ToString());
AddDefaultHttpHeaders(postMessage.Headers);

postMessage.Content = new StreamContent(data);
postMessage.Content = content;
postMessage.Content.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
if (isCompressed)
{
Expand Down
36 changes: 36 additions & 0 deletions ClickHouse.Client/ADO/StreamCallbackContent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@


Check warning on line 1 in ClickHouse.Client/ADO/StreamCallbackContent.cs

View workflow job for this annotation

GitHub Actions / Short

using System;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

namespace ClickHouse.Client.ADO;

/// <summary>
/// HttpContent implementation allowing streaming large payloads without having to materialize
/// the entire stream up-front.
/// </summary>
internal class StreamCallbackContent : HttpContent
{
private readonly Func<Stream, CancellationToken, Task> callback;
private readonly CancellationToken cancellationToken;

public StreamCallbackContent(Func<Stream, CancellationToken, Task> callback, CancellationToken cancellationToken)
{
this.callback = callback;
this.cancellationToken = cancellationToken;
}

protected override Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
return callback(stream, cancellationToken);
}

protected override bool TryComputeLength(out long length)
{
length = 0;
return false;
}
}
1 change: 1 addition & 0 deletions ClickHouse.Client/ClickHouse.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="System.Net.Http" Version="4.3.4" />
<PackageReference Include="System.Net.Http.Formatting" Version="4.0.20126.16343" />
<PackageReference Include="System.Text.Json" Version="8.0.0" />
</ItemGroup>

Expand Down

0 comments on commit 5264cd4

Please sign in to comment.