Skip to content

Async & TPL

Sandesh Kota edited this page Apr 1, 2019 · 23 revisions
  • Async : Responsiveness (backend)
  • Parallel : Performance (divide workload)
  • Risk:
    • Race Conditions
    • Dependency -> ensure that you wait for the dependent task to have the required data

Task

  • Creating a Task in parallel (depends on # of cores also)
    • Single core: threads are shared - Still best for responsiveness (ex: main thread is for UI)
    • Multi Core: threads run in parallel
      • Only the main thread/ UI thread can update UI
using System.Threading.Tasks;

Task t = new Task( () => { 
  // code
});
t.Start();

// another way to create and start task
task tnew = task.Factory.StartNew( _code_ );
  • Safe Way (typesafe, reduce the race condition issues)
Task<int> t = Task.Factory.StartNew( () => 
{
  // ...
  return 1;
});
int r = t.Result;      // implicitlly calls t.Wait()

WAIT

Task t = new Task( () => { 
  // code
});
t.Start();
t.Wait();

int r = t.Result; // implicitlly calls t.Wait()


// Multiple tasks
Task t1 = Task.Factory.startNew( code );
Task t2 = Task.Factory.startNew( code );
Task t3 = Task.Factory.startNew( code );

Task[] tasks = {t1, t2, t3 };
Task.WaitAll( tasks ); // wait for ALL to finish

int index = Task.WaitAny( tasks );  // wait for FIRST to finish
Task completedTask = tasks[index];

// Wait All One By One - (when some may fail | hide latency)
List<task<TResult> tasks = new List<task<TResult>();
for (int i =0 ....)
  tasks.Add( Task.Factory.StartNew( code ) );

while (tasks.Count > 0)
{
  int index = Task.WaitAny( tasks.ToArray() );
  // ... process tasks[index].Result
  tasks.removeAt(index);
}

Task Dependency

Task t1 = Task.Factory.StartNew( code );

Task t2 = Task.Factory.StartNew( ()=>
{
  t1.wait();
  // code
});
OR
Task t2 = t1.ContinueWith( (antecedent) => { } ); // starts when t1 completes / fails

Variations (WaitAll / WaitAny)

Task[] tasks = {t1, t2, t3};
Task.Factory.ContinueWhenAll(tasks, (setOfTasks) =>
{
  // when all are finished
});
Task.Factory.ContinueWhenAny(tasks, (firstTask) =>
{
  // when firstTask has finished
});

Exceptions

  • When a Task throws an exception it is stored and then re-thrown when .Result , .Wait OR .WaitAll is called
Task<int> t = Task.Factory.StartNew( code );
int r = t.Result;

try {
  int r = t.Result;
} catch(AggregateException ae){
  Console.WriteLine(ae.InnerException.Message);
}
  • If a Task creates sub-tasks, the result could be a tree of exception
catch(AggregateException ae){
  ae = ae.Flatten();
  foreach(var ex in ex.InnerExceptions)
    Console.WriteLine(ex.Message);
}
  • Exception Handling
    • You should design to observe all unhandled exception
      • otherwise exception is re-thrown when a task is garbage-collected
    • 4 techniques to observe (follow any 1)
      • call .Wait or .result
      • call Task.WaitAll
      • call .Exception property after task has completed
      • subscribe to TaskScheduler.UnobservedTaskException : The best since we can handle even exceptions from 3rd party libraries which uses TPL
      TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptioneventArgs>(TaskEx_Handler);
      
      static void TaskEx_Handler(object sender, UnobservedTaskExceptioneventArgs e)
      {
        Console.WriteLine("Unobserver: " + e.Exception.Message);
        e.SetObserved();
      }
      

Task Cancellation

  • Creator passes a cancellation token, start task, later signals cancel...
  • Task monitors token, if cancelled performs cleanup & throws exception
var cts = new CancellationTokenSource();
var token = cts.Token;
Task t = Task.Factory.StartNew( () => 
  {
     // code
     // while    if(token.IsCancellationRequested) { //cleanup:    token.ThrowIfcancellationRequested(); }
  },
  token
);

// cancelling
cts.Cancel();

Task Priorities

  • Child tasks attach to parent
  • Parent task doesn't complete until all children complete
Task parent = Task.Factory.StartNew( () =>
{
  Task child1 = Task.Factory.StartNew( () => {}, TaskCreationOptions.AttachedToParent );
});

Scenarios

  • Issue
for (int i = 0; i < 10; i++ )
{
  Task.Factory.StartNew( () => {
    int taskid = i;
    Console.WriteLine(taskid);
  });
}

//prints: 10, 10, 10 ..... because the tasks are created in the loop but when the task actually runs the i value would have been incremented to 10 (i++) and so that is used for all the tasks (race condition)
  • Fix (pass i as argument to the task)
for (int i = 0; i < 10; i++ )
{
  Task.Factory.StartNew( (arg) => {
      int taskid = (int)arg;
      Console.WriteLine(taskid);
    },
    i
  );
}

Parallel Invoke

Parallel.Invoke(
  () => { ...code... },
  () => { ...code... },
  () => { ...code... }
);

Clone this wiki locally