Skip to content

Commit

Permalink
Better incremental refresh in Jobs View + AbortAll Button
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikescher committed Jul 27, 2020
1 parent c687ff4 commit 8f9d5db
Show file tree
Hide file tree
Showing 15 changed files with 245 additions and 82 deletions.
4 changes: 2 additions & 2 deletions Controller/DataController.cs
Expand Up @@ -23,12 +23,12 @@ public static async Task RefreshData(HttpContext context)
{
var idx = int.Parse((string)context.Request.RouteValues["idx"]);

using (var proxy = JobRegistry.DataCollectJobs.StartOrQueue((man) => new DataCollectJob(man, idx)))
using (var proxy = JobRegistry.DataCollectJobs.StartOrQueue((man) => new DataCollectJob(man, idx, true)))
{
while (proxy.JobRunningOrWaiting) await Task.Delay(50);

if (proxy.Killed) { context.Response.StatusCode = 500; await context.Response.WriteAsync("Job was killed prematurely"); return; }
if (proxy.Job.Result == null) { context.Response.StatusCode = 500; await context.Response.WriteAsync("Job returned no data"); return; }
if (proxy.Job.Result == null) { context.Response.StatusCode = 500; await context.Response.WriteAsync("Job returned no data"); return; }

await context.Response.WriteAsync(proxy.Job.Result);
}
Expand Down
45 changes: 35 additions & 10 deletions Controller/JobController.cs
Expand Up @@ -25,11 +25,26 @@ public static async Task List(HttpContext context)
})
.ThenByDescending(p => p["StartTime"].Value<string>())
.ToList();

var vidcache = Program.GetAllCachedData();

var r = new JObject
(
new JProperty("Meta", new JObject
(
new JProperty("Jobs", new JObject
(
new JProperty("CountActive", JobRegistry.Managers.Sum(p => p.CountActive)),
new JProperty("CountQueued", JobRegistry.Managers.Sum(p => p.CountQueued))
)),

new JProperty("Videos", new JObject
(
new JProperty("CountCachedPreviews", vidcache.Count(p => p["meta"]["cached_previews"].Value<bool>())),
new JProperty("CountCachedVideos", vidcache.Count(p => p["meta"]["cached"].Value<bool>())),
new JProperty("CountTotal", vidcache.Count)
)),

new JProperty("CountActive", JobRegistry.Managers.Sum(p => p.CountActive)),
new JProperty("CountQueued", JobRegistry.Managers.Sum(p => p.CountQueued))
)),
Expand Down Expand Up @@ -80,7 +95,7 @@ public static async Task ManuallyForcePreviewJobs(HttpContext context)
if (File.Exists(pathCache)) continue;

count++;
JobRegistry.PreviewGenJobs.StartOrQueue((man) => new PreviewGenJob(man, pathVideo, pathCache, null), false);
JobRegistry.PreviewGenJobs.StartOrQueue((man) => new PreviewGenJob(man, pathVideo, pathCache, null, obj["meta"].Value<int>("datadirindex"), obj["meta"].Value<string>("uid")), false);
}

await context.Response.WriteAsync($"Started/Attached {count} new jobs");
Expand Down Expand Up @@ -125,7 +140,7 @@ public static async Task ManuallyForceTranscodeJobs(HttpContext context)
if (File.Exists(pathCache)) continue;

count++;
JobRegistry.ConvertJobs.StartOrQueue((man) => new ConvertJob(man, pathVideo, pathCache), false);
JobRegistry.ConvertJobs.StartOrQueue((man) => new ConvertJob(man, pathVideo, pathCache, obj["meta"].Value<int>("datadirindex"), obj["meta"].Value<string>("uid")), false);
}

