Skip to content

Commit

Permalink
Merge 1b25660 into 019f5cd
Browse files Browse the repository at this point in the history
  • Loading branch information
akashrn5 committed Oct 26, 2018
2 parents 019f5cd + 1b25660 commit 9735f80
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext contex
public RecordWriter<NullWritable, ObjectArrayWritable> getRecordWriter(
final TaskAttemptContext taskAttemptContext) throws IOException {
final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration());
String appName =
taskAttemptContext.getConfiguration().get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME);
if (null != appName) {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, appName);
}
//if loadModel having taskNo already(like in SDK) then no need to overwrite
short sdkWriterCores = loadModel.getSdkWriterCores();
int itrSize = (sdkWriterCores > 0) ? sdkWriterCores : 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ public class CarbonTableInputFormatTest {
addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords");
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION, "/tmp/carbon/");
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, "CarbonTableInputFormatTest");
try {
creator = new StoreCreator(new File("target/store").getAbsolutePath(),
new File("../hadoop/src/test/resources/data.csv").getCanonicalPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ public class CarbonTableOutputFormatTest {
addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords");
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION, "/tmp/carbon/");
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, "CarbonTableOutputFormatTest");
try {
carbonLoadModel = new StoreCreator(new File("target/store").getAbsolutePath(),
new File("../hadoop/src/test/resources/data.csv").getCanonicalPath()).createTableAndLoadModel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,13 @@ class SparkCarbonFileFormat extends FileFormat
dataSchema: StructType): OutputWriterFactory = {

val conf = job.getConfiguration

conf
.set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
sparkSession.sqlContext.getConf("spark.app.name"))
val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
model.setLoadWithoutConverterStep(true)
CarbonTableOutputFormat.setLoadModel(conf, model)

CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
sparkSession.sparkContext.getConf.get("spark.app.name"))

new OutputWriterFactory {
override def newInstance(
path: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,6 @@ with Serializable {
model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
model.setLoadWithoutConverterStep(true)
carbonProperty
.addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
sparkSession.sparkContext.getConf.get("spark.app.name"))

val staticPartition = options.getOrElse("staticpartition", null)
if (staticPartition != null) {
conf.set("carbon.staticpartition", staticPartition)
Expand Down Expand Up @@ -159,6 +155,9 @@ with Serializable {
if (updateTimeStamp.isDefined) {
conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get)
}
conf
.set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
sparkSession.sqlContext.getConf("spark.app.name"))
CarbonTableOutputFormat.setLoadModel(conf, model)

new OutputWriterFactory {
Expand All @@ -175,6 +174,12 @@ with Serializable {
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
val appName = context.getConfiguration.get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME)
if (null != appName) {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
appName)
}
val taskNumber = generateTaskNumber(path, context, model.getSegmentId)
val storeLocation = CommonUtil.getTempStoreLocations(taskNumber)
CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation)
Expand Down

0 comments on commit 9735f80

Please sign in to comment.