### Como utilizar o apache hudi?

#### Objetivo: este tutorial tem como objetivo aprender a utilizar o apache hudi com pyspark
#### Data: 2022-09-15
#### Autor: Guilherme Gandolfi

In [6]:
%%configure -f
{
    "conf": {
        "spark.jars.packages": "org.apache.hudi:hudi-spark3.1-bundle_2.12:0.12.0",
        "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
        "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
5,,pyspark,idle,,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3,,pyspark,idle,,,,
5,,pyspark,idle,,,,✔


In [118]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [119]:
spark = SparkSession.builder.appName('bora_apache_hudi').getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [120]:
print(spark)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7fd094faab90>

#### Carregando um dataframe para operacoes de alteracao dos dados

##### Foi utilizado como base de dados 
##### Netflix Movies and TV Shows - (https://www.kaggle.com/datasets/shivamb/netflix-shows)

In [121]:
%%sh
ls import_bases/
pwd import_bases/

netflix_titles.csv
/home/glue_user/workspace/jupyter_workspace


#### Load dataframe

In [122]:
schema = StructType() \
        .add("show_id",StringType(),True) \
        .add("type",StringType(),True) \
        .add("title",StringType(),True ) \
        .add("director",StringType(),True) \
        .add("cast",StringType(),True) \
        .add("country",StringType(),True) \
        .add("date_added",StringType(),True) \
        .add("release_year",IntegerType(),True) \
        .add("rating",StringType(),True) \
        .add("duration",StringType(),True) \
        .add("listed_in",StringType(),True) \
        .add("description",StringType(),True) 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [123]:
df_netflix = spark.read.csv('/home/glue_user/workspace/jupyter_workspace/import_bases/netflix_titles.csv', inferSchema='True', header=True, schema=schema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [124]:
df_netflix.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- release_year: integer (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)

### Write Files

In [125]:
tableName='netflix_new'
hudi_options = {
    'hoodie.table.name': tableName,
    'hoodie.datasource.write.recordkey.field': 'show_id',
    'hoodie.datasource.write.partitionpath.field': 'release_year',
    'hoodie.datasource.write.table.name': tableName,
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': 'release_year',
    'hoodie.upsert.shuffle.parallelism': 2,
    'hoodie.insert.shuffle.parallelism': 2
}
basePath='/tmp/hudi_bases/netflix/'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [126]:
df_netflix_2021=df_netflix.filter(df_netflix['release_year']==2021)
df_netflix_2021.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Check particionamento do arquivo

In [127]:
%%sh
ls /tmp/hudi_bases/netflix/
ls /tmp/hudi_bases/netflix/2021

2021
30c7b8c0-74bc-48d2-9555-dba135b7c2fb-0_0-510-2031_20220916015619988.parquet


### Query data

In [128]:
df_netflix_hudi=spark.read.format('hudi').load(basePath)
df_netflix_hudi.createOrReplaceTempView('df_netflix_hudi')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [129]:
df_netflix_hudi.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _hoodie_commit_time: string (nullable = true)
 |-- _hoodie_commit_seqno: string (nullable = true)
 |-- _hoodie_record_key: string (nullable = true)
 |-- _hoodie_partition_path: string (nullable = true)
 |-- _hoodie_file_name: string (nullable = true)
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)
 |-- release_year: integer (nullable = true)

In [130]:
spark.sql('select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path from df_netflix_hudi').show(10)
spark.sql('select count (*) from df_netflix_hudi').show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+------------------+----------------------+
|_hoodie_commit_time|_hoodie_record_key|_hoodie_partition_path|
+-------------------+------------------+----------------------+
|  20220916015619988|             s1286|                  2021|
|  20220916015619988|             s8438|                  2021|
|  20220916015619988|              s645|                  2021|
|  20220916015619988|              s272|                  2021|
|  20220916015619988|              s654|                  2021|
|  20220916015619988|             s1343|                  2021|
|  20220916015619988|              s889|                  2021|
|  20220916015619988|              s256|                  2021|
|  20220916015619988|             s1349|                  2021|
|  20220916015619988|              s401|                  2021|
+-------------------+------------------+----------------------+
only showing top 10 rows

+--------+
|count(1)|
+--------+
|     589|
+--------+

#### Update data

In [131]:
spark.sql('select type, count (*) from df_netflix_hudi group by type').show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+
|   type|count(1)|
+-------+--------+
|TV Show|     314|
|  Movie|     275|
+-------+--------+

In [132]:
df_netflix_2021=df_netflix_2021.withColumn('type', regexp_replace('type','TV Show','TV Show - Hudi'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [133]:
df_netflix_2021.write.format("hudi").options(**hudi_options).mode("append").save(basePath)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [134]:
df_netflix_hudi=spark.read.format('hudi').load(basePath)
df_netflix_hudi.createOrReplaceTempView('df_netflix_hudi')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [135]:
spark.sql('select type, count (*) from df_netflix_hudi group by type').show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+--------+
|          type|count(1)|
+--------------+--------+
|TV Show - Hudi|     314|
|         Movie|     275|
+--------------+--------+

In [136]:
%%sh
ls /tmp/hudi_bases/netflix/

2021


In [137]:
%%sh
ls /tmp/hudi_bases/netflix/2021

30c7b8c0-74bc-48d2-9555-dba135b7c2fb-0_0-510-2031_20220916015619988.parquet
30c7b8c0-74bc-48d2-9555-dba135b7c2fb-0_0-554-2070_20220916015627680.parquet
