Skip to content

[SUPPORT] inline's clustering truncates Path up to the first comma met. #12333

@serhii-donetskyi-datacubed

Description

Describe the problem you faced

When doing inline clustering, if table's path contains comma, it truncates path (up to the first comma) and fails, since no such directory exists.

To Reproduce

Steps to reproduce the behavior:

  1. Run the script:
import sys
import boto3

from pyspark.sql import SparkSession
from pyspark.context import SparkContext
import pyspark.sql.functions as F

spark = SparkSession.builder \
    .config("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "CORRECTED") \
    .config("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED") \
    .config("spark.sql.legacy.parquet.int96RebaseModeInRead", "CORRECTED") \
    .config("spark.sql.legacy.parquet.int96RebaseModeInWrite", "CORRECTED") \
    .config('spark.sql.adaptive.enabled', 'true') \
    .config('spark.sql.adaptive.coalescePartitions.enabled', 'true') \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .getOrCreate()

hudi_options = {
    'hoodie.database.name': 'test',
    'hoodie.table.name': 'test',
    'hoodie.enable.data.skipping': 'true',
    'hoodie.datasource.write.recordkey.field': 'id1,id2',
    'hoodie.datasource.write.precombine.field': 'ts',
    'hoodie.datasource.write.reconcile.schema': 'false',
    'hoodie.datasource.write.schema.allow.auto.evolution.column.drop': 'true',
    'hoodie.datasource.write.operation': 'insert',

    # 'hoodie.datasource.hive_sync.enable': 'true',
    # 'hoodie.datasource.hive_sync.use_jdbc': 'false',
    # 'hoodie.datasource.hive_sync.database': curated_db,
    # 'hoodie.datasource.hive_sync.table': curated_table,
    # 'hoodie.datasource.hive_sync.support_timestamp': 'true',
    # 'hoodie.datasource.hive_sync.mode': 'hms',

    'hoodie.parquet.outputtimestamptype': 'TIMESTAMP_MICROS',
    # 'hoodie.index.type': 'RECORD_INDEX',

    'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
    'hoodie.cleaner.commits.retained': 10,

    'hoodie.parquet.small.file.limit': 0,
    'hoodie.clustering.inline': 'true',
    'hoodie.clustering.inline.max.commits': 2,

    'format': 'hudi',
    'mode': 'append',
    'path': '/tmp/test/id1,id2/'
}

df = spark.sql('select 1 as id1, 1 as id2, now() as ts, 99 as col')

for _ in range(2):
    df.write.save(**hudi_options)

Expected behavior

Data is appended to hudi table and clustering service is executed without any errors.

Environment Description
Using EMR 7.3.0, which has Hudi version 0.15.0

  • Hudi version : 0.15.0

  • Spark version : 3.5.1

  • Hive version : 3.1.3

  • Hadoop version : 3.3.5

  • Storage (HDFS/S3/GCS..) : ALL

  • Running on Docker? (yes/no) : no

Stacktrace

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In[3], line 2
      1 for _ in range(2):
----> 2     df.write.save(**hudi_options)

File /usr/local/lib/python3.9/dist-packages/pyspark/sql/readwriter.py:1463, in DataFrameWriter.save(self, path, format, mode, partitionBy, **options)
   1461     self._jwrite.save()
   1462 else:
-> 1463     self._jwrite.save(path)

File /usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /usr/local/lib/python3.9/dist-packages/pyspark/errors/exceptions/captured.py:179, in capture_sql_exception.<locals>.deco(*a, **kw)
    177 def deco(*a: Any, **kw: Any) -> Any:
    178     try:
--> 179         return f(*a, **kw)
    180     except Py4JJavaError as e:
    181         converted = convert_exception(e.java_exception)

File /usr/local/lib/python3.9/dist-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o59.save.
: java.util.concurrent.CompletionException: org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/tmp/test/id1.
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/tmp/test/id1.
	at org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1500)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:757)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:754)
	at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:384)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    Status

    ✅ Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions