Skip to content

Commit

Permalink
Related to bug ID# 3172. Improved to split the big kettle job into
Browse files Browse the repository at this point in the history
multiple inner job to resolve the issue of Java Heap space exception.
  • Loading branch information
KirtiMistry committed Oct 26, 2016
1 parent 532fe7a commit 7403975
Show file tree
Hide file tree
Showing 8 changed files with 398 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .classpath
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<classpathentry kind="src" path="generated"/>
<classpathentry kind="src" path="src/main/java"/>
<classpathentry kind="src" path="src/main/resources"/>
<classpathentry combineaccessrules="false" kind="src" path="/sqlpower_library"/>
<classpathentry combineaccessrules="false" kind="src" path="/sqlpower-library"/>
<classpathentry kind="lib" path="buildlib/ant.jar"/>
<classpathentry kind="lib" path="buildlib/gsbase-2.0.1.jar"/>
<classpathentry kind="lib" path="buildlib/junit.jar"/>
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/ca/sqlpower/architect/CoreUserSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ private final void loadFromPrefs() {
logger.debug("Preferences class = " + prefs.getClass());

swingSettings.setBoolean(ArchitectSwingUserSettings.PLAYPEN_RENDER_ANTIALIASED,
prefs.getBoolean(ArchitectSwingUserSettings.PLAYPEN_RENDER_ANTIALIASED, false));
// if (getDefaultLocale() != null) {
String localeVal = prefs.get(ArchitectSwingUserSettings.DEFAULT_LOCALE, getDefaultLocale().getDisplayLanguage());
swingSettings.setString(ArchitectSwingUserSettings.DEFAULT_LOCALE,localeVal);
setDefaultLocale(new Locale(localeVal));
// } else {
// swingSettings.setString(ArchitectSwingUserSettings.DEFAULT_LOCALE,Locale.getDefault().getDisplayName());
// }
prefs.getBoolean(ArchitectSwingUserSettings.PLAYPEN_RENDER_ANTIALIASED, false));
if (getDefaultLocale() != null) {
String localeVal = prefs.get(ArchitectSwingUserSettings.DEFAULT_LOCALE, getDefaultLocale().getDisplayLanguage());
swingSettings.setString(ArchitectSwingUserSettings.DEFAULT_LOCALE,localeVal);
setDefaultLocale(new Locale(localeVal));
} else {
swingSettings.setString(ArchitectSwingUserSettings.DEFAULT_LOCALE,Locale.getDefault().getDisplayName());
}
etlUserSettings.setString(ETLUserSettings.PROP_PL_ENGINE_PATH,
prefs.get(ETLUserSettings.PROP_PL_ENGINE_PATH, ""));
etlUserSettings.setString(ETLUserSettings.PROP_ETL_LOG_PATH,
Expand Down
244 changes: 237 additions & 7 deletions src/main/java/ca/sqlpower/architect/etl/kettle/KettleJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.job.JobHopMeta;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.job.entries.job.JobEntryJob;
import org.pentaho.di.job.entries.special.JobEntrySpecial;
import org.pentaho.di.job.entries.success.JobEntrySuccess;
import org.pentaho.di.job.entries.trans.JobEntryTrans;
import org.pentaho.di.job.entry.JobEntryCopy;
import org.pentaho.di.repository.Repository;
Expand Down Expand Up @@ -113,6 +115,15 @@ public class KettleJob implements Monitorable {
*/
private final KettleSettings settings;

/**
* used when big job spilitted into small
* list of job name
*/
private List<JobMeta> jobMetaList = new ArrayList<JobMeta>();
/**
* job number used create inner job
*/
private int job_no = 0;
public KettleJob(ArchitectSession session, KettleRepositoryDirectoryChooser chooser) {
this(session);
dirChooser = chooser;
Expand Down Expand Up @@ -310,6 +321,7 @@ public void doExport(List<SQLTable> tableList, SQLDatabase targetDB ) throws SQL
jm.addNote(new NotePadMeta(buffer.toString(), 0, 0, 125, 125));
}

//Start entry
JobEntryCopy startEntry = new JobEntryCopy();
JobEntrySpecial start = new JobEntrySpecial("Start", true, false);
startEntry.setEntry(start);
Expand Down Expand Up @@ -339,13 +351,29 @@ public void doExport(List<SQLTable> tableList, SQLDatabase targetDB ) throws SQL
i++;
}

// Success entry

JobEntrySuccess success = new JobEntrySuccess();
success.setName("Success");
JobEntryCopy successEntry = new JobEntryCopy();
successEntry.setEntry(success);
successEntry.setLocation(i*spacing, spacing);
successEntry.setDrawn();
jm.addJobEntry(successEntry);

JobHopMeta hop = new JobHopMeta(oldJobEntry, successEntry);
jm.addJobHop(hop);

if (monitor.isCancelled()) {
cancel();
return;
}

jm.setName(settings.getJobName());

String jobname = settings.getJobName();
if (settings.isSplittingJob() && getJob_no() > 0) {
jobname += "_"+getJob_no();
}
jm.setName(jobname);
// System.out.println("setting job name: "+jobname);
if (settings.isSavingToFile()) {
outputToXML(transformations, jm);
} else {
Expand All @@ -357,6 +385,176 @@ public void doExport(List<SQLTable> tableList, SQLDatabase targetDB ) throws SQL
}
}

