# Please enter a `USER_NUMBER` below:
## Use a combination of the first 3 letters of your name & number assigned to you as shown in the example below

In [None]:
%run HudiUtilityFunctions.ipynb
USER_NUMBER = "dip7"

In [None]:
spark = get_spark_session("Hudi Presto workshop")

## Dataset File Location (in Parquet)

In [None]:
print(INPUT)

In [None]:
listFiles(INPUT)

# Part 1: Create Hudi Tables, Read using Presto

### First, we will create a Hudi Copy-on-Write Table using a 1 GB dataset (Brooklyn TPC-DS) that is stored in a S3 bucket. We will write the Hudi table in a S3 bucket.
### Few of the important parameters: 
- `hoodie.table.keygenerator.class` : extracts a key out of incoming records. Both record key and partition paths comprise more than 1 field
- `hoodie.datasource.write.recordkey.field` :  Value to be used as the recordKey component of HoodieKey
- `hoodie.datasource.hive_sync.enable` : When set to `True`, syncs the table to Hive metastore
- `hoodie.datasource.write.precombine.field` : Field used in preCombining before actual write. Helps in choosing latest version of a record when multiple versions with the same record key exist

### `mode(Overwrite)` : overwrites and recreates the table if it already exists.
### Here we are using the default write operation : `upsert`. If you have a workload without updates, you can also issue `insert` or `bulk_insert` operations which could be faster

### Copy-on-Write (CoW): Re-Writes a new Parquet file version for updates (synchronous merge during write)

In [None]:
TABLE_NAME_1 = 'presto_hudi_demo_cow_' + USER_NUMBER

df_cow = spark.read.parquet(INPUT)

PATH = SANDBOX_BASE_PATH + USER_NUMBER + "/output/hudi/1gb/" + TABLE_NAME_1

hudi_options = {
        'hoodie.table.name': TABLE_NAME_1,
        'hoodie.table.keygenerator.class' : "org.apache.hudi.keygen.ComplexKeyGenerator",
        'hoodie.datasource.write.hive_style_partitioning' : "true",
        'hoodie.datasource.write.recordkey.field' : "ss_item_sk,ss_ticket_number",
        'hoodie.datasource.hive_sync.enable' : 'true',
        'hoodie.datasource.hive_sync.mode': "hms",
        'hoodie.datasource.write.precombine.field' : "ss_sold_time_sk",
        'hoodie.parquet.max.file.size' : '12582912',
        'hoodie.parquet.small.file.limit': '10485760'    
    }
# hudi_options.update(ZOOKEEPER_LOCK_CONFIGS)
hudi_options.update(DISABLE_TIMELINE_CONFIGS)
print(hudi_options)
spark.sql("DROP TABLE IF EXISTS " + TABLE_NAME_1)
df_cow.write.format("hudi").mode("overwrite").options(**hudi_options).mode("overwrite").save(PATH)

### Check the list of Files

In [None]:
listFiles(PATH)

# Run Presto Queries

In [None]:
presto(f"SELECT count(*) FROM {TABLE_NAME_1} where ca_location_type = 'condo'")

In [None]:
presto(f"SELECT AVG(ss_quantity) FROM {TABLE_NAME_1} where ca_location_type = 'condo'")

In [None]:
presto(f"""
SELECT SUM(ss_sales_price), ca_zip FROM {TABLE_NAME_1} 
GROUP BY ca_zip HAVING COUNT(DISTINCT ca_location_type) = 3""")

In [None]:
presto(f"""
SELECT ca_location_type, ca_zip, SUM(ss_sales_price) FROM {TABLE_NAME_1} 
GROUP BY ROLLUP (ca_zip, ca_location_type) HAVING ca_location_type IS NOT NULL ORDER BY ca_zip""")

In [None]:
presto(f"""
SELECT COUNT(*), ca_city, ca_county, avg(ss_sales_price) from {TABLE_NAME_1} 
GROUP BY ca_location_type, ca_city, ca_county HAVING ca_location_type = 'apartment' ORDER BY ca_county""")

## 

# Create MoR Table

### Merge-on-Read (MoR): Stores data using a combination of Parquet + row-based log files. Updates are logged to log files & later compacted to produce new versions of columnar files synchronously or asynchronously.

In [None]:
TABLE_NAME_2 = 'presto_hudi_demo_mor_' + USER_NUMBER

