Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: KLIP 12 - Implement High-Availability for Pull queries #4022

Merged
merged 10 commits into from Jan 13, 2020

Conversation

vpapavas
Copy link
Member

@vpapavas vpapavas commented Dec 3, 2019

KLIP 12 - Implement High-Availability for Pull queries

Author: Vicky Papavasileiou, Vinoth Chandar |
Release Target: 5.5 |
Status: In Discussion
Discussion: link to the design discussion PR

tl;dr: Enables high-availability for Ksql pull queries, in case of server failures. Current
design for handling failure of an active ksql server, incurs an unavailability period of several
seconds (10+) to few minutes due to the Streams rebalancing procedure. This work is based on new
Kafka Streams APIs, introduced as a part of KIP-535,
which allow serving stale data from standby replicas to achieve high availability, while providing
eventual consistency.

Motivation and background

Stateful persistent queries persist their state in state stores. These state stores are partitioned
and replicated across a number of standbys for faster recovery in case of
failures.
A KSQL server may host partitions from multiple state stores serving as the active host for some partitions
and the standby for others. Without loss of generality, we will focus on one partition and one state
store for the remainder of this document. The active KSQL server is the server that hosts the
active partition whereas the standby servers are the ones that host the replicas.

Assume we have a load balancer (LB) and a cluster of three KSQL servers (A, B, C) where A
is the active server and B, C are the standby replicas. A receives updates from the source topic
partition and writes these updates to its state store and changelog topic. The changelog topic is
replicated into the state stores of servers B and C.

Now assume LB receives a pull query (1) and sends the query to B (2). B determines that A
is the active server and forwards the request to A (3). A executes the query successfully and
returns the response.

Failure scenario: Assume that A goes down. B receives the request, tries to forward it to A
and fails. What to do now? The current implementation tries to forward the request to A
(in a busy loop) for a configurable timeout KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS . If it has not
succeeded in this timeout, the request fails. The next request B receives, it will again try to
forward it to A, since A is still the active, and it will fail again. This will happen until
rebalancing completes and a new active is elected for the partition.

There are two steps in the rebalancing procedure that are expensive:

  1. One of B or C is picked as the new active, this takes ~10 seconds based on default configs.
  2. Once a new active is chosen, depending on how far behind (lag) it is with respect to the changelog,
    it may take few seconds or even minutes before the standby has fully caught up to the last committed offset.

In total, it takes >>10 seconds before a query can start succeeding again. What can we do until a
new active is ready to serve requests?
KIP-535
has laid the groundwork to allow us to serve requests from standbys, even if they are still in
rebalancing state or have not caught up to the last committed offset of the active.

Every KSQL server (active or standby) now has the information of:

  1. Local current offset position per partition: The current offset position of a partition of the changelog topic that has been successfully written into the state store
  2. Local end offset position per partition: The last offset written to a partition of the changelog topic
  3. Routing Metadata for a key: The partition, active host and standy hosts per key

This information allows each server (with some communication, discussed below) to compute the global
lag information of every topic partition whether it is the standby or active. This enables us to
implement a policy where B having established that A is down (after trying to send a request
to A and seeing it failed), to decide whether it (B) will serve it or C. This effectively
means that there will be little to no down time in serving requests at the cost of consistency as
B or C may serve stale (not caught up) data. Hence, we achieve high availability at the cost of
consistency. Eventual consistency for the win!

What is in scope

Ensure availability for pull queries when KSQL servers fail (with ksql.streams.num.standby.replicas=N,
we can tolerate upto N such failures)

What is not in scope

  • Address unavailability caused by cold starts of KSQL server i.e when a new server is added to a
    KSQL cluster, it must first rebuild its state off the changelog topics and that process still could
    take a long time.
  • Try to improve consistency guarantees provided by KSQL i.e reduce the amount of time it takes to
    rebalance or standby replication to catch up.

Value/Return

The cluster of KSQl server will be able to serve requests even when the active is down. This mitigates
a large current gap in deploying KSQL pull queries for mission critical use-cases, by significantly
reducing failure rate of pull queries during server failures.

Public APIS

