Skip to content

Commit

Permalink
Handled multiple partition columns for partition cache
Browse files Browse the repository at this point in the history
  • Loading branch information
nihal0107 committed Nov 3, 2020
1 parent 4a729e1 commit 9a294ce
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,18 @@ object PartitionCacheManager extends Cache[PartitionCacheKey,

private def readPartition(identifier: PartitionCacheKey, segmentFilePath: String) = {
val segmentFile = SegmentFileStore.readSegmentFile(segmentFilePath)
var partitionPath = new mutable.StringBuilder()
var partitionSpec: Map[String, String] = Map()
segmentFile.getLocationMap.values().asScala
.flatMap(_.getPartitions.asScala).toSet.map { uniquePartition: String =>
.flatMap(_.getPartitions.asScala).toSet.foreach { uniquePartition: String =>
partitionPath.append(CarbonCommonConstants.FILE_SEPARATOR).append(uniquePartition)
val partitionSplit = uniquePartition.split("=")
val storageFormat = CatalogStorageFormat(
Some(new URI(identifier.tablePath + "/" + uniquePartition)),
None, None, None, compressed = false, Map())
CatalogTablePartition(Map(partitionSplit(0) -> partitionSplit(1)), storageFormat)
}.toSeq
partitionSpec = partitionSpec. +(partitionSplit(0) -> partitionSplit(1))
}
Seq(CatalogTablePartition(partitionSpec,
CatalogStorageFormat(
Some(new URI(identifier.tablePath + partitionPath)),
None, None, None, compressed = false, Map())))
}

override def put(key: PartitionCacheKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,21 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", "true")
}

test("test partition cache on multiple columns") {
CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", "false")
sql("drop table if exists partition_cache")
sql("create table partition_cache(a string) partitioned by(b int, c String) stored as carbondata")
sql("insert into partition_cache select 'k',1,'nihal'")
checkAnswer(sql("select count(*) from partition_cache where b = 1"), Seq(Row(1)))
sql("select * from partition_cache where b = 1").collect()
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "partition_cache")
val partitionSpecs: util.List[CatalogTablePartition] = PartitionCacheManager.getIfPresent(
PartitionCacheKey(carbonTable.getTableId, carbonTable.getTablePath, 1L))
assert(partitionSpecs.size == 1)
assert(partitionSpecs.get(0).spec.size == 2)
CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", "true")
}

test("test partition caching after load") {
CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", "false")
sql("drop table if exists partition_cache")
Expand Down

0 comments on commit 9a294ce

Please sign in to comment.