diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 9945eb0650feb..690a25fa8b301 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -66,6 +66,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.SCHEMA_COMMIT_ACTION; + /** * HoodieTableMetaClient allows to access meta-data about a hoodie table It returns meta-data about * commits, savepoints, compactions, cleanups as a HoodieTimeline Create an instance of the @@ -553,6 +555,32 @@ public HoodieTimeline getCommitTimeline() { } } + /** + * Return whether an available historySchema file exist in schema folder or not. + */ + public boolean isValidHistorySchemaExist() { + try { + Path baseSchemaPath = new Path(this.getSchemaFolderName()); + FileSystem fs = this.getFs(); + if (fs.exists(baseSchemaPath)) { + FileStatus[] allSchemaFileStatus = fs.listStatus(baseSchemaPath); + if (allSchemaFileStatus.length == 0) { + return false; + } + List validateCommits = this.getCommitsTimeline() + .filterCompletedInstants().getInstants().map(f -> f.getTimestamp()).collect(Collectors.toList()); + Stream schemaFileStream = Arrays.stream(allSchemaFileStatus) + .filter(f -> f.isFile() && f.getPath().getName().endsWith(SCHEMA_COMMIT_ACTION)) + .map(file -> file.getPath().getName()); + + return schemaFileStream.anyMatch(f -> validateCommits.contains(f.split("\\.")[0])); + } + } catch (IOException io) { + throw new HoodieException(io); + } + return false; + } + /** * Gets the commit action type. */ diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 47e391a560a22..ef4a8cfdaf9ae 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -36,6 +36,7 @@ import org.apache.hudi.common.util.StringUtils import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema} import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter +import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager import org.apache.hudi.io.storage.HoodieHFileReader import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.internal.Logging @@ -49,9 +50,9 @@ import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext, SparkSession} import org.apache.spark.unsafe.types.UTF8String - import java.net.URI import java.util.Locale + import scala.collection.JavaConverters._ import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} @@ -509,12 +510,21 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name))) private def isSchemaEvolutionEnabled = { + // Auto set schema evolution + def detectSchemaEvolution(): Boolean = { + val result = metaClient.isValidHistorySchemaExist + if (result) { + sparkSession.sessionState.conf.setConfString(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, result.toString) + } + result + } // NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as // t/h Spark Session configuration (for ex, for Spark SQL) optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean || sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, - DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean + DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean || + detectSchemaEvolution() } }