# Akka.Streams Graph Lifecycles
Just like how Akka.NET actors have discrete lifecycles, so do Akka.Streams graphs!

* **Running** – the stream is currently processing elements and has not run out of elements to process.
* **Completed** – the stream has terminated because:
    * The `Source<T>` ran out of elements or
    * One or more downstream stages signaled completion.
* **Failed** – the stream terminated because an error was thrown in one of the stages.

Let's consider some examples.

## Completed Streams
A stream completes usually when it's completed processing all possible inputs - or if one of the graph stages signals that it wants to terminate, i.e. a [`KillSwitch`](https://getakka.net/api/Akka.Streams.IKillSwitch.html).

In [1]:
#r "nuget: Akka.Streams, 1.4.47"

using System.Linq;
using System.Collections.Immutable;
using System.Threading;
using Akka;
using Akka.Actor;
using Akka.Streams;
using Akka.Streams.Dsl;

ActorSystem actorSystem = ActorSystem.Create("StreamsExample");

IMaterializer materializer = actorSystem.Materializer();

// a source representing a range of integers
Source<int, NotUsed> source1 = Source.From(Enumerable.Range(1, 10));

// a source representing a single string value
Source<string, NotUsed> source2 = Source.Single("a");

// let's combine these two sources such that we create 10 int / string tuples
IAsyncEnumerable<(int i, string s)> merged1 = source1.Zip(source2).RunAsAsyncEnumerable(materializer);

await foreach(var (i, s) in merged1){
    Console.WriteLine($"{i}-->{s}");
}

1-->a


This stream terminated with a single output, despite the fact that we had `source1` defined as 

```csharp
Source<int, NotUsed> source1 = Source.From(Enumerable.Range(1, 10));
```

Which is at least 10 elements! What gives?

Well in this case, it's because we combined `source1` with `source2`, which is defined as

```csharp
Source<string, NotUsed> source2 = Source.Single("a");
```

A single element! So due to the semantics of how the `Zip<T>` stage works - it will "complete" once at least one of its upstream `Source<T>`s completes, the entire stream terminates with a single output.

So how could we fix this in order to ensure that we had all 10 outputs from `source1` get rendered?

In [5]:
// a source representing a single string value
// BUT this source will be repeated each time a downstream requests it.
Source<string, NotUsed> source3 = Source.Repeat("a");

// let's combine these two sources such that we create 10 int / string tuples
IAsyncEnumerable<(int i, string s)> merged1 = source1.Zip(source3).RunAsAsyncEnumerable(materializer);

await foreach(var (i, s) in merged1){
    Console.WriteLine($"{i}-->{s}");
}

1-->a
2-->a
3-->a
4-->a
5-->a
6-->a
7-->a
8-->a
9-->a
10-->a


Now we've had all 10 values from `source1` written out, all because we changed to a new `Source<T>` that had different semantics.

> **N.B.** This type of use case is exactly why it's a good idea to read "[Overview of Built-In Akka.Streams Stages and Their Semantics](https://getakka.net/articles/streams/builtinstages.html)" as it spells out exactly how these streams stages work with regard to lifetime, completion order, and so on.

## Infinite Streams
Many of the examples we've used so far have been for "finite" streams - where the number of possible inputs and outputs can be determined ahead of actually running the code.

I.e. when we create a `Source<int>` from a `List<int>`, we know how many items in the `List<int>` will need to be processed before we reach the end. That's an example of a finite stream.

An infinite stream, on the other hand, is unbounded - it has no discrete end, so it runs until it's told to terminate via some external or internal signal such as an `IKillSwitch`.

In [7]:
// create a source that will be materialized into an IActorRef
Source<string, IActorRef> actorSource = Source.ActorRef<string>(1000, OverflowStrategy.DropHead);
var (preMaterializedRef, standAloneSrc) = actorSource.PreMaterialize(materializer);

// materialize the rest of the stream into an IAsyncEnumerable
IAsyncEnumerable<string> strResponses = standAloneSrc.Via(Flow.Create<string>().Select(x => x.ToLowerInvariant())).RunAsAsyncEnumerable(materializer);

// send some messages to our head actor to drive the stream
preMaterializedRef.Tell("HIT1");
preMaterializedRef.Tell("HIT2");
preMaterializedRef.Tell("HIT3");
//preMaterializedRef.GracefulStop(TimeSpan.FromSeconds(1));

// need to timeout our IAsyncEnumerable otherwise it will run forever (by design)
var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(500));

await foreach(var str in strResponses.WithCancellation(cts.Token)){
    Console.WriteLine(str);
}

hit1
hit2
hit3


In order to get this code sample to terminate, we had to pass in a `CancellationToken` and invoke it behind the scenes - otherwise we would wait on our `IAsyncEnumerable` indefinitely!

### Using KillSwitches to Terminate Streams

However, another approach we could have used is an `IKillSwitch` to programmatically terminate the stream when we're done with it - even though we could, in theory, continue to process more events.

