Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,9 @@
* limitations under the License.
*/

package org.apache.paimon.flink;
package org.apache.paimon.fs;

import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileIOLoader;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.fs.local.LocalFileIO;

import java.io.IOException;
Expand All @@ -40,7 +34,8 @@ public IsolatedDirectoryFileIO() {}

@Override
public void configure(CatalogContext context) {
root = new Path(context.options().get(ROOT_DIR));
String rootDir = context.options().get(ROOT_DIR);
root = rootDir == null ? null : new Path(rootDir);
}

@Override
Expand Down Expand Up @@ -103,6 +98,10 @@ private void checkPath(Path path) {
if (path == null) {
throw new NullPointerException("path is null");
}
if (root == null) {
throw new UnsupportedOperationException(
"Isolated file io requires option " + ROOT_DIR + ".");
}
if (!path.toString().startsWith(root.toString())) {
throw new UnsupportedOperationException(
"Isolated file io only supports reading child of root directory "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@

org.apache.paimon.fs.RequireOptionsFileIOLoader
org.apache.paimon.utils.HadoopUtilsITCase$TestFileIOLoader
org.apache.paimon.fs.IsolatedDirectoryFileIO$Loader
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.IsolatedDirectoryFileIO;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.UriReaderFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,3 @@
# limitations under the License.

org.apache.paimon.flink.action.CopyFilesActionSlowFileIO$Loader
org.apache.paimon.flink.IsolatedDirectoryFileIO$Loader
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable, dataSplits: Se
table.newBatchWriteBuilder(),
writeType,
firstRowIdToPartitionMap,
table.catalogEnvironment().catalogContext())
catalogContextForBlobDescriptor)
try {
iter.foreach(row => write.write(row))
Iterator.apply(write.commit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ case class PaimonSparkWriter(
writeRowTracking,
fullCompactionDeltaCommits,
batchId,
table.catalogEnvironment().catalogContext(),
catalogContextForBlobDescriptor,
postponePartitionBucketComputer
)

Expand Down Expand Up @@ -263,7 +263,8 @@ case class PaimonSparkWriter(
coreOptions.dynamicBucketMaxBuckets
)
row => {
val sparkRow = new SparkRow(writeType, row)
val sparkRow =
new SparkRow(writeType, row, RowKind.INSERT, catalogContextForBlobDescriptor)
assigner.assign(
extractor.partition(sparkRow),
extractor.trimmedPrimaryKey(sparkRow).hashCode)
Expand Down Expand Up @@ -454,10 +455,8 @@ case class PaimonSparkWriter(
.bootstrap(numSparkPartitions, sparkPartitionId)
.toCloseableIterator
TaskContext.get().addTaskCompletionListener[Unit](_ => bootstrapIterator.close())
val toPaimonRow = SparkRowUtils.toPaimonRow(
rowType,
rowKindColIdx,
table.catalogEnvironment().catalogContext())
val toPaimonRow =
SparkRowUtils.toPaimonRow(rowType, rowKindColIdx, catalogContextForBlobDescriptor)

bootstrapIterator.asScala
.map(
Expand Down Expand Up @@ -491,7 +490,8 @@ case class PaimonSparkWriter(
val rowPartitionKeyExtractor = new RowPartitionKeyExtractor(tableSchema)
iterator.map(
row => {
val sparkRow = new SparkRow(writeType, row)
val sparkRow =
new SparkRow(writeType, row, RowKind.INSERT, catalogContextForBlobDescriptor)
val partitionHash = rowPartitionKeyExtractor.partition(sparkRow).hashCode
val keyHash = rowPartitionKeyExtractor.trimmedPrimaryKey(sparkRow).hashCode
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.paimon.spark.util.{ScanPlanHelper, SparkRowUtils}
import org.apache.paimon.spark.write.{PaimonDataWrite, WriteTaskResult}
import org.apache.paimon.table.{BucketMode, FileStoreTable, PostponeUtils}
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
import org.apache.paimon.utils.BlobDescriptorUtils

import org.apache.spark.sql._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
Expand Down Expand Up @@ -96,6 +97,10 @@ case class SparkPostponeCompactProcedure(
val writeBuilder = realTable.newBatchWriteBuilder
val rowType = table.rowType()
val coreOptions = table.coreOptions()
val catalogContextForBlobDescriptor =
BlobDescriptorUtils.getCatalogContext(
table.catalogEnvironment().catalogContext(),
coreOptions.toConfiguration)

def newWrite() = PaimonDataWrite(
writeBuilder,
Expand All @@ -104,7 +109,7 @@ case class SparkPostponeCompactProcedure(
writeRowTracking = coreOptions.dataEvolutionEnabled(),
Option.apply(coreOptions.fullCompactionDeltaCommits()),
None,
table.catalogEnvironment().catalogContext(),
catalogContextForBlobDescriptor,
Some(postponePartitionBucketComputer)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ case class PaimonBatchWrite(
writeSchema,
dataSchema,
coreOptions,
table.catalogEnvironment().catalogContext())
catalogContextForBlobDescriptor)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ package org.apache.paimon.spark.write

import org.apache.paimon.CoreOptions
import org.apache.paimon.CoreOptions.TagCreationMode
import org.apache.paimon.catalog.CatalogContext
import org.apache.paimon.partition.actions.PartitionMarkDoneAction
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.CommitMessage
import org.apache.paimon.tag.TagBatchCreation
import org.apache.paimon.utils.{InternalRowPartitionComputer, PartitionPathUtils, PartitionStatisticsReporter, TypeUtils}
import org.apache.paimon.utils.{BlobDescriptorUtils, InternalRowPartitionComputer, PartitionPathUtils, PartitionStatisticsReporter, TypeUtils}

import org.apache.spark.internal.Logging

Expand All @@ -36,6 +37,11 @@ trait WriteHelper extends Logging {

lazy val coreOptions: CoreOptions = table.coreOptions()

lazy val catalogContextForBlobDescriptor: CatalogContext =
BlobDescriptorUtils.getCatalogContext(
table.catalogEnvironment().catalogContext(),
coreOptions.toConfiguration)

def postCommit(messages: Seq[CommitMessage]): Unit = {
if (messages.isEmpty) {
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

package org.apache.paimon.spark.sql

import org.apache.paimon.CoreOptions
import org.apache.paimon.catalog.CatalogContext
import org.apache.paimon.data.{Blob, BlobDescriptor}
import org.apache.paimon.fs.Path
import org.apache.paimon.fs.{IsolatedDirectoryFileIO, Path}
import org.apache.paimon.fs.local.LocalFileIO
import org.apache.paimon.options.Options
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.spark.{PaimonSparkTestBase, SparkCatalog}
import org.apache.paimon.utils.UriReaderFactory

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -199,6 +200,99 @@ class BlobTestBase extends PaimonSparkTestBase {
}
}

test("Blob: test write blob descriptor from external storage") {
val catalogName = "isolated_paimon"
val databaseName = "external_blob_db"
val paimonRoot = tempDBDir.getCanonicalPath + "/paimon-isolated-root"
val externalRoot = tempDBDir.getCanonicalPath + "/external-blob-root"
val isolatedPaimonRoot = "isolated://" + paimonRoot
val isolatedExternalRoot = "isolated://" + externalRoot
spark.conf.set(s"spark.sql.catalog.$catalogName", classOf[SparkCatalog].getName)
spark.conf.set(s"spark.sql.catalog.$catalogName.warehouse", isolatedPaimonRoot)
spark.conf.set(
s"spark.sql.catalog.$catalogName.${IsolatedDirectoryFileIO.ROOT_DIR}",
isolatedPaimonRoot)

try {
sql(s"CREATE DATABASE IF NOT EXISTS $catalogName.$databaseName")
sql(s"USE $catalogName.$databaseName")

val blobData = new Array[Byte](1024 * 1024)
RANDOM.nextBytes(blobData)
val blobPath = externalRoot + "/external_blob"
val fileIO = new LocalFileIO
val outputStream = fileIO.newOutputStream(new Path("file://" + blobPath), true)
try outputStream.write(blobData)
finally outputStream.close()

val isolatedPath = "isolated://" + blobPath

withTable("t") {
sql(
"CREATE TABLE t (id INT, data STRING, picture BINARY) TBLPROPERTIES (" +
"'row-tracking.enabled'='true', " +
"'data-evolution.enabled'='true', " +
"'blob-field'='picture', " +
"'blob-as-descriptor'='true')")

// 1. directly writing raise expected errors.
val error = intercept[Exception] {
sql("INSERT INTO t VALUES (1, 'paimon', sys.path_to_descriptor('" + isolatedPath + "'))")
}
assert(
exceptionContains(
error,
"Isolated file io only supports reading child of root directory") &&
exceptionContains(error, "paimon-isolated-root") &&
exceptionContains(error, "external-blob-root/external_blob"),
exceptionMessages(error)
)

// 2. inject blob-descriptor io info through dynamic params.
// this time writing should success.
val descriptorRootOption =
s"spark.paimon.$catalogName.$databaseName.t." +
CoreOptions.BLOB_DESCRIPTOR_PREFIX + IsolatedDirectoryFileIO.ROOT_DIR
withSparkSQLConf(descriptorRootOption -> isolatedExternalRoot) {
sql("INSERT INTO t VALUES (2, 'paimon', sys.path_to_descriptor('" + isolatedPath + "'))")

val newDescriptorBytes =
sql("SELECT picture FROM t WHERE id = 2").collect()(0).get(0).asInstanceOf[Array[Byte]]
val newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes)
val options = new Options()
options.set(IsolatedDirectoryFileIO.ROOT_DIR, isolatedPaimonRoot)
val catalogContext = CatalogContext.create(options)
val uriReaderFactory = new UriReaderFactory(catalogContext)
val blob =
Blob.fromDescriptor(uriReaderFactory.create(newBlobDescriptor.uri), newBlobDescriptor)
assert(util.Arrays.equals(blobData, blob.toData))
}
}
} finally {
sql(s"USE paimon.$dbName0")
sql(s"DROP DATABASE IF EXISTS $catalogName.$databaseName CASCADE")
}
}

private def exceptionContains(throwable: Throwable, message: String): Boolean = {
if (throwable == null) {
false
} else if (throwable.getMessage != null && throwable.getMessage.contains(message)) {
true
} else {
exceptionContains(throwable.getCause, message)
}
}

private def exceptionMessages(throwable: Throwable): String = {
if (throwable == null) {
""
} else {
throwable.getClass.getName + ": " + throwable.getMessage + "\n" +
exceptionMessages(throwable.getCause)
}
}

test("Blob: test compaction") {
withTable("t") {
sql(
Expand Down