Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pinot Long Term Data Store #7229

Open
suddendust opened this issue Jul 29, 2021 · 42 comments
Open

Pinot Long Term Data Store #7229

suddendust opened this issue Jul 29, 2021 · 42 comments

Comments

@suddendust
Copy link
Contributor

suddendust commented Jul 29, 2021

We have certain use-cases wherein we would like to move older data to cheap object stores like the S3 and store only the most recent data in Pinot. One such use-case is storing distributed trace data - Our query patterns show that more than 90% of the queries lie in the last 24 hours. Having said that, we have a retention period of 30 days. So in this case, we would like to keep only the last 24 hours worth of data in Pinot, and move the rest to a cheap store like the S3.

From what I concluded from our initial discussions, this would involve work on two fronts:

  1. Moving older data from Pinot to the S3 (this already happens, mentioning it for completeness).
  2. Pinot-Presto connector, so that it can query from both Pinot and the S3 based on the time span of the query.

Here is a rough sketch of the design we came up with in our discussion the other day (this is relevant to the distributed tracing example I gave above):

Screenshot 2021-07-25 at 7 13 21 PM

I have created this issue to get the discussion started.

Thanks!

@suddendust
Copy link
Contributor Author

@kishoreg please let me know your thoughts, thanks!

@mcvsubbu
Copy link
Contributor

Useful feature.
Another way to do this may be to evict the segments from pinot-servers, but load them on-demand when a query comes in. Mark something in the IDEALSTATE to indicate that the segment is not there, so that the broker is aware (this may or may not be needed).

@suddendust
Copy link
Contributor Author

suddendust commented Jul 29, 2021

Yes @mcvsubbu let us evaluate the pros and cons of each of these approaches. I mentioned this one specifically as work has already started on the connector side I think. If this is done on-demand though, will it not add to the response times as the entire segment might have to be loaded from S3 to Pinot?

@mcvsubbu
Copy link
Contributor

Yes @mcvsubbu let us evaluate the pros and cons of each of these approaches. I mentioned this one specifically as work has already started on the connector side I think. If this is done on-demand though, will it not add to the response times as the entire segment might have to be loaded from S3 to Pinot?

True, but something worth considering

@sajjad-moradi
Copy link
Contributor

Also if lower granularity for older data (>24h) is an option, maybe introducing roll-up can compact older data which lead to lower cost of having pinot servers. In that case, even for older data, query latency will be sub-seconds.

@Aka-shi
Copy link
Contributor

Aka-shi commented Jul 30, 2021

@sajjad-moradi Our trace data has very high cardinality dimensions(trace_id, span_id, etc,.), so compaction won't give us much benefits here(roll-up ratio would be very poor). It would have helped if we are only storing aggregates.

@kishoreg
Copy link
Member

This makes sense and something worth doing. We need two things

  • enhance tiered storage concept to move segments older than X to S3.
  • either enhance presto Pinot connector to directly load data from S3 or enhance Pinot to lazy load segments from S3 on demand. Later option is probably better? Note that the latency will be higher when we lazy load a segment.

Thoughts?

@kishoreg
Copy link
Member

Whether we do it from connector or lazy load in Pinot the latency will be the same. Lazy load might be better if there is ssd one so let of no PoC as l cache on Pinot servers.

@suddendust
Copy link
Contributor Author

suddendust commented Jul 30, 2021

@kishoreg just a thought - Can we do something that doesn't require us to download the complete segment from the S3? Some way of looking "inside" a segment and download only a part of it.

@kishoreg
Copy link
Member

possible but it will be a huge undertaking. we can probably get there in small steps.

@mcvsubbu
Copy link
Contributor

I would vote for lazy load in pinot as a first step. I will be glad to review the design and PRs if you can contribute.

@suddendust
Copy link
Contributor Author

@mcvsubbu Will come up with a design proposal for this soon.

@suddendust
Copy link
Contributor Author

suddendust commented Aug 4, 2021

