Setup

In [None]:
import warnings
warnings.filterwarnings('ignore')

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

spark = SparkSession \
    .builder \
    .appName("Foo") \
    .config("spark.jars",
            "/home/ralfs/Desktop/jars/hudi-spark3-bundle_2.12-0.10.0.1.jar," + 
            "/home/ralfs/Desktop/jars/clusterj-rondb-21.04.3.jar," + 
            "/home/ralfs/Desktop/jars/mysql-connector-java-8.0.28.jar") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

sc = spark.sparkContext

In [None]:
# pyspark
thesispath = "/home/ralfs/Desktop/temp_dir/"
head_node = "127.0.0.1"

## Setup spark

In [None]:
sc.setLogLevel("ERROR")

tableName = "hudi_trips_cow"

In [None]:
import os

basePath = thesispath + tableName
datapath = thesispath + "data"
testpath = thesispath + "test"

file_size = 125829120
batch_size = 100

## Properties

In [None]:
# for rondb and hbase
table = database = "hudi"

def add_to_dict(dict1, dict2):
    for key, value in dict1.items():
        if key not in dict2:
            dict2[key] = value

hudi_options = {
    'hoodie.table.name': tableName,
    #'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
    'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
    'hoodie.datasource.write.recordkey.field': 'uuid',
    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
    'hoodie.datasource.write.table.name': tableName,
    'hoodie.datasource.write.precombine.field': 'ts',
    'hoodie.upsert.shuffle.parallelism': 2,
    'hoodie.insert.shuffle.parallelism': 2,
    'hoodie.delete.shuffle.parallelism': 2,
    #'write.parquet.max.file.size': 1 #Default Value: 120 (MB)
    'hoodie.parquet.max.file.size': file_size #Default Value: 125829120 (125829120 Bytes = 120 Megabytes)
    #'hoodie.cleaner.commits.retained': 0
}

hudi_insert_options = {
    'hoodie.datasource.write.operation': 'insert'
}
add_to_dict(hudi_options, hudi_insert_options)

hudi_upsert_options = {
    'hoodie.datasource.write.operation': 'upsert'
}
add_to_dict(hudi_options, hudi_upsert_options)

hudi_delete_options = {
    'hoodie.datasource.write.operation': 'delete'
}
add_to_dict(hudi_options, hudi_delete_options)

# Bloom

bloom_options = {
    'hoodie.index.type': 'GLOBAL_BLOOM',
    'hoodie.bloom.index.update.partition.path': True
}
bloom_insert_options = dict(bloom_options)
add_to_dict(hudi_insert_options, bloom_insert_options)
bloom_upsert_options = dict(bloom_options)
add_to_dict(hudi_upsert_options, bloom_upsert_options)
bloom_delete_options = dict(bloom_options)
add_to_dict(hudi_delete_options, bloom_delete_options)

# Simple

simple_options = {
    'hoodie.index.type': 'GLOBAL_SIMPLE',
    'hoodie.simple.index.update.partition.path': True
}
simple_insert_options = dict(simple_options)
add_to_dict(hudi_insert_options, simple_insert_options)
simple_upsert_options = dict(simple_options)
add_to_dict(hudi_upsert_options, simple_upsert_options)
simple_delete_options = dict(simple_options)
add_to_dict(hudi_delete_options, simple_delete_options)

# HBase

hbase_options = {
    'hoodie.index.type': 'HBASE',
    'hoodie.index.hbase.get.batch.size': batch_size,
    'hoodie.index.hbase.put.batch.size': batch_size,
    'hoodie.index.hbase.max.qps.per.region.server': 1000,
    'hoodie.index.hbase.zkquorum': head_node,
    'hoodie.index.hbase.zkport': '2181',
    'hoodie.index.hbase.table': table,
    'hoodie.index.hbase.qps.allocator.class': 'org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator',
    'hoodie.hbase.index.update.partition.path': True
}
hbase_insert_options = dict(hbase_options)
add_to_dict(hudi_insert_options, hbase_insert_options)
hbase_upsert_options = dict(hbase_options)
add_to_dict(hudi_upsert_options, hbase_upsert_options)
hbase_delete_options = dict(hbase_options)
add_to_dict(hudi_delete_options, hbase_delete_options)

# RonDB