df_mor = spark.read.parquet(INPUT)

PATH = SANDBOX_BASE_PATH + USER_NUMBER + "/output/hudi/1gb/" + TABLE_NAME_2

hudi_options = {
        'hoodie.table.name': TABLE_NAME_2,
        'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
        'hoodie.table.keygenerator.class' : "org.apache.hudi.keygen.ComplexKeyGenerator",
        'hoodie.datasource.write.hive_style_partitioning' : "true",
        'hoodie.datasource.write.recordkey.field' : "ss_item_sk,ss_ticket_number",
        'hoodie.datasource.hive_sync.enable' : 'true',
        'hoodie.datasource.hive_sync.mode': "hms",
        'hoodie.datasource.write.precombine.field' : "ss_sold_time_sk",
        'hoodie.parquet.max.file.size' : '12582912',
        'hoodie.parquet.small.file.limit': '10485760'    
    }
hudi_options.update(ZOOKEEPER_LOCK_CONFIGS)
hudi_options.update(DISABLE_TIMELINE_CONFIGS)
print(hudi_options)
spark.sql("DROP TABLE IF EXISTS " + TABLE_NAME_2)
df_mor.write.format("hudi").mode("overwrite").options(**hudi_options).mode("overwrite").save(PATH)

## 1. Run Real Time Queries using Presto
### Exposes near-real time data by merging the base and log files of the latest file slice on-the-fly

In [None]:
presto(f"SELECT count(*) FROM {TABLE_NAME_2}_rt where ca_location_type = 'condo'")

## 2. Run Read Optimized Queries using Presto
### Exposes only the base/columnar files in the latest file slices

In [None]:
presto(f"SELECT count(*) FROM {TABLE_NAME_2}_ro where ca_location_type = 'condo'")

# 

 # Part 2: Run Clustering Service using Spark SQL

### A common challenge in analytical workloads is managing the discrepancy between arrival and event times. We want to write data as it arrives, which leads to data being written based on 'arrival time'. However, this means  data can be scattered across different files.

### So, query engines might have to read the entire set of data files. Even with predicate pushdown, we can end up with scanning a lot of data.

### Clustering is a data layout optimization technique. It is extremely important to organize & lay out your data in storage in an optimized way.


### [ `Clustering`](https://hudi.apache.org/docs/next/clustering) in Hudi allows to deal with this.

### 2 Steps to cluster: 

- `Schedule`: create a clustering plan
- `Execute`: process the plan to create new files & replace old files


### Spark SQL Procedure: `run_clustering()`

### Parameters:
- `hoodie.clustering.async.max.commits` : Config to control frequency of async clustering
- `hoodie.clustering.plan.strategy.sort.columns` : Columns to sort the data by when clustering
- `hoodie.clustering.plan.strategy.small.file.limit` : Files smaller than the size specified here are candidates for clustering
- `hoodie.write.concurrency.mode` : Concurrency modes for write operations (OCC- Multiple writers can operate on the table with lazy conflict resolution using locks)

**scheduleandexecute: Make a clustering plan first and execute that plan immediately**

In [None]:
spark.sql(f"""
    CALL run_clustering(
        table => '{TABLE_NAME_1}',
        op => 'scheduleandexecute',
        options => 'hoodie.clustering.async.max.commits=4,
                    hoodie.clustering.plan.strategy.sort.columns=ca_location_type,
                    hoodie.clustering.plan.strategy.small.file.limit=629145600,
                    hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824,
                    hoodie.write.markers.type=direct,
                    hoodie.embed.timeline.server=false,
                    hoodie.cleaner.policy.failed.writes=LAZY,
                    hoodie.write.concurrency.mode=optimistic_concurrency_control,
                    hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider,
                    hoodie.write.lock.zookeeper.url=zk-cs.hudi-infra.svc.cluster.local,
                    hoodie.write.lock.zookeeper.port=2181,
                    hoodie.write.lock.zookeeper.base_path=/test'
    )
""").show()

# Run Presto Queries after Clustering

In [None]:
presto(f"SELECT count(*) FROM {TABLE_NAME_1} where ca_location_type = 'condo'")

In [None]:
presto(f"SELECT AVG(ss_quantity) FROM {TABLE_NAME_1} where ca_location_type = 'condo'")