Add configuration parameter to the WITH clause of CTAS statements to specify acceptable.offset.lag. Also, add property to the json request to specify the above property.

Design

The problem at hand is how to determine if a server is down and where to route a request when it is
down. Every KSQL server must have available the information of what other KSQL servers exist in the
cluster, their status and their lag per topic partition. There are three components to this:
Healthchecking, Lag reporting and Lag-aware routing.

Failure detection/Healthcheck

Failure detection/ healthcheck is implemented via a periodic hearbeat. KSQL servers either broadcast
their heartbeat (N^2 interconnections with N KSQL servers) or we implement a gossip protocol. In
the initial implementation, we will use the REST API to send heartbeats leveraging the N^2 mesh that
already exists between KSQL servers. Hence, we will implement a new REST endpoint, /heartbeat that
will register the heartbeats a server receives from other servers.

Cluster membership is determined using the information provided from the
Kafka Streams
instance and specifically StreamsMetadata
from which we can obtain the information of the host and port of the instance. A server periodically
polls for all currently running KS instances (local and remote) and updates the list of remote servers
it has seen. If no KS streams are currently running, cluster membership is not performed. Moreover,
this policy depends on the current design where a new KSQL server replays every command in the
command topic and hence runs every query.

We want to guarantee 99% uptime. This means that in a 5 minute window, it must take no longer than
2 seconds to determine the active server is down and to forward the request to one of the standbys.
The heartbeats must be light-weight so that we can send multiple heartbeats per second which will
provide us with more datapoints required to implement a well-informed policy for determining when a
server is up and when down.

Configuration parameters:

  1. Heartbeat INTERVAL (e.g send heartbeat every 200 ms)
  2. Heartbeat WINDOW (e.g every 2 seconds, count the missed/received heartbeats and determine if server is down/up)
  3. MISSED_THRESHOLD: How many heartbeats in a row constitute a node as down (e.g. 3 missed heartbeats = server is down)
  4. RECEIVED_THRESHOLD: How many heartbeats in a row constitute a node as up (e.g. 2 received heartbeats = server is up)

We will provide sane defaults out-of-box, to achieve a 99% uptime.

Pseudocode for REST endpoint for /heartbeat:

Map<KsqlServer, List<Long>> receivedHeartbeats;

@POST
public Response receiveHeartbeat(Request heartbeat) 
  return Response.ok(processRequest()).build();
}

private HeartbeatResponse processRequest() {
  KsqlServer server = request.getServer();
  receivedHeartbeats.get(server).add(request.getTimestamp());
}

Additionaly, we will implement a REST endpoint /clusterStatus that provides the current status of the cluster
i.e. which servers are up and which are down (from the viewpoint of the server that receives the
request).

Pseudocode for REST endpoint for /clusterStatus:

Map<KsqlServer, Boolean> hostStatus;

@GET
public Response checkClusterStatus(Request heartbeat) 
  return Response.ok(processRequest()).build();
}

private ClusterStatusResponse processRequest() {
  return hostStatus;
}

Lag reporting

Every server neeeds to periodically broadcast their local current offset and end offset positions. We will implement a new
REST endpoint /reportlag. The local offsets information at a server is obtained via
Map<String, Map<Integer, LagInfo>> localPositionsMap = kafkaStreams.allLocalStorePartitionLags();.

Pseudocode for REST endpoint for /reportlag:

Map<KsqlServer, Map<String, Map<Integer, LagInfo>>> globalPositionsMap;

@POST
public Response receiveLagInfo(Request lagMap) 
  return Response.ok(processRequest()).build();
}

private LagReportResponse processRequest() {
  KsqlServer server = request.getServer();
  Map<String, Map<Integer, LagInfo>>  localPositionsMap = request.getPositions();
  globalPositionsMap.put(server, localPositionsMap);
}

Lag-aware routing

Given the above information (alive + offsets), how does a KSQL server decide to which standby to route
the request? Every server knows the offsets (current and end) of all partitions hosted by all servers in the cluster.
Given this information, a server can compute the lag of itself and others by determining the maximum end offset
per partition as reported by all server and subtract from it their current offset.
This allows us to implement lag-aware routing where server B can a) determine that server A is
down and b) decide whether it will serve the request itself or forward it to C depending on who
has the smallest lag for the given key in the pull query.

Pseudocode for routing:

// Map populated periodically through heartbeat information
Map<KsqlNode, Boolean> aliveNodes;

// Map populated periodically through lag reporting
// KsqlServer to store name to partition to lag information
Map<KsqlServer, Map<String, Map<Integer, LagInfo>>> globalPositionsMap;
 
// Fetch the metadata related to the key
KeyQueryMetadata queryMetadata = queryMetadataForKey(store, key, serializer);
 
// Acceptable lag for the query
final long acceptableOffsetLag = 10000;

// Max end offset position
Map<String, Map<Integer, Long>> maxEndOffsetPerStorePerPartition;
for (KsqlServer server:  globalPositionsMap) {
    for (String store: globalPositionsMap.get(server)) {
        for (Integer partition: globalPositionsMap.get(server).get(store)) {
            long offset = globalPositionsMap.get(server).get(store).get(partition);
            maxEndOffsetPerStorePerPartition.computeIfAbsent(store, Map::new);
            maxEndOffsetPerStorePerPartition.get(store).computeIfAbsent(partition, Map::new);
            maxEndOffsetPerStorePerPartition.get(store).putIfAbsent(partition, -1);
            long currentMax = maxEndOffsetPerStore.get(store).get(partition);
            maxEndOffsetPerStorePerPartition.get(store).put(partition, Math.max(offset, currentMax);       
        }
    }
}

// Ordered list of servers, in order of most caught-up to least
List<KsqlNode> nodesToQuery;
KsqlNode active = queryMetadata.getActiveHost();
if (aliveNodes.get(active)) {
  nodesToQuery.add(active)  
}

// add standbys
nodesToQuery.addAll(queryMetadata.getStandbyHosts().stream()
    // filter out all the standbys that are down
    .filter(standbyHost -> aliveNodes.get(standByHost) != null)
    // get the lag at each standby host for the key's store partition
    .map(standbyHost -> new Pair(standbyHostInfo,
        maxEndOffsetPerStorePerPartition.get(storeName).get(queryMetadata.partition()) - 
        globalPositionsMap.get(standbyHost).get(storeName).get(queryMetadata.partition()).currentOffsetPosition))
    // Sort by offset lag, i.e smallest lag first
    .sorted(Comaparator.comparing(Pair::getRight())
    .filter(standbyHostLagPair -> standbyHostLagPair.getRight() < acceptableOffsetLag)
    .map(standbyHostLagPair -> standbyHostLagPair.getLeft())
    .collect(Collectors.toList()));
 
// Query available nodes, starting from active if up
List<Row> result = null;
for (KsqlNode server : nodesToQuery) {
  try {
    result = query(store, key, server);
  } catch (Exception e) {
    System.err.println("Querying server %s failed", server);
  }
  if (result != null) {
    break;
  }
}

if (result == null) {
  throw new Exception("Unable to serve request. All nodes are down or too far behind");
}

return result;

We will also introduce a per-query configuration parameter acceptable.offset.lag, that will provide
applications the ability to control how much stale data they are willing to tolerate on a per query
basis. If a standby lags behind by more than the tolerable limit, pull queries will fail.
This parameter can be configured either as part of the WITH clause of CTAS queries or be given as
arguments to the request's JSON payload. This is a very
useful knob to handle the scenario of cold start explained above. In such a case, a newly added KSQL
server could be lagging by a lot as it rebuilds the entire state and thus the usefulness of the data
returned by pull queries may diminish significantly.

Tradeoffs

  1. We decouple the failure detection mechanism from the lag reporting to make the heartbeats
    light-weight and achieve smaller heartbeat interval. This way, heartbeats can be sent at a higher
    interval than lag information (which is much larger in size). As our goal is to achieve high-availability,
    receiving less frequent lag updates is ok as this affects consistency and not availability.
  2. We decouple routing decision from healthchecking. The decision of where to route a query is local
    (i.e. does not require remote calls) as the information about the status of other servers is already
    there. This provides flexibility in changing the lag reporting mechanism down the line more easily.
  3. We choose to keep the initial design simple (no request based failure detection, gossip protocols)
    and closer to choices made in Kafka/Kafka Streams (no Zookeeper based failure detection), for ease
    of deployment and troubleshooting.

Rejected/Alternate approaches

Retrieve lag information on-demand

We employ a pull model where servers don't explicitly send their lag information. Rather, communication
happens only when needed, i.e. when a server tries to forward a request to another server. Once
server B has determined that server A is down, it needs to determine what other server should
evaluate the query. At this point,B doesn't have any knowledge of the lag information of the other
standbys. So, in addition to evaluating the query locally, B also sends the request to C. B
then has both its own result and C’s result of query evaluation and decides which one is the freshest
to include in the response. B can make this decision because the lag information is piggybacked
with the query evaluation result. The advantages of this approach is that it results in less
communication overhead: Lag information is exchanged only when the active is down. Moreover, it is
piggy-backed on the request. On the other side, all standbys need to evaluate the same query. Moreover,
the communication between B and C involves large messages as they contain the query result
(can be many rows). Finally, latency for a request increases as B needs to wait to receive a
response from C with query result and lag information. Then only can B send a response back to
the client.

More efficient lag propagation

Instead of broadcasting lag information, we could also build a gossip protocol to disseminate this
information, with more round trips but lower network bandwidth consumption. While we concede that
this is an effective and proven technique, debugging such protocols is hard in practice. So, we
decided to keep things simple, learn and iterate. Similarly, we could also encode lag information
in the regular pull query responses themselves, providing very upto-date lag estimates. However,
sending lag information for all local stores in every response will be prohibitively expensive and
we would need a more intelligent, selective propagation that only piggybacks a few stores's lag in
each response. We may pursue both of these approaches in the future, based on initial experience.

Failure detection based on actual request failures

In this proposal, we have argued for a separate health checking mechanism (i.e separate control plane),
while we could have used the pull query requests between servers themselves to gauge whether another
server is up or down. But, any scheme like that would require a fallback mechanism that periodically
probes other servers anyway to keep the availability information upto date. While we recognize that
such an approach could provide much quicker failure detection (and potentially higher number of
samples to base failure detection on) and less communication overhead, it also requires significant
tuning to handle transient application pauses or other corner cases.

We intend to use the simple heartbeat mechanism proposed here as a baseline implementation, that can
be extended to more advanced schemes like these down the line.

Test plan

We will do unit tests and integration tests with failure scenarios where we cover the cases:

  1. Request is forwarded to a standby.
  2. Request is forward to the most caught-up standby.
  3. Request fails if lag of standbys is more than acceptable lag configuration.

We will look into muckrake or cc-system-tests.

Documentation Updates

Need to add documentation about the configuration parameters regarding the failure detection policy,
acceptable lag, new endpoints.

Compatibility Implications

N/A

Performance Implications

Improve performance of pull queries in case of failure.

Security Implications

N/A

@vpapavas vpapavas requested a review from a team as a code owner December 3, 2019 02:11
@vinothchandar
Copy link
Contributor

@AlanConfluent could you review this design as well? @purplefox do you want to sign up for reviewing this? :)

@vinothchandar vinothchandar requested a review from a team December 6, 2019 16:57
@vinothchandar
Copy link
Contributor

@rodesai do you want to assign this review to yourself?

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to be a massive improvement for ksqlDB as a serving layer! Thanks for the the KLIP @vpapavas - some comments in line.

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @vpapavas - great to see the NFRs being looked at :D

I think we need to gossip the OFFSET, not the LAG

As I've commented above, I don't think communicating lag is of any meaningful use as lag is relative to a moving target: the head offset. A more standard metric to communicate is simply the offset. Offset is exact. Offset can be used to determine lag. Lag can not be used to determine offset. Lag also requires each node to contact Kafka to retrieve the head offset, where as each knows its offset already.

I don't think LAG is meaningful for choosing which node to serve a request

Having a config to control acceptable lag is OK. But what does a lag of 1000 messages actually mean? It's not a meaningful value really. It's got no relation to wall-clock time, event-time, or the number of updates to a key.

I'm assuming we're using lag to 'control' how stale reads are. But I don't think it does so in useful way.

A better pattern might be to allow the caller to control what they are willing to accept as a response when doing a read...

Consider some use-cases:

  1. I want to read the most up-to-date data from KSQL, but don't care about previous reads.
  2. I want to read data from KSQL, but don't care if it's stale: I care about latency and throughput.
  3. I've just written some data to KSQL/Kafka and I want to know I'm reading a result that includes that data.
  4. I've just read some data from KSQL and want to ensure my next read is not reading stale data, i.e. data older than my last read.

The first two use-cases are achievable by including some options with the request to control who serves the request: any node that has the correct state, or only the most up to date?

The last two use-cases introduce read-after-write consistency constraints. To meet these we'd need to include partition-offset information in any read or write response. Clients can then pass this back to us if they require their next read to be at least at that offset.

It's important to note that providing this read-after-write consistency helps in the rebalance scenario, but also when reading from the active: the active node can be lagging too!.

I'm thinking...

We may choose to offer a C* style hardcoded set of read consistency levels, e.g. something like:

  • ANY: read from any node, regardless of lag. (cold nodes not included).
  • LATEST: read from the node with the most up-to-date value.
  • ACTIVE: only read from the active. (with user supplied timeout?)

Combining the above read consistency level with an optional partition-offset would allow users fine-grained control of their reads.

We might also choose to allow users to control what happens if their requirements aren't met, e.g do they want to wait/block, (for some user-supplied amount of time) or just fail?

This information, along with info of each state replicas offset, would be enough for a node to know which nodes meet these requirements and hence choose which, if any, node should serve the request.

This has benefits:

  1. it allows the caller to have finer grained control over read-after-write consistency for all reads, not just in a rebalance. This i s much more meaningful that some acceptable amount of lag.
  2. Allows users to control consistency and choose to trade scalability for consistency.
  3. Clients can even use the offsets they've received back from a writing to a source topic using a standard Kafka topic if they wanted. It's not just available from using KSQL directly.

Solution needs to work beyond KEY lookups

In the near future we'll want to support more scatter-gather style operations, e.g. select * from table. The design needs to be keep this in mind so that we're not making it harder to add such functionality.

@big-andy-coates big-andy-coates self-assigned this Dec 18, 2019
@vinothchandar
Copy link
Contributor

vinothchandar commented Dec 18, 2019

A more standard metric to communicate is simply the offset

We have had a similar discussion on the KIP. Without the endOffset the offsets don't mean much, as all standbys could be really behind and user may not want to be querying any of the standbys. So we need to fetch the endOffsets anyway. I am also mulling if we send both head, offset and be more future proof. I left a comment above on this.

Lag also requires each node to contact Kafka to retrieve the head offset, where as each knows its offset already.

Original assumption in KIP and this KLIP was that this endOffset information is already in memory in Streams. But we found it to be untrue last week. If we could obtain the endOffset locally from underlying KafkaConsumer, then we could address the freshness of the lag value.

It's not a meaningful value really. It's got no relation to wall-clock time, event-time, or the number of updates to a key.

We are aware of this. Doing something based on event-time is definitely more user-friendly, but is also larger scoped on Streams. Let's see what this would take .

  • Each active needs to report the timestamp of the last record it produced to changelog topic
  • Each standby needs to report the timestamp of the last record it consumed from the topic.
    From this we could compute a time lag (similar to replica.lag.time.max.ms in kafka), but picking a standby with the lowest "time lag" again has the same issues that you raised with offset lag. If an active and standby are apart by 10 seconds, it does not mean it will take 10 seconds to catch. It could be lower or higher, depending on how replication performs.

A holistic approach also needs to take into account : event-time based lag from the source topics or even higher upstream, late arriving records etc.. This is not in scope for this KLIP (I can clarify this). While most of the attention here has been on lag propagation, the primary value we are adding now is

  • Being even able to query standbys
  • Helping the user configure a large enough acceptable lag, that fences off querying from really laggy standbys or restoring actives.
    This value has been attested by a real streams IQ user, who also co-authored the KIP.

On cases 1,2:

I want to read the most up-to-date data from KSQL, but don't care about previous reads.

If this means, up-to-date with the endOffset of a changelog, we can handle this still in the current proposal, since even restoring actives will report a lag in that state, so we could send the query to one of the standbys.

I want to read data from KSQL, but don't care if it's stale: I care about latency and throughput.

I feel unbounded staleness is not very useful. This proposal is to introduce some large tolerable bound on staleness.. Otherwise, it should address this.

On cases 3,4, for providing different consistency levels, I would like for us to get the highly available, eventually-consistent story working, at the table level first, before heading down tunable knobs per query. Again, consistency levels seem like a good thing to pursue. but not in the scope of this KLIP.

@big-andy-coates
Copy link
Contributor

big-andy-coates commented Dec 19, 2019

I am also mulling if we send both head, offset and be more future proof

I'm not sure what use getting the head from each node would be. Do you have something in mind? To provide this information each node ask Kafka before reporting its offsets, so this is not free. It could be added later if needed, so I wouldn't include it initially unless there is a need.

I want to read data from KSQL, but don't care if it's stale: I care about latency and throughput.

I feel unbounded staleness is not very useful.

Well, that's your opinion. However, there are some very successful database out there that off exactly this. On a well provisioned servers, i.e. not maxed out, the standbys will be kept pretty much in-sync with the master. Hence being able to serve requests from any standby is a useful pattern for maximizing the read load the cluster can handle and minimising latency, at the cost of consistency. Though we could of course introduce something similar to Kafka's ISR sets if we wanted.

This proposal is to introduce some large tolerable bound on staleness..

The main proposal of making use of the new metadata in KS to provide more read availability is great. Something we should definitely do. (Though, as I say above, it needs to keep in mind non-KEY requests too). However, I strongly feel that adding a 'max acceptable lag' setting is meaningless. It's a knob that sounds useful until you try and use it in a real world situation, where you quickly realize there's not really any right value for it, it will always depend on what the system is doing at that point in time. It's a bit like the old config in streams that could be used to suppress intermediate results: it was never really fit for purpose. Consider a system which occasionally has large bursts of messages, e.g. when some batch processing finishes, or black friday shopping, etc. At these times lag spikes of messages are produced. If the user chooses a low max lag they'll lose availability during a rebalance during the bursts, so they have to set it high, but that can mean a struggling node outside of a burst can continue to serve stale reads for a long long time. Which does the user choose?

So all I'm saying is our efforts would be better spent on a more complete and useful solution that some 'max lag'.

I realize the setting is partly being used to avoid issues with cold ksql nodes starting up and being way behind. However, IMHO this is better fixed in some other way, e.g. not adding cold nodes to the available list until they're warm. (I believe KS already has some concept of when its warm).

Oh, and I had some more thoughts on the health check: at the moment queries can fail on a machine. So we could have the situation where one node isn't running the queries topology. A general 'which nodes are healthy in the cluster' approach might not cover such a situation as the node itself is healthy, it's just one query that's failed. Not sure of the best solution, just food for thought.

@vinothchandar
Copy link
Contributor

It could be added later if needed, so I wouldn't include it initially unless there is a need.

We cannot iterate that easily with Apache Kafka, this is a public API and there will be a KIP process for any change to this API.

To provide this information each node ask Kafka before reporting its offsets, so this is not free.

The Fetcher (under KafkaConsumer) already has the endOffsets, in fact this is how the KafkaConsumer lag is monitored. We could actually fetch this just out of memory, with some API changes (needs another KIP). At that point, the endOffsets will be pretty consistent and updated in real-time at each node and it can also broadcast its offset/head (endOffset) much more realtime, which will improve the scenario we discussed before.

However, there are some very successful database out there that off exactly this.

Yes. It's my opinion, but not just mine. Like I mentioned before, there was a KIP process in which this was discussed by many others also. Systems that I know of, that do "only" async replication e.g Apache HBase, don't allow you to query standbys until the lag is caught up. (Much like what we do, and also have similar poor rap for availability). Systems like C* do synchronous replication to keep the lag itself under control during writes. Right now, a standby can be lagging by hours or an active replica can be restoring and days behind in that state.. It would be really stretching our imagination to call the datastore available, in this state. Queries will see totally out-of-date/un-useful data.

(I believe KS already has some concept of when its warm
Not at the moment. With KIP-441, Streams will try to ensure an active is always the most "warm". Still, the decision of whether is acceptable or not to query is on ksql.

So all I'm saying is our efforts would be better spent on a more complete and useful solution that some 'max lag'.

Streams is getting the lag API regardless since another streams user wants it. We can add a config to control whether we use the lag to fail queries reading stale data or not. Give users the control and find out what's right. Before we expose the capability for a query to be failed if the computed results in the ksql table are more than X minutes stale, we need to engineer the system so that it can offer such guarantees most of the time. So, this is a more long term effort.

Even for any of the issues you mentioned before, being able to query standbys is a critical piece that needs to happen. I would like it to happen in the next release. I don't think we have hold up this entire efforts on this issue.

@big-andy-coates
Copy link
Contributor

big-andy-coates commented Dec 20, 2019

Even for any of the issues you mentioned before, being able to query standbys is a critical piece that needs to happen. I would like it to happen in the next release. I don't think we have hold up this entire efforts on this issue.

Totally. As I said above I think querying standbys is a great step forward. I'm not trying to block anything. I'm not saying we shouldn't do the standby reads unless we have read-after-write consistency. I'm merely pointing out that 'max lag' setting has limited real world application and we should look at implementing something more useful soon.

Copy link
Contributor

@apurvam apurvam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @vpapavas , this looks good to me. I think this is a good first step toward getting HA for pull queries. I left a few questions, but it looks good to me overall.

@apurvam apurvam added the design-proposal Tag KLIP Prs with this label label Jan 2, 2020
@vinothchandar
Copy link
Contributor

I'm merely pointing out that 'max lag' setting has limited real world application and we should look at implementing something more useful soon.

@big-andy-coates , then we are both in agreement. The gap I think is : In first phase ,I want to first expose querying standbys in some form, iron out wrinkles and harden these new components we are building. The usefulness of 'max lag' (we can agree to disagree here, we anyway provide a knob to the user) will be merely to ensure pull queries don't query totally incorrect/out-of-date state.

In the next phase (another KLIP), we can explore how to build some end-end guarantees of data staleness i.e guarantees between state and source/input topics, by considering all cases holistically.. We may need additional APIs from KafkaStreams on how caught up each operator is w.r.t source topic data, without having to compute it ourselves out-of-band. For example, we could have a query filter out all data for the past hour and if we simply measure based on records we see in the final state store/aggregation operator, we may infer this as staleness, while in-fact we are caught up. Also, we need to understand end-end what sort of processing latency, Streams can provide for different types of topologies. Without this, we cannot guide the users on setting tighter bounds on staleness anyway. We are only beginning to understand these things at the streams side.

Hope this sequencing makes sense to you. @vpapavas could you please resolve existing conversations and update the KLIP based on the discussions.. Hopefully, we can have a final round of comments and get this landed in the next day or so..

@vinothchandar vinothchandar added this to the 0.7.0 milestone Jan 6, 2020
@vinothchandar vinothchandar added the P0 Denotes must-have for a given milestone label Jan 6, 2020
Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @vpapavas / @vinothchandar

This is going to be a big step forward.

I really hope there is time to implement features that allow the caller to specify acceptable lag, rather than just a config. But this can always come later.

I'm concerned that having our own group-membership protocol is non-trivial and could be a big drain on resources to get this right and later enhance to make more configurable. I've suggested an alternative below. Likely you've already considered and rejected it, but thought I'd put it out there as it doesn't match any rejected alternatives.

Look forward to seeing HA pull queries!

design-proposals/klip-12-pull-high-availability.md Outdated Show resolved Hide resolved
@vpapavas vpapavas merged commit af8498e into confluentinc:master Jan 13, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
design-proposal Tag KLIP Prs with this label P0 Denotes must-have for a given milestone
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

9 participants