Skip to content

Commit

Permalink
[MINOR] Spelling sql/core
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR intends to fix typos in the sub-modules:
* `sql/core`

Split per srowen #30323 (comment)

NOTE: The misspellings have been reported at jsoref@706a726#commitcomment-44064356

### Why are the changes needed?

Misspelled words make it harder to read / understand content.

### Does this PR introduce _any_ user-facing change?

There are various fixes to documentation, etc...

### How was this patch tested?

No testing was performed

Closes #30531 from jsoref/spelling-sql-core.

Authored-by: Josh Soref <jsoref@users.noreply.github.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
(cherry picked from commit a093d6f)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
jsoref authored and dongjoon-hyun committed Dec 22, 2020
1 parent 999bbb6 commit 154917c
Show file tree
Hide file tree
Showing 64 changed files with 208 additions and 205 deletions.
Expand Up @@ -87,14 +87,14 @@ function preprocessGraphLayout(g) {
var node = g.node(nodes[i]);
node.padding = "5";

var firstSearator;
var firstSeparator;
var secondSeparator;
var splitter;
if (node.isCluster) {
firstSearator = secondSeparator = labelSeparator;
firstSeparator = secondSeparator = labelSeparator;
splitter = "\\n";
} else {
firstSearator = "<span class='stageId-and-taskId-metrics'>";
firstSeparator = "<span class='stageId-and-taskId-metrics'>";
secondSeparator = "</span>";
splitter = "<br>";
}
Expand All @@ -104,7 +104,7 @@ function preprocessGraphLayout(g) {
if (newTexts) {
node.label = node.label.replace(
newTexts[0],
newTexts[1] + firstSearator + newTexts[2] + secondSeparator + newTexts[3]);
newTexts[1] + firstSeparator + newTexts[2] + secondSeparator + newTexts[3]);
}
});
}
Expand Down
10 changes: 5 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Expand Up @@ -1363,7 +1363,7 @@ class Dataset[T] private[sql](
// Attach the dataset id and column position to the column reference, so that we can detect
// ambiguous self-join correctly. See the rule `DetectAmbiguousSelfJoin`.
// This must be called before we return a `Column` that contains `AttributeReference`.
// Note that, the metadata added here are only avaiable in the analyzer, as the analyzer rule
// Note that, the metadata added here are only available in the analyzer, as the analyzer rule
// `DetectAmbiguousSelfJoin` will remove it.
private def addDataFrameIdToCol(expr: NamedExpression): NamedExpression = {
val newExpr = expr transform {
Expand Down Expand Up @@ -1665,10 +1665,10 @@ class Dataset[T] private[sql](
* See [[RelationalGroupedDataset]] for all the available aggregate functions.
*
* {{{
* // Compute the average for all numeric columns rolluped by department and group.
* // Compute the average for all numeric columns rolled up by department and group.
* ds.rollup($"department", $"group").avg()
*
* // Compute the max age and average salary, rolluped by department and gender.
* // Compute the max age and average salary, rolled up by department and gender.
* ds.rollup($"department", $"gender").agg(Map(
* "salary" -> "avg",
* "age" -> "max"
Expand Down Expand Up @@ -1794,10 +1794,10 @@ class Dataset[T] private[sql](
* (i.e. cannot construct expressions).
*
* {{{
* // Compute the average for all numeric columns rolluped by department and group.
* // Compute the average for all numeric columns rolled up by department and group.
* ds.rollup("department", "group").avg()
*
* // Compute the max age and average salary, rolluped by department and gender.
* // Compute the max age and average salary, rolled up by department and gender.
* ds.rollup($"department", $"gender").agg(Map(
* "salary" -> "avg",
* "age" -> "max"
Expand Down
Expand Up @@ -284,7 +284,7 @@ case class FileSourceScanExec(
//
// Sort ordering would be over the prefix subset of `sort columns` being read
// from the table.
// eg.
// e.g.
// Assume (col0, col2, col3) are the columns read from the table
// If sort columns are (col0, col1), then sort ordering would be considered as (col0)
// If sort columns are (col1, col0), then sort ordering would be empty as per rule #2
Expand Down Expand Up @@ -379,12 +379,12 @@ case class FileSourceScanExec(
case (key, _) if (key.equals("Location")) =>
val location = relation.location
val numPaths = location.rootPaths.length
val abbreviatedLoaction = if (numPaths <= 1) {
val abbreviatedLocation = if (numPaths <= 1) {
location.rootPaths.mkString("[", ", ", "]")
} else {
"[" + location.rootPaths.head + s", ... ${numPaths - 1} entries]"
}
s"$key: ${location.getClass.getSimpleName} ${redact(abbreviatedLoaction)}"
s"$key: ${location.getClass.getSimpleName} ${redact(abbreviatedLocation)}"
case (key, value) => s"$key: ${redact(value)}"
}

Expand Down
Expand Up @@ -28,14 +28,14 @@ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveS
object ExplainUtils extends AdaptiveSparkPlanHelper {
/**
* Given a input physical plan, performs the following tasks.
* 1. Computes the operator id for current operator and records it in the operaror
* 1. Computes the operator id for current operator and records it in the operator
* by setting a tag.
* 2. Computes the whole stage codegen id for current operator and records it in the
* operator by setting a tag.
* 3. Generate the two part explain output for this plan.
* 1. First part explains the operator tree with each operator tagged with an unique
* identifier.
* 2. Second part explans each operator in a verbose manner.
* 2. Second part explains each operator in a verbose manner.
*
* Note : This function skips over subqueries. They are handled by its caller.
*
Expand Down Expand Up @@ -117,7 +117,7 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
}

/**
* Traverses the supplied input plan in a bottem-up fashion does the following :
* Traverses the supplied input plan in a bottom-up fashion does the following :
* 1. produces a map : operator identifier -> operator
* 2. Records the operator id via setting a tag in the operator.
* Note :
Expand Down Expand Up @@ -210,7 +210,7 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {

/**
* Given a input plan, returns an array of tuples comprising of :
* 1. Hosting opeator id.
* 1. Hosting operator id.
* 2. Hosting expression
* 3. Subquery plan
*/
Expand Down
Expand Up @@ -87,7 +87,7 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
def isEmpty: Boolean = numRows == 0

/**
* Clears up resources (eg. memory) held by the backing storage
* Clears up resources (e.g. memory) held by the backing storage
*/
def clear(): Unit = {
if (spillableArray != null) {
Expand Down
Expand Up @@ -386,25 +386,25 @@ class SparkSqlAstBuilder extends AstBuilder {
* - '/path/to/fileOrJar'
*/
override def visitManageResource(ctx: ManageResourceContext): LogicalPlan = withOrigin(ctx) {
val mayebePaths = if (ctx.STRING != null) string(ctx.STRING) else remainder(ctx.identifier).trim
val maybePaths = if (ctx.STRING != null) string(ctx.STRING) else remainder(ctx.identifier).trim
ctx.op.getType match {
case SqlBaseParser.ADD =>
ctx.identifier.getText.toLowerCase(Locale.ROOT) match {
case "file" => AddFileCommand(mayebePaths)
case "jar" => AddJarCommand(mayebePaths)
case "file" => AddFileCommand(maybePaths)
case "jar" => AddJarCommand(maybePaths)
case other => operationNotAllowed(s"ADD with resource type '$other'", ctx)
}
case SqlBaseParser.LIST =>
ctx.identifier.getText.toLowerCase(Locale.ROOT) match {
case "files" | "file" =>
if (mayebePaths.length > 0) {
ListFilesCommand(mayebePaths.split("\\s+"))
if (maybePaths.length > 0) {
ListFilesCommand(maybePaths.split("\\s+"))
} else {
ListFilesCommand()
}
case "jars" | "jar" =>
if (mayebePaths.length > 0) {
ListJarsCommand(mayebePaths.split("\\s+"))
if (maybePaths.length > 0) {
ListJarsCommand(maybePaths.split("\\s+"))
} else {
ListJarsCommand()
}
Expand Down
Expand Up @@ -670,7 +670,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
}

${ctx.registerComment(
s"""Codegend pipeline for stage (id=$codegenStageId)
s"""Codegened pipeline for stage (id=$codegenStageId)
|${this.treeString.trim}""".stripMargin,
"wsc_codegenPipeline")}
${ctx.registerComment(s"codegenStageId=$codegenStageId", "wsc_codegenStageId", true)}
Expand Down
Expand Up @@ -115,7 +115,7 @@ trait AdaptiveSparkPlanHelper {

/**
* Returns a sequence containing the subqueries in this plan, also including the (nested)
* subquries in its children
* subqueries in its children
*/
def subqueriesAll(p: SparkPlan): Seq[SparkPlan] = {
val subqueries = flatMap(p)(_.subqueries)
Expand Down
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.datasources._
* @param storage storage format used to describe how the query result is stored.
* @param provider the data source type to be used
* @param query the logical plan representing data to write to
* @param overwrite whthere overwrites existing directory
* @param overwrite whether overwrites existing directory
*/
case class InsertIntoDataSourceDirCommand(
storage: CatalogStorageFormat,
Expand Down
Expand Up @@ -89,8 +89,8 @@ case class CreateDatabaseCommand(
* A command for users to remove a database from the system.
*
* 'ifExists':
* - true, if database_name does't exist, no action
* - false (default), if database_name does't exist, a warning message will be issued
* - true, if database_name doesn't exist, no action
* - false (default), if database_name doesn't exist, a warning message will be issued
* 'cascade':
* - true, the dependent objects are automatically dropped before dropping database.
* - false (default), it is in the Restrict mode. The database cannot be dropped if
Expand Down
Expand Up @@ -355,7 +355,7 @@ case class LoadDataCommand(
// entire string will be considered while making a Path instance,this is mainly done
// by considering the wild card scenario in mind.as per old logic query param is
// been considered while creating URI instance and if path contains wild card char '?'
// the remaining charecters after '?' will be removed while forming URI instance
// the remaining characters after '?' will be removed while forming URI instance
LoadDataCommand.makeQualified(defaultFS, uriPath, loadPath)
}
}
Expand Down
Expand Up @@ -211,7 +211,7 @@ case class DataSource(
s"Unable to infer schema for $format. It must be specified manually.")
}

// We just print a waring message if the data schema and partition schema have the duplicate
// We just print a warning message if the data schema and partition schema have the duplicate
// columns. This is because we allow users to do so in the previous Spark releases and
// we have the existing tests for the cases (e.g., `ParquetHadoopFsRelationSuite`).
// See SPARK-18108 and SPARK-21144 for related discussions.
Expand Down
Expand Up @@ -167,7 +167,7 @@ class DynamicPartitionDataWriter(

private var fileCounter: Int = _
private var recordsInFile: Long = _
private var currentPartionValues: Option[UnsafeRow] = None
private var currentPartitionValues: Option[UnsafeRow] = None
private var currentBucketId: Option[Int] = None

/** Extracts the partition values out of an input row. */
Expand Down Expand Up @@ -247,19 +247,19 @@ class DynamicPartitionDataWriter(
val nextPartitionValues = if (isPartitioned) Some(getPartitionValues(record)) else None
val nextBucketId = if (isBucketed) Some(getBucketId(record)) else None

if (currentPartionValues != nextPartitionValues || currentBucketId != nextBucketId) {
if (currentPartitionValues != nextPartitionValues || currentBucketId != nextBucketId) {
// See a new partition or bucket - write to a new partition dir (or a new bucket file).
if (isPartitioned && currentPartionValues != nextPartitionValues) {
currentPartionValues = Some(nextPartitionValues.get.copy())
statsTrackers.foreach(_.newPartition(currentPartionValues.get))
if (isPartitioned && currentPartitionValues != nextPartitionValues) {
currentPartitionValues = Some(nextPartitionValues.get.copy())
statsTrackers.foreach(_.newPartition(currentPartitionValues.get))
}
if (isBucketed) {
currentBucketId = nextBucketId
statsTrackers.foreach(_.newBucket(currentBucketId.get))
}

fileCounter = 0
newOutputWriter(currentPartionValues, currentBucketId)
newOutputWriter(currentPartitionValues, currentBucketId)
} else if (description.maxRecordsPerFile > 0 &&
recordsInFile >= description.maxRecordsPerFile) {
// Exceeded the threshold in terms of the number of records per file.
Expand All @@ -268,7 +268,7 @@ class DynamicPartitionDataWriter(
assert(fileCounter < MAX_FILE_COUNTER,
s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER")

newOutputWriter(currentPartionValues, currentBucketId)
newOutputWriter(currentPartitionValues, currentBucketId)
}
val outputRow = getOutputRow(record)
currentWriter.write(outputRow)
Expand Down
Expand Up @@ -164,7 +164,7 @@ object FileFormatWriter extends Logging {

SQLExecution.checkSQLExecutionId(sparkSession)

// propagate the decription UUID into the jobs, so that committers
// propagate the description UUID into the jobs, so that committers
// get an ID guaranteed to be unique.
job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid)

Expand Down
Expand Up @@ -453,7 +453,7 @@ object PartitioningUtils {
val decimalTry = Try {
// `BigDecimal` conversion can fail when the `field` is not a form of number.
val bigDecimal = new JBigDecimal(raw)
// It reduces the cases for decimals by disallowing values having scale (eg. `1.1`).
// It reduces the cases for decimals by disallowing values having scale (e.g. `1.1`).
require(bigDecimal.scale <= 0)
// `DecimalType` conversion can fail when
// 1. The precision is bigger than 38.
Expand Down
Expand Up @@ -168,7 +168,7 @@ case class ReplaceTableAsSelectExec(
* A new table will be created using the schema of the query, and rows from the query are appended.
* If the table exists, its contents and schema should be replaced with the schema and the contents
* of the query. This implementation is atomic. The table replacement is staged, and the commit
* operation at the end should perform tne replacement of the table's metadata and contents. If the
* operation at the end should perform the replacement of the table's metadata and contents. If the
* write fails, the table is instructed to roll back staged changes and any previously written table
* is left untouched.
*/
Expand Down
Expand Up @@ -426,9 +426,9 @@ private[joins] class UnsafeHashedRelation(
readBuffer(valuesBuffer, 0, valuesSize)

val loc = binaryMap.lookup(keyBuffer, Platform.BYTE_ARRAY_OFFSET, keySize)
val putSuceeded = loc.append(keyBuffer, Platform.BYTE_ARRAY_OFFSET, keySize,
val putSucceeded = loc.append(keyBuffer, Platform.BYTE_ARRAY_OFFSET, keySize,
valuesBuffer, Platform.BYTE_ARRAY_OFFSET, valuesSize)
if (!putSuceeded) {
if (!putSucceeded) {
binaryMap.free()
throw new IOException("Could not allocate memory to grow BytesToBytesMap")
}
Expand Down
Expand Up @@ -102,7 +102,7 @@ object ExtractGroupingPythonUDFFromAggregate extends Rule[LogicalPlan] {
case p: PythonUDF =>
// This is just a sanity check, the rule PullOutNondeterministic should
// already pull out those nondeterministic expressions.
assert(p.udfDeterministic, "Non-determinstic PythonUDFs should not appear " +
assert(p.udfDeterministic, "Non-deterministic PythonUDFs should not appear " +
"in grouping expression")
val canonicalized = p.canonicalized.asInstanceOf[PythonUDF]
if (attributeMap.contains(canonicalized)) {
Expand Down Expand Up @@ -174,7 +174,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper {
}

private def collectEvaluableUDFsFromExpressions(expressions: Seq[Expression]): Seq[PythonUDF] = {
// If fisrt UDF is SQL_SCALAR_PANDAS_ITER_UDF, then only return this UDF,
// If first UDF is SQL_SCALAR_PANDAS_ITER_UDF, then only return this UDF,
// otherwise check if subsequent UDFs are of the same type as the first UDF. (since we can only
// extract UDFs of the same eval type)

Expand Down Expand Up @@ -268,7 +268,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper {
case PythonEvalType.SQL_SCALAR_PANDAS_UDF | PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF =>
ArrowEvalPython(validUdfs, resultAttrs, child, evalType)
case _ =>
throw new AnalysisException("Unexcepted UDF evalType")
throw new AnalysisException("Unexpected UDF evalType")
}

attributeMap ++= validUdfs.map(canonicalizeDeterministic).zip(resultAttrs)
Expand Down
Expand Up @@ -288,7 +288,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](

/**
* Delete expired log entries that proceed the currentBatchId and retain
* sufficient minimum number of batches (given by minBatchsToRetain). This
* sufficient minimum number of batches (given by minBatchesToRetain). This
* equates to retaining the earliest compaction log that proceeds
* batch id position currentBatchId + 1 - minBatchesToRetain. All log entries
* prior to the earliest compaction log proceeding that position will be removed.
Expand Down
Expand Up @@ -685,6 +685,6 @@ object StreamExecution {

/**
* A special thread to run the stream query. Some codes require to run in the QueryExecutionThread
* and will use `classOf[QueryxecutionThread]` to check.
* and will use `classOf[QueryExecutionThread]` to check.
*/
abstract class QueryExecutionThread(name: String) extends UninterruptibleThread(name)
Expand Up @@ -77,7 +77,7 @@ object FlatMapGroupsWithStateExecHelper {
// =========================== Private implementations of StateManager ===========================
// ===============================================================================================

/** Commmon methods for StateManager implementations */
/** Common methods for StateManager implementations */
private abstract class StateManagerImplBase(shouldStoreTimestamp: Boolean)
extends StateManager {

Expand Down
Expand Up @@ -65,7 +65,7 @@ object HiveSerDe {
outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"),
serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe")))

// `HiveSerDe` in `serdeMap` should be dintinct.
// `HiveSerDe` in `serdeMap` should be distinct.
val serdeInverseMap: Map[HiveSerDe, String] = serdeMap.flatMap {
case ("sequencefile", _) => None
case ("rcfile", _) => None
Expand Down
Expand Up @@ -387,8 +387,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
}
val sink = new MemorySink()
val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink, df.schema.toAttributes))
val recoverFromChkpoint = outputMode == OutputMode.Complete()
val query = startQuery(sink, extraOptions, recoverFromCheckpoint = recoverFromChkpoint)
val recoverFromCheckpoint = outputMode == OutputMode.Complete()
val query = startQuery(sink, extraOptions, recoverFromCheckpoint = recoverFromCheckpoint)
resultDf.createOrReplaceTempView(query.name)
query
} else if (source == SOURCE_NAME_FOREACH) {
Expand Down

0 comments on commit 154917c

Please sign in to comment.