Skip to content

Commit

Permalink
Fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ravipesala committed Sep 4, 2018
1 parent 2fce803 commit 3db7f28
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 24 deletions.
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.carbondata.execution.datasources

import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -68,9 +70,10 @@ class CarbonFileIndexReplaceRule extends Rule[LogicalPlan] {
hadoopFsRelation: HadoopFsRelation): FileIndex = {
if (fileIndex.isInstanceOf[InMemoryFileIndex] && fileIndex.rootPaths.length == 1) {
val carbonFile = FileFactory.getCarbonFile(fileIndex.rootPaths.head.toUri.toString)
val carbonFiles = getDataFolders(carbonFile)
if (carbonFiles.nonEmpty && carbonFiles.length > 1) {
val paths = carbonFiles.map(p => new Path(p.getAbsolutePath))
val dataFolders = new ArrayBuffer[CarbonFile]()
getDataFolders(carbonFile, dataFolders)
if (dataFolders.nonEmpty && dataFolders.length > 1) {
val paths = dataFolders.map(p => new Path(p.getAbsolutePath))
new InMemoryFileIndex(hadoopFsRelation.sparkSession,
paths,
hadoopFsRelation.options,
Expand All @@ -86,19 +89,19 @@ class CarbonFileIndexReplaceRule extends Rule[LogicalPlan] {
/**
* Get datafolders recursively
*/
private def getDataFolders(carbonFile: CarbonFile): Seq[CarbonFile] = {
val files = carbonFile.listFiles()
var folders: Seq[CarbonFile] = Seq()
private def getDataFolders(
tableFolder: CarbonFile,
dataFolders: ArrayBuffer[CarbonFile]): Unit = {
val files = tableFolder.listFiles()
files.foreach { f =>
if (f.isDirectory) {
val files = f.listFiles()
if (files.nonEmpty && !files(0).isDirectory) {
folders = Seq(f) ++ folders
dataFolders += f
} else {
folders = getDataFolders(f) ++ folders
getDataFolders(f, dataFolders)
}
}
}
folders
}
}
Expand Up @@ -591,23 +591,27 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
}

test("test write using multi subfolder") {
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")
if (!spark.sparkContext.version.startsWith("2.1")) {
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")

// Saves dataframe to carbon file
df.write.format("carbon").save(warehouse1 + "/test_folder/1/"+System.nanoTime())
df.write.format("carbon").save(warehouse1 + "/test_folder/2/"+System.nanoTime())
df.write.format("carbon").save(warehouse1 + "/test_folder/3/"+System.nanoTime())
// Saves dataframe to carbon file
df.write.format("carbon").save(warehouse1 + "/test_folder/1/" + System.nanoTime())
df.write.format("carbon").save(warehouse1 + "/test_folder/2/" + System.nanoTime())
df.write.format("carbon").save(warehouse1 + "/test_folder/3/" + System.nanoTime())

val frame = spark.read.format("carbon").load(warehouse1 + "/test_folder")
assert(frame.count() == 30)
val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
DataMapStoreManager.getInstance().clearDataMaps(AbsoluteTableIdentifier.from(warehouse1+"/test_folder"))
assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
val frame = spark.read.format("carbon").load(warehouse1 + "/test_folder")
assert(frame.count() == 30)
assert(frame.where("c1='a1'").count() == 3)
val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
DataMapStoreManager.getInstance()
.clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/test_folder"))
assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
}
}
override protected def beforeAll(): Unit = {
drop
Expand Down

0 comments on commit 3db7f28

Please sign in to comment.