rondb_options = {
    'hoodie.index.type': 'RONDB',
    'hoodie.index.rondb.batch.size': batch_size,
    'hoodie.index.rondb.update.partition.path': True
}
rondb_insert_options = dict(rondb_options)
add_to_dict(hudi_insert_options, rondb_insert_options)
rondb_upsert_options = dict(rondb_options)
add_to_dict(hudi_upsert_options, rondb_upsert_options)
rondb_delete_options = dict(rondb_options)
add_to_dict(hudi_delete_options, rondb_delete_options)

# RonDB Cluster

rondb_cluster_options = {
    'hoodie.index.type': 'RONDB_CLUSTER',
    'hoodie.index.rondb.batch.size': batch_size,
    'hoodie.index.rondb.clusterj': {
        'com.mysql.clusterj.connectstring': '127.0.0.1:1186',
        'com.mysql.clusterj.database': database
    },
    'hoodie.index.rondb.update.partition.path': True
}
rondb_cluster_insert_options = dict(rondb_cluster_options)
add_to_dict(hudi_insert_options, rondb_cluster_insert_options)
rondb_cluster_upsert_options = dict(rondb_cluster_options)
add_to_dict(hudi_upsert_options, rondb_cluster_upsert_options)
rondb_cluster_delete_options = dict(rondb_cluster_options)
add_to_dict(hudi_delete_options, rondb_cluster_delete_options)

# All options

options_dict = {
    'bloom': {
        'insert': bloom_insert_options,
        'upsert': bloom_upsert_options,
        'delete': bloom_delete_options
    },
    'simple': {
        'insert': simple_insert_options,
        'upsert': simple_upsert_options,
        'delete': simple_delete_options
    },
    'hbase': {
        'insert': hbase_insert_options,
        'upsert': hbase_upsert_options,
        'delete': hbase_delete_options
    },
    'rondb': {
        'insert': rondb_insert_options,
        'upsert': rondb_upsert_options,
        'delete': rondb_delete_options
    },
    'rondb_cluster': {
        'insert': rondb_cluster_insert_options,
        'upsert': rondb_cluster_upsert_options,
        'delete': rondb_cluster_delete_options
    }
}

## DB stuff

In [None]:
import happybase
import pymysql.cursors

def reset_hbase():
    connection = happybase.Connection()

    connection.disable_table(table)
    connection.delete_table(table)
    
    connection.create_table(
        table,
        {
            '_s': dict() #the column family used by Hudi
        }
    )
    print("hbase reset")

reset_hbase()
    
def reset_rondb():
    connection = pymysql.connect(
        host=head_node,
        port=3306,
        user='root',
        passwd=''
    )
    
    mycursor = connection.cursor()
    mycursor.execute("DROP DATABASE IF EXISTS hudi;")

    connection.commit()
    print("rondb reset")

reset_rondb()

def reset_db():
    reset_hbase()
    reset_rondb()
    
reset_rondb()

## Create entries used in tests

In [None]:
action_insert = 'insert'
action_upsert = 'upsert'
action_delete = 'delete'

action_types = [action_insert, action_upsert, action_delete]

In [None]:
from IPython.display import display, clear_output

def display_info(text):
    clear_output(wait=True)
    display(text)

In [None]:
import time
import pandas as pd

def init_data(num_of_entries, dataGen, batch_size = 50000):
    first = True
    inserted_entries = 0

    while True: # done in batches to avoid java.lang.OutOfMemoryError: GC overhead limit exceeded
        display_info(inserted_entries)
        
        insert_num = min(batch_size, num_of_entries - inserted_entries)
        if insert_num <= 0:
            break
        data = dataGen.generateInserts(insert_num)
        data_list = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(data)
        df = spark.read.json(spark.sparkContext.parallelize(data_list, 2))
        
        if first:
            first = False
            df.write.mode("overwrite").parquet(datapath)
        else:
            df.write.mode("append").parquet(datapath)
        inserted_entries += insert_num

#init_data(1000000, 50000)