In [8]:
// create another source that will be materialized into an IActorRef
Source<string, IActorRef> actorSource2 = Source.ActorRef<string>(1000, OverflowStrategy.DropTail);
var (preMaterializedRef2, standAloneSrc2) = actorSource2.PreMaterialize(materializer);

// going to use this as part of our KillSwitch
var cts = new CancellationTokenSource();

// materialize the rest of the stream into an IAsyncEnumerable
IAsyncEnumerable<string> strResponses2 = standAloneSrc2.Via(Flow.Create<string>()
    .Select(x => x.ToLowerInvariant()))
    .Via(cts.Token.AsFlow<string>(true))
    //.Via(KillSwitches.AsFlow<string>(cts.Token, cancelGracefully:true))
    .RunAsAsyncEnumerable(materializer);

// send some messages to our head actor to drive the stream
preMaterializedRef2.Tell("HIT1");
preMaterializedRef2.Tell("HIT2");
preMaterializedRef2.Tell("HIT3");

var count = 0;
await foreach(var str in strResponses2){
    Console.WriteLine(str);
    if(++count == 3){
        cts.Cancel(); // shut down the stream
    }
}
Console.WriteLine("Stream completed!");

hit1
hit2
hit3
Stream completed!


There are other ways to instrument a kill switch - including "shared" kill switches that can be used to terminate multiple streams at once, but this is the most idiomatic way of doing it in .NET.

### Other Ways of Completing Streams
Some stream stages support their own custom methods of completion - the `Source.ActorRef<T>` being one of them. We can complete this stage by stopping the `IActorRef` we receive from the materializer.

In [None]:
// create a source that will be materialized into an IActorRef
Source<string, IActorRef> actorSource3 = Source.ActorRef<string>(1000, OverflowStrategy.DropTail);
var (preMaterializedRef3, standAloneSrc3) = actorSource3.PreMaterialize(materializer);

// materialize the rest of the stream into an IAsyncEnumerable
IAsyncEnumerable<string> strResponses3 = standAloneSrc3.Via(Flow.Create<string>().Select(x => x.ToLowerInvariant())).RunAsAsyncEnumerable(materializer);

// send some messages to our head actor to drive the stream
preMaterializedRef3.Tell("HIT1");
preMaterializedRef3.Tell("HIT2");
preMaterializedRef3.Tell("HIT3");
preMaterializedRef3.Tell(PoisonPill.Instance);

await foreach(var str in strResponses3){
    Console.WriteLine(str);
}
Console.WriteLine("Stream completed!");

hit1
hit2
hit3
Stream completed!


And there you have it - stream completed via sending the `IActorRef` a `PoisonPill` message.

## Dealing with Failures
Failures are a part of every programming exercise - and Akka.Streams is no different.

In [9]:
// create a source of integers - including one bad apple (zero)
Source<int, NotUsed> numbers = Source.From(new []{ 9,8,7,6,0,5,4,3,2,1 });

IAsyncEnumerable<string> integerDivision = numbers.Via(Flow.Create<int>()
    .Select(x => $"1/{x} is {1/x} w/ integer division"))
    .RunAsAsyncEnumerable(materializer);

await foreach(var d in integerDivision){
    Console.WriteLine(d);
}

1/9 is 0 w/ integer division
1/8 is 0 w/ integer division
1/7 is 0 w/ integer division
1/6 is 0 w/ integer division


Error: System.DivideByZeroException: Attempted to divide by zero.
   at Submission#10.<>c.<<Initialize>>b__0_0(Int32 x)
   at Akka.Streams.Implementation.Fusing.Select`2.Logic.OnPush()
--- End of stack trace from previous location ---
   at Akka.Streams.Dsl.SinkQueueAsyncEnumerator`1.MoveNextAsync()
   at Submission#10.<<Initialize>>d__0.MoveNext()
--- End of stack trace from previous location ---
   at Submission#10.<<Initialize>>d__0.MoveNext()
--- End of stack trace from previous location ---
   at Microsoft.CodeAnalysis.Scripting.ScriptExecutionState.RunSubmissionsAsync[TResult](ImmutableArray`1 precedingExecutors, Func`2 currentExecutor, StrongBox`1 exceptionHolderOpt, Func`2 catchExceptionOpt, CancellationToken cancellationToken)

When the stream attempts to divide by zero we get a `System.DivideByZeroException` and the stream stops processing any additional items, even though there might still be outstanding work to perform inside the stream.

In this example it would be pretty easy to fix the source of the bug by just checking for zero or converting to a floating-point number first, but let's take a look at how we can recover from failures using Akka.Streams.

