Skip to content

Commit

Permalink
[SPARK-27521][SQL] Move data source v2 to catalyst module
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Currently we are in a strange status that, some data source v2 interfaces(catalog related) are in sql/catalyst, some data source v2 interfaces(Table, ScanBuilder, DataReader, etc.) are in sql/core.

I don't see a reason to keep data source v2 API in 2 modules. If we should pick one module, I think sql/catalyst is the one to go.

Catalyst module already has some user-facing stuff like DataType, Row, etc. And we have to update `Analyzer` and `SessionCatalog` to support the new catalog plugin, which needs to be in the catalyst module.

This PR can solve the problem we have in #24246

## How was this patch tested?

existing tests

Closes #24416 from cloud-fan/move.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
cloud-fan authored and gatorsmile committed Jun 5, 2019
1 parent 3f102a8 commit 8b6232b
Show file tree
Hide file tree
Showing 60 changed files with 65 additions and 28 deletions.
39 changes: 39 additions & 0 deletions project/MimaExcludes.scala
Expand Up @@ -291,6 +291,45 @@ object MimaExcludes {
case _ => true
},

// [SPARK-27521][SQL] Move data source v2 to catalyst module
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.vectorized.ColumnarBatch"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.vectorized.ArrowColumnVector"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.vectorized.ColumnarRow"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.vectorized.ColumnarArray"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.vectorized.ColumnarMap"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.vectorized.ColumnVector"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.GreaterThanOrEqual"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.StringEndsWith"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.LessThanOrEqual$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.In$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.Not"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.IsNotNull"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.LessThan"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.LessThanOrEqual"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.EqualNullSafe$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.GreaterThan$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.In"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.And"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.StringStartsWith$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.EqualNullSafe"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.StringEndsWith$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.GreaterThanOrEqual$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.Not$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.IsNull$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.LessThan$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.IsNotNull$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.Or"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.EqualTo$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.GreaterThan"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.StringContains"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.Filter"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.IsNull"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.EqualTo"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.And$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.Or$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.StringStartsWith"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.StringContains$"),

// [SPARK-26216][SQL] Do not use case class as public API (UserDefinedFunction)
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.expressions.UserDefinedFunction$"),
ProblemFilters.exclude[AbstractClassProblem]("org.apache.spark.sql.expressions.UserDefinedFunction"),
Expand Down
4 changes: 4 additions & 0 deletions sql/catalyst/pom.xml
Expand Up @@ -114,6 +114,10 @@
<version>2.7.3</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.sources.v2;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

Expand Down Expand Up @@ -56,13 +55,7 @@ public interface TableProvider {
* @throws UnsupportedOperationException
*/
default Table getTable(CaseInsensitiveStringMap options, StructType schema) {
String name;
if (this instanceof DataSourceRegister) {
name = ((DataSourceRegister) this).shortName();
} else {
name = this.getClass().getName();
}
throw new UnsupportedOperationException(
name + " source does not support user-specified schema");
this.getClass().getSimpleName() + " source does not support user-specified schema");
}
}
Expand Up @@ -23,7 +23,7 @@
import org.apache.arrow.vector.holders.NullableVarCharHolder;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.execution.arrow.ArrowUtils;
import org.apache.spark.sql.util.ArrowUtils;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.types.UTF8String;

Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.arrow
package org.apache.spark.sql.util

import scala.collection.JavaConverters._

Expand Down
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.arrow
package org.apache.spark.sql.util

import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema}
import org.apache.arrow.vector.types.pojo.ArrowType

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateTimeUtils
Expand Down
4 changes: 0 additions & 4 deletions sql/core/pom.xml
Expand Up @@ -112,10 +112,6 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-asm7-shaded</artifactId>
Expand Down
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.network.util.JavaUtils
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector}
import org.apache.spark.util.{ByteBufferOutputStream, Utils}

Expand Down
Expand Up @@ -25,6 +25,7 @@ import org.apache.arrow.vector.complex._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ArrowUtils

object ArrowWriter {

Expand Down
Expand Up @@ -28,8 +28,8 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.arrow.ArrowUtils
import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.util.Utils

/**
Expand Down
Expand Up @@ -24,8 +24,8 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.arrow.ArrowUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.ArrowUtils

/**
* Grouped a iterator into batches.
Expand Down
Expand Up @@ -29,8 +29,9 @@ import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter}
import org.apache.spark._
import org.apache.spark.api.python._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.arrow.{ArrowUtils, ArrowWriter}
import org.apache.spark.sql.execution.arrow.ArrowWriter
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector}
import org.apache.spark.util.Utils

Expand Down
Expand Up @@ -27,9 +27,9 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.arrow.ArrowUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector}
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}

/**
* Physical node for [[org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsInPandas]]
Expand Down
Expand Up @@ -29,9 +29,9 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan}
import org.apache.spark.sql.execution.arrow.ArrowUtils
import org.apache.spark.sql.execution.window._
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.util.Utils

/**
Expand Down
Expand Up @@ -31,8 +31,9 @@ import org.apache.spark.api.r._
import org.apache.spark.api.r.SpecialLengths
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.arrow.{ArrowUtils, ArrowWriter}
import org.apache.spark.sql.execution.arrow.ArrowWriter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector}
import org.apache.spark.util.Utils

Expand Down
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{BinaryType, Decimal, IntegerType, StructField, StructType}
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.util.Utils


Expand Down
Expand Up @@ -305,7 +305,7 @@ class RateStreamProviderSuite extends StreamTest {
.load()
}
assert(exception.getMessage.contains(
"rate source does not support user-specified schema"))
"RateStreamProvider source does not support user-specified schema"))
}

test("continuous data") {
Expand Down
Expand Up @@ -204,7 +204,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
provider.getTable(new CaseInsensitiveStringMap(params.asJava), userSpecifiedSchema)
}
assert(exception.getMessage.contains(
"socket source does not support user-specified schema"))
"TextSocketSourceProvider source does not support user-specified schema"))
}

test("input row metrics") {
Expand Down
Expand Up @@ -21,8 +21,8 @@ import org.apache.arrow.vector._
import org.apache.arrow.vector.complex._

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.execution.arrow.ArrowUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.ArrowColumnVector
import org.apache.spark.unsafe.types.UTF8String

Expand Down
Expand Up @@ -31,9 +31,9 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.memory.MemoryMode
import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.arrow.ArrowUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector}
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.types.CalendarInterval

Expand Down

0 comments on commit 8b6232b

Please sign in to comment.