def init_tests(action_type, num_of_tests, num_of_changes_in_test, dataGen, test_info_df):
    for test_nr in range(num_of_tests):
        df = None
        if action_type == action_insert:        
            data = dataGen.generateInserts(num_of_changes_in_test)
            data_list = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(data)
            df = spark.read.json(spark.sparkContext.parallelize(data_list, 2))
        elif action_type == action_upsert:        
            data = dataGen.generateUpdates(num_of_changes_in_test)
            data_list = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(data)
            df = spark.read.json(spark.sparkContext.parallelize(data_list, 2))
            df = df.withColumn("partitionpath", lit("space/space/space")) # to update path
            df.head()
        else:
            data = dataGen.generateUpdates(num_of_changes_in_test)
            data_list = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(data)
            df = spark.read.json(spark.sparkContext.parallelize(data_list, 2))

        file_name = f"{action_type}_{test_nr}"
        df.write.mode("overwrite").parquet(os.path.join(testpath, file_name))
        test_info = {
                'action': action_type,
                'run_nr': test_nr,
                'file_name': file_name}
        test_info_df = test_info_df.append(test_info, ignore_index = True)
    return test_info_df

#init_tests("insert")

def setup(num_of_entries, num_of_tests, num_of_changes_in_test):
    dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
    
    #data
    init_data(num_of_entries, dataGen)
    
    #test
    test_info_df = pd.DataFrame(columns = ['action', 'run_nr', 'file_name'])
    for action_type in action_types:
        print(f"    {action_type}")
        test_info_df = init_tests(action_type, num_of_tests, num_of_changes_in_test, dataGen, test_info_df)

    dataGen = None
    
    test_info_path = os.path.join(testpath, "test_info.csv")
    if os.path.exists(test_info_path):
        os.remove(test_info_path)
    
    test_info_df.to_csv(test_info_path, index=False) # saves test info

#setup(5)

def bootstrap_hudi(options):
    #uses prev df saved in datapath and options to initialize hudi table
    start_time = time.time()
    df = spark.read.format("parquet").load(datapath)
    options['hoodie.index.hbase.get.batch.size'] = 500
    options['hoodie.index.hbase.put.batch.size'] = 500
    df.write.format("hudi").options(**options).mode("overwrite").save(basePath)
    print(f"bootstrap took: {time.time()-start_time}")
    options['hoodie.index.hbase.get.batch.size'] = batch_size
    options['hoodie.index.hbase.put.batch.size'] = batch_size
    
    #sleep_sec = 10
    #print(f"now sleeping for: {sleep_sec}")
    #time.sleep(sleep_sec) # sleeping to remove impact from the bootstrap

#bootstrap_hudi(bloom_upsert_options)

## Test

In [None]:
def run_test(df, options):
    start_time = time.time()
    df.write.format("hudi").options(**options).mode("append").save(basePath)
    end_time = time.time()

    return end_time - start_time

def test_implementation(implementation_name, test_info_df):
    times_list = []
    for index, row in test_info_df.iterrows():
        test_df = spark.read.format("parquet").load(os.path.join(testpath, row['file_name']))

        result_time = run_test(test_df, options_dict[implementation_name][row['action']])

        print(f"{row['action']}: {row['run_nr']} --- {result_time} seconds ---")
        times_list.append(result_time)
    return times_list


def test_implementations():
    #reset_path(resultspath) # resets results (remember to seve and rename after running)
    
    test_info_df = pd.read_csv(os.path.join(testpath, 'test_info.csv'))
    
    #options_dict
    for implementation_name in options_dict:
        print(implementation_name)
        
        reset_db()
        bootstrap_hudi(options_dict[implementation_name]["upsert"]) # always upsert
        print(f"{implementation_name}: bootstraped")
        
        times_list = test_implementation(implementation_name, test_info_df)
        test_info_df[implementation_name] = times_list
    
    reset_db()
    
    results_path = os.path.join(thesispath, "results.csv")
    if os.path.exists(results_path):
        os.remove(results_path)
    
    test_info_df.to_csv(results_path, index=False) # saves results

#test_implementation('bloom') # no db
#test_implementation('simple') # no db
#test_implementation('hbase') # hbase running
#test_implementation('rondb', pd.DataFrame()) # rondb running
#test_implementation('rondb_cluster') # rondb running

## Run test

In [None]:
# have to run seperatly and make sure db are working only for the current index

setup(1000000, 10, 100) # run 1 time before all implementations
test_implementations()

full transparancy

ideas
can see roolback times