@mcvsubbu @kishoreg I am thinking along the lines of what was proposed earlier. Introduce a new state like "EVICTED" in the segment ideal-state. A segment can move from ONLINE -> EVICTED (this transition can take place when a segment's retention period expires). The invariant to maintain is that an evicted segment is always present in the deep store. While querying, the broker gets all of the segments to be queried, sees that some of them have been evicted, triggers a download to a sub-set of servers, and then proceeds with its regular business. The segments remain cached in these servers, and can be replaced with some eviction policy if need be. What are your thoughts on this? (I through it will be a good idea to get this approach vetted before I get on with the design).

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Aug 4, 2021

If you are changing the state machine, it is best to draw a picture and float it. We can then look at each state transition. This has to be done in a compatible manner in Helix. @kishoreg can comment more on whether that is easy to do.

Can we even do this without modifying the state machine?

Say, the query lands on the server and the server finds that it does not have the segment. It can then hold the query until downloading the segment from deep store (a lazy download), and return the response to the broker like before. The only thing that needs to be done here is to have a larger time out to allow for the segment to be downloaded.

The table config can set some criteria for segment availability in servers (say, upto 5d recent, or upto 5d after push in refresh use cases). It can also set some retention time (say. 15m).

Such code complexity will also need to be evaluated against memory map on a fast local store (cloud costs for this may be high). You can memory-map 3TB worth segments off an SSD onto (say) 64G of memory, and the OS will pretty much keep these segments in disk all the time .. until a query needs it. And then it will be paged out in a timely manner. We have done this successfully in LinkedIn for several years now.

Have you considered this option, or will that still be more expensive than actually lazy-loading the segment from deep store?

@suddendust
Copy link
Contributor Author

Thanks @mcvsubbu for the review and the pointers. As for the FSM, this is the new state I proposed:

Screenshot 2021-08-05 at 12 00 38 AM

The reason I introduced a new state is because it looked like a cleaner way for the broker to determine that a segment has been purged and has been moved to the deep-store. Certainly, the broker can also determine this by first determining that the segment is absent, and then looking at its S3 location in the segment config. Just that the code will be a bit less clean in this case. With the new state, I was thinking of defining an invariant that a segment moves to this state iff it was successfully uploaded AND its URL was successfully updated in its metadata (so the broker can be sure that the segment was actually uploaded just by looking at the new state, can be helpful in case when the deep-store was bypassed during commit for some reason). But on second thoughts it appears this adds unnecessary complexity.

We haven't really done a cost comparison of lazy-loading vs. mmap on local Pinot servers. But I'll throw some numbers here. Our ingestion rate is increasing quite rapidly and we're looking at around 4-5T/day of data in the next few months (these are conservative numbers). With a retention period of 30 days (again min.), we'll have to store 150T worth of segments on SSDs at any time. Storage costs can be prohibitive with this much data. Not to mention all of this to serve a tiny amount of queries (< 10%) We'll try to do a proper cost analysis of this today.

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Aug 4, 2021

I was just proposing what worked for us in terms of cost. Your mileage may vary, of course.

In terms of state machine, we need to transition out of "Moved" state to ONLINE and OFFLINE states as well (what if a moved segment is deleted)

Lastly, let us ask ourselves why the broker needs to know that a segment has been moved.

@afilipchik
Copy link

+1 for tiered storage. We (@elonazoulay ) had very similar ideas on the Presto-Pinot part (let it scan segments directly and offload real-time serving part).

@kishoreg
Copy link
Member

kishoreg commented Aug 5, 2021

@suddendust this is a pretty big undertaking. @noahprince22 had attempted this before. #6187

@kishoreg
Copy link
Member

kishoreg commented Aug 5, 2021

From your initial diagram, which path are we trying to solve? Presto -> Pinot -> s3 or Presto -> pinot-connector -> s3

@suddendust
Copy link
Contributor Author

suddendust commented Aug 5, 2021

@mcvsubbu @kishoreg here's the updated FSM. I had not marked any transitions out-of MOVED earlier.

Screenshot 2021-08-05 at 11 00 15 AM

We are trying to go with Presto -> Pinot -> S3 (no changes to the connector atm).

I haven't really given a thought to how this design would behave when there are too many segments to load. With 2T/day and 500M segments, we have around 4000 segments per day. With a retention of 30 days, we are looking at 4000 * 30 = 120,000 segments. If someone makes a query that literally queries data for last 30 days, we might have to load all of them (yikes!). May be this can be controlled with a max segment config, as has been done in this PR?

I guess we can avoid much of the complexity by not touching the FSM? Essentially offloading all the download business to the servers itself - They determine they don't have the segment and trigger a download, an respond some time later asynchronously.

Lastly, let us ask ourselves why the broker needs to know that a segment has been moved.

@mcvsubbu I gave it some thought and looks like it doesn't. It is the responsibility of the servers to furnish the segment - Either local or from the deep store. The broker just needs to query them and wait, as it is doing right now. It doesn't need to be concerned about where the segment is.

This implementation does seem to come with a fair bit of warning - Too many segments to download, chatty cluster due to too many state transitions, long download times, etc.

I am sure I am missing some minute but important details here, could definitely use some advice. Thanks :)

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Aug 5, 2021