### Handling Failed Akka.Streams Stages
One of the simplest methods we can use to [handle errors in Akka.Streams stages is the `Recover` method](https://getakka.net/articles/streams/error-handling.html#recover).

In [10]:
using Akka.Util; // need this for Option<T>, which Recover depends upon

IAsyncEnumerable<string> integerDivision2 = numbers.Via(Flow.Create<int>()
    .Select(x => $"1/{x} is {1/x} w/ integer division"))
    .Recover(ex => {
        if(ex is DivideByZeroException){
            return new Option<string>("Whoops - attempted to divide by zero!");
        }

        // otherwise just return nothing - a gap will appear in the output
        return Option<string>.None;
    })
    .RunAsAsyncEnumerable(materializer);

await foreach(var d in integerDivision2){
    Console.WriteLine(d);
}

1/9 is 0 w/ integer division
1/8 is 0 w/ integer division
1/7 is 0 w/ integer division
1/6 is 0 w/ integer division
Whoops - attempted to divide by zero!


In this instance, the stream still terminated without processing all of its inputs - but rather than throwing an exception we had a chance to return a final output that could be processed by our `Sink<T>` instead.

What if we want something more robust? What if we want the opportunity to retry a failed stream stage and keep processing the rest of the upstream events? This is [where `RecoverWithRetries<T>` can be very helpful](https://getakka.net/articles/streams/error-handling.html#recover-with-retries)!

In [13]:
IAsyncEnumerable<string> integerDivision3 = numbers.Via(Flow.Create<int>()
    .Select(x => $"1/{x} is {1/x} w/ integer division")
    .RecoverWithRetries(ex => {
        if(ex is DivideByZeroException){
            // have to return a new Source<int> here
            return numbers.Via(Flow.Create<int>()
                .SkipWhile(x => x != 0)
                .Where(x => x != 0) // so we can skip the 0 element itself too
                .Select(x => $"1/{x} is {1/x} w/ integer division"));
        }

        // otherwise just bail and don't attempt to recover
        return null;
    }, attempts: 3)) // allow up to three restart attempts
    .RunAsAsyncEnumerable(materializer);

await foreach(var d in integerDivision3){
    Console.WriteLine(d);
}

1/9 is 0 w/ integer division
1/8 is 0 w/ integer division
1/7 is 0 w/ integer division
1/6 is 0 w/ integer division
1/5 is 0 w/ integer division
1/4 is 0 w/ integer division
1/3 is 0 w/ integer division
1/2 is 0 w/ integer division
1/1 is 1 w/ integer division


Alright! We got all of the non-zero output this time - but that `RecoverWithRetries` stage was really cumbersome to write for this particular case. I wonder if there's an even better way to do this?

In [15]:
using Akka.Streams.Supervision; // needed for stream-stage deciders

// create a custom Decider with a "Restart" directive in the event of DivideByZeroException
Akka.Streams.Supervision.Decider decider = cause => cause is DivideByZeroException
    ? Akka.Streams.Supervision.Directive.Restart
    : Akka.Streams.Supervision.Directive.Stop;

IAsyncEnumerable<string> integerDivision = numbers.Via(Flow.Create<int>()
    .Select(x => $"1/{x} is {1/x} w/ integer division"))
    .WithAttributes(ActorAttributes.CreateSupervisionStrategy(decider))
    .RunAsAsyncEnumerable(materializer);

await foreach(var d in integerDivision){
    Console.WriteLine(d);
}

1/9 is 0 w/ integer division
1/8 is 0 w/ integer division
1/7 is 0 w/ integer division
1/6 is 0 w/ integer division
1/5 is 0 w/ integer division
1/4 is 0 w/ integer division
1/3 is 0 w/ integer division
1/2 is 0 w/ integer division
1/1 is 1 w/ integer division


Similar to [how actors are programmed to handle their children's failures, primarily by restarting them](https://petabridge.com/blog/akkadotnet-actors-restart/), so too can Akka.Streams stages be handled using some metadata we provide during stream construction.

This is exactly what we've done by [defining a `Decider`](https://getakka.net/api/Akka.Streams.Supervision.Decider.html) - the algorithm that gets used to evaluate a stream stage's `Exception`s when they're thrown:

```csharp
Akka.Streams.Supervision.Decider decider = cause => cause is DivideByZeroException
    ? Akka.Streams.Supervision.Directive.Restart
    : Akka.Streams.Supervision.Directive.Stop;
```

By default a `Directive.Stop` will be returned - hence why we had to go through so much trouble using the `RecoverWithRetries` stage to rebuild the stream - we literally had to recreate the entire upstream part of the stream in order to resume processing.

In this case, however, we can simplify our error handling by configuring the underlying actors that are created by the `IMaterializer` to execute the graph to simply restart in the event of a `DivideByZeroException`. This is exactly what we did when we passed in these `ActorAttributes`:

```csharp
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(decider))
```

There are a number of other attributes you can configure in your streams, such as:

* The name of the stream stage actor;
* The dispatcher it runs on;
* And a variety of other settings, which need to be more well-documented.

But for now you should have the tools to properly manage the lifecycles of your streams.