Skip to content

Commit

Permalink
add support for using top-down during main save
Browse files Browse the repository at this point in the history
  • Loading branch information
riina committed Mar 19, 2023
1 parent b1e8279 commit 96edc17
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 44 deletions.
24 changes: 24 additions & 0 deletions src/Art.M3U/IExtraSaverOperation.cs
@@ -0,0 +1,24 @@
namespace Art.M3U;

/// <summary>
/// Represents an extra operation that should be used when no new segments are immediately available in <see cref="M3UDownloaderContextProcessor"/>.
/// </summary>
/// <remarks>
/// The <see cref="TickAsync"/> method can be invoked multiple times, and
/// is intended to be used when no new segments are immediately available.
/// </remarks>
public interface IExtraSaverOperation
{
/// <summary>
/// Resets this operation.
/// </summary>
void Reset();

/// <summary>
/// Executes operation step.
/// </summary>
/// <param name="m3">Existing M3U file.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Task that returns false if this operation is no longer useful.</returns>
Task<bool> TickAsync(M3UFile m3, CancellationToken cancellationToken = default);
}
3 changes: 2 additions & 1 deletion src/Art.M3U/M3UDownloaderContext.cs
Expand Up @@ -164,8 +164,9 @@ private async Task WriteAncillaryFileAsync(string file, ReadOnlyMemory<byte> dat
/// </summary>
/// <param name="oneOff">If true, only request target once.</param>
/// <param name="timeout">Timeout when waiting for new entries.</param>
/// <param name="extraOperation">Extra operation to invoke during down time.</param>
/// <returns>Downloader.</returns>
public M3UDownloaderContextStandardSaver CreateStandardSaver(bool oneOff, TimeSpan timeout) => new(this, oneOff, timeout);
public M3UDownloaderContextStandardSaver CreateStandardSaver(bool oneOff, TimeSpan timeout, IExtraSaverOperation? extraOperation = null) => new(this, oneOff, timeout, extraOperation);

/// <summary>
/// Creates a basic stream output downloader.
Expand Down
46 changes: 37 additions & 9 deletions src/Art.M3U/M3UDownloaderContextProcessor.cs
Expand Up @@ -109,9 +109,11 @@ private static async Task DelayOrThrowAsync(ArtHttpResponseMessageException exce
/// <param name="oneOff">If true, complete after one pass through playlist.</param>
/// <param name="timeout">Timeout to use to determine when a stream seems to have ended.</param>
/// <param name="playlistElementProcessor">Processor to handle playlist elements.</param>
/// <param name="extraOperation">Optional extra operation.</param>
/// <param name="cancellationToken">Cancellation token.</param>
protected async Task ProcessPlaylistAsync(bool oneOff, TimeSpan timeout, IPlaylistElementProcessor playlistElementProcessor, CancellationToken cancellationToken = default)
protected async Task ProcessPlaylistAsync(bool oneOff, TimeSpan timeout, IPlaylistElementProcessor playlistElementProcessor, IExtraSaverOperation? extraOperation = null, CancellationToken cancellationToken = default)
{
extraOperation?.Reset();
IOperationProgressContext? operationProgressContext = null;
try
{
Expand Down Expand Up @@ -174,21 +176,47 @@ protected async Task ProcessPlaylistAsync(bool oneOff, TimeSpan timeout, IPlayli
if (j != 0)
{
sw.Restart();
remainingTimeout = timeout;
}
else if (sw.IsRunning)
{
var elapsed = sw.Elapsed;
if (elapsed >= timeout)
if (extraOperation != null)
{
if (operationProgressContext != null)
try
{
operationProgressContext.Dispose();
operationProgressContext = null;
Context.Tool.LogInformation("No new segments, executing extra operation...");
bool shouldContinue = await extraOperation.TickAsync(m3, cancellationToken).ConfigureAwait(false);
if (!shouldContinue)
{
extraOperation = null;
}
}
Context.Tool.LogInformation($"No new entries for timeout {timeout}, stopping");
return;
catch (Exception e)
{
Context.Tool.LogError(e.Message, e.ToString());
extraOperation = null;
}
}
if (extraOperation != null)
{
sw.Restart();
remainingTimeout = timeout;
}
else
{
var elapsed = sw.Elapsed;
if (elapsed >= timeout)
{
if (operationProgressContext != null)
{
operationProgressContext.Dispose();
operationProgressContext = null;
}
Context.Tool.LogInformation($"No new entries for timeout {timeout}, stopping");
return;
}
remainingTimeout = timeout.Subtract(elapsed);
}
remainingTimeout = timeout.Subtract(elapsed);
}
else
{
Expand Down
6 changes: 4 additions & 2 deletions src/Art.M3U/M3UDownloaderContextStandardSaver.cs
Expand Up @@ -7,16 +7,18 @@ public class M3UDownloaderContextStandardSaver : M3UDownloaderContextSaver
{
private readonly bool _oneOff;
private readonly TimeSpan _timeout;
private readonly IExtraSaverOperation? _extraOperation;

internal M3UDownloaderContextStandardSaver(M3UDownloaderContext context, bool oneOff, TimeSpan timeout) : base(context)
internal M3UDownloaderContextStandardSaver(M3UDownloaderContext context, bool oneOff, TimeSpan timeout, IExtraSaverOperation? extraOperation) : base(context)
{
_oneOff = oneOff;
_timeout = timeout;
_extraOperation = extraOperation;
}

/// <inheritdoc />
public override Task RunAsync(CancellationToken cancellationToken = default)
{
return ProcessPlaylistAsync(_oneOff, _timeout, new SegmentDownloadPlaylistElementProcessor(Context), cancellationToken);
return ProcessPlaylistAsync(_oneOff, _timeout, new SegmentDownloadPlaylistElementProcessor(Context), _extraOperation, cancellationToken);
}
}
2 changes: 1 addition & 1 deletion src/Art.M3U/M3UDownloaderContextStreamOutputSaver.cs
Expand Up @@ -26,6 +26,6 @@ internal M3UDownloaderContextStreamOutputSaver(M3UDownloaderContext context, boo
/// <exception cref="ArtHttpResponseMessageException">Thrown on HTTP response indicating non-successful response.</exception>
public Task ExportAsync(Stream stream, CancellationToken cancellationToken = default)
{
return ProcessPlaylistAsync(_oneOff, _timeout, new StreamOutputPlaylistElementProcessor(Context, stream), cancellationToken);
return ProcessPlaylistAsync(_oneOff, _timeout, new StreamOutputPlaylistElementProcessor(Context, stream), null, cancellationToken);
}
}
97 changes: 66 additions & 31 deletions src/Art.M3U/M3UDownloaderContextTopDownSaver.cs
Expand Up @@ -8,7 +8,7 @@ namespace Art.M3U;
/// <summary>
/// Represents a top-down saver.
/// </summary>
public partial class M3UDownloaderContextTopDownSaver : M3UDownloaderContextSaver
public partial class M3UDownloaderContextTopDownSaver : M3UDownloaderContextSaver, IExtraSaverOperation
{
[GeneratedRegex("(^[\\S\\s]*[^\\d]|)\\d+(\\.\\w+)$")]
private static partial Regex GetBitRegex();
Expand All @@ -18,6 +18,8 @@ public partial class M3UDownloaderContextTopDownSaver : M3UDownloaderContextSave

private readonly long _top;
private readonly Func<string, long, string> _nameTransform;
private long _currentTop;
private bool _ended;

internal M3UDownloaderContextTopDownSaver(M3UDownloaderContext context, long top)
: this(context, top, TranslateNameDefault)
Expand All @@ -32,6 +34,7 @@ internal M3UDownloaderContextTopDownSaver(M3UDownloaderContext context, long top
internal M3UDownloaderContextTopDownSaver(M3UDownloaderContext context, long top, Func<string, long, string> nameTransform) : base(context)
{
_top = top;
_currentTop = _top;
_nameTransform = nameTransform;
}

Expand Down Expand Up @@ -81,45 +84,77 @@ public static string TranslateNameMatchLength(string name, string i)
/// <inheritdoc />
public override async Task RunAsync(CancellationToken cancellationToken = default)
{
FailCounter = 0;
long top = _top;
Reset();
while (true)
{
if (HeartbeatCallback != null) await HeartbeatCallback().ConfigureAwait(false);
M3UFile m3;
Context.Tool.LogInformation("Reading main...");
try
{
if (top < 0) break;
if (HeartbeatCallback != null) await HeartbeatCallback().ConfigureAwait(false);
Context.Tool.LogInformation("Reading main...");
M3UFile m3 = await Context.GetAsync(cancellationToken).ConfigureAwait(false);
string str = m3.DataLines.First();
Uri origUri = new(Context.MainUri, str);
int idx = str.IndexOf('?');
if (idx >= 0) str = str[..idx];
Uri uri = new UriBuilder(new Uri(Context.MainUri, _nameTransform(str, top))) { Query = origUri.Query }.Uri;
Context.Tool.LogInformation($"Downloading segment {uri.Segments[^1]}...");
try
{
// Don't assume MSN, and just accept failure (exception) when trying to decrypt with no IV
// Also don't depend on current file since it probably won't do us good anyway for this use case
await Context.DownloadSegmentAsync(uri, null, null, cancellationToken).ConfigureAwait(false);
top--;
}
catch (ArtHttpResponseMessageException e)
{
if (e.StatusCode == HttpStatusCode.NotFound)
{
Context.Tool.LogInformation("HTTP NotFound returned, ending operation");
return;
}
await HandleRequestExceptionAsync(e, cancellationToken).ConfigureAwait(false);
}
await Task.Delay(500, cancellationToken).ConfigureAwait(false);
FailCounter = 0;
m3 = await Context.GetAsync(cancellationToken).ConfigureAwait(false);
}
catch (ArtHttpResponseMessageException e)
{
await HandleRequestExceptionAsync(e, cancellationToken).ConfigureAwait(false);
continue;
}
bool shouldContinue = await TickAsync(m3, cancellationToken).ConfigureAwait(false);
if (!shouldContinue)
{
return;
}
await Task.Delay(500, cancellationToken).ConfigureAwait(false);
}
}

void IExtraSaverOperation.Reset()
{
Reset();
}

private void Reset()
{
_ended = false;
_currentTop = _top;
FailCounter = 0;
}

Task<bool> IExtraSaverOperation.TickAsync(M3UFile m3, CancellationToken cancellationToken)
{
return TickAsync(m3, cancellationToken);
}

private async Task<bool> TickAsync(M3UFile m3, CancellationToken cancellationToken = default)
{
if (_ended || _currentTop < 0)
{
return false;
}
string str = m3.DataLines.First();
Uri origUri = new(Context.MainUri, str);
int idx = str.IndexOf('?');
if (idx >= 0) str = str[..idx];
Uri uri = new UriBuilder(new Uri(Context.MainUri, _nameTransform(str, _currentTop))) { Query = origUri.Query }.Uri;
Context.Tool.LogInformation($"Top-downloading segment {uri.Segments[^1]}...");
try
{
// Don't assume MSN, and just accept failure (exception) when trying to decrypt with no IV
// Also don't depend on current file since it probably won't do us good anyway for this use case
await Context.DownloadSegmentAsync(uri, null, null, cancellationToken).ConfigureAwait(false);
_currentTop--;
}
catch (ArtHttpResponseMessageException e)
{
if (e.StatusCode == HttpStatusCode.NotFound)
{
Context.Tool.LogInformation("HTTP NotFound returned, ending top-down operation");
_ended = true;
return false;
}
await HandleRequestExceptionAsync(e, cancellationToken).ConfigureAwait(false);
}
FailCounter = 0;
return true;
}
}

0 comments on commit 96edc17

Please sign in to comment.