Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Ability to "revive" archived series #1976

Closed
shanson7 opened this issue May 12, 2021 · 15 comments · Fixed by #1992
Closed

Ability to "revive" archived series #1976

shanson7 opened this issue May 12, 2021 · 15 comments · Fixed by #1992
Labels

Comments

@shanson7
Copy link
Collaborator

Is your feature request related to a problem? Please describe.

We use mt-index-prune to keep the metric_idx table lean for speedy start up. It would be nice to have a tool that could be run to "revive" series by partition/pattern.

Describe the solution you'd like

This would be something that could run in the background similar to mt-index-prune itself. Reviving a series would likely entail putting that series back into the metric_idx table and somehow triggering a re-index operation in metrictank (to avoid needing a rolling restart).

Describe alternatives you've considered

On several occasions we've needed to restore archived series and we currently use a script querying cassandra + publish fake data. Making this a "feature" of metrictank would clean this process up and simplify it a little.

@Dieterbe
Copy link
Contributor

Dieterbe commented May 12, 2021

Reviving a series would likely entail putting that series back into the metric_idx table and somehow triggering a re-index operation in metrictank (to avoid needing a rolling restart).

you'd also need to increment the LastUpdate field, otherwise it would still be considered stale and subsequently

  1. pruned out of the in-memory index at the next pruning run
  2. skipped at index load time

A big challenge here is the re-indexing. Trying to resync a live index with what's in the persistent index seems problematic.
I think the best idea to go about this is to have a tool that:

  1. turns the query pattern/requested partition/whatever into a list of metricdefinitions
  2. submit special messages into kafka that say "please re-add this metricdefinition". similar to the current ingestion path (that AddOrUpdates into the index, sets LastUpdate, etc), but minus the part where we actually add any data to mdata.AggMetrics

(a similar new "message type" has come up before. not sure if we documented this anywhere, but we were discussing at one point how a metric delete api call is undone when an instance restarts and consumes data for that metric that was submitted before the delete call was triggered. if we were to do deletes via kafka messages it would make sure the instances always correctly execute them, even if they were temporarily down at the time of the rest api call, or had to restart)

@shanson7
Copy link
Collaborator Author

Trying to resync a live index with what's in the persistent index seems problematic.

Is it though? IMO long write locks are the only danger. If we limit the scope to just "Add missing" (i.e. not a full diff) we can call Get(mkey) (just a read lock) and if there is a result in the index move on. For definitions that need to be added have 2 options:

  1. call Load(defs) in reasonable (configurable?) size batches
  2. call AddOrUpdate for each def and it would be similar to loading them in from Kafka.

@shanson7
Copy link
Collaborator Author

This is on my near/mid-term roadmap, so I can take the implementation in a month or two if we settle on the details.

@Dieterbe
Copy link
Contributor

Dieterbe commented May 19, 2021

Maybe i'm missing something, but...

If we were to do this via a tool that hits metrictank's api endpoint, then I envision the tool would first add the entries to the index table and then do RPC calls to all metrictank shards to add those entries to their index. But if an instance is starting up it may miss the addition to the index table. (e.g. if it just finished loading that partition but it's still loading other partitions). Trying to do an RPC call against such an instance to add index entries may be problematic because at least currently, all index methods are only accepted once the instance is ready (loaded all index partitions and replayed kafka, amongst others). Technically, the new rpc method could bypass that restriction but that seems like a hacky custom code path that goes against the current design, so I would rather avoid that. Also if you hit the index while it's initializing, you compete with very long write locks (CasIdx.rebuildIndex() calls UnpartitionedMemoryIdx.Load for entire partitions at once. these could be split up but I rather not). In other words, this would only be guaranteed to work properly is the cluster is fully healthy, ie if all replicas of all shards are up. Generally, the cluster should be functional even it's degraded (ie if for each shard at least a single replica is up). At least that is true for ingestion and querying. I would want it to be true also for this "reviving" functionality

So the alternative I propose is to extend https://github.com/grafana/metrictank/blob/master/schema/msg/format.go
we'd add a message format for MetricData revivals. The Handler interface would get a new method to process these and simply hand them off to the index. This way we make sure all instances process the "rpc call" (which is transported through a kafka message). As mentioned earlier, this approach also makes sense as a better way to do deletes.

@shanson7
Copy link
Collaborator Author

But if an instance is starting up it may miss the addition to the index table

Ah, yes, that is a corner case. As you allude to later this is an existing case for deletes. In fact, I believe the delete requests currently fail if the entire cluster isn't healthy.

index. This way we make sure all instances process the "rpc call" (which is transported through a kafka message). As mentioned earlier, this approach also makes sense as a better way to do deletes.

This would make deletes asynchronous in the request path? Which instance is responsible for actually deleting/archiving the record from cassandra?

So long as the client doesn't need to know about kafka, partitions, how to format the mkey, etc. I think that this is reasonable.

@Dieterbe
Copy link
Contributor

Dieterbe commented May 20, 2021

This would make deletes asynchronous in the request path?

true, but it's still better than executing incorrectly

Which instance is responsible for actually deleting/archiving the record from cassandra?

The answer to this doesn't really change whether the delete comes in via REST or via kafka.
I note that currently any request to "/metrics/delete" results in MetricIndex.Delete being called on all peers.
Whether this affects deletes in cassandra is governed by this flag:

        casIdx.BoolVar(&CliConfig.updateCassIdx, "update-cassandra-index", CliConfig.updateCassIdx, "synchronize index changes to cassandra. not all your nodes need to do this.")

It's probably harmless for multiple replicas of the same shard to execute this query redundantly, but it's only needed that 1 replica per shard does it. That's also the recommended setup (write pods that have update-cassandra-index set to true, and the rest has it disabled.

So long as the client doesn't need to know about kafka, partitions, how to format the mkey, etc. I think that this is reasonable.

For revival:
Well, somebody/something will need to do it. At first glance, mt-gateway seems like an appropriate place as it already takes in data ingestion over REST and publishes the messages to kafka. But would we want to add querying the index' archive tables to its responsibilities? Maybe, or maybe this could be a separate microservice altogether. I don't have very strong opinions on who should do the rest->kafka conversion, as long as it's a stateless service. I wouldn't want to add this to any metrictank instance because they already have plenty of responsibilities.

For deletion: user submits query over rest. query gets published to kafka (whereas revival publishes MetricData's to be re-ingested, deletions are simply the query pattern). MT peers consume query, execute it against live index and update cassandra index accordingly (as described above)

@shanson7
Copy link
Collaborator Author

That's also the recommended setup (write pods that have update-cassandra-index set to true, and the rest has it disabled.

Funnily enough, that is how we used to have it configured, but we had to set read pods to true because our write pods are a completely different cluster (so they don't get the delete request at all).

For revival:
Well, somebody/something will need to do it. At first glance, mt-gateway

We don't use mt-gateway. I imagine that the revival tool could have the "smarts" to do this. Since it's already crawling the archive index table, it has the partition and id. Really, it just needs the kafka broker/topic information.

For deletion: user submits query over rest. query gets published to kafka

Hmm, I thought that there would be a message per series (similar to how kafka-mdm-in works today). Putting the query in opens the window to differing behaviors. For example, to save on memory we prune more aggressively in our write instances than we do in the read instances. That means the write instances might not have the same "view" of the index.

@Dieterbe
Copy link
Contributor

Dieterbe commented May 20, 2021

Hmm, I thought that there would be a message per series (similar to how kafka-mdm-in works today). Putting the query in opens the window to differing behaviors.

Somehow you still need to execute the query though, who will it be if not your metrictank cluster?

I think the proposal works for both the standard (same pruning rules on all instances, update-cassandra-index only enabled on write pods), as well as your custom deployment:

  1. on standard deployments, the same change will be made in all indexes (as they have the same pruning rules), and only the write nodes need making the changes to cassandra
  2. on your custom deployment, you have update-cassandra-index enabled on your read pods, so they will execute the query in the most complete interpretation (meaning more data due to less aggressive pruning) and make the changes to cassandra as desired

IOW perhaps the real requirement is, "whoever has the least aggressive pruning (and thus has the most complete view of the index), is the one who should update-cassandra-index". that's really what it comes down to, and both the standard and your deployment satisfy this.

@shanson7
Copy link
Collaborator Author

Somehow you still need to execute the query though, who will it be if not your metrictank cluster?

Right...but my thought would be to process the query synchronously at least. Existing endpoints will likely need this anyway (to return the count of deleted series). Send the request to peers and either:

A) collect matching definitions to one node and produce the kafka messages
B) the peers would produce the messages (possibly with duplicates)

