# Merge-on-Read (MoR)

## Setup pyspark kernel

In [None]:
%%configure -f
{
    "conf": {
        "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar",
        "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
        "spark.sql.hive.convertMetastoreParquet":"false"
    }
}

## Read sample data and clean

In [None]:
raw = spark.read.csv(
    's3://aws-data-analytics-blog/emrimmersionday/tripdata.csv',
    header=True,
    inferSchema=True,
)

In [None]:
raw.cache()

raw.printSchema()

In [None]:
raw.show(1, vertical=True)

In [None]:
from pyspark.sql.functions import (
    unix_timestamp, col, from_unixtime,
    year, month, dayofmonth, hour, row_number
)
from pyspark.sql import Window

df = raw.na.fill(
        {'ehail_fee': '0'}
    ).withColumn(
        "ehail_fee", col("ehail_fee").cast("int")
    ).withColumn(
        'lpep_pickup_datetime', 
        from_unixtime(unix_timestamp(col('lpep_pickup_datetime'), 'M/d/yy H:mm'))
    ).withColumn(
        'lpep_dropoff_datetime', 
        from_unixtime(unix_timestamp(col('lpep_dropoff_datetime'), 'M/d/yy H:mm')) 
    ).withColumn(
        'lpep_pickup_datetime', col('lpep_pickup_datetime').cast('timestamp')
    ).withColumn(
        'lpep_dropoff_datetime', col('lpep_dropoff_datetime').cast('timestamp')
    ).withColumn(
        '_year', year(col('lpep_pickup_datetime'))
    ).withColumn(
        '_month', month(col('lpep_pickup_datetime'))
    ).withColumn(
        '_day', dayofmonth(col('lpep_pickup_datetime'))
    ).withColumn(
        '_hour', hour(col('lpep_pickup_datetime'))
    ).orderBy(
        ['lpep_pickup_datetime', 'lpep_dropoff_datetime']
    ).withColumn(
        "id", row_number().over(Window().orderBy(['lpep_pickup_datetime', 'lpep_dropoff_datetime']))
    )

df.printSchema()

df.select(
    'id', 'VendorID', 'lpep_pickup_datetime', 'lpep_dropoff_datetime', '_year', '_month', '_day', '_hour', 'ehail_fee',
    'passenger_count', 'trip_distance'
).show(10)

In [None]:
hourly = df.select('_hour').distinct().orderBy('_hour').collect()
hourly

In [None]:
hourly_df = [
    df.where(f'_hour = {row._hour}')
    for row in hourly
]

## Create MoR table

In [None]:
empty_df = spark.createDataFrame(
    spark.sparkContext.emptyRDD(),
    df.schema
)

empty_df.printSchema()

empty_df.count()

In [None]:
database_name = 'default'
table_name = 'mor_tripdata'
table_type = 'MERGE_ON_READ'

bucket_name = ''
base_path = f's3://{bucket_name}/hudi/{table_name}'

# hoodie options
hudi_options = {
    'hoodie.table.name': table_name,
    'hoodie.table.type': table_type,
    'hoodie.metadata.enable': 'true',
    
    'hoodie.compact.inline.max.delta.seconds': 3600,      # Default Value: 3600, since v0.9.0
    'hoodie.compact.inline.max.delta.commits': 10,        # Default Value: 10

    'hoodie.cleaner.delete.bootstrap.base.file': 'false', # Default Value: false, since v0.9.0
    'hoodie.cleaner.commits.retained': 10,                # Default Value: 10
    'hoodie.commits.archival.batch': 10,                  # Default Value: 10

    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.table.type': table_type,
    'hoodie.datasource.write.recordkey.field': 'id',
    'hoodie.datasource.write.partitionpath.field': '_year,_month,_day',
    'hoodie.datasource.write.precombine.field': 'lpep_pickup_datetime',
    'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator',
    'hoodie.datasource.write.hive_style_partitioning': 'true',

    'hoodie.parquet.small.file.limit': 104857600,        # default: 104857600 Bytes (100 MB)
    'hoodie.parquet.max.file.size': 125829120,           # default: 125829120 Bytes (120 MB)
    'hoodie.parquet.block.size': 125829120,              # default: 125829120 Bytes (120 MB)
    'hoodie.parquet.page.size': 1048576,                 # default: 1048576 Bytes (1 MB)
    'hoodie.parquet.compression.codec': 'snappy',

    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.database': database_name,
    'hoodie.datasource.hive_sync.table': table_name,
    'hoodie.datasource.hive_sync.partition_fields': '_year,_month,_day',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',

    'hoodie.insert.shuffle.parallelism': 2,
    'hoodie.upsert.shuffle.parallelism': 2,
    'hoodie.bulkinsert.shuffle.parallelism': 2,
    'hoodie.delete.shuffle.parallelism': 2,
}

In [None]:
empty_df.write.format('hudi')\
    .options(**hudi_options)\
    .option('hoodie.datasource.write.operation', 'insert')\
    .mode("overwrite")\
    .save(base_path)

