Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xuanyuanking committed Sep 24, 2020
1 parent fad1976 commit 97761d2
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 18 deletions.
Expand Up @@ -1046,6 +1046,9 @@ class Analyzer(
lazy val loaded = CatalogV2Util.loadTable(catalog, ident).map {
case v1Table: V1Table =>
if (isStreaming) {
if (v1Table.v1Table.tableType == CatalogTableType.VIEW) {
throw new AnalysisException("Stream reading does not support views.")
}
SubqueryAlias(
catalog.name +: ident.asMultipartIdentifier,
UnresolvedCatalogRelation(v1Table.v1Table, options, isStreaming = true))
Expand Down
Expand Up @@ -732,11 +732,3 @@ case class HiveTableRelation(
s"$nodeName $metadataStr"
}
}

/**
* A V2 table with V1 fallback support. This is used to fallback to V1 table when the V2 one
* doesn't implement specific capabilities but V1 already has.
*/
trait V2TableWithV1Fallback extends Table {
def v1Table: CatalogTable
}
Expand Up @@ -80,3 +80,11 @@ private[sql] case class V1Table(v1Table: CatalogTable) extends Table {

override def toString: String = s"V1Table($name)"
}

/**
* A V2 table with V1 fallback support. This is used to fallback to V1 table when the V2 one
* doesn't implement specific capabilities but V1 already has.
*/
trait V2TableWithV1Fallback extends Table {
def v1Table: CatalogTable
}
Expand Up @@ -298,12 +298,13 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
case s @ StreamingRelationV2(
_, _, table, extraOptions, _, _, _, Some(UnresolvedCatalogRelation(tableMeta, _, true))) =>
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val v1Relation = getStreamingRelation(tableMeta, extraOptions)
if (table.isInstanceOf[SupportsRead]
&& table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ)) {
s.copy(v1Relation = None)
s.copy(v1Relation = Some(v1Relation))
} else {
// Fallback to V1 relation
getStreamingRelation(tableMeta, extraOptions)
v1Relation
}
}
}
Expand Down
Expand Up @@ -43,8 +43,7 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {
case r: DataSourceV2Relation if !r.table.supports(BATCH_READ) =>
failAnalysis(s"Table ${r.table.name()} does not support batch scan.")

case r: StreamingRelationV2
if !r.table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) && r.v1Relation.isEmpty =>
case r: StreamingRelationV2 if !r.table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) =>
throw new AnalysisException(s"Table ${r.table.name()} does not support either " +
"micro-batch or continuous scan.")

Expand Down
Expand Up @@ -26,12 +26,13 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, V2TableWithV1Fallback}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.connector.{FakeV2Provider, InMemoryTableCatalog}
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsRead, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsRead, Table, TableCapability, V2TableWithV1Fallback}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.execution.streaming.{MemoryStream, MemoryStreamScanBuilder, StreamingRelation}
import org.apache.spark.sql.execution.streaming.{MemoryStream, MemoryStreamScanBuilder}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.streaming.sources.FakeScanBuilder
Expand Down Expand Up @@ -158,11 +159,13 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
withTempDir { tempDir =>
withTable(tblName) {
spark.sql(s"CREATE TABLE $tblName (data int) USING $v2Source")

// Check the StreamingRelationV2 has been replaced by StreamingRelation
val plan = spark.readStream.option("path", tempDir.getCanonicalPath).table(tblName)
.queryExecution.analyzed.collectFirst {
case d: StreamingRelation => d
}
assert(plan.nonEmpty)
case d: StreamingRelationV2 => d
}
assert(plan.isEmpty)
}
}
}
Expand Down

0 comments on commit 97761d2

Please sign in to comment.