/**
*
* @param tableList
* @param targetDB
* @param splitno, is a number of tables(transformation) per job
* @throws Exception
*/
public void doSplitedJobExport(List<SQLTable> tableList, SQLDatabase targetDB ) throws Exception {
if (settings.isSplittingJob() && tableList.size() > settings.getSplitJobNo()) {
jobMetaList.clear();
// split a big job into small a jobs of splitno of (table)transformation/job
List<List<SQLTable>> splitedTableList = chopped(tableList, settings.getSplitJobNo());
for (List<SQLTable> newtableList:splitedTableList) {
job_no++;
doExport(newtableList,targetDB);
}
try {
EnvUtil.environmentInit();
LogWriter lw = LogWriter.getInstance();
JobMeta jm = new JobMeta(lw);
//Start Entry
JobEntryCopy startEntry = new JobEntryCopy();
JobEntrySpecial start = new JobEntrySpecial("Start", true, false);
startEntry.setEntry(start);
startEntry.setLocation(10, spacing);
startEntry.setDrawn();
jm.addJobEntry(startEntry);


JobEntryCopy oldJobEntry = null;
int i = 1;
for (JobMeta job : jobMetaList) {
JobEntryCopy entry = new JobEntryCopy();
JobEntryJob jn = new JobEntryJob(job.getName());
jn.setJobName(job.getName());
entry.setEntry(jn);
entry.setLocation(i*spacing, spacing);
entry.setDrawn();
jm.addJobEntry(entry);
if (oldJobEntry != null) {
JobHopMeta hop = new JobHopMeta(oldJobEntry, entry);
jm.addJobHop(hop);
} else {
JobHopMeta hop = new JobHopMeta(startEntry, entry);
jm.addJobHop(hop);
}
oldJobEntry = entry;
i++;
}

// // Success entry
JobEntrySuccess success = new JobEntrySuccess();
success.setName("Success");
JobEntryCopy successEntry = new JobEntryCopy();
successEntry.setEntry(success);
successEntry.setLocation(i*spacing, spacing);
successEntry.setDrawn();
jm.addJobEntry(successEntry);

JobHopMeta hop = new JobHopMeta(oldJobEntry, successEntry);
jm.addJobHop(hop);

if (monitor.isCancelled()) {
cancel();
return;
}

jm.setName( settings.getJobName());
job_no=0;
if (settings.isSavingToFile()) {
jobOutputToXML(jobMetaList, jm);
} else {
jm.setDirectory(new RepositoryDirectory());
jobOutputToRepository(jm, jobMetaList, createRepository());
}
} finally {
job_no=0;
}

} else {
doExport(tableList,targetDB);
}
}

private void jobOutputToRepository(JobMeta jm, List<JobMeta> jmList, Repository createRepository) {
// TODO Auto-generated method stub

}

private void jobOutputToXML(List<JobMeta> jmList, JobMeta jm) throws IOException {
Map<File, String> outputs = new LinkedHashMap<File, String>();


//This sets the location of the transformations in the job
//The first entry is not a transformation so skip it
//This is done here so we know where the files are being saved and that they are saved
for (int i = 1; i < jm.nrJobEntries() -1; i++) {
JobEntryJob jobs = (JobEntryJob)(jm.getJobEntry(i).getEntry());
jobs.setFileName(getJobFilePath(jobs.getName()));
System.out.println("\n jobOutputToXML::jobs fileName: "+jobs.getFileName());
}

String fileName = settings.getFilePath() ;
if (!fileName.toUpperCase().endsWith(".KJB")) {
fileName += ".kjb";
}
jm.setFilename(fileName);
outputs.put(new File(fileName), jm.getXML());
if (getJob_no() > 0) {
jobMetaList.add(jm);
}
UserPrompter up = session.createUserPrompter(
"The file {0} already exists. Overwrite?", UserPromptType.BOOLEAN, UserPromptOptions.OK_NOTOK_CANCEL, UserPromptResponse.NOT_OK,
false, "Overwrite", "Don't Overwrite", "Cancel");
for (File f : outputs.keySet()) {
try {
logger.debug("jobOutputToXML :: The file to output is " + f.getPath());
if (f.exists()) {
UserPromptResponse overwriteOption = up.promptUser(f.getAbsolutePath());
if (overwriteOption == UserPromptResponse.OK) {
f.delete();
} else if (overwriteOption == UserPromptResponse.NOT_OK) {
continue;
} else if (overwriteOption == UserPromptResponse.CANCEL) {
cancel();
return;
} else {
throw new IllegalStateException(
"Unknown response value from user prompt: " + overwriteOption);
}
}
f.createNewFile();
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(new
FileOutputStream(f), "utf-8"));
out.write(outputs.get(f));
out.flush();
out.close();
monitor.setProgress(monitor.getProgress() + 1);

if (monitor.isCancelled()) {
cancel();
return;
}
} catch (IOException er) {
tasksToDo.clear();
tasksToDo.add("File " + f.getName() + " was not created");
throw er;
}
}

}

private String getJobFilePath(String jobName) {
String parentPath = new File(settings.getFilePath()).getParentFile().getPath();
logger.debug("Parent file path is " + parentPath +" for job");
return new File(parentPath, jobName + ".kjb").getPath();
}

// chops a list into sublists of length L
static <T> List<List<SQLTable>> chopped(List<SQLTable> list, final int L) {
List<List<SQLTable>> parts = new ArrayList<List<SQLTable>>();
final int N = list.size();
for (int i = 0; i < N; i += L) {
parts.add(new ArrayList<SQLTable>(
list.subList(i, Math.min(N, i + L)))
);
}
return parts;
}

/**
* This is a helper method that sets the taskToDo list with
* the correct message to display.
Expand Down Expand Up @@ -410,6 +608,7 @@ void outputToXML(List<TransMeta> transformations, JobMeta job) throws IOExceptio

for (TransMeta transMeta : transformations) {
File file = new File(getTransFilePath(transMeta.getName()));
System.out.println("\n transformation file: "+file.getAbsolutePath());
transMeta.setFilename(file.getName());
try {
outputs.put(file, transMeta.getXML());
Expand All @@ -425,24 +624,31 @@ void outputToXML(List<TransMeta> transformations, JobMeta job) throws IOExceptio
//This sets the location of the transformations in the job
//The first entry is not a transformation so skip it
//This is done here so we know where the files are being saved and that they are saved
for (int i = 1; i < job.nrJobEntries(); i++) {
for (int i = 1; i < job.nrJobEntries()-1; i++) {
JobEntryTrans trans = (JobEntryTrans)(job.getJobEntry(i).getEntry());
trans.setFileName(getTransFilePath(trans.getName()));
}

String fileName = settings.getFilePath();
String fileName = settings.getFilePath() ;
if (settings.isSplittingJob() && getJob_no() > 0) {
String parentPath = new File(settings.getFilePath()).getParentFile().getPath();
fileName = parentPath+"/"+job.getName()+".kjb";
}
if (!fileName.toUpperCase().endsWith(".KJB")) {

fileName += ".kjb";
}
job.setFilename(fileName);
outputs.put(new File(fileName), job.getXML());

if (settings.isSplittingJob() && getJob_no() > 0) {
jobMetaList.add(job);
}
UserPrompter up = session.createUserPrompter(
"The file {0} already exists. Overwrite?", UserPromptType.BOOLEAN, UserPromptOptions.OK_NOTOK_CANCEL, UserPromptResponse.NOT_OK,
false, "Overwrite", "Don't Overwrite", "Cancel");
for (File f : outputs.keySet()) {
try {
logger.debug("The file to output is " + f.getPath());
logger.debug("outputToXML :: The file to output is " + f.getPath());
if (f.exists()) {
UserPromptResponse overwriteOption = up.promptUser(f.getAbsolutePath());
if (overwriteOption == UserPromptResponse.OK) {
Expand Down Expand Up @@ -784,4 +990,28 @@ public void setRepositoryDirectoryChooser(KettleRepositoryDirectoryChooser choos
public SPDataSource getRepository() {
return settings.getRepository();
}

public int getSplitJobNo() {
return settings.getSplitJobNo();
}


public void setSplitJobNo(int newValue) {
settings.setSplitJobNo(newValue);
}

/**
* return the current job no
*/
private int getJob_no() {
return job_no;
}

public boolean isSplittingJob() {
return settings.isSplittingJob();
}

public void setSplittingJob(boolean newValue) {
settings.setSplittingJob(newValue);
}
}
Loading

0 comments on commit 7403975

Please sign in to comment.