-
Notifications
You must be signed in to change notification settings - Fork 4.5k
/
StreamingResponse.cs
94 lines (83 loc) · 3.37 KB
/
StreamingResponse.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
using System;
using System.Collections.Generic;
using System.Threading;
namespace Azure.AI.OpenAI;
/// <summary>
/// Represents an operation response with streaming content that can be deserialized and enumerated while the response
/// is still being received.
/// </summary>
/// <typeparam name="T"> The data type representative of distinct, streamable items. </typeparam>
public class StreamingResponse<T>
: IDisposable
, IAsyncEnumerable<T>
{
private Response _rawResponse { get; }
private IAsyncEnumerable<T> _asyncEnumerableSource { get; }
private bool _disposedValue { get; set; }
private StreamingResponse() { }
private StreamingResponse(
Response rawResponse,
Func<Response, IAsyncEnumerable<T>> asyncEnumerableProcessor)
{
_rawResponse = rawResponse;
_asyncEnumerableSource = asyncEnumerableProcessor.Invoke(rawResponse);
}
/// <summary>
/// Creates a new instance of <see cref="StreamingResponse{T}"/> using the provided underlying HTTP response. The
/// provided function will be used to resolve the response into an asynchronous enumeration of streamed response
/// items.
/// </summary>
/// <param name="response">The HTTP response.</param>
/// <param name="asyncEnumerableProcessor">
/// The function that will resolve the provided response into an IAsyncEnumerable.
/// </param>
/// <returns>
/// A new instance of <see cref="StreamingResponse{T}"/> that will be capable of asynchronous enumeration of
/// <typeparamref name="T"/> items from the HTTP response.
/// </returns>
public static StreamingResponse<T> CreateFromResponse(
Response response,
Func<Response, IAsyncEnumerable<T>> asyncEnumerableProcessor)
{
return new(response, asyncEnumerableProcessor);
}
/// <summary>
/// Gets the underlying <see cref="Response"/> instance that this <see cref="StreamingResponse{T}"/> may enumerate
/// over.
/// </summary>
/// <returns> The <see cref="Response"/> instance attached to this <see cref="StreamingResponse{T}"/>. </returns>
public Response GetRawResponse() => _rawResponse;
/// <summary>
/// Gets the asynchronously enumerable collection of distinct, streamable items in the response.
/// </summary>
/// <remarks>
/// <para> The return value of this method may be used with the "await foreach" statement. </para>
/// <para>
/// As <see cref="StreamingResponse{T}"/> explicitly implements <see cref="IAsyncEnumerable{T}"/>, callers may
/// enumerate a <see cref="StreamingResponse{T}"/> instance directly instead of calling this method.
/// </para>
/// </remarks>
/// <returns></returns>
public IAsyncEnumerable<T> EnumerateValues() => this;
/// <inheritdoc/>
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!_disposedValue)
{
if (disposing)
{
_rawResponse?.Dispose();
}
_disposedValue = true;
}
}
IAsyncEnumerator<T> IAsyncEnumerable<T>.GetAsyncEnumerator(CancellationToken cancellationToken)
=> _asyncEnumerableSource.GetAsyncEnumerator(cancellationToken);
}