Permalink
Browse files

update

  • Loading branch information...
1 parent f5a4c8d commit 97fc3675f9e0e4e8e84bb274c60d3e2dd70decd1 @cenwenchu committed Jan 12, 2012
@@ -327,21 +327,21 @@ protected void createTimeStampFile(String dir) {
@Override
public void exportEntryData(Job job) {
- JobDataOperation jobDataOperation = new JobDataOperation(job,AnalysisConstants.JOBMANAGER_EVENT_EXPORTDATA);
+ JobDataOperation jobDataOperation = new JobDataOperation(job,AnalysisConstants.JOBMANAGER_EVENT_EXPORTDATA,this.config);
createReportFileThreadPool.execute(jobDataOperation);
}
@Override
public void loadEntryData(Job job) {
- JobDataOperation jobDataOperation = new JobDataOperation(job,AnalysisConstants.JOBMANAGER_EVENT_LOADDATA);
+ JobDataOperation jobDataOperation = new JobDataOperation(job,AnalysisConstants.JOBMANAGER_EVENT_LOADDATA,this.config);
createReportFileThreadPool.submit(jobDataOperation);
}
@Override
public void loadEntryDataToTmp(Job job) {
- JobDataOperation jobDataOperation = new JobDataOperation(job,AnalysisConstants.JOBMANAGER_EVENT_LOADDATA_TO_TMP);
+ JobDataOperation jobDataOperation = new JobDataOperation(job,AnalysisConstants.JOBMANAGER_EVENT_LOADDATA_TO_TMP,this.config);
createReportFileThreadPool.submit(jobDataOperation);
}
@@ -525,7 +525,7 @@ protected void exportOrCleanTrunk(Job job)
//删除临时文件,防止重复载入使得清空不生效
if (config.getSaveTmpResultToFile())
{
- JobDataOperation jobDataOperation = new JobDataOperation(job,AnalysisConstants.JOBMANAGER_EVENT_DEL_DATAFILE);
+ JobDataOperation jobDataOperation = new JobDataOperation(job,AnalysisConstants.JOBMANAGER_EVENT_DEL_DATAFILE,this.config);
jobDataOperation.run();
}
@@ -538,7 +538,7 @@ protected void exportOrCleanTrunk(Job job)
{
logger.warn("@ disk2Mem mode: " + job.getJobName() + " store trunk to disk now .");
- JobDataOperation jobDataOperation = new JobDataOperation(job,AnalysisConstants.JOBMANAGER_EVENT_SETNULL_EXPORTDATA);
+ JobDataOperation jobDataOperation = new JobDataOperation(job,AnalysisConstants.JOBMANAGER_EVENT_SETNULL_EXPORTDATA,this.config);
jobDataOperation.run();
}
@@ -549,7 +549,7 @@ protected void exportOrCleanTrunk(Job job)
{
logger.warn("export job: " + job.getJobName() + " trunk to disk.");
- JobDataOperation jobDataOperation = new JobDataOperation(job,AnalysisConstants.JOBMANAGER_EVENT_EXPORTDATA);
+ JobDataOperation jobDataOperation = new JobDataOperation(job,AnalysisConstants.JOBMANAGER_EVENT_EXPORTDATA,this.config);
jobDataOperation.run();
}
}
@@ -157,7 +157,7 @@ public void merge(Job job,BlockingQueue<JobMergedResult> branchResultQueue
if (job.getNeedLoadResultFile().compareAndSet(true, false))
{
- new Thread(new JobDataOperation(job,AnalysisConstants.JOBMANAGER_EVENT_LOADDATA_TO_TMP)).start();
+ new Thread(new JobDataOperation(job,AnalysisConstants.JOBMANAGER_EVENT_LOADDATA_TO_TMP,this.config)).start();
}
}
@@ -25,6 +25,8 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
+import com.taobao.top.analysis.config.MasterConfig;
import com.taobao.top.analysis.exception.AnalysisException;
import com.taobao.top.analysis.node.job.Job;
import com.taobao.top.analysis.util.AnalysisConstants;
@@ -43,12 +45,14 @@
private final Log logger = LogFactory.getLog(JobDataOperation.class);
Job job;
String operation;
+ MasterConfig config;
- public JobDataOperation(Job job,String operation)
+ public JobDataOperation(Job job,String operation,MasterConfig config)
{
this.job = job;
this.operation = operation;
+ this.config = config;
}
@Override
@@ -460,7 +464,10 @@ public String getDestDir()
if (!output.endsWith(File.separator))
output = output + File.separator;
- return output + "tmp" + File.separator;
+ if (config != null && config.getMasterName() != null)
+ return output + config.getMasterName() + File.separator + "tmp" + File.separator;
+ else
+ return output + "tmp" + File.separator;
}
/**
@@ -125,7 +125,7 @@ void mergeTrunk(long beg) throws InterruptedException
{
if (job.getNeedLoadResultFile().compareAndSet(true, false))
{
- new JobDataOperation(job,AnalysisConstants.JOBMANAGER_EVENT_LOADDATA_TO_TMP).run();
+ new JobDataOperation(job,AnalysisConstants.JOBMANAGER_EVENT_LOADDATA_TO_TMP,this.config).run();
}
boolean gotLock = job.getLoadLock().tryLock(80, TimeUnit.SECONDS);
@@ -157,7 +157,7 @@ void mergeTrunk(long beg) throws InterruptedException
if (!config.getSaveTmpResultToFile() &&
job.getJobResult() == null)
{
- new JobDataOperation(job,AnalysisConstants.JOBMANAGER_EVENT_LOADDATA).run();
+ new JobDataOperation(job,AnalysisConstants.JOBMANAGER_EVENT_LOADDATA,this.config).run();
}
}

0 comments on commit 97fc367

Please sign in to comment.