In [None]:
presto(f"""
SELECT SUM(ss_sales_price), ca_zip FROM {TABLE_NAME_1} 
GROUP BY ca_zip HAVING COUNT(DISTINCT ca_location_type) = 3""")

In [None]:
presto(f"""
SELECT ca_location_type, ca_zip, SUM(ss_sales_price) FROM {TABLE_NAME_1} 
GROUP BY ROLLUP (ca_zip, ca_location_type) HAVING ca_location_type IS NOT NULL ORDER BY ca_zip""")

In [None]:
presto(f"""
SELECT COUNT(*), ca_city, ca_county, avg(ss_sales_price) from {TABLE_NAME_1} 
GROUP BY ca_location_type, ca_city, ca_county HAVING ca_location_type = 'apartment' ORDER BY ca_county""")

### We can see that even at a scale of 1 GB, there are differences in query execution time. Imagine in a real-world where we have TB or PB scale datasets.

### PrestoUI provides clarity on the number of records scanned by the query engine. Let's check the Presto UI

## 

# Part 3: Using Hudi's Metadata Table with Queries

### To get the current state of a table, costly files list operations on S3 has to be performed while reading or writing a table. This can be avoided by enabling and using Hudi metadata table indices feature.

- `key = "hive.hudi_metadata_enabled"`
- `key = "hudi.hudi_metadata_table_enabled"`

### This will creates Files Index under the table path's .hoodie/metadata/files prefix & avoids costly file listings during read or writes by reading file list from the prefix instead of performing a whole table prefix file scan

# Run Presto Query with Metadata table enabled

In [None]:
presto_with_metadata_enabled(f"SELECT count(*) FROM {TABLE_NAME_1} where ca_location_type = 'condo'")

In [None]:
presto_with_metadata_enabled(f"SELECT AVG(ss_quantity) FROM {TABLE_NAME_1} where ca_location_type = 'condo'")

# 

# Additional Operations

## Perform Record-Level Writes: UPDATE

### Set the Timeline server to `True` to rely on the timeline server for metadata transactions and to ensure data consistency.

In [None]:
spark.sql("SET hoodie.embed.timeline.server = 'true'")

In [None]:
spark.sql(f"SELECT * from {TABLE_NAME_1} LIMIT 5").toPandas()

In [None]:
spark.sql(f"SELECT ss_list_price, * FROM {TABLE_NAME_1} WHERE ss_item_sk = 17587 AND ss_ticket_number = 129234").toPandas()

## Run a record-level `UPDATE` on the Data Lake

In [None]:
spark.sql(f'''
UPDATE {TABLE_NAME_1}
SET ss_list_price = 150.0
WHERE ss_item_sk = 17587 AND ss_ticket_number = 129234''');


### Check if the record changed

In [None]:
spark.sql(f"SELECT ss_list_price, * FROM {TABLE_NAME_1} WHERE ss_item_sk = 17587 AND ss_ticket_number = 129234").toPandas()

## Clustering Strategies: Z-order/Hilbert (Multi-dimensional clustering)

In [None]:
spark.sql(f"""
    CALL run_clustering(
        table => '{TABLE_NAME_2}_rt',
        op => 'scheduleandexecute',
        order_strategy => 'z-order',
        options => 'hoodie.clustering.async.max.commits=4,
                    hoodie.clustering.plan.strategy.sort.columns=ca_location_type,c_birth_country
                    hoodie.clustering.plan.strategy.small.file.limit=629145600,
                    hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824,
                    hoodie.write.markers.type=direct,
                    hoodie.embed.timeline.server=false,
                    hoodie.cleaner.policy.failed.writes=LAZY,
                    hoodie.write.concurrency.mode=optimistic_concurrency_control,
                    hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider,
                    hoodie.write.lock.zookeeper.url=zk-cs.hudi-infra.svc.cluster.local,
                    hoodie.write.lock.zookeeper.port=2181,
                    hoodie.write.lock.zookeeper.base_path=/test'
    )
""").show()

In [None]:
presto(f"SELECT count(*) FROM {TABLE_NAME_2}_rt where ca_location_type = 'condo'")

In [None]:
presto(f"SELECT count(*) FROM {TABLE_NAME_2}_rt where ca_location_type = 'condo' and c_birth_country = 'UNITED STATES'")

# Notebook Ends