This means that so long as any replica is healthy for a shard group the message gets produced and can be processed later by unhealthy instances when they catch up.

IOW perhaps the real requirement is, "whoever has the least aggressive pruning (and thus has the most complete view of the index), is the one who should update-cassandra-index"

This does introduce write amplification since read nodes are generally run with replicas (we set the update-interval to 100 days in read nodes to prevent numerous updates). Maybe the cassandra index just needs configurations that differentiate updates and deletes. That would likely address both concerns.

@Dieterbe
Copy link
Contributor

Dieterbe commented Jun 3, 2021

Existing endpoints will likely need this anyway (to return the count of deleted series).

The only thing metrictank needs, at a minimum, is compatibility with Graphite, which has no api to delete metrics
(it does have an api to delete from tagdb but it doesn't seem to return counts)
That said, i agree it's a nice feature that metrictank's delete api can display the count. But I would like to keep production of kafka messages out of the scope of read and query nodes. (note that writers currently already write "metricpersist" messages to kafka)

So It sounds then like what we need is an api server that can receive

  1. delete queries, which it executes against the configured url (typically pool of query nodes)
  2. a list of partition/MKey pairs for reviving.

in both cases it gets a list of MKeys which can be published to kafka to the proper partitions for consumption by the right metrictank shards.

something like that? (I know deletes are out of scope for this but i find it useful to mention them here as there seems to be some common ground)

@shanson7
Copy link
Collaborator Author

shanson7 commented Jun 11, 2021

So It sounds then like what we need is an api server

I think this sounds like a clean solution. It would also make it trivial to put the API/admin server behind authentication and cleaner to add non-graphite standard endpoints there. Some open questions/notes (don't need answers now):

  1. Should this server be part of the cluster? If not we would need to expose some additional information via the query nodes (e.g. the partition/mkey for a series). I think this would just be another format for findSeries. TBH, that sounds pretty useful.
  2. The "revive" case is more "background". The API server would crawl the archive table and discover series that need to be revived.
  3. Would it make sense to remove the delByQuery endpoint from query nodes and just put them in the API server?
  4. Should the API server make the database changes synchronously and just use kafka control messages to keep in-memory state up-to-date?

@Dieterbe
Copy link
Contributor

The "revive" case is more "background". The API server would crawl the archive table and discover series that need to be revived.

What are some use cases for revival? Or parameters to control which series should be revived?
I had assumed they would vary a lot over time (or that they would be specific to ones deployment/situation), and thus i thought it would make more sense to write+adjust a worker script or tool (as needed for each use case) to collect the list of series from the archive table. having this logic in a server daemon might mean a lot of work building a generalized interface with parameters for use cases that may be one-offs, initiating a whole upgrade cycle to deploy new code for new use cases, etc.

Should the API server make the database changes synchronously and just use kafka control messages to keep in-memory state up-to-date?

you mean for the api server to make modifications to the persistent index (both the live and archive tables) ?

@shanson7
Copy link
Collaborator Author

What are some use cases for revival? Or parameters to control which series should be revived?

For us, the use case is "Revive series matching this name and these tags". I expect it will be an infrequent operation, so a standalone tool is fine.

Should the API server make the database changes synchronously and just use kafka control messages to keep in-memory state up-to-date?

you mean for the api server to make modifications to the persistent index (both the live and archive tables) ?

Yes. Should the deletes be synchronous or rely on a metrictank instance configured to update the index (e.g. via update-cassandra-index = true or the bigtable equivalent) to consumer and process them?

@Dieterbe
Copy link
Contributor

For us, the use case is "Revive series matching this name and these tags". I expect it will be an infrequent operation, so a standalone tool is fine.

I guess that is common enough. I now think a standalone service will be simpler than a service + a cli tool, even if we have to grow the api over time to accommodate some use cases (e.g. "revive series matching this pattern and also another one but not if they match this substring, and only if their lastUpdate is bigger than...")

Yes. Should the deletes be synchronous or rely on a metrictank instance configured to update the index (e.g. via update-cassandra-index = true or the bigtable equivalent) to consumer and process them?

This is a bit misleading because synchronous would usually imply when the request returns, the deleted data will no longer show up. This is not true here: deleting data from the persistent store may be synchronous, but queries get served by metrictanks and they would still show the data until they consume the message. "True sync" seems impossible in light of the problems we want to solve (instances being down and/or restarting)

What's the advantage to this? I feel like it may be related to your deployment style but right now i gotta go and can't re-read what you said on this before. However I wanted to get some thoughts out for now.
I can't think of any advantages right now but I do have a couple disadvantages:

  • Synchronous becomes an issue if time needed to execute all deletes against the store becomes unreasonable (user thinks: "did something break?", or worse, triggers a timeout somewhere (e.g. in a reverse proxy sitting between the client and the server). So if some deletes will have to be done async, it seems reasonable to want to simplify and say "all deletes are async", even benign ones.
  • In the context of an otherwise async system. making this call sync, may not be that useful.
    Because you should wait before all data for the series has been received prior to triggering the delete (lest it shows up again)
    So in the case where you know you stopped ingesting data into kafka, but you don't know exactly when the last point has been received by the relevant peers, then you should wait with triggering the delete because you don't know exactly how long.
    Admittedly this is a weak argument because typically all peers will be in sync (and it's pretty trivial to check this) and usually there will be some time before stopping sending of a metric and its deletion
  • Thusfar we've always thought of metrictank peers as the authority over the persistent index. The path of least resistance is to keep doing so (saves all MT devs and operators some extra mental gymnastics)
  • Imagine the api server first executing the persistent index operations, but something happens (e.g. api server crash) before it writes everything to kafka. We now put the burden to "retry until success" on the client, otherwise kafka will miss messages and the metrictanks will not fully delete from their index. Alternatively, if we make the api server first produce everything to kafka, and then do the persistent index operations. We have the same problem if something happens during persistent index operations. We put the burden on the client to retry until success

@shanson7
Copy link
Collaborator Author

This is a bit misleading because synchronous would usually imply when the request returns, the deleted data will no longer show up

That is one possible definition. But in "eventually consistent" platforms that isn't really true. In this case it's a consistency guarantee, I suppose. The benefit, IMO, is you don't need to know that any cluster members are consuming your control messages and are configured to update cassandra. You know that the table has been cleaned up and will eventually be reflected in running instances.

It seems easy enough to start with fully async behavior and add in "consistency" declarations to the API if ever needed. In my case, I don't think I actually need it.

Yes. Should the deletes be synchronous or rely on a metrictank instance configured to update the index (e.g. via update-cassandra-index = true or the bigtable equivalent) to consumer and process them?

Also, I think that I didn't accurately convey the fact that it wouldn't be an exclusive "or". In my head, all instance confiugred to update the index would always do so (redundantly, if the API server was also issuing the delete/archive/revive operations to the index table). This is needed to handle small race conditions with the data input. The synchronous design would just be to add a consistency guarantee. As mentioned, we don't need to worry about that for now.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
2 participants