await context.Response.WriteAsync($"Started/Attached {count} new jobs");
Expand All @@ -140,13 +155,13 @@ public static async Task ManuallyForceCollectData(HttpContext context)
for (var i = 0; i < Program.Args.DataDirs.Count; i++)
{
var ddidx = i;
JobRegistry.DataCollectJobs.StartOrQueue((man) => new DataCollectJob(man, ddidx), false);
JobRegistry.DataCollectJobs.StartOrQueue((man) => new DataCollectJob(man, ddidx, true), false);
await context.Response.WriteAsync($"Started/Attached {Program.Args.DataDirs.Count} new jobs");
}
}
else
{;
JobRegistry.DataCollectJobs.StartOrQueue((man) => new DataCollectJob(man, int.Parse(idx)), false);
JobRegistry.DataCollectJobs.StartOrQueue((man) => new DataCollectJob(man, int.Parse(idx), true), false);
await context.Response.WriteAsync($"Started/Attached 1 new jobs");
}
}
Expand All @@ -155,17 +170,27 @@ public static async Task AbortJob(HttpContext context)
{
var jobid = (string)context.Request.RouteValues["jobid"];

foreach (var man in JobRegistry.Managers)
if (jobid == "all" || jobid == "*")
{
var count = 0;
foreach (var man in JobRegistry.Managers) count += man.AbortAllJobs();
await context.Response.WriteAsync($"OK, {count} jobs aborted");
}
else
{
if (man.AbortJob(jobid))
foreach (var man in JobRegistry.Managers)
{
await context.Response.WriteAsync($"Job aborted");
return;
if (man.AbortJob(jobid))
{
await context.Response.WriteAsync($"Job aborted");
return;
}
}

context.Response.StatusCode = 404;
await context.Response.WriteAsync($"Job not found");
}

context.Response.StatusCode = 404;
await context.Response.WriteAsync($"Job not found");
return;
}

Expand Down
14 changes: 7 additions & 7 deletions Controller/ThumbnailController.cs
Expand Up @@ -37,7 +37,7 @@ public static async Task GetThumbnail(HttpContext context)
{
if (pathVideo == null) { context.Response.StatusCode = 404; await context.Response.WriteAsync("Video file not found"); return; }

await GetPreviewImage(context, pathVideo, 1);
await GetPreviewImage(context, pathVideo, idx, id, 1);
return;
}

Expand All @@ -47,7 +47,7 @@ public static async Task GetThumbnail(HttpContext context)
{
// ensure that for all videos the previews are pre-generated
// so we don't have to start ffmpeg when we first hover
JobRegistry.PreviewGenJobs.StartOrQueue((man) => new PreviewGenJob(man, pathVideo, pathCache, null), false); // runs as background job
JobRegistry.PreviewGenJobs.StartOrQueue((man) => new PreviewGenJob(man, pathVideo, pathCache, null, idx, id), false); // runs as background job
}

var data = await File.ReadAllBytesAsync(pathThumbnail);
Expand All @@ -73,7 +73,7 @@ public static async Task GetAutoThumbnail(HttpContext context)

var pathVideo = obj["meta"]?.Value<string>("path_video");

await GetPreviewImage(context, pathVideo, 1);
await GetPreviewImage(context, pathVideo, idx, id, 1);
}

public static async Task GetPreview(HttpContext context)
Expand All @@ -88,10 +88,10 @@ public static async Task GetPreview(HttpContext context)

var pathVideo = obj["meta"]?.Value<string>("path_video");

await GetPreviewImage(context, pathVideo, img);
await GetPreviewImage(context, pathVideo, idx, id, img);
}

