Skip to content

Support data preprocessing in Spark framework#7299

Open
jackjlli wants to merge 1 commit intomasterfrom
support-spark-preprocessing
Open

Support data preprocessing in Spark framework#7299
jackjlli wants to merge 1 commit intomasterfrom
support-spark-preprocessing

Conversation

@jackjlli
Copy link
Member

@jackjlli jackjlli commented Aug 13, 2021

Description

Currently the data preprocessing Hadoop job is located in pinot-hadoop module. And there is no data preprocessing Spark job. In order to reused some of the logic of data preprocessing (e.g. support data in AVRO and ORC format) for both MR and Spark framework, the common code is moved from pinot-hadoop to pinot-ingestion-common module.

This PR:

  • adds data preprocessing support for Spark.
  • refactors some code from pinot-hadoop to pinot-ingestion-common so that some part of the code can be reused for both MR and Spark job.
  • both AVRO and ORC formats are supported in both MR and Spark job.

Upgrade Notes

Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)

  • Yes (Please label as backward-incompat, and complete the section below on Release Notes)

Does this PR fix a zero-downtime upgrade introduced earlier?

  • Yes (Please label this as backward-incompat, and complete the section below on Release Notes)

Does this PR otherwise need attention when creating release notes? Things to consider:

  • New configuration options
  • Deprecation of configurations
  • Signature changes to public methods/interfaces
  • New plugins added or old plugins removed
  • Yes (Please label this PR as release-notes and complete the section on Release Notes)

Release Notes

Documentation

@codecov-commenter
Copy link

codecov-commenter commented Aug 13, 2021

Codecov Report

Merging #7299 (cf0d18c) into master (ce2c367) will decrease coverage by 31.08%.
The diff coverage is 0.00%.

❗ Current head cf0d18c differs from pull request most recent head 475542c. Consider uploading reports for the commit 475542c to get more accurate results
Impacted file tree graph

@@              Coverage Diff              @@
##             master    #7299       +/-   ##
=============================================
- Coverage     70.39%   39.30%   -31.09%     
+ Complexity     3299       92     -3207     
=============================================
  Files          1508     1526       +18     
  Lines         74754    75359      +605     
  Branches      10846    10951      +105     
=============================================
- Hits          52621    29618    -23003     
- Misses        18508    43482    +24974     
+ Partials       3625     2259     -1366     
Flag Coverage Δ
integration1 30.72% <ø> (?)
integration2 29.15% <ø> (-0.03%) ⬇️
unittests1 ?
unittests2 14.37% <0.00%> (-0.14%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
.../pinot/ingestion/jobs/SegmentPreprocessingJob.java 0.00% <0.00%> (ø)
...estion/preprocess/AvroDataPreprocessingHelper.java 0.00% <0.00%> (ø)
.../ingestion/preprocess/DataPreprocessingHelper.java 0.00% <0.00%> (ø)
...ion/preprocess/DataPreprocessingHelperFactory.java 0.00% <ø> (ø)
...gestion/preprocess/OrcDataPreprocessingHelper.java 0.00% <0.00%> (ø)
...reprocess/mappers/AvroDataPreprocessingMapper.java 0.00% <ø> (ø)
...preprocess/mappers/OrcDataPreprocessingMapper.java 0.00% <ø> (ø)
...preprocess/mappers/SegmentPreprocessingMapper.java 0.00% <ø> (ø)
...partitioners/AvroDataPreprocessingPartitioner.java 0.00% <ø> (ø)
...on/preprocess/partitioners/GenericPartitioner.java 0.00% <ø> (ø)
... and 899 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ce2c367...475542c. Read the comment docs.

@kishoreg
Copy link
Member

is this needed only for v0_deprecated/spark? Can you please add more to the description on what is the change and why it's needed.

@jackjlli
Copy link
Member Author

is this needed only for v0_deprecated/spark? Can you please add more to the description on what is the change and why it's needed.

I've updated the description of the PR. Basically this PR is to add the data preprocessing Spark job in pinot. And since some logic can be reused for both MR and Spark jobs, I refactored some code from pinot-hadoop module to pinot-ingestion-common module. At LinkedIn this data preprocessing job has already been used along with the PinotBuildAndPushJob from v0_deprecated module.

@jackjlli jackjlli force-pushed the support-spark-preprocessing branch from 3bdd50c to aa0792c Compare August 18, 2021 17:47
// Repartition and sort within partitions.
Comparator<Object> comparator = new SparkDataPreprocessingComparator();
JavaPairRDD<Object, Row> partitionedSortedPairRDD =
pairRDD.repartitionAndSortWithinPartitions(sparkPartitioner, comparator);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sometimes use coalesce() (then sortWithinPartitions()) can be more efficient than repartition(). is it feasible to allow coalesce()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! The coalesce() method uses its default partition function which is not murmur2 for partitioning, and there is no way to specify our custom partition function. That's why repartitionAndSortWithinPartitions is used here, as it provides the ability to specify our own custom partition function here.


@Override
public int getPartition(Object key) {
SparkDataPreprocessingJobKey jobKey = (SparkDataPreprocessingJobKey) key;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just call generatePartitionId() in this method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key of the KV pair is not only used for partitioning but also used for sorting purpose. It'd be good to generate the partition index in order to minimize the footprint of the key, since any data type can be the partition column (e.g. string).

@xiangfu0
Copy link
Contributor

Shall we consider use pinot-ingestion-spark module?

@jackjlli
Copy link
Member Author

Shall we consider use pinot-ingestion-spark module?

Good question! I also thought about putting it to pinot-ingestion-spark module, while this data preprocessing job for Spark uses common code which is also used for the preprocessing MR job in pinot-hadoop module. That's why I put it under the same 'v0_deprecated/' directory for now. The core logic should be simple and can be easily put to pinot-ingestion-spark module later on.

@jackjlli jackjlli force-pushed the support-spark-preprocessing branch 2 times, most recently from f9204f7 to 475542c Compare August 23, 2021 22:37
@jackjlli jackjlli force-pushed the support-spark-preprocessing branch from 475542c to e2e92fc Compare November 17, 2021 00:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants