Skip to content

Adding exactly once processing to Clonos

Pedro Silvestre edited this page Apr 2, 2020 · 24 revisions

Highly-available Streaming

The standby-operators are currently able to take over execution of a failed operator and reconfigure connections. But they produce duplicate records to the downstream, which we want to avoid in order to provide exactly-once processing guarantees.

Approach 1: RecordIDs

Assumptions:

  • Deterministic computations (no uses of random or physical time)
  • Order insensitive computations
    • For any two input records coming from different input streams, the results given by the operator should be the same no matter the order in which they are received. (This means that general stateful operators are not allowed?) If one requires two events to be ordered, they should have the same key and be ordered in Kafka.
  • Deterministic routing (no shuffle)

Idea:

Each record in a stream carries a unique RecordID. Operators that receive events coming from input sources generate a RecordID for each record. Subsequent operators combine input RecordIDs into output RecordIDs. After a successful checkpoint RecordIDs are not required any more and they are garbage-collected. Thus, they can be reused.

Operators can deduplicate records in each of their input streams using the RecordIDs. This ensures that while replaying records during failure recovery, the same record is not processed twice.

To guarantee that Records in a stream-epoch have unique recordIDs, each operator has to also detect the creation of duplicates in its outputs. If a duplicate is detected, a new ID is forcefully generated, and this relationship is stably synchronously recorded. This is expensive but rare.

Lineage based RecordID generation

Let IRD be Input RecordID and ORD be Output RecordID, H represent a hashing operation and let (+) be some commutative and distributative binary operation on RecordIDs (XOR or ADD). Different operator types will need to combine IRDs in different ways to generate their ORD. ORD will have the value of some function F of the input RecordIDs and other state the operator may maintain:

  • Map: ORD = F(IRD) = IRD
  • FlatMap: ORD = F(IRD, count) = H(IRD + count), where count counts how many records the FlatMap has produced.
  • RollingReduction: ORD = F(IRD, RRI) = RRI (+) IRD, where RRI is a rolling (+) reduction of all previous RecordIDs. [Breaks order insensitive computation assumption]
  • Window: ORD = F({Record.IRD | Record \in Window Pane}) = (+) {IRD | Record \in Window Pane}

Issues

In this figure, a Rolling Sum operator is shown. It has an operator downstream from it, and two upstream operators, which are shown simply as queues. Each square in the queues is a record. Blue records have already affected the downstream causally (a corresponding output of Rolling Sum has been processed downstream).

On recovery of a failure of the "Rolling Sum" operator, the records from U_1 may all be replayed before the ones from U_2. Note that this is just one possibility. In fact, all order combinations may occur. In this particular example, the id generated for input U_1^n will be the same as pre-failure and thus will be deduplicated. While records U_1^{n+1} and U_1^{n+2} are completely new and will not be deduplicated downstream, but they will affect the reducers' current "reduction RecordID" (the id it keeps and reduces new input recordIDs into). Thus, when the records U_2^n and U_2^{n+1} (whose corresponding output records have already been processed by the downstream) are processed by the Rolling Sum, they will produce completely new RecordIDs and won't be deduplicated downstream.

  • What does this mean?
    • Records may be processed twice.
    • The approach doesn't work for stateful order-sensitive operators (it does work for window operators). Although this is listed in our assumptions it seems to be a strong one impacting stateful operators, such as map and reduce.
  • What is the problem?
    • The order by which an operator will process elements.
  • When does the problem manifest?
    • When an operator has more than one input channels.
  • Why?
    • Although an operator is guaranteed to receive records from an input channel in the same order every single time (FIFO channel), there is no deterministic order that defines from which input channel records will arrive next (due to network aspects for instance). If the abovementioned operator fails, replayed records may arrive to the failed operator's substitute in a different order creating a different state and making it infeasible to deduplicate seen records downstream of the failure.

      Currently, we are deduplicating records downstream of a failure. For order-sensitive operators, I don't think this will work because it doesn't capture the order of processed records by an upstream operator.

Approach 2: Lineage Record Counters

We will track the offset of each input stream that led to a record being produced.
Each record flowing from upstream operator U to operator O carries a vector clock C, which has an entry for every input stream of O, storing the latest records to causally affect this record. Thus C[i] is the latest record of input stream i that affects the record.

Operators keep the latest vector clock processed for every input stream. During recovery, downstream operators only start processing when the records they receive are newer than as the saved timestamp.

This can be visualized as a lattice starting at the point of replay, where every path represents a possible order in which records may be delivered. If a downstream operator has a current timestamp of {U_1^m+2, U_2^{n+1}} and replay is starting from offsets m and n then downstream operators would start processing from the records in red:

Issues

Going back to the previous example, if again all the records from U_1 are replayed first, then the corresponding output records generated by input records U_1^{n+1} and U_1^{n+2} will not be processed at all by the downstreams, as they dont yet take into account records U_2^n and U_2^{n+1}.

Again, this means that we aren't completely doing exactly-once processing.

Approach 3: Lineage Input Channel and Output Record Counters

Causal Logging is better. You can skip this.

Let's consider again the rolling reduce example shown below that calculates a sum. Under normal operation, with each output record (the rolling sum) sent downstream, we also send the input channel of the input record(s) that produced that specific output record and the current counter of records output so far.

For instance, say U_1^n is processed first. We send downstream (U_1^n, 1, 1) where U_1^n is the current rolling sum, 1 is the input channel of the input record (U_1^n) that produced the current rolling sum, and 1 is the number of records output so far. To continue:

  • say the "Rolling Sum" operator processes U_2^n next. It will send downstream (U_1^n + U_2^n, 2, 2) where U_1^n + U_2^n is the current rolling sum, 2 is the input channel of the input record (U_2^n) that produced the current rolling sum, and 2 is the number of records output so far
  • say the "Rolling Sum" operator processes U_2^{n+1} next. It will send downstream (U_1^n + U_2^n + U2_{n+1}, 2, 3) where U_1^n + U_2^n + U_2{n+1} is the current rolling sum, 2 is the input channel of the input record (U_2^{n+1}) that produced the current rolling sum, and 3 is the number of records output so far.

Then consider that the rolling sum operator dies. In addition, on recovery of the "Rolling Sum" operator, say that all records from U_1 are replayed before the ones from U_2.

  • The standby "Rolling sum" operator asks downstream to provide it with the input channel information that it keeps.

  • The standby "Rolling Sum" operator receives (1, 1), (2, 2), (2, 3).

  • The standby "Rolling Sum" operator infers that the first record to be processed should come from channel 1, the second from channel 2, and the third from channel 2.

  • The "Rolling Sum" operator starts processing records from its two input channels.

  • First, it receives three records in a row from input channel 1.

  • Since the first record should come from input channel 1, the "Rolling Sum" operator processes the first record, but doesn't send the output downstream.

  • Then it buffers the two next records from input channel 1.

  • Then, it receives three records from input channel 2. Since the next two records should come from input channel 2, it processes the next two record, but doesn't send the output downstream.

    This is indeed the actual order that the standby "Rolling Sum" operator's predecessor had processed records and this order produces a successful replay that rebuilds the correct state and achieves exactly-once processing. For the rest of the records, since no ordering has been seen so far any ordering between the records of the two input channels works.

The approach involves a lineage counter of records coming from each input channel and a counter of output records.

Note that the approach generalises to multiple downstream operators because the order of records appearing in each input channel is fixed by the guarantee of FIFO channels and we can reconstruct the order of records sent to multiple downstream operators by using the output record counter.

Approach 4: Causal Logging

As we have seen in other approaches, the main issue tends to be that the recovering operator must be able to reach an equivalent state as before failure.

Some operator functions are more or less sensitive to order of processing of records. Some operators are completely insensitive to order (e.g. stateless ops, windows), while others can reach the same state as pre-failure by processing the same set of messages, even if in a different order (e.g. rolling sum), we will call these convergent-capable. Finally, there is a large set of operator functions that are highly sensitive to processing order, and may diverge in state if even one message is processed in a different order (e.g. most UDFs).

Operator types:

  • Order insensitive: stateless operators, joins with no timeout, event-time windows
  • Order sensitive:
    • Convergent-capable: rolling aggregates
    • Divergent: Operators with timeouts, service-like operators (causal links between records)

This hierarchy of operator function types is very similar to Stonebrakers thinking in his high-availability work, shown in figure below.

In the figure below, the allowed recovery paths for each of these is shown. The green path is the path taken before failure. The yellow paths are the ones allowed by convergent-capable operators. All paths are allowed by order insensitive operators and finally only the green path is allowed for divergent ones.

Causal logging helps to recover even these most sensitive operators, as for any operator O, it records downstream of O the order in which O receives messages. To support an arbitrary number of failures, this information must be propagated to all downstream operators. This information is recorded downstream by piggybacking the order information on normal records.

A quick video visualization of how this happens.

Operators must keep an order log for each upstream operator, clearing this information whenever a checkpoint completes. Each operator must also record, for each of these order logs, how far in the log downstream operators are, to avoid sending repeated information. In other words, whenever a message is sent to a downstream operator, only the deltas ( the new order information) added to each log are sent.

In reality, recording the order information is important because it is nondeterministic. To completely realize causal logging, all types of nondeterministic events should be recorded. Other kinds of nondeterministic events are the use of random number generators, the use of physical time and the contents of a processing time window the moment it fires.

Example execution

Following this example again, say the pre-failure order is the one stated in the image [U_1^n,U_2^n,U_2^{n+1}]. When the operator processes input record U_1^n, it produces an output record O1, to which he attaches the order determinant "1", which states that the corresponding input was received on input 1. Next, when processing U_2^n, he emits (O2,"2"), and when processing U_2^{n+1} he emits (O3, "2").

The downstream operator keeps the determinants in an ordered list [1,2,2], which he can send back to the recovering operator at request. Lets say the rolling sum fails. The recovering operator uses this determinant list to deliver records in the correct order, first one from channel 1, then two from channel 2.

Advantages

  • Exactly-once processing without restrictions
  • Get to keep timers and random partitioning without issues.

Disadvantages

  • One more communication step during recovery
  • Memory cost (each operator keeps a determinant log for each upstream) (Though probably each is < 2MB)