Permalink
Browse files

Refactored APIs a bit. Moved some responsibilities around. Need to st…

…art writing more tests.
  • Loading branch information...
1 parent 4e813cd commit cf6dcd8c94edaf57b932785b69a0733413e181a7 unknown committed Oct 12, 2011
View
@@ -4,4 +4,5 @@ obj
*.suo
App_Data
*.results.xml
-/src/packages
+/src/packages
+_ReSharper.*
@@ -1,7 +0,0 @@
-using System.Data.Entity;
-
-namespace WebBackgrounder.EntityFramework.Entities {
- public class JobContext : DbContext {
- public DbSet<Job> Jobs { get; set; }
- }
-}
@@ -1,9 +1,9 @@
using System;
namespace WebBackgrounder.EntityFramework.Entities {
- public class Job {
+ public class JobWorker {
public int Id { get; set; }
- public string JobName { get; set; }
+ public string Name { get; set; }
public Guid WorkerId { get; set; }
public int Status { get; set; }
}
@@ -0,0 +1,7 @@
+using System.Data.Entity;
+
+namespace WebBackgrounder.EntityFramework.Entities {
+ public class JobsContext : DbContext {
+ public DbSet<JobWorker> JobWorkers { get; set; }
+ }
+}
@@ -1,82 +0,0 @@
-using System;
-using System.Linq;
-using WebBackgrounder.EntityFramework.Entities;
-
-namespace WebBackgrounder.EntityFramework {
- public class EntityJobRepository : IJobRepository {
- readonly JobContext _context;
- public EntityJobRepository(JobContext context) {
- _context = context;
- }
-
- public bool PendingJobsExist(string jobName) {
- var pending = from job in _context.Jobs
- where job.JobName == jobName
- && job.Status != (int)JobStatus.Done
- && job.Status != (int)JobStatus.Ignored
- select job;
- return pending.Any();
- }
-
- public void CreateJobRequest(string jobName, Guid workerId) {
- var job = new Job { JobName = jobName, WorkerId = workerId, Status = (int)JobStatus.Ready };
- _context.Jobs.Add(job);
- _context.SaveChanges();
- }
-
- public Guid GetWorkerIdForJob(string jobName) {
- // Look for the oldest ready job.
- var winner = (from job in _context.Jobs
- where job.JobName == jobName
- && (job.Status == (int)JobStatus.Ready
- || job.Status == (int)JobStatus.Started)
- orderby job.Id ascending
- select job).FirstOrDefault();
-
- if (winner == null) {
- return Guid.Empty;
- }
-
- return winner.WorkerId;
- }
-
- public void CompleteJob(string jobName, Guid workerId) {
- var losers = from job in _context.Jobs
- where job.JobName == jobName
- && job.Status == (int)JobStatus.Ready
- && job.WorkerId != workerId
- select job;
-
- foreach (var loser in losers) {
- loser.Status = (int)JobStatus.Ignored;
- }
- _context.SaveChanges();
-
- var winner = (from job in _context.Jobs
- where job.JobName == jobName
- && job.Status == (int)JobStatus.Ready
- && job.WorkerId == workerId
- select job).FirstOrDefault();
- if (winner == null) {
- throw new InvalidOperationException("No job ready for worker " + workerId);
- }
-
- winner.Status = (int)JobStatus.Done;
- _context.SaveChanges();
- }
-
- public void StartWork(string jobName, Guid workerId) {
- var winner = (from job in _context.Jobs
- where job.JobName == jobName
- && job.Status == (int)JobStatus.Ready
- && job.WorkerId == workerId
- select job).FirstOrDefault();
- if (winner == null) {
- throw new InvalidOperationException("No job ready for worker " + workerId);
- }
-
- winner.Status = (int)JobStatus.Started;
- _context.SaveChanges();
- }
- }
-}
@@ -0,0 +1,78 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using WebBackgrounder.EntityFramework.Entities;
+
+namespace WebBackgrounder.EntityFramework {
+ public class EntityJobWorkerRepository {
+ readonly JobsContext _context;
+ readonly IEnumerable<JobWorker> _workers;
+ readonly string _jobName;
+
+ public EntityJobWorkerRepository(JobsContext context, string jobName) {
+ _context = context;
+ _jobName = jobName;
+ _workers = from worker in _context.JobWorkers
+ where worker.Name == jobName
+ && worker.Status < (int)JobStatus.Complete
+ select worker;
+ }
+
+ public bool AnyActiveWorkers() {
+ return _workers.Any();
+ }
+
+ public JobUnitOfWork ReserveWorker(Guid workerId) {
+ var worker = new JobWorker { Name = _jobName, WorkerId = workerId, Status = (int)JobStatus.Ready };
+ _context.JobWorkers.Add(worker);
+ _context.SaveChanges();
+
+ var currentWorker = GetCurrentWorker();
+ return currentWorker.WorkerId == workerId ? new JobUnitOfWork(this, currentWorker) : null;
+ }
+
+ // Retrieves the oldest (aka first inserted) job with the same job name.
+ JobWorker GetCurrentWorker() {
+ // REVIEW: Make absolutely sure this queries the database and doesn't look in some context cache.
+ var currentWorker = (from worker in _context.JobWorkers
+ where worker.Name == _jobName
+ && worker.Status == (int)JobStatus.Ready
+ orderby worker.Id ascending
+ select worker).FirstOrDefault();
+
+ if (currentWorker == null) {
+ throw new InvalidOperationException("Could not find a job to handle this worker, even though we should have just created one.");
+ }
+
+ return currentWorker;
+ }
+
+ public void SetWorkerStarted(JobWorker worker) {
+ worker.Status = (int)JobStatus.Started;
+ _context.SaveChanges();
+ }
+
+ public void SetWorkerComplete(JobWorker worker) {
+ worker.Status = (int)JobStatus.Started;
+ _context.SaveChanges();
+ }
+
+ public void SetWorkerFailed(JobWorker worker) {
+ worker.Status = (int)JobStatus.Started;
+ _context.SaveChanges();
+ }
+
+ public void UpdateIgnoredWorkers(JobWorker currentWorker) {
+ var ignoredWorkers = from worker in _workers
+ where worker.Status == (int)JobStatus.Ready
+ && worker.Name == currentWorker.Name
+ && worker.WorkerId != currentWorker.WorkerId
+ select worker;
+
+ foreach (var loser in ignoredWorkers) {
+ loser.Status = (int)JobStatus.Ignored;
+ }
+ _context.SaveChanges();
+ }
+ }
+}
@@ -1,37 +0,0 @@
-using System;
-
-namespace WebBackgrounder.EntityFramework {
- public interface IJobRepository {
- /// <summary>
- /// Returns true if the specified job is currently pending or running.
- /// </summary>
- /// <param name="jobName"></param>
- /// <returns></returns>
- bool PendingJobsExist(string jobName);
-
- /// <summary>
- /// Creates a request from a worker that it's ready to take on a
- /// job. It might not necessarily get it.
- /// </summary>
- void CreateJobRequest(string jobName, Guid workerId);
-
- /// <summary>
- /// Returns the worker Id that gets to do the job.
- /// </summary>
- /// <param name="jobName"></param>
- /// <returns></returns>
- Guid GetWorkerIdForJob(string jobName);
-
- /// <summary>
- /// Indicates that the worker is about to start.
- /// </summary>
- /// <param name="jobName"></param>
- void StartWork(string jobName, Guid workerId);
-
- /// <summary>
- /// Marks the job as complete.
- /// </summary>
- /// <param name="jobName"></param>
- void CompleteJob(string jobName, Guid workerId);
- }
-}
@@ -1,9 +1,10 @@
namespace WebBackgrounder.EntityFramework {
public enum JobStatus {
- Ignored = 0,
Ready = 1,
Started = 2,
- Done = 3
+ Complete = 3,
+ Failed = 4,
+ Ignored = 5
}
}
@@ -0,0 +1,28 @@
+using System;
+using WebBackgrounder.EntityFramework.Entities;
+
+namespace WebBackgrounder.EntityFramework {
+ public class JobUnitOfWork : IDisposable {
+ readonly EntityJobWorkerRepository _repository;
+ readonly JobWorker _currentJob;
+ bool _finished;
+
+ public JobUnitOfWork(EntityJobWorkerRepository repository, JobWorker job) {
+ _currentJob = job;
+ _repository = repository;
+ _repository.SetWorkerStarted(_currentJob);
+ }
+
+ public void Complete() {
+ _repository.UpdateIgnoredWorkers(_currentJob);
+ _repository.SetWorkerComplete(_currentJob);
+ _finished = true;
+ }
+
+ public void Dispose() {
+ if (!_finished) {
+ _repository.SetWorkerFailed(_currentJob);
+ }
+ }
+ }
+}
@@ -48,11 +48,11 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
- <Compile Include="Entities\Job.cs" />
- <Compile Include="Entities\JobContext.cs" />
- <Compile Include="IJobRepository.cs" />
+ <Compile Include="Entities\JobWorker.cs" />
+ <Compile Include="Entities\JobsContext.cs" />
<Compile Include="JobStatus.cs" />
- <Compile Include="EntityJobRepository.cs" />
+ <Compile Include="EntityJobWorkerRepository.cs" />
+ <Compile Include="JobUnitOfWork.cs" />
<Compile Include="WebFarmJobCoordinator.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
@@ -1,53 +1,35 @@
using System;
-using System.Transactions;
+using WebBackgrounder.EntityFramework.Entities;
namespace WebBackgrounder.EntityFramework {
/// <summary>
/// Uses the database accessed via EF Code First to coordinate jobs in a web farm.
/// </summary>
public class WebFarmJobCoordinator : IJobCoordinator {
- readonly IJobRepository _repository;
+ readonly JobsContext _context;
+ readonly Guid _workerId = Guid.NewGuid();
- public WebFarmJobCoordinator(IJobRepository repository) {
- _repository = repository;
+ public WebFarmJobCoordinator(JobsContext context) {
+ _context = context;
}
- public bool CanDoWork(string jobName, Guid workerId) {
- bool canDoWork = false;
- using (var transaction = new TransactionScope()) {
- if (!_repository.PendingJobsExist(jobName)) {
- _repository.CreateJobRequest(jobName, workerId);
- canDoWork = _repository.GetWorkerIdForJob(jobName) == workerId;
- }
- transaction.Complete();
+ public void PerformWork(IJob jobWorker) {
+ // We need a new instance every time we perform work.
+ var repository = new EntityJobWorkerRepository(_context, jobWorker.Name);
+
+ // TODO: If the pending job belongs to this worker, we need to deal with that.
+ if (repository.AnyActiveWorkers()) {
+ return;
}
- return canDoWork;
- }
-
- public void Done(string jobName, Guid workerId) {
- _repository.CompleteJob(jobName, workerId);
- }
-
- public IDisposable StartWork(string jobName, Guid workerId) {
- using (var transaction = new TransactionScope()) {
- _repository.StartWork(jobName, workerId);
- }
- return new WorkScope(this, jobName, workerId);
- }
-
- private class WorkScope : IDisposable {
- IJobCoordinator _coordinator;
- string _jobName;
- Guid _workerId;
- public WorkScope(IJobCoordinator coordinator, string jobName, Guid workerId) {
- _coordinator = coordinator;
- _jobName = jobName;
- _workerId = workerId;
+ var unitOfWork = repository.ReserveWorker(_workerId);
+ if (unitOfWork == null) {
+ return;
}
- public void Dispose() {
- _coordinator.Done(_jobName, _workerId);
+ using (unitOfWork) {
+ jobWorker.Execute();
@davidfowl
davidfowl Oct 12, 2011 Member

This is async, should be:

jobWorker.Execute().ContinueWith(_ => unitOfWork.Dispose());
+ unitOfWork.Complete();
}
}
}
Oops, something went wrong.

0 comments on commit cf6dcd8

Please sign in to comment.