-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Description
I have a hudi with glue streaming setup (glue 4) reading from kinesis. Config are as follows:
- ingestion rate - 5000rps
- number of DPU - g2.x 20 workers
- kinesis shards - 40
- target - s3
Each record size is 2.5KB and there are around 20000 partitions being published. when keeping a window size of 5 min or 15 min in both cases the lag is more than half an hour even though we have tried:
- increasing workers
- removing compression
- Tried MOR instead of COW (little better but still >20min) - but for our usecase we need to spike out max latency hence doing COW
- playing around with kinesis fetch configs
- Playing around with shuffle parallelism (increased. from 80 to 160 to 320)
- played around with file sizing. (didnt impact much)
Only thing that made the lag >10min < 15 min to reduce the partition size, am i missing something here is there some other configuration to tweak ?
A clear and concise description of what you expected to happen.
Environment Description
-
Hudi version : 0.12.1
-
Spark version : spark 3.3.0
-
Storage (HDFS/S3/GCS..) :s3
-
Running on Docker? (yes/no) : no
SCRIPT
from pyspark.sql.functions import col, from_json, from_unixtime, expr, regexp_extract, udf
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.transforms import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, DoubleType, LongType
import uuid
from awsglue import DynamicFrame
G2.x (4 vcpu and 16gb ram) - 20 workers
# Initialize Spark session
spark = SparkSession.builder \
.appName("GlueKinesisToHudi") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.hive.convertMetastoreParquet", "false") \
.getOrCreate()
sparkContext = spark.sparkContext
glueContext = GlueContext(sparkContext)
schema = # schema
parsed_df_k = glueContext.create_data_frame.from_options(
connection_type="kinesis",
connection_options={
"typeOfData": "kinesis",
"streamARN": "# kinesis arn",
"classification": "json",
"startingPosition": "latest",
"inferSchema": "true",
"emitConsumerLagMetrics": "true",
"maxFetchTimeInMs": 60000
},
transformation_ctx="dataframe_KinesisStream_node21_c",
)
def process_batch(spark_df,batch_id):
if spark_df.count() > 0:
dynamicframe = DynamicFrame.fromDF(spark_df, glueContext, "from_data_frame")
parsed_df = dynamicframe.toDF()
#column transformations her
# Define Hudi options
hudi_options = {
"hoodie.table.name": "compression_15_run1",
"hoodie.datasource.write.table.type": "COPY_ON_WRITE",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.recordkey.field": "id",
"hoodie.datasource.write.partitionpath.field": "container_id,year,month,day",
"hoodie.datasource.write.precombine.field": "timestamp",
"hoodie.datasource.write.hive_style.partitioning": "true",
"hoodie.upsert.shuffle.parallelism": 160,
"hoodie.insert.shuffle.parallelism": 160,
"hoodie.bulkinsert.shuffle.parallelism": 160,
"hoodie.datasource.hive_sync.mode": "hms",
'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
'hoodie.datasource.hive_sync.database': 'glue_ctx_db',
'hoodie.datasource.hive_sync.table': 'compression_15_run1',
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.write.hive_style_partitioning': 'true',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.index.type': 'SIMPLE',
"hoodie.parquet.compression.codec": "gzip",
"hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS",
"hoodie.cleaner.fileversions.retained": "1",
"hoodie.parquet.compression.ratio": "0.95",
'hoodie.copyonwrite.record.size.estimate': '2560',
'hoodie.cleaner.commits.retained': 1,
'hoodie.combine.before.insert': True,
}
warehouse_path = "s3://hudi/run_1/output"
parsed_df.write.format("hudi") \
.options(**hudi_options) \
.mode("append") \
.save(warehouse_path)
glueContext.forEachBatch(
frame=parsed_df_k,
batch_function=process_batch,
options={
"windowSize": "15 minutes",
"checkpointLocation": "s3://hudi/run_1/checkpoint",
},
)