OK, so we have some conclusions here, let me know if this sounds right.

Happy path:

  • The broker does not need to know whether a segment is in memory or not. So, maybe we don't need to change the Segment State machine, after all (let me know if you think otherwise)
  • When the server gets a request, it needs to handle the case that the segment is not in its own hard disk. So, we need to have the server initiate the load and wait for the segment to load before executing the query.

We will need to change the table config to indicate a few things:

  • When will segments be "unload"ed? (num hours after create/update, maybe one criteria. Total number of loaded segments may be another criteria -- we will unload older ones as the newer ones come in. Maybe there are other ways to specify this as well)
  • Maybe we should limit the number of queries that use deep store segments within a certain time frame. Limiting the overall number of loaded segments may also help, since the other queries may indicate partial results.

Not-so-happy-path:

  • Server may fail the download or take too long that the query has already timed out.
  • Servers may be asked to download too many segments, causing the server to thrash. We can think of a few solutions here. Maybe limit the number of old segments each server will contain. Any more than that, either we return partial or the server ousts one of the segments and downloads another, increasing query latency.

@kishoreg , does something like this seem a reasonable path to lead to a usable feature?

@amrishlal
Copy link
Contributor

I haven't really given a thought to how this design would behave when there are too many segments to load. With 2T/day and 500M segments, we have around 4000 segments per day. With a retention of 30 days, we are looking at 4000 * 30 = 120,000 segments. If someone makes a query that literally queries data for last 30 days, we might have to load all of them (yikes!). May be this can be controlled with a max segment config, as has been done in this PR?

Possibly two things to consider here:

  • I am wondering if data in S3 needs to have the same granularity as the data in Pinot or can we aggregate the data to a higher-level dimension while aging it out to S3? For example, if data in the latest segment has a granularity of 1 second, then data in a segment 10 days old may have a granularity of 10 seconds (thereby reducing the data size by factor of 10), and data 30 days old may have a granularity of 1 hour (thereby reducing the data size by a factor of ~4000).
  • Also, would adding a segment cache between Pinot and S3 help with latency? Usually I would expect some sort of a locality of reference when we pull in data from S3

@kishoreg
Copy link
Member

kishoreg commented Aug 5, 2021

Possibly two things to consider here:

  • I am wondering if data in S3 needs to have the same granularity as the data in Pinot or can we aggregate the data to a higher-level dimension while aging it out to S3? For example, if data in the latest segment has a granularity of 1 second, then data in a segment 10 days old may have a granularity of 10 seconds (thereby reducing the data size by factor of 10), and data 30 days old may have a granularity of 1 hour (thereby reducing the data size by a factor of ~4000).

This is a good point but I consider it more of optimization and should not be designed for this. There are cases where users would not want to compress the granularity. Moreover, one of the main reasoning here is s3 is cheaper and users want to keep the data for long.

  • Also, would adding a segment cache between Pinot and S3 help with latency? Usually I would expect some sort of a locality of reference when we pull in data from S3

This is definitely possible and something to consider once we have the ability to load a segment when we receive the query

@kishoreg
Copy link
Member

kishoreg commented Aug 5, 2021

@kishoreg , does something like this seem a reasonable path to lead to a usable feature?

I don't know. This needs more thought. We took a similar approach last time when Noah Prince wanted to solve it but we abandoned it after hitting a lot of obstacles. should we start a proposal doc this time?

@suddendust
Copy link
Contributor Author

I'll start with the proposal doc for this today.

@suddendust
Copy link
Contributor Author

@kishoreg @mcvsubbu Here is the design proposal on the feature. One section is still in todo (evaluating the performance implications of this feature).

@suddendust
Copy link
Contributor Author

@mcvsubbu @kishoreg any thoughts of this?

@mcvsubbu
Copy link
Contributor

Been a very busy week, I will take a look hopefully this week

@jackjlli
Copy link
Contributor

@mcvsubbu

The broker does not need to know whether a segment is in memory or not. So, maybe we don't need to change the Segment State machine, after all (let me know if you think otherwise)

Broker timeout may be one of the factors for broker to know about the status of the segments, since it takes time for servers to download and load stale segments into memory.

@mcvsubbu
Copy link
Contributor

@suddendust changing the state machine is not what we agreed on. It is a LOT easier to keep the state machine as it is, and pull the segment from deepstore when needed.

@suddendust
Copy link
Contributor Author

suddendust commented Aug 17, 2021

@mcvsubbu We're not changing the SM. I put it there just because it was discussed here. I'll state this more clearly in the doc or remove the section altogether.

Edit: I have stated this in the doc. I did not remove the section as it would have deleted all the associated comments as well.

Edit2: Moved to appendix

@mcvsubbu
Copy link
Contributor

Good. always nice to have the rejected alternatives. Just add them as appendix. thanks. I will go through it again.

@kishoreg
Copy link
Member

I will also review this in detail. The best way to implement this would be to either implement a new implementation of existing interfaces like segment fetcher or enhance/add interfaces. That way this can be implemented as a plugin in the first version and once it's tested in production, we can make the plugin available by default.

Let's try to see if we can achieve this without touching pinot-core.

@suddendust
Copy link
Contributor Author

@mcvsubbu thanks for the thorough review of the proposal. There are some gaps in it that I am working on right now. Will resolve them soon as ask you for a re-review. Thanks again :)

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Sep 8, 2021

Please also add a link o https://docs.google.com/document/d/1Z4FLg3ezHpqvc6zhy0jR6Wi2OL8wLO_lRC6aLkskFgs/edit#heading=h.sbtdm9byxqnc in the design document, and comment on how your solution may work with or complement tiered storage, if we build that.

@suddendust
Copy link
Contributor Author

suddendust commented Sep 13, 2021

Thanks @mcvsubbu, have updated the proposal. I have a question though that I couldn't figure out a solution to. When servers download/evict segments, what would be the best way to update the segment idealstate? Similarly for rebalancing from broker for even distribution of frozen segments. Certainly servers shouldn't be doing this and the controller can't get to know about them.

@mcvsubbu
Copy link
Contributor

@suddendust our current design is to NOT change idealstate. Please conform that this is indeed the case. This is so precisely for the reasons you mention.

One way to let the controller know is via some API, but we will need to manage multiple replicas doing this same thing, etc. We can get to this later. Let us do one version in which the idealstate is not touched.

@suddendust
Copy link
Contributor Author

suddendust commented Sep 13, 2021

@mcvsubbu not adding any new state here. But as segments are downloaded, they'll need to be moved back to ONLINE right before we can serve queries off them.

Edit: The design is essentially moving segments to OFFLINE when they are frozen and to ONLINE when they are thawed.

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Sep 13, 2021

The segments should be in ONLINE state in idealstate. A server may find it not in local store, and download it before serving the query. If you move a segment to be OFFLINE, the broker will not route queries to that segment

@suddendust
Copy link
Contributor Author

suddendust commented Sep 13, 2021

This makes the design quite simpler as I don't have to deal with the vagaries of state transitions. I'll update the doc with this (my thought process was that an unmapped segment should be moved to OFFLINE as an invariant. I was doing this solely to adhere to this).

@suddendust
Copy link
Contributor Author

@mcvsubbu @npawar Just a reminder on the proposal, thanks! As I see it, this has two aspects to it - Segment freezing and thawing. I think segment freezing is relatively simple due to tiered storage that is already implemented. Shall I start with a PoC implementation for it?

@suddendust
Copy link
Contributor Author

@npawar Any updates on this? Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants