Skip to content

Commit

Permalink
[MINOR] Fix Call Procedure code style (apache#6186)
Browse files Browse the repository at this point in the history
* Fix Call Procedure code style.
Co-authored-by: superche <superche@tencent.com>
  • Loading branch information
hechao-ustc authored and fengjian committed Apr 5, 2023
1 parent 876f57f commit 00a9576
Show file tree
Hide file tree
Showing 33 changed files with 167 additions and 162 deletions.
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.types._
import java.io.FileNotFoundException
import java.util.function.Supplier

class MetadataCreateProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport {
class CreateMetadataTableProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None)
)
Expand Down Expand Up @@ -67,14 +67,14 @@ class MetadataCreateProcedure extends BaseProcedure with ProcedureBuilder with S
Seq(Row("Created Metadata Table in " + metadataPath + " (duration=" + timer.endTimer / 1000.0 + "secs)"))
}

override def build = new MetadataCreateProcedure()
override def build = new CreateMetadataTableProcedure()
}

object MetadataCreateProcedure {
val NAME = "metadata_create"
object CreateMetadataTableProcedure {
val NAME = "create_metadata_table"
var metadataBaseDirectory: Option[String] = None

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new MetadataCreateProcedure()
override def get() = new CreateMetadataTableProcedure()
}
}
Expand Up @@ -26,10 +26,10 @@ import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}

import java.util.function.Supplier

class CreateSavepointsProcedure extends BaseProcedure with ProcedureBuilder with Logging {
class CreateSavepointProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "commit_Time", DataTypes.StringType, None),
ProcedureParameter.required(1, "commit_time", DataTypes.StringType, None),
ProcedureParameter.optional(2, "user", DataTypes.StringType, ""),
ProcedureParameter.optional(3, "comments", DataTypes.StringType, "")
)
Expand Down Expand Up @@ -75,14 +75,14 @@ class CreateSavepointsProcedure extends BaseProcedure with ProcedureBuilder with
Seq(Row(result))
}

override def build: Procedure = new CreateSavepointsProcedure()
override def build: Procedure = new CreateSavepointProcedure()
}

object CreateSavepointsProcedure {
val NAME: String = "create_savepoints"
object CreateSavepointProcedure {
val NAME: String = "create_savepoint"

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get(): CreateSavepointsProcedure = new CreateSavepointsProcedure()
override def get(): CreateSavepointProcedure = new CreateSavepointProcedure()
}
}

Expand Down
Expand Up @@ -29,7 +29,7 @@ import scala.util.{Failure, Success, Try}
class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "instant_Time", DataTypes.StringType, None)
ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None)
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
Expand Down
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.types._
import java.io.FileNotFoundException
import java.util.function.Supplier

class MetadataDeleteProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport {
class DeleteMetadataTableProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None)
)
Expand Down Expand Up @@ -58,14 +58,14 @@ class MetadataDeleteProcedure extends BaseProcedure with ProcedureBuilder with S
Seq(Row("Removed Metadata Table from " + metadataPath))
}

override def build = new MetadataDeleteProcedure()
override def build = new DeleteMetadataTableProcedure()
}

object MetadataDeleteProcedure {
val NAME = "metadata_delete"
object DeleteMetadataTableProcedure {
val NAME = "delete_metadata_table"
var metadataBaseDirectory: Option[String] = None

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new MetadataDeleteProcedure()
override def get() = new DeleteMetadataTableProcedure()
}
}
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}

import java.util.function.Supplier

class DeleteSavepointsProcedure extends BaseProcedure with ProcedureBuilder with Logging {
class DeleteSavepointProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None)
Expand Down Expand Up @@ -74,14 +74,14 @@ class DeleteSavepointsProcedure extends BaseProcedure with ProcedureBuilder with
Seq(Row(result))
}

override def build: Procedure = new DeleteSavepointsProcedure()
override def build: Procedure = new DeleteSavepointProcedure()
}

object DeleteSavepointsProcedure {
val NAME: String = "delete_savepoints"
object DeleteSavepointProcedure {
val NAME: String = "delete_savepoint"

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get(): DeleteSavepointsProcedure = new DeleteSavepointsProcedure()
override def get(): DeleteSavepointProcedure = new DeleteSavepointProcedure()
}
}

Expand Down
Expand Up @@ -49,7 +49,7 @@ class ExportInstantsProcedure extends BaseProcedure with ProcedureBuilder with L

private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "localFolder", DataTypes.StringType, None),
ProcedureParameter.required(1, "local_folder", DataTypes.StringType, None),
ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, -1),
ProcedureParameter.optional(3, "actions", DataTypes.StringType, defaultActions),
ProcedureParameter.optional(4, "desc", DataTypes.BooleanType, false)
Expand Down
Expand Up @@ -28,17 +28,17 @@ import scala.language.higherKinds
class HdfsParquetImportProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "tableType", DataTypes.StringType, None),
ProcedureParameter.required(2, "srcPath", DataTypes.StringType, None),
ProcedureParameter.required(3, "targetPath", DataTypes.StringType, None),
ProcedureParameter.required(4, "rowKey", DataTypes.StringType, None),
ProcedureParameter.required(5, "partitionKey", DataTypes.StringType, None),
ProcedureParameter.required(6, "schemaFilePath", DataTypes.StringType, None),
ProcedureParameter.required(1, "table_type", DataTypes.StringType, None),
ProcedureParameter.required(2, "src_path", DataTypes.StringType, None),
ProcedureParameter.required(3, "target_path", DataTypes.StringType, None),
ProcedureParameter.required(4, "row_key", DataTypes.StringType, None),
ProcedureParameter.required(5, "partition_key", DataTypes.StringType, None),
ProcedureParameter.required(6, "schema_file_path", DataTypes.StringType, None),
ProcedureParameter.optional(7, "format", DataTypes.StringType, "parquet"),
ProcedureParameter.optional(8, "command", DataTypes.StringType, "insert"),
ProcedureParameter.optional(9, "retry", DataTypes.IntegerType, 0),
ProcedureParameter.optional(10, "parallelism", DataTypes.IntegerType, jsc.defaultParallelism),
ProcedureParameter.optional(11, "propsFilePath", DataTypes.StringType, "")
ProcedureParameter.optional(11, "props_file_path", DataTypes.StringType, "")
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
Expand Down
Expand Up @@ -35,9 +35,9 @@ object HoodieProcedures {
val mapBuilder: ImmutableMap.Builder[String, Supplier[ProcedureBuilder]] = ImmutableMap.builder()
mapBuilder.put(RunCompactionProcedure.NAME, RunCompactionProcedure.builder)
mapBuilder.put(ShowCompactionProcedure.NAME, ShowCompactionProcedure.builder)
mapBuilder.put(CreateSavepointsProcedure.NAME, CreateSavepointsProcedure.builder)
mapBuilder.put(DeleteSavepointsProcedure.NAME, DeleteSavepointsProcedure.builder)
mapBuilder.put(RollbackSavepointsProcedure.NAME, RollbackSavepointsProcedure.builder)
mapBuilder.put(CreateSavepointProcedure.NAME, CreateSavepointProcedure.builder)
mapBuilder.put(DeleteSavepointProcedure.NAME, DeleteSavepointProcedure.builder)
mapBuilder.put(RollbackToSavepointProcedure.NAME, RollbackToSavepointProcedure.builder)
mapBuilder.put(RollbackToInstantTimeProcedure.NAME, RollbackToInstantTimeProcedure.builder)
mapBuilder.put(RunClusteringProcedure.NAME, RunClusteringProcedure.builder)
mapBuilder.put(ShowClusteringProcedure.NAME, ShowClusteringProcedure.builder)
Expand Down Expand Up @@ -66,13 +66,13 @@ object HoodieProcedures {
mapBuilder.put(ShowBootstrapPartitionsProcedure.NAME, ShowBootstrapPartitionsProcedure.builder)
mapBuilder.put(UpgradeTableProcedure.NAME, UpgradeTableProcedure.builder)
mapBuilder.put(DowngradeTableProcedure.NAME, DowngradeTableProcedure.builder)
mapBuilder.put(ListMetadataFilesProcedure.NAME, ListMetadataFilesProcedure.builder)
mapBuilder.put(ListMetadataPartitionsProcedure.NAME, ListMetadataPartitionsProcedure.builder)
mapBuilder.put(MetadataCreateProcedure.NAME, MetadataCreateProcedure.builder)
mapBuilder.put(MetadataDeleteProcedure.NAME, MetadataDeleteProcedure.builder)
mapBuilder.put(MetadataInitProcedure.NAME, MetadataInitProcedure.builder)
mapBuilder.put(ShowMetadataStatsProcedure.NAME, ShowMetadataStatsProcedure.builder)
mapBuilder.put(ValidateMetadataFilesProcedure.NAME, ValidateMetadataFilesProcedure.builder)
mapBuilder.put(ShowMetadataTableFilesProcedure.NAME, ShowMetadataTableFilesProcedure.builder)
mapBuilder.put(ShowMetadataTablePartitionsProcedure.NAME, ShowMetadataTablePartitionsProcedure.builder)
mapBuilder.put(CreateMetadataTableProcedure.NAME, CreateMetadataTableProcedure.builder)
mapBuilder.put(DeleteMetadataTableProcedure.NAME, DeleteMetadataTableProcedure.builder)
mapBuilder.put(InitMetadataTableProcedure.NAME, InitMetadataTableProcedure.builder)
mapBuilder.put(ShowMetadataTableStatsProcedure.NAME, ShowMetadataTableStatsProcedure.builder)
mapBuilder.put(ValidateMetadataTableFilesProcedure.NAME, ValidateMetadataTableFilesProcedure.builder)
mapBuilder.put(ShowFsPathDetailProcedure.NAME, ShowFsPathDetailProcedure.builder)
mapBuilder.put(CopyToTableProcedure.NAME, CopyToTableProcedure.builder)
mapBuilder.put(RepairAddpartitionmetaProcedure.NAME, RepairAddpartitionmetaProcedure.builder)
Expand Down
Expand Up @@ -30,10 +30,10 @@ import org.apache.spark.sql.types._
import java.io.FileNotFoundException
import java.util.function.Supplier

class MetadataInitProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging {
class InitMetadataTableProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "readOnly", DataTypes.BooleanType, false)
ProcedureParameter.optional(1, "read_only", DataTypes.BooleanType, false)
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
Expand Down Expand Up @@ -71,14 +71,14 @@ class MetadataInitProcedure extends BaseProcedure with ProcedureBuilder with Spa
Seq(Row(action + " Metadata Table in " + metadataPath + " (duration=" + timer.endTimer / 1000.0 + "sec)"))
}

override def build = new MetadataInitProcedure()
override def build = new InitMetadataTableProcedure()
}

object MetadataInitProcedure {
val NAME = "metadata_init"
object InitMetadataTableProcedure {
val NAME = "init_metadata_table"
var metadataBaseDirectory: Option[String] = None

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new MetadataInitProcedure()
override def get() = new InitMetadataTableProcedure()
}
}
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}

import java.util.function.Supplier

class RollbackSavepointsProcedure extends BaseProcedure with ProcedureBuilder with Logging {
class RollbackToSavepointProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None)
Expand Down Expand Up @@ -74,14 +74,14 @@ class RollbackSavepointsProcedure extends BaseProcedure with ProcedureBuilder wi
Seq(Row(result))
}

override def build: Procedure = new RollbackSavepointsProcedure()
override def build: Procedure = new RollbackToSavepointProcedure()
}

object RollbackSavepointsProcedure {
val NAME: String = "rollback_savepoints"
object RollbackToSavepointProcedure {
val NAME: String = "rollback_to_savepoint"

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get(): RollbackSavepointsProcedure = new RollbackSavepointsProcedure()
override def get(): RollbackToSavepointProcedure = new RollbackToSavepointProcedure()
}
}

Expand Down
Expand Up @@ -37,22 +37,22 @@ import java.util.function.Supplier
class RunBootstrapProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "tableType", DataTypes.StringType, None),
ProcedureParameter.required(2, "bootstrapPath", DataTypes.StringType, None),
ProcedureParameter.required(3, "basePath", DataTypes.StringType, None),
ProcedureParameter.required(4, "rowKeyField", DataTypes.StringType, None),
ProcedureParameter.optional(5, "baseFileFormat", DataTypes.StringType, "PARQUET"),
ProcedureParameter.optional(6, "partitionPathField", DataTypes.StringType, ""),
ProcedureParameter.optional(7, "bootstrapIndexClass", DataTypes.StringType, "org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex"),
ProcedureParameter.optional(8, "selectorClass", DataTypes.StringType, "org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector"),
ProcedureParameter.optional(9, "keyGeneratorClass", DataTypes.StringType, "org.apache.hudi.keygen.SimpleKeyGenerator"),
ProcedureParameter.optional(10, "fullBootstrapInputProvider", DataTypes.StringType, "org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider"),
ProcedureParameter.optional(11, "schemaProviderClass", DataTypes.StringType, ""),
ProcedureParameter.optional(12, "payloadClass", DataTypes.StringType, "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"),
ProcedureParameter.required(1, "table_type", DataTypes.StringType, None),
ProcedureParameter.required(2, "bootstrap_path", DataTypes.StringType, None),
ProcedureParameter.required(3, "base_path", DataTypes.StringType, None),
ProcedureParameter.required(4, "rowKey_field", DataTypes.StringType, None),
ProcedureParameter.optional(5, "base_file_format", DataTypes.StringType, "PARQUET"),
ProcedureParameter.optional(6, "partition_path_field", DataTypes.StringType, ""),
ProcedureParameter.optional(7, "bootstrap_index_class", DataTypes.StringType, "org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex"),
ProcedureParameter.optional(8, "selector_class", DataTypes.StringType, "org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector"),
ProcedureParameter.optional(9, "key_generator_glass", DataTypes.StringType, "org.apache.hudi.keygen.SimpleKeyGenerator"),
ProcedureParameter.optional(10, "full_bootstrap_input_provider", DataTypes.StringType, "org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider"),
ProcedureParameter.optional(11, "schema_provider_class", DataTypes.StringType, ""),
ProcedureParameter.optional(12, "payload_class", DataTypes.StringType, "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"),
ProcedureParameter.optional(13, "parallelism", DataTypes.IntegerType, 1500),
ProcedureParameter.optional(14, "enableHiveSync", DataTypes.BooleanType, false),
ProcedureParameter.optional(15, "propsFilePath", DataTypes.StringType, ""),
ProcedureParameter.optional(16, "bootstrapOverwrite", DataTypes.BooleanType, false)
ProcedureParameter.optional(14, "enable_hive_sync", DataTypes.BooleanType, false),
ProcedureParameter.optional(15, "props_file_path", DataTypes.StringType, ""),
ProcedureParameter.optional(16, "bootstrap_overwrite", DataTypes.BooleanType, false)
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
Expand Down
Expand Up @@ -30,10 +30,10 @@ class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with Logging

private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "skipLocking", DataTypes.BooleanType, false),
ProcedureParameter.optional(2, "scheduleInLine", DataTypes.BooleanType, true),
ProcedureParameter.optional(3, "cleanPolicy", DataTypes.StringType, None),
ProcedureParameter.optional(4, "retainCommits", DataTypes.IntegerType, 10)
ProcedureParameter.optional(1, "skip_locking", DataTypes.BooleanType, false),
ProcedureParameter.optional(2, "schedule_in_line", DataTypes.BooleanType, true),
ProcedureParameter.optional(3, "clean_policy", DataTypes.StringType, None),
ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType, 10)
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
Expand Down
Expand Up @@ -36,8 +36,8 @@ class ShowArchivedCommitsProcedure(includeExtraMetadata: Boolean) extends BasePr
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
ProcedureParameter.optional(2, "startTs", DataTypes.StringType, ""),
ProcedureParameter.optional(3, "endTs", DataTypes.StringType, "")
ProcedureParameter.optional(2, "start_ts", DataTypes.StringType, ""),
ProcedureParameter.optional(3, "end_ts", DataTypes.StringType, "")
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
Expand All @@ -63,7 +63,7 @@ class ShowArchivedCommitsProcedure(includeExtraMetadata: Boolean) extends BasePr
StructField("num_update_writes", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_errors", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_log_blocks", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_corrupt_logblocks", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_corrupt_log_blocks", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_rollback_blocks", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_log_records", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_updated_records_compacted", DataTypes.LongType, nullable = true, Metadata.empty),
Expand Down

0 comments on commit 00a9576

Please sign in to comment.