private static async Task GetPreviewImage(HttpContext context, string videopath, int imageIndex)
private static async Task GetPreviewImage(HttpContext context, string videopath, int datadirindex, string videouid, int imageIndex)
{
if (!Program.HasValidFFMPEG) { context.Response.StatusCode = 400; await context.Response.WriteAsync("No ffmpeg installation found"); return; }

Expand All @@ -101,7 +101,7 @@ private static async Task GetPreviewImage(HttpContext context, string videopath,

if (pathCache == null)
{
using (var proxy = JobRegistry.PreviewGenJobs.StartOrQueue((man) => new PreviewGenJob(man, videopath, null, imageIndex)))
using (var proxy = JobRegistry.PreviewGenJobs.StartOrQueue((man) => new PreviewGenJob(man, videopath, null, imageIndex, datadirindex, videouid)))
{
while (proxy.JobRunningOrWaiting) await Task.Delay(50);

Expand All @@ -120,7 +120,7 @@ private static async Task GetPreviewImage(HttpContext context, string videopath,

if (!File.Exists(pathCache))
{
using (var proxy = JobRegistry.PreviewGenJobs.StartOrQueue((man) => new PreviewGenJob(man, videopath, pathCache, null)))
using (var proxy = JobRegistry.PreviewGenJobs.StartOrQueue((man) => new PreviewGenJob(man, videopath, pathCache, null, datadirindex, videouid)))
{
while (proxy.JobRunningOrWaiting) await Task.Delay(50);
}
Expand Down
12 changes: 6 additions & 6 deletions Controller/VideoController.cs
Expand Up @@ -96,18 +96,18 @@ public static async Task GetVideoStream(HttpContext context)
}

if (pathCache != null)
await GetVideoStreamWithCache(context, pathVideo, pathCache);
await GetVideoStreamWithCache(context, pathVideo, pathCache, idx, id);
else
await GetVideoStreamWithoutCache(context, pathVideo);
await GetVideoStreamWithoutCache(context, pathVideo, idx, id);
}

private static async Task GetVideoStreamWithCache(HttpContext context, string pathVideo, string pathCache)
private static async Task GetVideoStreamWithCache(HttpContext context, string pathVideo, string pathCache, int datadirindex, string videouid)
{
if (!Program.HasValidFFMPEG) { context.Response.StatusCode = 400; await context.Response.WriteAsync("No ffmpeg installation found"); return; }

context.Response.Headers.Add(HeaderNames.ContentType, "video/webm");

using var proxy = JobRegistry.ConvertJobs.StartOrQueue((man) => new ConvertJob(man, pathVideo, pathCache));
using var proxy = JobRegistry.ConvertJobs.StartOrQueue((man) => new ConvertJob(man, pathVideo, pathCache, datadirindex, videouid));

while (proxy.JobRunningOrWaiting && !File.Exists(proxy.Job.Temp)) await Task.Delay(0);

Expand Down Expand Up @@ -148,13 +148,13 @@ private static async Task GetVideoStreamWithCache(HttpContext context, string pa
}
}

private static async Task GetVideoStreamWithoutCache(HttpContext context, string pathVideo)
private static async Task GetVideoStreamWithoutCache(HttpContext context, string pathVideo, int datadirindex, string videouid)
{
if (!Program.HasValidFFMPEG) { context.Response.StatusCode = 400; await context.Response.WriteAsync("No ffmpeg installation found"); return; }

context.Response.Headers.Add(HeaderNames.ContentType, "video/webm");

using var proxy = JobRegistry.ConvertJobs.StartOrQueue((man) => new ConvertJob(man, pathVideo, null));
using var proxy = JobRegistry.ConvertJobs.StartOrQueue((man) => new ConvertJob(man, pathVideo, null, datadirindex, videouid));

while (proxy.JobRunningOrWaiting && !File.Exists(proxy.Job.Temp)) await Task.Delay(0);

Expand Down
2 changes: 1 addition & 1 deletion CronMiddleware.cs
Expand Up @@ -42,7 +42,7 @@ private async Task Cron()
var ddidx = i;

await Console.Out.WriteLineAsync($"Start data refresh of [{ddidx}] by cron interval ({interval:g})");
JobRegistry.DataCollectJobs.StartOrQueue((man) => new DataCollectJob(man, ddidx), false);
JobRegistry.DataCollectJobs.StartOrQueue((man) => new DataCollectJob(man, ddidx, false), false);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions Jobs/AbsJobManager.cs
Expand Up @@ -32,5 +32,7 @@ protected AbsJobManager(int maxParallelism, string name)
public abstract bool AbortJob(string jobid);

public abstract void ClearFinishedJobs();

public abstract int AbortAllJobs();
}
}
18 changes: 14 additions & 4 deletions Jobs/Impl/ConvertJob.cs
Expand Up @@ -15,13 +15,18 @@ public class ConvertJob : Job
public readonly string Destination;
public readonly string Temp;

public readonly int DataDirIndex;
public readonly string VideoUID;

private (int, int) _progress = (0, 1);
public override (int, int) Progress => _progress;

public ConvertJob(AbsJobManager man, string src, string dst) : base(man, src)
public ConvertJob(AbsJobManager man, string src, string dst, int ddindex, string viduid) : base(man, src)
{
Destination = dst;
Temp = Path.Combine(Path.GetTempPath(), "yt_dl_v_" + Guid.NewGuid().ToString("B") + ".webm");
Destination = dst;
DataDirIndex = ddindex;
VideoUID = viduid;
Temp = Path.Combine(Path.GetTempPath(), "yt_dl_v_" + Guid.NewGuid().ToString("B") + ".webm");
}

public override string Name => $"Convert::'{Path.GetFileName(Source)}'";
Expand Down Expand Up @@ -118,8 +123,9 @@ protected override void Run()
else
{
ChangeState(JobState.Finished);
_progress = (1, 1);

_progress = (1, 1);

while (ProxyCount != 0) // Wait for proxies
{
if (AbortRequest) { ChangeState(JobState.Aborted); return; }
Expand All @@ -139,6 +145,10 @@ protected override void Run()
{
File.Move(Temp, Destination);
if (File.Exists(Destination) && new FileInfo(Destination).Length == 0) File.Delete(Destination);

Program.PatchDataCache(DataDirIndex, VideoUID, new[]{"meta", "cached"}, true);
Program.PatchDataCache(DataDirIndex, VideoUID, new[]{"meta", "cache_file"}, Destination);

break;
}
catch (IOException)
Expand Down
11 changes: 7 additions & 4 deletions Jobs/Impl/DataCollectJob.cs
Expand Up @@ -13,16 +13,18 @@ namespace youtube_dl_viewer.Jobs
public class DataCollectJob : Job
{
public readonly int Index;
public readonly bool ClearOld;

public string Result = null;
public (string json, Dictionary<string, JObject> obj)? FullResult = null;

private (int, int) _progress = (0, 1);
public override (int, int) Progress => _progress;

public DataCollectJob(AbsJobManager man, int index) : base(man, "self::"+index)
public DataCollectJob(AbsJobManager man, int index, bool clearOld) : base(man, "self::"+index)
{
Index = index;
Index = index;
ClearOld = clearOld;
}

public override string Name => $"DataCollect::{Index}::'{((Index>=0 && Index <= Program.Args.DataDirs.Count) ? Program.DataDirToString(Program.Args.DataDirs[Index]) : "ERR")}'";
Expand All @@ -34,9 +36,9 @@ public override void Abort()

protected override void Run()
{
lock (Program.DataCache)
if (ClearOld)
{
Program.DataCache[Index] = (null, null);
lock (Program.DataCache) Program.DataCache[Index] = (null, null);
}

var (jsonstr, jsonobj) = CreateData(Index);
Expand Down Expand Up @@ -138,6 +140,7 @@ protected override void Run()
new JProperty("meta", new JObject
(
new JProperty("uid", id),
new JProperty("datadirindex", index),

new JProperty("directory", dir),

Expand Down
25 changes: 18 additions & 7 deletions Jobs/Impl/PreviewGenJob.cs
Expand Up @@ -23,6 +23,9 @@ public class PreviewGenJob : Job
{
public readonly string Destination;
public readonly string TempDir;

public readonly int DataDirIndex;
public readonly string VideoUID;

private readonly int? _queryImageIndex;

Expand All @@ -32,11 +35,13 @@ public class PreviewGenJob : Job
private (int, int) _progress = (0, Program.Args.MaxPreviewImageCount+1);
public override (int, int) Progress => _progress;

public PreviewGenJob(AbsJobManager man, string src, string dst, int? imgIdx) : base(man, src)
public PreviewGenJob(AbsJobManager man, string src, string dst, int? imgIdx, int ddindex, string viduid) : base(man, src)
{
Destination = dst;
Destination = dst;
DataDirIndex = ddindex;
VideoUID = viduid;
_queryImageIndex = imgIdx;
TempDir = Path.Combine(Path.GetTempPath(), "yt_dl_p_" + Guid.NewGuid().ToString("B"));
TempDir = Path.Combine(Path.GetTempPath(), "yt_dl_p_" + Guid.NewGuid().ToString("B"));
Directory.CreateDirectory(TempDir);
}

Expand Down Expand Up @@ -181,11 +186,17 @@ protected override void Run()

ms.Write(bin);
}
using (var fs = new FileStream(Destination, FileMode.Create, FileAccess.Write))

if (Destination != null)
{
ms.Seek(0, SeekOrigin.Begin);
ms.CopyTo(fs);
using (var fs = new FileStream(Destination, FileMode.Create, FileAccess.Write))
{
ms.Seek(0, SeekOrigin.Begin);
ms.CopyTo(fs);
}

Program.PatchDataCache(DataDirIndex, VideoUID, new[]{"meta", "cached_previews"}, true);
Program.PatchDataCache(DataDirIndex, VideoUID, new[]{"meta", "previewscache_file"}, Destination);
}
}

Expand Down
2 changes: 1 addition & 1 deletion Jobs/Job.cs
Expand Up @@ -55,7 +55,7 @@ public TimeSpan Time

protected Job(AbsJobManager man, string source)
{
ID = Guid.NewGuid().ToString("B").ToUpper();
ID = Guid.NewGuid().ToString("N").ToUpper();
Source = source;
_manager = man;
}
Expand Down

0 comments on commit 8f9d5db

Please sign in to comment.