## Create table

In [None]:
hourly_df[0].write.format('hudi')\
    .options(**hudi_options)\
    .option('hoodie.datasource.write.operation', 'upsert')\
    .mode("append")\
    .save(base_path)

In [None]:
hourly_df[1].write.format('hudi')\
    .options(**hudi_options)\
    .option('hoodie.datasource.write.operation', 'insert')\
    .mode("append")\
    .save(base_path)

In [None]:
for _df in hourly_df[2:]:
    _df.write.format('hudi')\
        .options(**hudi_options)\
        .option('hoodie.datasource.write.operation', 'insert')\
        .mode("append")\
        .save(base_path)

## Read table

### Read snapshot

In [None]:
tripsSnapshotDF = spark.read.format("hudi")\
    .load(f'{base_path}')

print(tripsSnapshotDF.count())

tripsSnapshotDF.printSchema()

tripsSnapshotDF\
    .orderBy('id')\
    .select('_hoodie_commit_time','_hoodie_commit_seqno','_hoodie_record_key','_hoodie_partition_path','_hoodie_file_name')\
    .show(3, vertical=True, truncate=False)

tripsSnapshotDF\
    .orderBy('id', ascending=False)\
    .select('_hoodie_commit_time','_hoodie_commit_seqno','_hoodie_record_key','_hoodie_partition_path','_hoodie_file_name')\
    .show(3, vertical=True, truncate=False)

### Incremental query

In [None]:
rows = tripsSnapshotDF\
    .select('_hoodie_commit_time')\
    .distinct()\
    .orderBy('_hoodie_commit_time')\
    .limit(50)\
    .collect()

commits = [row[0] for row in rows]
print(commits, '\n')

beginTime = commits[len(commits) - 3]
print(beginTime, '\n')

# incrementally query data
incremental_read_options = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': beginTime,
}

incremental_df = spark.read.format("hudi")\
    .options(**incremental_read_options)\
    .load(f'{base_path}')

print(incremental_df.count(), '\n')

incremental_df.select('_hoodie_commit_time','_hoodie_commit_seqno','_hoodie_record_key','_hoodie_partition_path','_hoodie_file_name')\
    .orderBy('_hoodie_commit_time')\
    .show(3, vertical=True, truncate=False)

### Point in time query

In [None]:
beginTime = "000" # Represents all commits > this time.
endTime = commits[len(commits) - 3]

# query point in time data
point_in_time_read_options = {
    'hoodie.datasource.query.type': 'incremental',
    'hoodie.datasource.read.begin.instanttime': beginTime,
    'hoodie.datasource.read.end.instanttime': endTime,
}

incremental_df = spark.read.format("hudi")\
    .options(**point_in_time_read_options)\
    .load(f'{base_path}')

print(incremental_df.count(), '\n')

incremental_df.select('_hoodie_commit_time','_hoodie_commit_seqno','_hoodie_record_key','_hoodie_partition_path','_hoodie_file_name')\
    .orderBy('_hoodie_commit_time', ascending=False)\
    .show(3, vertical=True, truncate=False)

## Update data

In [None]:
from pyspark.sql.functions import lit

hourly_df[0].withColumn('passenger_count', lit(-1))\
    .write.format('hudi')\
    .options(**hudi_options)\
    .option('hoodie.datasource.write.operation', 'upsert')\
    .mode("append")\
    .save(base_path)

spark.read.format("hudi")\
    .load(f'{base_path}')\
    .select(
        '_hoodie_commit_time','_hoodie_commit_seqno','_hoodie_record_key','_hoodie_partition_path','_hoodie_file_name',
        'id', 'passenger_count'
    ).show(3, vertical=True, truncate=False)

spark.sql(
f'''
select count(1) as ro_count
from {database_name}.{table_name}_ro
''').show()

spark.sql(
f'''
select count(1) as rt_count
from {database_name}.{table_name}_rt
''').show()

### Delete data

In [None]:
to_be_delete = df.where('VendorID = 1')\
    .select('id', '_year', '_month', '_day')\
    .withColumn('lpep_pickup_datetime', lit(0.0))
print(to_be_delete.count())

to_be_delete.write.format("hudi")\
    .options(**hudi_options)\
    .option('hoodie.datasource.write.operation', 'delete')\
    .mode("append")\
    .save(f'{base_path}')

spark.read.format("hudi")\
    .load(f'{base_path}')\
    .select(
        '_hoodie_commit_time','_hoodie_commit_seqno','_hoodie_record_key','_hoodie_partition_path','_hoodie_file_name',
        'id', 'VendorID'
    ).show(3, vertical=True, truncate=False)

spark.sql(
f'''
select count(1) as ro_count
from {database_name}.{table_name}_ro
''').show()

spark.sql(
f'''
select count(1) as rt_count
from {database_name}.{table_name}_rt
''').show()