Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.table.timeline.HoodieTimeline.SCHEMA_COMMIT_ACTION;

/**
* <code>HoodieTableMetaClient</code> allows to access meta-data about a hoodie table It returns meta-data about
* commits, savepoints, compactions, cleanups as a <code>HoodieTimeline</code> Create an instance of the
Expand Down Expand Up @@ -553,6 +555,32 @@ public HoodieTimeline getCommitTimeline() {
}
}

/**
* Return whether an available historySchema file exist in schema folder or not.
*/
public boolean isValidHistorySchemaExist() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's history-schema?

try {
Path baseSchemaPath = new Path(this.getSchemaFolderName());
FileSystem fs = this.getFs();
if (fs.exists(baseSchemaPath)) {
FileStatus[] allSchemaFileStatus = fs.listStatus(baseSchemaPath);
if (allSchemaFileStatus.length == 0) {
return false;
}
List<String> validateCommits = this.getCommitsTimeline()
.filterCompletedInstants().getInstants().map(f -> f.getTimestamp()).collect(Collectors.toList());
Stream<String> schemaFileStream = Arrays.stream(allSchemaFileStatus)
.filter(f -> f.isFile() && f.getPath().getName().endsWith(SCHEMA_COMMIT_ACTION))
.map(file -> file.getPath().getName());

return schemaFileStream.anyMatch(f -> validateCommits.contains(f.split("\\.")[0]));
}
} catch (IOException io) {
throw new HoodieException(io);
}
return false;
}

/**
* Gets the commit action type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager
import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
Expand All @@ -49,9 +50,9 @@ import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import org.apache.spark.unsafe.types.UTF8String

import java.net.URI
import java.util.Locale

import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -509,12 +510,21 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name)))

private def isSchemaEvolutionEnabled = {
// Auto set schema evolution
def detectSchemaEvolution(): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see HoodieCatalog::loadTable also checks the config SCHEMA_EVOLUTION_ENABLED. Should that call this auto-detection as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry we might not be able to do that,
we need metaclient to detect schema evolution, if we create metaclient in HoodieCatalog::loadTable, it will be time cost

val result = metaClient.isValidHistorySchemaExist
if (result) {
sparkSession.sessionState.conf.setConfString(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, result.toString)
Copy link
Contributor

@alexeykudinkin alexeykudinkin Jun 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we overriding configuration here? I don't think we should be overriding configs in such a way

}
result
}
// NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as
// t/h Spark Session configuration (for ex, for Spark SQL)
optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean ||
sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean ||
detectSchemaEvolution()
}
}

Expand Down