# Apache Hudi Core Conceptions (6) - COW: Bloom Index

## 1. Configuration

In [1]:
%%configure -f
{
    "conf" : {
        "spark.jars":"hdfs:///tmp/hudi-spark-bundle.jar",            
        "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
        "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
        "spark.sql.catalog.spark_catalog":"org.apache.spark.sql.hudi.catalog.HoodieCatalog"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
102,application_1677377031637_0141,spark,idle,Link,Link,,


In [2]:
%%sh
# deploy hudi bundle jar
hdfs dfs -copyFromLocal -f /usr/lib/hudi/hudi-spark-bundle.jar /tmp/hudi-spark-bundle.jar
hdfs dfs -ls /tmp/hudi-spark-bundle.jar
# deploy hudi-stat.sh - a utility shell script 
wget https://github.com/bluishglc/hudi-core-conceptions/releases/download/v1.0/hudi-stat.sh -O ~/hudi-stat.sh &>/dev/null
chmod a+x ~/hudi-stat.sh
ls ~/hudi-stat.sh

-rw-r--r--   1 emr-notebook hdfsadmingroup   61421977 2023-03-04 08:24 /tmp/hudi-spark-bundle.jar
/home/emr-notebook/hudi-stat.sh


In [3]:
%%html
<style>
table {float:left}
</style>

## 2. Test Case 1 - Gradually Increase

如果要观察到符合预期的文件创建和split行为，需保证如下两点：
1. 每条记录的大小应基本相同 => 否则Hudi估算出的单条记录平均大小会很不准确，进而会影响到Hudi估算的记录数，输入数据的，以便进行split
2. 手动设置记录的平均大小 => 便于Hudi准确估算记录数，已便进行符合预期的文件split

[0MB, 100MB) -> [100MB, 120MB) -> [120MB, )

KEY|VALUE
:---|:---
hoodie.index.type|SIMPLE
hoodie.copyonwrite.record.size.estimate|175

### 2.1. Set Variables

In [4]:
%%sql
set TABLE_NAME=reviews_cow_bloom_1

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
103,application_1677377031637_0142,spark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [5]:
%env TABLE_NAME=reviews_cow_bloom_1

env: TABLE_NAME=reviews_cow_bloom_1


In [6]:
%%sql
set TABLE_PATH=s3://glc-examples/hudi-core-conceptions/reviews_cow_bloom_1

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [7]:
%env TABLE_PATH=s3://glc-examples/hudi-core-conceptions/reviews_cow_bloom_1

env: TABLE_PATH=s3://glc-examples/hudi-core-conceptions/reviews_cow_bloom_1


## 2.2 Create Table

In [8]:
%%sh
echo $(basename $TABLE_PATH)
aws s3 rm $TABLE_PATH --recursive &>/dev/null
rm -rf ~/${TABLE_NAME}
sleep 5

reviews_cow_bloom_1


In [9]:
%%sql
drop table if exists ${TABLE_NAME}

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [10]:
%%sql
create table if not exists ${TABLE_NAME}(
    review_id string, 
    star_rating int, 
    review_body string, 
    review_date date, 
    year long,
    timestamp long,
    parity int
)
using hudi
location '${TABLE_PATH}'
partitioned by (parity)
options ( 
    type = 'cow',  
    primaryKey = 'review_id', 
    preCombineField = 'timestamp',
    hoodie.index.type = 'BLOOM',
    hoodie.copyonwrite.record.size.estimate = '175'
);

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

### 2.3. Batch 1 - Insert ( 0 -> 96MB / Partition )

In [11]:
%%sql
insert into 
    ${TABLE_NAME}
select 
    review_id, 
    star_rating, 
    review_body, 
    review_date, 
    year,
    unix_timestamp(current_timestamp()) as timestamp,
    mod(crc32(review_id), 2) as parity
from
    reviews
where
    year = 2003;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [12]:
%%sh
~/hudi-stat.sh $TABLE_PATH timeline commits storage


[ TIMELINE ]

╔═════╤═══════════════════╤════════╤═══════════╤═════════════╤═════════════╤═════════════╗
║ No. │ Instant           │ Action │ State     │ Requested   │ Inflight    │ Completed   ║
║     │                   │        │           │ Time        │ Time        │ Time        ║
╠═════╪═══════════════════╪════════╪═══════════╪═════════════╪═════════════╪═════════════╣
║ 0   │ 20230304082527210 │ commit │ COMPLETED │ 03-04 08:25 │ 03-04 08:25 │ 03-04 08:26 ║
╚═════╧═══════════════════╧════════╧═══════════╧═════════════╧═════════════╧═════════════╝

[ COMMITS ]

╔═══════════════════╤═════════════════════╤═══════════════════╤═════════════════════╤══════════════════════════╤═══════════════════════╤══════════════════════════════╤══════════════╗
║ CommitTime        │ Total Bytes Written │ Total Files Added │ Total Files Updated │ Total Partitions Written │ Total Records Written │ Total Update Records Written │ Total Errors ║
╠═══════════════════╪═════════════════════╪════════════════

### 2.4. Batch 2 - Insert ( 96MB -> 110MB / Partition )

In [13]:
%%sql
insert into 
    ${TABLE_NAME}
select 
    review_id, 
    star_rating, 
    review_body, 
    review_date, 
    year,
    unix_timestamp(current_timestamp()) as timestamp,
    mod(crc32(review_id), 2) as parity
from
    reviews
where
    year = 1998;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [14]:
%%sh
~/hudi-stat.sh $TABLE_PATH timeline commits storage


[ TIMELINE ]

╔═════╤═══════════════════╤════════╤═══════════╤═════════════╤═════════════╤═════════════╗
║ No. │ Instant           │ Action │ State     │ Requested   │ Inflight    │ Completed   ║
║     │                   │        │           │ Time        │ Time        │ Time        ║
╠═════╪═══════════════════╪════════╪═══════════╪═════════════╪═════════════╪═════════════╣
║ 0   │ 20230304082527210 │ commit │ COMPLETED │ 03-04 08:25 │ 03-04 08:25 │ 03-04 08:26 ║
╟─────┼───────────────────┼────────┼───────────┼─────────────┼─────────────┼─────────────╢
║ 1   │ 20230304082751852 │ commit │ COMPLETED │ 03-04 08:27 │ 03-04 08:28 │ 03-04 08:28 ║
╚═════╧═══════════════════╧════════╧═══════════╧═════════════╧═════════════╧═════════════╝

[ COMMITS ]

╔═══════════════════╤═════════════════════╤═══════════════════╤═════════════════════╤══════════════════════════╤═══════════════════════╤══════════════════════════════╤══════════════╗
║ CommitTime        │ Total Bytes Written │ Total Files Adde

### 2.5. Batch 3 - Insert ( 110MB -> 113.7MB / Partition )

In [15]:
%%sql
insert into 
    ${TABLE_NAME}
select 
    review_id, 
    star_rating, 
    review_body, 
    review_date, 
    year,
    unix_timestamp(current_timestamp()) as timestamp,
    mod(crc32(review_id), 2) as parity
from
    reviews
where
    year = 1997;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [16]:
%%sh
~/hudi-stat.sh $TABLE_PATH timeline commits storage


[ TIMELINE ]

╔═════╤═══════════════════╤════════╤═══════════╤═════════════╤═════════════╤═════════════╗
║ No. │ Instant           │ Action │ State     │ Requested   │ Inflight    │ Completed   ║
║     │                   │        │           │ Time        │ Time        │ Time        ║
╠═════╪═══════════════════╪════════╪═══════════╪═════════════╪═════════════╪═════════════╣
║ 0   │ 20230304082527210 │ commit │ COMPLETED │ 03-04 08:25 │ 03-04 08:25 │ 03-04 08:26 ║
╟─────┼───────────────────┼────────┼───────────┼─────────────┼─────────────┼─────────────╢
║ 1   │ 20230304082751852 │ commit │ COMPLETED │ 03-04 08:27 │ 03-04 08:28 │ 03-04 08:28 ║
╟─────┼───────────────────┼────────┼───────────┼─────────────┼─────────────┼─────────────╢
║ 2   │ 20230304082906058 │ commit │ COMPLETED │ 03-04 08:29 │ 03-04 08:29 │ 03-04 08:29 ║
╚═════╧═══════════════════╧════════╧═══════════╧═════════════╧═════════════╧═════════════╝

[ COMMITS ]

╔═══════════════════╤═════════════════════╤══════════════════

## 3. Test Case 2 - Sharply Increasing

如果要观察到符合预期的文件创建和split行为，需保证如下两点：
1. 每条记录的大小应基本相同 => 否则Hudi估算出的单条记录平均大小会很不准确，进而会影响到Hudi估算的记录数，输入数据的，以便进行split
2. 手动设置记录的平均大小 => 便于Hudi准确估算记录数，已便进行符合预期的文件split

[0MB, 100MB) -> [100MB, 120MB) -> [120MB, )

KEY|VALUE
:---|:---
hoodie.index.type|SIMPLE
hoodie.copyonwrite.record.size.estimate|175

### 3.1. Set Variables

In [17]:
%%sql
set TABLE_NAME=reviews_cow_bloom_2

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [18]:
%env TABLE_NAME=reviews_cow_bloom_2

env: TABLE_NAME=reviews_cow_bloom_2


In [19]:
%%sql
set TABLE_PATH=s3://glc-examples/hudi-core-conceptions/reviews_cow_bloom_2

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [20]:
%env TABLE_PATH=s3://glc-examples/hudi-core-conceptions/reviews_cow_bloom_2

env: TABLE_PATH=s3://glc-examples/hudi-core-conceptions/reviews_cow_bloom_2


## 3.2 Create Table

In [21]:
%%sh
echo $(basename $TABLE_PATH)
aws s3 rm $TABLE_PATH --recursive &>/dev/null
rm -rf ~/${TABLE_NAME}
sleep 5

reviews_cow_bloom_2


In [22]:
%%sql
drop table if exists ${TABLE_NAME}

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [23]:
%%sql
create table if not exists ${TABLE_NAME}(
    review_id string, 
    star_rating int, 
    review_body string, 
    review_date date, 
    year long,
    timestamp long,
    parity int
)
using hudi
location '${TABLE_PATH}'
partitioned by (parity)
options ( 
    type = 'cow',  
    primaryKey = 'review_id', 
    preCombineField = 'timestamp',
    hoodie.index.type = 'BLOOM',
    hoodie.copyonwrite.record.size.estimate = '175'
);

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

### 3.3. Batch 1 - Insert ( 0 -> 341MB / Partition )

In [24]:
%%sql
insert into 
    ${TABLE_NAME}
select 
    review_id, 
    star_rating, 
    review_body, 
    review_date, 
    year,
    unix_timestamp(current_timestamp()) as timestamp,
    mod(crc32(review_id), 2) as parity
from
    reviews
where
    year = 2010;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [25]:
%%sh
~/hudi-stat.sh $TABLE_PATH timeline commits storage


[ TIMELINE ]

╔═════╤═══════════════════╤════════╤═══════════╤═════════════╤═════════════╤═════════════╗
║ No. │ Instant           │ Action │ State     │ Requested   │ Inflight    │ Completed   ║
║     │                   │        │           │ Time        │ Time        │ Time        ║
╠═════╪═══════════════════╪════════╪═══════════╪═════════════╪═════════════╪═════════════╣
║ 0   │ 20230304083130910 │ commit │ COMPLETED │ 03-04 08:31 │ 03-04 08:32 │ 03-04 08:32 ║
╚═════╧═══════════════════╧════════╧═══════════╧═════════════╧═════════════╧═════════════╝

[ COMMITS ]

╔═══════════════════╤═════════════════════╤═══════════════════╤═════════════════════╤══════════════════════════╤═══════════════════════╤══════════════════════════════╤══════════════╗
║ CommitTime        │ Total Bytes Written │ Total Files Added │ Total Files Updated │ Total Partitions Written │ Total Records Written │ Total Update Records Written │ Total Errors ║
╠═══════════════════╪═════════════════════╪════════════════

## 4. Test Case 3 - Default Settings

### 4.1. Set Variables

In [None]:
%%sql
set TABLE_NAME=reviews_cow_bloom_3

In [None]:
%env TABLE_NAME=reviews_cow_bloom_3

In [None]:
%%sql
set TABLE_PATH=s3://glc-examples/hudi-core-conceptions/reviews_cow_bloom_3

In [None]:
%env TABLE_PATH=s3://glc-examples/hudi-core-conceptions/reviews_cow_bloom_3

## 4.2 Create Table

In [None]:
%%sh
echo $(basename $TABLE_PATH)
aws s3 rm $TABLE_PATH --recursive &>/dev/null
rm -rf ~/${TABLE_NAME}
sleep 5

In [None]:
%%sql
drop table if exists ${TABLE_NAME}

In [None]:
%%sql
create table if not exists ${TABLE_NAME}(
    review_id string, 
    star_rating int, 
    review_body string, 
    review_date date, 
    year long,
    timestamp long,
    parity int
)
using hudi
location '${TABLE_PATH}'
partitioned by (parity)
options ( 
    type = 'cow',  
    primaryKey = 'review_id', 
    preCombineField = 'timestamp',
    hoodie.index.type = 'BLOOM'
    -- hoodie.copyonwrite.record.size.estimate = '175'
);

### 4.3. Batch 1 - Insert ( 0 -> 96MB / Partition )

In [None]:
%%sql
insert into 
    ${TABLE_NAME}
select 
    review_id, 
    star_rating, 
    review_body, 
    review_date, 
    year,
    unix_timestamp(current_timestamp()) as timestamp,
    mod(crc32(review_id), 2) as parity
from
    reviews
where
    year = 2003;

In [None]:
%%sh
~/hudi-stat.sh $TABLE_PATH timeline commits storage

### 4.3. Batch 1 - Insert ( 96MB -> 417MB / Partition )

In [None]:
%%sql
insert into 
    ${TABLE_NAME}
select 
    review_id, 
    star_rating, 
    review_body, 
    review_date, 
    year,
    unix_timestamp(current_timestamp()) as timestamp,
    mod(crc32(review_id), 2) as parity
from
    reviews
where
    year = 2010;

In [None]:
%%sh
~/hudi-stat.sh $TABLE_PATH timeline commits storage