Skip to content

Commit

Permalink
Fix task id in file format
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Jul 13, 2019
1 parent ebe78dc commit 498ce1f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public CarbonThreadFactory(String name, boolean withTime) {
@Override public Thread newThread(Runnable r) {
final Thread thread = defaultFactory.newThread(r);
if (withTime) {
thread.setName(name + "_" + System.currentTimeMillis());
thread.setName(name + "_" + System.nanoTime());
} else {
thread.setName(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark.sql.carbondata.execution.datasources

import java.net.URI
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -143,6 +145,13 @@ class SparkCarbonFileFormat extends FileFormat
conf.set(CarbonSQLHadoopMapReduceCommitProtocol.COMMIT_PROTOCOL, "true")

new OutputWriterFactory {

/**
* counter used for generating unique task numbers.
*/
val counter = new AtomicLong()
val taskIdMap = new ConcurrentHashMap[String, java.lang.Long]()

override def newInstance(
path: String,
dataSchema: StructType,
Expand All @@ -153,10 +162,28 @@ class SparkCarbonFileFormat extends FileFormat
path
}
context.getConfiguration.set("carbon.outputformat.writepath", updatedPath)
context.getConfiguration.set("carbon.outputformat.taskno", System.nanoTime() + "")
context.getConfiguration
.set("carbon.outputformat.taskno", generateTaskNumber(updatedPath, context))
new CarbonOutputWriter(path, context, dataSchema.fields)
}

/**
* Generate taskId using the taskID of taskcontext and the path. It should be unique in case
* for each spark task.
*/
private def generateTaskNumber(path: String,
context: TaskAttemptContext): String = {
var count: java.lang.Long = taskIdMap.get(path)
if (count == null) {
count = counter.incrementAndGet()
// Generate task_id using the combination of task_id and counter to make it unique.
taskIdMap.put(path, count)
}
// generate unique value combining spark task id and global counter value
String.valueOf(Math.pow(10, 6).toInt + context.getTaskAttemptID.getTaskID.getId) +
String.valueOf(count + Math.pow(10, 6).toInt)
}

override def getFileExtension(context: TaskAttemptContext): String = {
CarbonTablePath.CARBON_DATA_EXT
}
Expand Down

0 comments on commit 498ce1f

Please sign in to comment.