Skip to content

Commit

Permalink
[CARBONDATA-2355] Support run SQL on carbondata files directly
Browse files Browse the repository at this point in the history
Support run SQL on carbondata files directly

This closes #2181
  • Loading branch information
xubo245 authored and chenliang613 committed Jun 1, 2018
1 parent 4d22ddc commit 9469e6b
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 9 deletions.
7 changes: 7 additions & 0 deletions docs/sdk-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,14 @@ Each of SQL data types are mapped into data types of SDK. Following are the mapp
| STRING | DataTypes.STRING |
| DECIMAL | DataTypes.createDecimalType(precision, scale) |

## Run SQL on files directly
Instead of creating table and query it, you can also query that file directly with SQL.

### Example
```
SELECT * FROM carbonfile.`$Path`
```
Find example code at [DirectSQLExample](https://github.com/apache/carbondata/blob/master/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala) in the CarbonData repo.
## API List

### Class org.apache.carbondata.sdk.file.CarbonWriterBuilder
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.examples

import java.io.File

import org.apache.commons.io.FileUtils

import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.examples.util.ExampleUtils
import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}

/**
* Running SQL on carbon files directly
* No need to create table first
* TODO: support more than one carbon file
*/
object DirectSQLExample {

// prepare SDK writer output
def buildTestData(
path: String,
num: Int = 3,
persistSchema: Boolean = false): Any = {

// getCanonicalPath gives path with \, but the code expects /.
val writerPath = path.replace("\\", "/");

val fields: Array[Field] = new Array[Field](3)
fields(0) = new Field("name", DataTypes.STRING)
fields(1) = new Field("age", DataTypes.INT)
fields(2) = new Field("height", DataTypes.DOUBLE)

try {
val builder = CarbonWriter
.builder()
.outputPath(writerPath)
.isTransactionalTable(true)
.uniqueIdentifier(System.currentTimeMillis)
.withBlockSize(2)
if (persistSchema) {
builder.persistSchemaFile(true)
}
val writer = builder.buildWriterForCSVInput(new Schema(fields))
var i = 0
while (i < num) {
writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
i += 1
}
writer.close()
} catch {
case e: Exception => throw e
}
}

def cleanTestData(path: String): Unit = {
FileUtils.deleteDirectory(new File(path))
}

// scalastyle:off
def main(args: Array[String]) {
val carbonSession = ExampleUtils.createCarbonSession("DirectSQLExample")
val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
val path = s"$rootPath/examples/spark2/target/carbonFile/"

import carbonSession._
// 1. generate data file
cleanTestData(path)
buildTestData(path, 20)
val readPath = path + "Fact/Part0/Segment_null"

println("Running SQL on carbon files directly")
try {
// 2. run queries directly, no need to create table first
sql(s"""select * FROM carbonfile.`$readPath` limit 10""".stripMargin).show()
} catch {
case e: Exception => throw e
} finally {
// 3.delete data files
cleanTestData(path)
}
}
// scalastyle:on
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object S3UsingSDKExample {
num: Int = 3,
persistSchema: Boolean = false): Any = {

// getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
// getCanonicalPath gives path with \, but the code expects /.
val writerPath = path.replace("\\", "/");

val fields: Array[Field] = new Array[Field](3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
"../." +
"./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
.getCanonicalPath
//getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
//getCanonicalPath gives path with \, but the code expects /.
writerPath = writerPath.replace("\\", "/");


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
"../." +
"./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
.getCanonicalPath
//getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
//getCanonicalPath gives path with \, but the code expects /.
writerPath = writerPath.replace("\\", "/");

val filePath = writerPath + "/Fact/Part0/Segment_null/"
Expand Down Expand Up @@ -153,6 +153,34 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
cleanTestData()
}

test("Running SQL directly and read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") {
buildTestData(false)
assert(new File(filePath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")

//data source file format
if (sqlContext.sparkContext.version.startsWith("2.1")) {
//data source file format
sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
} else if (sqlContext.sparkContext.version.startsWith("2.2")) {
//data source file format
sql(
s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
|'$filePath' """.stripMargin)
} else {
// TO DO
}

val directSQL = sql(s"""select * FROM carbonfile.`$filePath`""".stripMargin)
directSQL.show(false)
checkAnswer(sql("select * from sdkOutputTable"), directSQL)

sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
assert(new File(filePath).exists())
cleanTestData()
}


test("should not allow to alter datasource carbontable ") {
buildTestData(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
"../." +
"./target/SparkCarbonFileFormat/WriterOutput/")
.getCanonicalPath
//getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
//getCanonicalPath gives path with \, but the code expects /.
writerPath = writerPath.replace("\\", "/")

def buildTestDataSingleFile(): Any = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
"../." +
"./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
.getCanonicalPath
//getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
//getCanonicalPath gives path with \, but the code expects /.
writerPath = writerPath.replace("\\", "/")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object TestSparkCarbonFileFormatWithSparkSession {
"../." +
"./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
.getCanonicalPath
//getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
//getCanonicalPath gives path with \, but the code expects /.
writerPath = writerPath.replace("\\", "/");

val filePath = writerPath + "/Fact/Part0/Segment_null/"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources

import java.net.URI

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -68,8 +69,23 @@ class SparkCarbonFileFormat extends FileFormat
override def inferSchema(sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
val filePaths = CarbonUtil.getFilePathExternalFilePath(
options.get("path").get)
val filePaths = if (options.isEmpty) {
val carbondataFiles = files.seq.filter { each =>
if (each.isFile) {
each.getPath.getName.contains(".carbondata")
} else {
false
}
}

carbondataFiles.map { each =>
each.getPath.toString
}.toList.asJava
} else {
CarbonUtil.getFilePathExternalFilePath(
options.get("path").get)
}

if (filePaths.size() == 0) {
throw new SparkException("CarbonData file is not present in the location mentioned in DDL")
}
Expand Down Expand Up @@ -193,7 +209,11 @@ class SparkCarbonFileFormat extends FileFormat
val fileSplit =
new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)

val path: String = options.get("path").get
val path: String = if (options.isEmpty) {
file.filePath
} else {
options.get("path").get
}
val endindex: Int = path.indexOf("Fact") - 1
val tablePath = path.substring(0, endindex)
lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
Expand Down

0 comments on commit 9469e6b

Please sign in to comment.