Skip to content

[SUPPORT] Best method to have Hudi process single stream from multiple source tables #2355

@ghost

Description

We have a work flow that publishes data changes (CDC) from multiple tables to a single Kinesis stream. Each Kinesis record contains the schema and field values. Our Spark application is using Spark structured stream to query new data changes from Kinesis. The structured stream is using foreachBatch, which contains logic some logic to retrieve the schema and field values from the Kinesis records and determine the S3 output path. Within a single foreachBatch of a Spark structured stream, we are processing data from multiple source tables. Towards the end of foreachBatch, we use Hudi and save the Hudi-generated parquet files on S3.

Here is a general idea of how this works:

  1. Spark queries Kinesis and retrieves a batch of data
  2. Batch of data (Dataframe) enters foreachBatch block
  3. foreachBatch logic will group the Dataframe by source tables. For each source table, then we retrieve schema information and create a new dataframe that represents the table being processed. This table dataframe is then processed by Hudi. The for-loop continues until all grouped/distinct tables within the batch is done.
  4. Repeat steps 1-3 for next Spark batch.

Assuming it takes 2 minutes to process changed data from a single source table, if the Spark batch contains changed data from 10 tables, then it may take 20 minutes for the Spark batch to complete as the logic provided above will only process changed data 1 table at a time.

Another method I've read about is to publish changed data from a single table to a single Kinesis stream, but this approach is difficult to manage as we have thousands of tables. This approach would mean if we have 10,000 source tables, then we'll need to push to 10,000 Kinesis streams, then have separate Spark structured streams for each Kinesis stream. Basically a 1:1:1 of Source table : kinesis stream : spark stream.

Is there another way to speed up the process we have currently? Is it a good idea to implement threading in Spark foreachBatch and have it process multiple tables at the same time?

I'm open for thoughts :)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions