Skip to content

Commit

Permalink
[CARBONDATA-3070] Fix partition load issue when custom location is ad…
Browse files Browse the repository at this point in the history
…ded.

Problem:
Load files from carbonfile format when custom partition location is added

Reason:
Carbon has its own filename for each carbondata file, it does not use the filename proposed by spark.
And also it has extra index file need to be created. In case of custom partition location sparks keep track the files
of name which creates and move them. But carbon has different files created and maintained, that creates the filenot found exception.

Solution:
Use custom protocol to manage commit and folder location for custom partition location.

This closes #2873
  • Loading branch information
ravipesala committed Nov 21, 2018
1 parent b9720d3 commit f947efe
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.carbondata.execution.datasources

import java.net.URI

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

Expand All @@ -27,6 +29,7 @@ import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.memory.MemoryMode
import org.apache.spark.sql._
import org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUnsafeRowReadSuport
Expand Down Expand Up @@ -111,6 +114,13 @@ class SparkCarbonFileFormat extends FileFormat
Some(schema)
}

/**
* Add our own protocol to control the commit.
*/
SparkSession.getActiveSession.get.sessionState.conf.setConfString(
"spark.sql.sources.commitProtocolClass",
"org.apache.spark.sql.carbondata.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")

/**
* Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation is
* done here.
Expand All @@ -125,6 +135,7 @@ class SparkCarbonFileFormat extends FileFormat
val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
model.setLoadWithoutConverterStep(true)
CarbonTableOutputFormat.setLoadModel(conf, model)
conf.set(CarbonSQLHadoopMapReduceCommitProtocol.COMMIT_PROTOCOL, "true")

new OutputWriterFactory {
override def newInstance(
Expand Down Expand Up @@ -310,7 +321,6 @@ class SparkCarbonFileFormat extends FileFormat
vectorizedReader.toBoolean && schema.forall(_.dataType.isInstanceOf[AtomicType])
}


/**
* Returns whether this format support returning columnar batch or not.
*/
Expand Down Expand Up @@ -369,7 +379,7 @@ class SparkCarbonFileFormat extends FileFormat

if (file.filePath.endsWith(CarbonTablePath.CARBON_DATA_EXT)) {
val split = new CarbonInputSplit("null",
new Path(file.filePath),
new Path(new URI(file.filePath)),
file.start,
file.length,
file.locations,
Expand All @@ -380,10 +390,12 @@ class SparkCarbonFileFormat extends FileFormat
split.setDetailInfo(info)
info.setBlockSize(file.length)
// Read the footer offset and set.
val reader = FileFactory.getFileHolder(FileFactory.getFileType(file.filePath),
val reader = FileFactory.getFileHolder(FileFactory.getFileType(split.getPath.toString),
broadcastedHadoopConf.value.value)
val buffer = reader
.readByteBuffer(FileFactory.getUpdatedFilePath(file.filePath), file.length - 8, 8)
.readByteBuffer(FileFactory.getUpdatedFilePath(split.getPath.toString),
file.length - 8,
8)
info.setBlockFooterOffset(buffer.getLong)
info.setVersionNumber(split.getVersion.number())
info.setUseMinMaxForPruning(true)
Expand Down Expand Up @@ -447,7 +459,74 @@ class SparkCarbonFileFormat extends FileFormat
}
}

}

/**
* Since carbon writes 2 files carbondata files and index file , but spark cannot understand two
* files so added custom protocol to copy the files in case of custom partition location.
*/
case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean)
extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) {

override def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
val carbonFlow = taskContext.getConfiguration.get(
CarbonSQLHadoopMapReduceCommitProtocol.COMMIT_PROTOCOL)
val tempPath = super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext)
// Call only in case of carbon flow.
if (carbonFlow != null) {
// Create subfolder with uuid and write carbondata files
val path = new Path(tempPath)
val uuid = path.getName.substring(0, path.getName.indexOf("-part-"))
new Path(new Path(path.getParent, uuid), path.getName).toString
} else {
tempPath
}
}

override def commitJob(jobContext: JobContext,
taskCommits: Seq[FileCommitProtocol.TaskCommitMessage]): Unit = {
val carbonFlow = jobContext.getConfiguration.get(
CarbonSQLHadoopMapReduceCommitProtocol.COMMIT_PROTOCOL)
var updatedTaskCommits = taskCommits
// Call only in case of carbon flow.
if (carbonFlow != null) {
val (allAbsPathFiles, allPartitionPaths) =
// spark 2.1 and 2.2 case
if (taskCommits.exists(_.obj.isInstanceOf[Map[String, String]])) {
(taskCommits.map(_.obj.asInstanceOf[Map[String, String]]), null)
} else {
// spark 2.3 and above
taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip
}
val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _)
val fs = new Path(path).getFileSystem(jobContext.getConfiguration)
// Move files from stage directory to actual location.
filesToMove.foreach{case (src, dest) =>
val srcPath = new Path(src)
val name = srcPath.getName
// Get uuid from spark's stage filename
val uuid = name.substring(0, name.indexOf("-part-"))
// List all the files under the uuid location
val list = fs.listStatus(new Path(new Path(src).getParent, uuid))
// Move all these files to actual folder.
list.foreach{ f =>
fs.rename(f.getPath, new Path(new Path(dest).getParent, f.getPath.getName))
}
}
updatedTaskCommits = if (allPartitionPaths == null) {
taskCommits.map(f => new FileCommitProtocol.TaskCommitMessage(Map.empty))
} else {
taskCommits.zipWithIndex.map{f =>
new FileCommitProtocol.TaskCommitMessage((Map.empty, allPartitionPaths(f._2)))
}
}
}
super.commitJob(jobContext, updatedTaskCommits)
}
}
object CarbonSQLHadoopMapReduceCommitProtocol {
val COMMIT_PROTOCOL = "carbon.commit.protocol"
}


Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,9 @@ private void checkPageLoaded() {
}

public void reset() {
if (isConstant) {
return;
}
isLoaded = false;
vector.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1276,6 +1276,40 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
}
}

test("test partition issue with add location") {
spark.sql("drop table if exists partitionTable_obs")
spark.sql("drop table if exists partitionTable_obs_par")
spark.sql(s"create table partitionTable_obs (id int,name String,email String) using carbon partitioned by(email) ")
spark.sql(s"create table partitionTable_obs_par (id int,name String,email String) using parquet partitioned by(email) ")
spark.sql("insert into partitionTable_obs select 1,'huawei','abc'")
spark.sql("insert into partitionTable_obs select 1,'huawei','bcd'")
spark.sql(s"alter table partitionTable_obs add partition (email='def') location '$warehouse1/test_folder121/'")
spark.sql("insert into partitionTable_obs select 1,'huawei','def'")

spark.sql("insert into partitionTable_obs_par select 1,'huawei','abc'")
spark.sql("insert into partitionTable_obs_par select 1,'huawei','bcd'")
spark.sql(s"alter table partitionTable_obs_par add partition (email='def') location '$warehouse1/test_folder122/'")
spark.sql("insert into partitionTable_obs_par select 1,'huawei','def'")

checkAnswer(spark.sql("select * from partitionTable_obs"), spark.sql("select * from partitionTable_obs_par"))
spark.sql("drop table if exists partitionTable_obs")
spark.sql("drop table if exists partitionTable_obs_par")
}

test("test multiple partition select issue") {
spark.sql("drop table if exists t_carbn01b_hive")
spark.sql(s"drop table if exists t_carbn01b")
spark.sql("create table t_carbn01b_hive(Qty_day_avg INT,Qty_total INT,Sell_price BIGINT,Sell_pricep DOUBLE,Profit DECIMAL(3,2),Item_code String,Item_name String,Outlet_name String,Create_date String,Active_status String,Item_type_cd INT, Update_time TIMESTAMP, Discount_price DOUBLE) using parquet partitioned by (Active_status,Item_type_cd, Update_time, Discount_price)")
spark.sql("create table t_carbn01b(Qty_day_avg INT,Qty_total INT,Sell_price BIGINT,Sell_pricep DOUBLE,Profit DECIMAL(3,2),Item_code String,Item_name String,Outlet_name String,Create_date String,Active_status String,Item_type_cd INT, Update_time TIMESTAMP, Discount_price DOUBLE) using carbon partitioned by (Active_status,Item_type_cd, Update_time, Discount_price)")
spark.sql("insert into t_carbn01b partition(Active_status, Item_type_cd,Update_time,Discount_price) select * from t_carbn01b_hive")
spark.sql("alter table t_carbn01b add partition (active_status='xyz',Item_type_cd=12,Update_time=NULL,Discount_price='3000')")
spark.sql("insert overwrite table t_carbn01b select 'xyz', 12, 74,3000,20000000,121.5,4.99,2.44,'RE3423ee','dddd', 'ssss','2012-01-02 23:04:05.12', '2012-01-20'")
spark.sql("insert overwrite table t_carbn01b_hive select 'xyz', 12, 74,3000,20000000,121.5,4.99,2.44,'RE3423ee','dddd', 'ssss','2012-01-02 23:04:05.12', '2012-01-20'")
checkAnswer(spark.sql("select * from t_carbn01b_hive"), spark.sql("select * from t_carbn01b"))
spark.sql("drop table if exists t_carbn01b_hive")
spark.sql(s"drop table if exists t_carbn01b")
}

override protected def beforeAll(): Unit = {
drop
createParquetTable
Expand Down

0 comments on commit f947efe

Please sign in to comment.