Skip to content

Commit

Permalink
[WIP] Upgrade spark integration version to 2.3.2
Browse files Browse the repository at this point in the history
Upgrade spark integration version to 2.3.2
  • Loading branch information
zzcclp committed Sep 27, 2018
1 parent 54bcf49 commit 586cf7b
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 4 deletions.
Expand Up @@ -296,7 +296,7 @@ object CarbonReflectionUtils {
classOf[LogicalPlan],
classOf[Seq[Attribute]],
classOf[SparkPlan])
method.invoke(dataSourceObj, mode, query, query.output, physicalPlan)
method.invoke(dataSourceObj, mode, query, query.output.map(_.name), physicalPlan)
.asInstanceOf[BaseRelation]
} else {
throw new UnsupportedOperationException("Spark version not supported")
Expand Down
2 changes: 1 addition & 1 deletion integration/spark-datasource/pom.xml
Expand Up @@ -278,7 +278,7 @@
<profile>
<id>spark-2.3</id>
<properties>
<spark.version>2.3.1</spark.version>
<spark.version>2.3.2</spark.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
</properties>
Expand Down
5 changes: 4 additions & 1 deletion integration/spark2/pom.xml
Expand Up @@ -283,6 +283,7 @@
<configuration>
<sources>
<source>src/main/spark2.1</source>
<source>src/main/commonTo2.1And2.2</source>
</sources>
</configuration>
</execution>
Expand Down Expand Up @@ -328,6 +329,7 @@
<sources>
<source>src/main/spark2.2</source>
<source>src/main/commonTo2.2And2.3</source>
<source>src/main/commonTo2.1And2.2</source>
</sources>
</configuration>
</execution>
Expand All @@ -339,7 +341,7 @@
<profile>
<id>spark-2.3</id>
<properties>
<spark.version>2.3.1</spark.version>
<spark.version>2.3.2</spark.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
</properties>
Expand All @@ -352,6 +354,7 @@
<excludes>
<exclude>src/main/spark2.1</exclude>
<exclude>src/main/spark2.2</exclude>
<exclude>src/main/commonTo2.1And2.2</exclude>
</excludes>
</configuration>
</plugin>
Expand Down
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.strategy

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}

/**
* Physical plan node for scanning data. It is applied for both tables
* USING carbondata and STORED AS CARBONDATA.
*/
class CarbonDataSourceScan(
override val output: Seq[Attribute],
val rdd: RDD[InternalRow],
@transient override val relation: HadoopFsRelation,
val partitioning: Partitioning,
val md: Map[String, String],
identifier: Option[TableIdentifier],
@transient private val logicalRelation: LogicalRelation)
extends FileSourceScanExec(
relation,
output,
relation.dataSchema,
Seq.empty,
Seq.empty,
identifier) {

override lazy val supportsBatch: Boolean = true

override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) =
(partitioning, Nil)

override lazy val metadata: Map[String, String] = md

override def inputRDDs(): Seq[RDD[InternalRow]] = rdd :: Nil

}
4 changes: 3 additions & 1 deletion pom.xml
Expand Up @@ -522,6 +522,7 @@
<sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark2/src/main/spark2.1</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.1And2.2</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
Expand Down Expand Up @@ -582,6 +583,7 @@
<sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark2/src/main/spark2.2</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.1And2.2</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2And2.3</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
Expand All @@ -608,7 +610,7 @@
<profile>
<id>spark-2.3</id>
<properties>
<spark.version>2.3.1</spark.version>
<spark.version>2.3.2</spark.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
</properties>
Expand Down

0 comments on commit 586cf7b

Please sign in to comment.