Skip to content

Conversation

@rossgray
Copy link
Contributor

@rossgray rossgray commented May 23, 2025

Problem

Batch export queries can consume a considerable amount of ClickHouse resources, particularly when exporting a large amount of data:

  • We keep the connection to ClickHouse open while we export data to the destination
  • If the batch export fails midway through, we have to retry, meaning additional queries to ClickHouse.

Changes

Here, we use an intermediate S3 stage, meaning we export the data directly from ClickHouse to our internal S3 bucket, before then exporting the data from S3 to the final destination.

By separating the reading of data from ClickHouse from the writing of data to the destination, we get some benefits:

  • If we fail to write data to the destination for whatever reason, we can continue from where we left off, and don't need to read the data again from ClickHouse.
  • The reading and writing of data is decoupled, meaning we can optimize these independently. For example, we can control memory consumption in ClickHouse better by controlling how we write data to S3 (eg changing the number of partitions to improve speed/reduce memory usage).
  • We now have a copy of the data we send to the destination, which is useful for debugging.
  • I also removed a significant number of our S3 tests which were just testing different permutations of parametrized inputs. This brings the number down from 1173 to 282 (not all of these run in CI but probably around half do)

Initial version

Note that this is just an initial MVP, and needs thorough testing before rolling it out to all users. Therefore, it will only be enabled for certain teams using an environment variable.

The code could definitely be made nicer. I'm also open to suggestions on structure/organization: I've put most of the new code in a new posthog/temporal/batch_exports/pre_export_stage.py module to keep it separate, but am happy to put it somewhere else.

Limitations

  • I've only implemented for S3 so far
  • Haven't thought too much about heartbeating and how we recover from failure halfway through (our current S3 implementation doesn't support this anyway)
  • We now have 2 main activities instead of 1, so setting an overall start to close timeout is trickier (I am not yet sure how the time will be distributed across the 2 activities for a typical batch export, and it may differ considerably based on the destination)
  • I haven't thought too much about error handling

Questions

  • Which S3 bucket should we use for this? Should we create a new one? Also, should we use our own set of environments variables for this, or is it ok to reuse the existing ones? I think it makes sense to use a new one

TODO

  • Implement versions of all batch export queries (so far only done it for one case)
    • Still need to do this for sessions model but this can be done in a follow up PR
  • Add log_comment to queries
  • Add some tests to make sure the data exported remains the same
  • Test performance & memory usage and see how altering number of partitions affects this. Not sure if I will be able to test this locally or if this will need to be done in dev/prod.
  • Remove some debug logging on ClickHouse memory usage
  • Set up S3 bucket(s)

Did you write or update any docs for this change?

Docs will need updating once this is live for all users.

How did you test this code?

Local testing to ensure data exported looks correct.

