Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/_data/authors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ vinoyang:
name: Vino Yang
web: https://cwiki.apache.org/confluence/display/~vinoyang


vbalaji:
name: Balaji Varadarajan
web: https://cwiki.apache.org/confluence/display/~vbalaji
175 changes: 175 additions & 0 deletions docs/_posts/2020-08-20-efficient-migration-of-large-parquet-tables.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
---
title: "Efficient Migration of Large Parquet Tables to Apache Hudi"
excerpt: "Migrating a large parquet table to Apache Hudi without having to rewrite the entire dataset."
author: vbalaji
category: blog
---

We will look at how to migrate a large parquet table to Hudi without having to rewrite the entire dataset.


# Motivation:

Apache Hudi maintains per record metadata to perform core operations such as upserts and incremental pull. To take advantage of Hudi’s upsert and incremental processing support, users would need to rewrite their whole dataset to make it an Apache Hudi table. Hudi 0.6.0 comes with an ***experimental feature*** to support efficient migration of large Parquet tables to Hudi without the need to rewrite the entire dataset.


# High Level Idea:

## Per Record Metadata:

Apache Hudi maintains record level metadata for perform efficient upserts and incremental pull.

![Per Record Metadata](/assets/images/blog/2020-08-20-per-record.png)

Apache HUDI physical file contains 3 parts

1. For each record, 5 HUDI metadata fields with column indices 0 to 4
1. For each record, the original data columns that comprises the record (Original Data)
1. Additional Hudi Metadata at file footer for index lookup

The parts (1) and (3) constitute what we term as “Hudi skeleton”. Hudi skeleton contains additional metadata that it maintains in each physical parquet file for supporting Hudi primitives. The conceptual idea is to decouple Hudi skeleton data from original data (2). Hudi skeleton can be stored in a Hudi file while the original data is stored in an external non-Hudi file. A migration of large parquet would result in creating only Hudi skeleton files without having to rewrite original data.

![skeleton](/assets/images/blog/2020-08-20-skeleton.png)

# Design Deep Dive:

For a deep dive on the internals, please take a look at the [RFC document](https://cwiki.apache.org/confluence/display/HUDI/RFC+-+12+%3A+Efficient+Migration+of+Large+Parquet+Tables+to+Apache+Hudi)

# Migration:

Hudi supports 2 modes when migrating parquet tables. We will use the term bootstrap and migration interchangeably in this document.

* METADATA_ONLY : In this mode, record level metadata alone is generated for each source record and stored in new bootstrap location.
* FULL_RECORD : In this mode, record level metadata is generated for each source record and both original record and metadata for each record copied

You can pick and choose these modes at partition level. One of the common strategy would be to use FULL_RECORD mode for a small set of "hot" partitions which are accessed more frequently and METADATA_ONLY for a larger set of "warm" partitions.


## Query Engine Support:
For a METADATA_ONLY bootstrapped table, Spark - data source, Spark-Hive and native Hive query engines are supported. Presto support is in the works.

## Ways To Migrate :

There are 2 ways to migrate a large parquet table to Hudi.

- Spark Datasource Write
- Hudi DeltaStreamer

We will look at how to migrate using both these approaches.

## Configurations:

These are bootstrap specific configurations that needs to be set in addition to regular hudi write configurations.


|Configuration Name | Default | Mandatory ? | Description |
|---|---|---|---|
|hoodie.bootstrap.base.path| | Yes |Base Path of source parquet table.|
|hoodie.bootstrap.parallelism | 1500 | Yes | Spark Parallelism used when running bootstrap |
|hoodie.bootstrap.keygen.class | |Yes |Bootstrap Index internally used by Hudi to map Hudi skeleton and source parquet files. |
|hoodie.bootstrap.mode.selector | org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector | Yes |Bootstap Mode Selector class. By default, Hudi employs METADATA_ONLY boostrap for all partitions. |
|hoodie.bootstrap.partitionpath.translator.class |org.apache.hudi.client.bootstrap.translator. IdentityBootstrapPartitionPathTranslator | No | For METADATA_ONLY bootstrap, this class allows customization of partition paths used in Hudi target dataset. By default, no customization is done and the partition paths reflects what is available in source parquet table. |
|hoodie.bootstrap.full.input.provider| org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider|No |For FULL_RECORD bootstrap, this class provides the input RDD of Hudi records to write. |
| hoodie.bootstrap.mode.selector.regex.mode |METADATA_ONLY |No |Bootstrap Mode used when the partition matches the regex pattern in hoodie.bootstrap.mode.selector.regex . Used only when hoodie.bootstrap.mode.selector set to BootstrapRegexModeSelector. |
| hoodie.bootstrap.mode.selector.regex |\.\* |No |Partition Regex used when hoodie.bootstrap.mode.selector set to BootstrapRegexModeSelector. |

## Spark Data Source:

Here, we use a Spark Datasource Write to perform bootstrap.
Here is an example code snippet to perform METADATA_ONLY bootstrap.


```properties
import org.apache.hudi.{DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
import org.apache.hudi.keygen.SimpleKeyGenerator
import org.apache.spark.sql.SaveMode

val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write
.format("hudi")
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, classOf[SimpleKeyGenerator].getName)
.mode(SaveMode.Overwrite)
.save(basePath)
```

Here is an example code snippet to perform METADATA_ONLY bootstrap for August 20 2020 - August 29 2020 partitions and FULL_RECORD bootstrap for other partitions.


```properties
import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
import org.apache.hudi.client.bootstrap.selector.BootstrapRegexModeSelector
import org.apache.hudi.{DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
import org.apache.hudi.keygen.SimpleKeyGenerator
import org.apache.spark.sql.SaveMode

val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write
.format("hudi")
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, classOf[SimpleKeyGenerator].getName)
.option(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR, classOf[BootstrapRegexModeSelector].getName)
.option(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR_REGEX, "2020/08/2[0-9]")
.option(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR_REGEX_MODE, "METADATA_ONLY")
.option(HoodieBootstrapConfig.FULL_BOOTSTRAP_INPUT_PROVIDER, classOf[SparkParquetBootstrapDataProvider].getName)
.mode(SaveMode.Overwrite)
.save(basePath)
```

## Hoodie DeltaStreamer:

Hoodie Deltastreamer allows bootstrap to be performed using --run-bootstrap command line option.

If you are planning to use delta-streamer after the initial boostrap to incrementally ingest data to the new hudi dataset, you need to pass either --checkpoint or --initial-checkpoint-provider to set the initial checkpoint for the deltastreamer.

Here is an example for running METADATA_ONLY bootstrap using Delta Streamer.

```properties
spark-submit --package org.apache.hudi:hudi-spark-bundle_2.11:0.6.0
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--run-bootstrap \
--target-base-path <Hudi_Base_Path> \
--target-table <Hudi_Table_Name> \
--props <props_file> \
--checkpoint <initial_checkpoint_if_you_are_going_to_use_deltastreamer_to_incrementally_ingest> \
--hoodie-conf hoodie.bootstrap.base.path=<Parquet_Source_base_Path> \
--hoodie-conf hoodie.datasource.write.recordkey.field=_row_key \
--hoodie-conf hoodie.datasource.write.partitionpath.field=datestr \
--hoodie-conf hoodie.bootstrap.keygen.class=org.apache.hudi.keygen.SimpleKeyGenerator
```


```properties
spark-submit --package org.apache.hudi:hudi-spark-bundle_2.11:0.6.0
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--run-bootstrap \
--target-base-path <Hudi_Base_Path> \
--target-table <Hudi_Table_Name> \
--props <props_file> \
--checkpoint <initial_checkpoint_if_you_are_going_to_use_deltastreamer_to_incrementally_ingest> \
--hoodie-conf hoodie.bootstrap.base.path=<Parquet_Source_base_Path> \
--hoodie-conf hoodie.datasource.write.recordkey.field=_row_key \
--hoodie-conf hoodie.datasource.write.partitionpath.field=datestr \
--hoodie-conf hoodie.bootstrap.keygen.class=org.apache.hudi.keygen.SimpleKeyGenerator \
--hoodie-conf hoodie.bootstrap.full.input.provider=org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider \
--hoodie-conf hoodie.bootstrap.mode.selector=org.apache.hudi.client.bootstrap.selector.BootstrapRegexModeSelector \
--hoodie-conf hoodie.bootstrap.mode.selector.regex="2020/08/2[0-9]" \
--hoodie-conf hoodie.bootstrap.mode.selector.regex.mode=METADATA_ONLY
```

## Known Caveats
1. Need proper defaults for the bootstrap config : hoodie.bootstrap.full.input.provider. Here is the [ticket](https://issues.apache.org/jira/browse/HUDI-1213)
1. DeltaStreamer manages checkpoints inside hoodie commit files and expects checkpoints in previously committed metadata. Users are expected to pass checkpoint or initial checkpoint provider when performing bootstrap through deltastreamer. Such support is not present when doing bootstrap using Spark Datasource. Here is the [ticket](https://issues.apache.org/jira/browse/HUDI-1214).
99 changes: 99 additions & 0 deletions docs/_posts/2020-08-21-async-compaction-deployment-model.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
---
title: "Async Compaction Deployment Models"
excerpt: "Mechanisms for executing compaction jobs in Hudi asynchronously"
author: vbalaji
category: blog
---

We will look at different deployment models for executing compactions asynchronously.

# Compaction

For Merge-On-Read table, data is stored using a combination of columnar (e.g parquet) + row based (e.g avro) file formats.
Updates are logged to delta files & later compacted to produce new versions of columnar files synchronously or
asynchronously. One of th main motivations behind Merge-On-Read is to reduce data latency when ingesting records.
Hence, it makes sense to run compaction asynchronously without blocking ingestion.


# Async Compaction

Async Compaction is performed in 2 steps:

1. ***Compaction Scheduling***: This is done by the ingestion job. In this step, Hudi scans the partitions and selects **file
slices** to be compacted. A compaction plan is finally written to Hudi timeline.
1. ***Compaction Execution***: A separate process reads the compaction plan and performs compaction of file slices.


# Deployment Models

There are few ways by which we can execute compactions asynchronously.

## Spark Structured Streaming

With 0.6.0, we now have support for running async compactions in Spark
Structured Streaming jobs. Compactions are scheduled and executed asynchronously inside the
streaming job. Async Compactions are enabled by default for structured streaming jobs
on Merge-On-Read table.

Here is an example snippet in java

```properties
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;

import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.ProcessingTime;


DataStreamWriter<Row> writer = streamingInput.writeStream().format("org.apache.hudi")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY(), operationType)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(), tableType)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, "10")
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY(), "true")
.option(HoodieWriteConfig.TABLE_NAME, tableName).option("checkpointLocation", checkpointLocation)
.outputMode(OutputMode.Append());
writer.trigger(new ProcessingTime(30000)).start(tablePath);
```

## DeltaStreamer Continuous Mode
Hudi DeltaStreamer provides continuous ingestion mode where a single long running spark application
ingests data to Hudi table continuously from upstream sources. In this mode, Hudi supports managing asynchronous
compactions. Here is an example snippet for running in continuous mode with async compactions

```properties
spark-submit --packages org.apache.hudi:hudi-utilities-bundle_2.11:0.6.0 \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--table-type MERGE_ON_READ \
--target-base-path <hudi_base_path> \
--target-table <hudi_table> \
--source-class org.apache.hudi.utilities.sources.JsonDFSSource \
--source-ordering-field ts \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--props /path/to/source.properties \
--continous
```

## Hudi CLI
Hudi CLI is yet another way to execute specific compactions asynchronously. Here is an example

```properties
hudi:trips->compaction run --tableName <table_name> --parallelism <parallelism> --compactionInstant <InstantTime>
...
```

## Hudi Compactor Script
Hudi provides a standalone tool to also execute specific compactions asynchronously. Here is an example

```properties
spark-submit --packages org.apache.hudi:hudi-utilities-bundle_2.11:0.6.0 \
--class org.apache.hudi.utilities.HoodieCompactor \
--base-path <base_path> \
--table-name <table_name> \
--instant-time <compaction_instant> \
--schema-file <schema_file>
```
Binary file added docs/assets/images/blog/2020-08-20-per-record.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/assets/images/blog/2020-08-20-skeleton.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.