Have added some test cases which use the new activities and assert the data is the same as before (although it seems to be in a different order, so have updated the tests to not care about ordering of events exported since I don't think is something we guarantee anyway)

Performance testing

We can improve performance by varying the number of file partitions we use when writing to S3 from ClickHouse.

Query performance for exporting 100k events locally:

Version Memory usage (MiB) Query duration (ms) Entire workflow duration (s)
Using current activity (just reading data from CH) 44.5 196 15.35
Using 1 partition 107 165 15.0
Using 5 partitions 92.3 210 13.9
Using 10 partitions 89.3 237 13.9
Using 20 partitions 85.7 294 14.0

@rossgray rossgray requested a review from a team May 23, 2025 13:44
@tomasfarias
Copy link
Contributor

tomasfarias commented May 23, 2025

By separating the reading of data from ClickHouse from the writing of data to the destination, we get some benefits:

  • If we fail to write data to the destination for whatever reason, we can continue from where we left off, and don't need to read the data again from ClickHouse.

I mean, we have to read it again from S3 instead of CH. I don't see how we can continue from where we left off with S3 (without reading the entire file again). In fact, we had that feature already with CH (as with CH we can query from a new timestamp), but it had to be removed for some reason.

EDIT: I guess what I am asking is, could you clarify this?

@tomasfarias
Copy link
Contributor

tomasfarias commented May 23, 2025

For example, we can control memory consumption in ClickHouse better by controlling how we write data to S3 (eg changing the number of partitions to improve speed/reduce memory usage).

Are you aware if this is even possible? Concretely: Can we impact memory consumption by controlling the number of partitions when exporting to s3?

EDIT: If anything, I would assume that more partitions = more parallelization = more resources can be used to gain more speed. So, we gain speed but in fact use up more memory.

Comment on lines 476 to 512
# TODO - remove this once testing over
# need to wait for query info to become available in system.query_log
await asyncio.sleep(5)
memory_usage = await client.read_query(
f"SELECT formatReadableSize(memory_usage) as memory_used FROM system.query_log WHERE query_id = '{query_id}' AND type='QueryFinish' ORDER BY event_time DESC LIMIT 1",
)
await logger.ainfo(f"Query memory usage = {memory_usage.decode('utf-8').strip()}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe consider putting this in a background task instead so as to not block the main thread? I know we are testing but still: What if it takes longer than 5 seconds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I can probably just remove this now

@rossgray
Copy link
Contributor Author

I mean, we have to read it again from S3 instead of CH. I don't see how we can continue from where we left off with S3 (without reading the entire file again). In fact, we had that feature already with CH (as with CH we can query from a new timestamp), but it had to be removed for some reason.

yeah sorry, I meant we can continue from the 2nd activity (reading from S3) rather than needing to read the data again from ClickHouse.

We would indeed need to find a better way to handle failures (for example, storing multi-part upload progress in the DB)

'$s3_secret',
'Parquet'
)
PARTITION BY rand() %% 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't read that far so maybe this is covered later, if so just tell me to look it up later:

This partitioning scheme doesn't consider data size. We could end up with a 100 row export (very light) partitioned into up to 10 files (assuming a uniform distribution, each one should be expected to have 10 rows).

Why did we choose 10? Are we considering that these 10 files will need to be joined together later?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found my solution for one of the two questions: We are letting arrow deal with multiple files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly, whereas partitioning will result in faster writes from clickhouse, it will result in slower reads from us. As we HAVE to read all files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure where to position ourselves with this tradeoff, I think we'll have to experiment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good questions!

10 is just an arbitrary value I picked for now - as mentioned, I want to do some more testing of this to see how it affects performance and memory usage.

Initially, I was thinking of keeping the number of partitions constant in order to make the load on ClickHouse as predictable as possible, which I think is more important than file size.

We could try partitioning based on max file size but then if there's a lot of data we could be trying to write say 100 partitions at once, which I presume would consume a lot of memory in CH (again, just an assumption at this point). I suppose we could make it dynamic with an upper limit but at this point I'm not sure if there is much benefit or not.

I chose to use pyarrow.datasets to read in the data from S3. I have not worked with it before but it sounds like a very performant way of reading in data from S3 which could be contained in multiple files, and also has the benefit of working with our existing code (our Consumer expects a queue of RecordBatches).

If we wanted to copy the data directly from our own S3 to the customer's S3 then I agree, we would probably want to control the creation of these files but for now I think it's better to keep the implementation generic so it can be used across all destinations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we wanted to copy the data directly from our own S3 to the customer's S3 then I agree

Nah, I don't think I would want to go that way personally. I think keeping the extra level of indirection is valuable even for S3.

Maaaaaaybe later down the line as a super-optimization for some really time sensitive exports, but otherwise no.

Copy link
Contributor

@tomasfarias tomasfarias May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose to use pyarrow.datasets to read in the data from S3.

Yeah, let's talk about this one. I've pinged you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I was thinking of keeping the number of partitions constant in order to make the load on ClickHouse as predictable as possible, which I think is more important than file size.

Keep in mind that we will now have to deal with a new kind of memory pressure: Pod memory pressure. I think this is also impacted by the choice of parquet format. I've brought this up in the thread I've opened up with you too.

@rossgray
Copy link
Contributor Author

Are you aware if this is even possible? Concretely: Can we impact memory consumption by controlling the number of partitions when exporting to s3?

I still need to test this out. This is just an assumption from what I read in the docs:

It is unlikely you will want to export your data as a single file. Most tools, including ClickHouse, will achieve higher throughput performance when reading and writing to multiple files due to the possibility of parallelism. We could execute our INSERT command multiple times, targeting a subset of the data. ClickHouse offers a means of automatic splitting files using a PARTITION key.

I assume writing to multiple files at once would be a lot faster but at the cost of higher memory usage, but still need to test it. I could also try playing around with different file formats & compression.

@tomasfarias
Copy link
Contributor

tomasfarias commented May 23, 2025

I still need to test this out. This is just an assumption from what I read in the docs:

Yeah, I read the same docs and arrived at the same assumption:

I assume writing to multiple files at once would be a lot faster but at the cost of higher memory usage

I just asked because the PR calls out a potential benefit to memory usage, whereas in fact the benefit would be in speed at the cost of memory usage:

(eg changing the number of partitions to improve speed/reduce memory usage).

EDIT: As we can't have negative partitions of course!


class ProducerFromInternalS3Stage:
"""
This is an alernative implementation of the `spmc.Producer` class that reads data from the internal S3 staging area.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we could inherit from spmc.Producer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe fine to do this later though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method signatures are different so not sure it would help that much to inherit, apart from documentation. Also, if we decide to migrate to this new architecture for all teams then we'll no longer need the old Producer anyway


# Read in batches
try:
for batch in dataset.to_batches():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very blocking

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will need to extend asyncpa to support this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah, internally threads are used, we may not be able to use async python at all without changing that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pushed a commit to fix this: edb36d7 (#32594)

@rossgray rossgray force-pushed the batch-exports-pre-export-stage branch 2 times, most recently from 18b1579 to edb36d7 Compare June 2, 2025 11:06
@rossgray rossgray marked this pull request as ready for review June 3, 2025 12:07
@rossgray rossgray requested a review from tomasfarias June 3, 2025 12:09
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Summary

This PR introduces a two-stage batch export process that first writes data to an internal S3 staging area before exporting to the final destination. The change aims to improve resource management and error recovery by decoupling data reading from ClickHouse and writing operations.

  • Adds new pre_export_stage.py module implementing the core S3 staging functionality with configurable partitioning for performance optimization
  • Introduces feature flag BATCH_EXPORT_USE_INTERNAL_S3_STAGE_TEAM_IDS to gradually roll out the feature to specific teams
  • Improves memory usage and query performance through configurable partitioning (5 partitions by default) when writing to S3
  • Adds ability to resume failed exports from S3 stage without re-querying ClickHouse, reducing database load
  • Maintains data copies in S3 staging for debugging purposes while properly cleaning up after successful exports

14 file(s) reviewed, 5 comment(s)
Edit PR Review Bot Settings | Greptile

BATCH_EXPORT_USE_INTERNAL_S3_STAGE_TEAM_IDS: list[str] = get_list(
os.getenv("BATCH_EXPORT_USE_INTERNAL_S3_STAGE_TEAM_IDS", "")
)
BATCH_EXPORT_INTERNAL_STAGING_BUCKET: str = os.getenv("BATCH_EXPORT_INTERNAL_STAGING_BUCKET", "posthog")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Default bucket name 'posthog' is too generic and could conflict with existing buckets. Consider a more specific default like 'posthog-batch-export-staging'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is mainly for the local environment where a posthog bucket is used elsewhere

],
select_from=ast.JoinExpr(table=ast.Field(chain=["sessions"])),
order_by=[ast.OrderExpr(expr=ast.Field(chain=["_inserted_at"]), order="ASC")],
# TODO: Add log_comment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: TODO comment about adding log_comment should be removed since log_comment has been implemented in all new queries

Suggested change
# TODO: Add log_comment
# log_comment is used in all export queries

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it hasn't been implemented for sessions yet

Comment on lines 959 to 965
# TODO - remove this once testing over
# need to wait for query info to become available in system.query_log
await asyncio.sleep(5)
memory_usage = await client.read_query(
f"SELECT formatReadableSize(memory_usage) as memory_used FROM system.query_log WHERE query_id = '{query_id}' AND type='QueryFinish' ORDER BY event_time DESC LIMIT 1",
)
await self.logger.ainfo(f"Query memory usage = {memory_usage.decode('utf-8').strip()}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Remove debug logging block before production deployment

is_backfill: bool = False
# TODO - pass these in to all inherited classes
batch_export_id: str | None = None
destination_default_fields: list[BatchExportField] | None = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is BatchExportField serializable? Probably since it's a dict I guess...

Comment on lines 563 to 568
# TODO - should we use our own set of env vars for this?
# TODO - check these are available in production workers
aws_access_key_id=settings.OBJECT_STORAGE_ACCESS_KEY_ID,
aws_secret_access_key=settings.OBJECT_STORAGE_SECRET_ACCESS_KEY,
endpoint_url=settings.OBJECT_STORAGE_ENDPOINT,
region_name=settings.OBJECT_STORAGE_REGION,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar with these env variables, so I think we should look up what they are used for, just in case they don't get changed from under us.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be easier to just use our own.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I've confirmed we'll be using our own bucket with existing credentials

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update this

Comment on lines 515 to 516
ORDER BY
_inserted_at, event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get rid of this.

Copy link
Contributor

@tomasfarias tomasfarias left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll need to test this with some real data to make the call. Initially, I have high hopes. Let's just get rid of the ordering constraints before shipping.

@rossgray rossgray force-pushed the batch-exports-pre-export-stage branch 3 times, most recently from f2270a8 to 4cf4bcf Compare June 4, 2025 12:43
@rossgray rossgray force-pushed the batch-exports-pre-export-stage branch from 8d8b715 to 7b1d3e5 Compare June 5, 2025 09:14
@rossgray rossgray merged commit 2b77e0d into master Jun 5, 2025
95 checks passed
@rossgray rossgray deleted the batch-exports-pre-export-stage branch June 5, 2025 10:39
rossgray added a commit that referenced this pull request Jun 5, 2025
…nal S3 stage prior to exporting (#32594)"

This reverts commit 2b77e0d.
jose-sequeira pushed a commit that referenced this pull request Jun 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants