Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pipeline/src/org/labkey/pipeline/PipelineModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.labkey.pipeline.api.PipelineStatusManager;
import org.labkey.pipeline.api.ScriptTaskFactory;
import org.labkey.pipeline.api.properties.ApplicationPropertiesSiteSettings;
import org.labkey.pipeline.cluster.ClusterStartup;
import org.labkey.pipeline.importer.FolderImportProvider;
import org.labkey.pipeline.mule.EPipelineContextListener;
import org.labkey.pipeline.mule.EPipelineQueueImpl;
Expand Down Expand Up @@ -269,7 +270,8 @@ public Set<Class> getIntegrationTests()
PipelineJobServiceImpl.IntegrationTestCase.class,
PipelineQueueImpl.TestCase.class,
PipelineServiceImpl.TestCase.class,
StatusController.TestCase.class
StatusController.TestCase.class,
ClusterStartup.TestCase.class
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,7 @@ public void testDummySubmit() throws Exception
{
Container c = JunitUtil.getTestContainer();

PipelineJob job = new DummyPipelineJob(c, TestContext.get().getUser());
PipelineJob job = new DummyPipelineJob(c, TestContext.get().getUser(), DummyPipelineJob.Worker.success);

PipelineService.get().queueJob(job);

Expand Down
175 changes: 161 additions & 14 deletions pipeline/src/org/labkey/pipeline/cluster/ClusterStartup.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,36 @@

package org.labkey.pipeline.cluster;

import org.apache.commons.lang3.SystemUtils;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.labkey.api.module.ModuleLoader;
import org.labkey.api.pipeline.PipelineJob;
import org.labkey.api.pipeline.PipelineJobException;
import org.labkey.api.pipeline.PipelineJobService;
import org.labkey.api.reader.Readers;
import org.labkey.api.util.ContextListener;
import org.labkey.api.util.FileUtil;
import org.labkey.api.util.JunitUtil;
import org.labkey.api.util.PageFlowUtil;
import org.labkey.api.util.TestContext;
import org.labkey.bootstrap.ClusterBootstrap;
import org.labkey.pipeline.AbstractPipelineStartup;
import org.labkey.pipeline.mule.test.DummyPipelineJob;
import org.mule.umo.manager.UMOManager;
import org.springframework.beans.factory.BeanFactory;

import javax.annotation.Nullable;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand All @@ -43,32 +60,50 @@ public class ClusterStartup extends AbstractPipelineStartup
/**
* This method is invoked by reflection - don't change its signature without changing org.labkey.bootstrap.ClusterBootstrap
*/
public void run(List<File> moduleFiles, List<File> moduleConfigFiles, List<File> customConfigFiles, File webappDir, String[] args) throws IOException, URISyntaxException, PipelineJobException
public void run(List<File> moduleFiles, List<File> moduleConfigFiles, List<File> customConfigFiles, File webappDir, String[] args) throws IOException, PipelineJobException
{
Map<String, BeanFactory> factories = initContext("org/labkey/pipeline/mule/config/cluster.log4j.properties", moduleFiles, moduleConfigFiles, customConfigFiles, webappDir, PipelineJobService.LocationType.RemoteExecutionEngine);

// First arg should be URI to XML file, based on the web server's file system
// Passing no args could be used to explode the modules and exit, preparing for future jobs
// First arg is URI to serialized job's JSON file, based on the web server's file system
if (args.length < 1)
{
System.out.println("No job file provided, exiting");
// Passing no args is used to explode the modules and exit, preparing for future jobs
System.out.println("No job file provided, exiting after extracting modules");
System.exit(0);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@labkey-jeckels This was on purpose. The use case is to verify the cluster process can start. we use it on our cluster after upgrades. It will also break the tests I just added

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would also be reasonable to support a new arg to explicitly say we dont expect to run a job, like "-dryRun" or something

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. I misinterpreted the comments. Please check the latest changes I pushed, which include core test coverage for the extract-only usage

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, looks good. glad we finally have this in place!

}

String localFile = PipelineJobService.get().getPathMapper().remoteToLocal(args[0]);
File file = new File(new URI(localFile));
if (!file.isFile())
UMOManager manager = null;
try
{
throw new IllegalArgumentException("Could not find file " + file.getAbsolutePath());
}
String originalURI = args[0];

URI localURI;
// Translate from the web server's path to the local file version
try
{
localURI = PipelineJobService.get().getPathMapper().remoteToLocal(new URI(originalURI));
}
catch (URISyntaxException e)
{
throw new IllegalArgumentException("Invalid URI. Could not find serialized job file: " + args[0], e);
}

doSharedStartup(moduleFiles);
if (!localURI.isAbsolute() || !"file".equals(localURI.getScheme()))
{
throw new IllegalArgumentException("Invalid URI. Could not find serialized job file: " + localURI);
}

String hostName = InetAddress.getLocalHost().getHostName();
UMOManager manager = setupMuleConfig("org/labkey/pipeline/mule/config/clusterRemoteMuleConfig.xml", factories, hostName);
File file = new File(localURI);
if (!file.isFile())
{
throw new IllegalArgumentException("Could not find serialized job file: " + localURI);
}

doSharedStartup(moduleFiles);

String hostName = InetAddress.getLocalHost().getHostName();
manager = setupMuleConfig("org/labkey/pipeline/mule/config/clusterRemoteMuleConfig.xml", factories, hostName);

try
{
PipelineJob job = PipelineJob.readFromFile(file);

System.out.println("Starting to run task for job " + job + " on host: " + hostName);
Expand Down Expand Up @@ -127,4 +162,116 @@ else if (job.getActiveTaskStatus() != PipelineJob.TaskStatus.complete)

//System.exit(0);
}

public static class TestCase
{
private File _tempDir;

@Before
public void setup() throws IOException
{
_tempDir = File.createTempFile("testJobDir", "dir");
if (!_tempDir.delete())
{
throw new RuntimeException("Failed to delete file " + _tempDir);
}
if (!_tempDir.mkdir())
{
throw new RuntimeException("Failed to create dir " + _tempDir);
}
}

@After
public void cleanup()
{
FileUtil.deleteDir(_tempDir);
}

@Test
public void testSuccess() throws IOException, InterruptedException
{
DummyPipelineJob job = new DummyPipelineJob(JunitUtil.getTestContainer(), TestContext.get().getUser(), DummyPipelineJob.Worker.success);
String output = executeJobRemote(createArgs(job), 0);
String jobLog = PageFlowUtil.getFileContentsAsString(job.getLogFile());
Assert.assertTrue("Couldn't find logging", jobLog.contains("Successful worker!"));
Assert.assertTrue("Couldn't find logging", output.contains("Exploding module archives"));
}

@Test
public void testFailure() throws IOException, InterruptedException
{
DummyPipelineJob job = new DummyPipelineJob(JunitUtil.getTestContainer(), TestContext.get().getUser(), DummyPipelineJob.Worker.failure);
executeJobRemote(createArgs(job), 1);
String jobLog = PageFlowUtil.getFileContentsAsString(job.getLogFile());
Assert.assertTrue("Couldn't find logging", jobLog.contains("Oopsies"));
Assert.assertTrue("Couldn't find logging", jobLog.contains("java.lang.UnsupportedOperationException"));
}

@Test
public void testExtractOnly() throws IOException, InterruptedException
{
List<String> args = createArgs(null);
String output = executeJobRemote(args, 0);
Assert.assertTrue("Couldn't find logging", output.contains("Exploding module archives"));
}

@Test
public void testBadPath() throws IOException, InterruptedException
{
DummyPipelineJob job = new DummyPipelineJob(JunitUtil.getTestContainer(), TestContext.get().getUser(), DummyPipelineJob.Worker.failure);
List<String> args = createArgs(job);
// Last argument is supposed to be the URI to the serialized job's file, hack it to something else
args.set(args.size() - 1, "NotAValidURI.json");
String output = executeJobRemote(args, 1);
Assert.assertTrue("Couldn't find logging", output.contains("Could not find serialized job file"));
}

protected String executeJobRemote(List<String> args, int expectedExitCode) throws IOException, InterruptedException
{
ProcessBuilder pb = new ProcessBuilder(args);
pb.directory(_tempDir);
pb.redirectErrorStream(true);
Process proc = pb.start();
StringBuilder sb = new StringBuilder();
try (BufferedReader procReader = Readers.getReader(proc.getInputStream()))
{
String line;
while ((line = procReader.readLine()) != null)
{
sb.append(line);
sb.append("\n");
}
}
proc.waitFor();
Assert.assertEquals("Wrong exit code", expectedExitCode, proc.exitValue());
return sb.toString();
}

@NotNull
private List<String> createArgs(@Nullable PipelineJob job) throws IOException
{
List<String> args = new ArrayList<>();
args.add(System.getProperty("java.home") + "/bin/java" + (SystemUtils.IS_OS_WINDOWS ? ".exe" : ""));
File labkeyBootstrap = new File(new File(new File(System.getProperty("catalina.home")), "lib"), "labkeyBootstrap.jar");

// Uncomment this line if you want to debug the forked process
// args.add("-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=*:5005");
args.add("-cp");
args.add(labkeyBootstrap.getPath());
args.add(ClusterBootstrap.class.getName());
args.add("-webappdir=" + ModuleLoader.getServletContext().getRealPath(""));

if (job != null)
{
// Serialize to a file
File serializedJob = new File(_tempDir, "job.json");
File log = new File(_tempDir, "job.log");
job.setLogFile(log);
job.writeToFile(serializedJob);
args.add(serializedJob.toURI().toString());
}

return args;
}
}
}
77 changes: 58 additions & 19 deletions pipeline/src/org/labkey/pipeline/mule/test/DummyPipelineJob.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
/*
* Copyright (c) 2015-2016 LabKey Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (c) 2015-2016 LabKey Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.labkey.pipeline.mule.test;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.log4j.Logger;
import org.jetbrains.annotations.Nullable;
import org.labkey.api.data.Container;
import org.labkey.api.pipeline.PipelineJob;
import org.labkey.api.pipeline.PipelineJobService;
import org.labkey.api.pipeline.PipelineService;
import org.labkey.api.pipeline.TaskId;
import org.labkey.api.pipeline.TaskPipeline;
Expand All @@ -36,9 +40,40 @@
*/
public class DummyPipelineJob extends PipelineJob
{
public DummyPipelineJob(Container c, User user)
private Worker _worker;

public enum Worker
{
success
{
@Override
public void run(Logger log)
{
log.info("Successful worker!");
}
},
failure
{
@Override
public void run(Logger log)
{
throw new UnsupportedOperationException("Oopsies!");
}
};

public abstract void run(Logger log);
}

@JsonCreator
protected DummyPipelineJob()
{
super();
}

public DummyPipelineJob(Container c, User user, Worker worker)
{
super(null, new ViewBackgroundInfo(c, user, null), PipelineService.get().findPipelineRoot(c));
_worker = worker;
try
{
setLogFile(File.createTempFile("DummyPipelineJob", ".tmp"));
Expand All @@ -47,6 +82,12 @@ public DummyPipelineJob(Container c, User user)
{
throw new UnexpectedException(e);
}
setActiveTaskId(getTaskPipeline().getTaskProgression()[0], false);
}

public Worker getWorker()
{
return _worker;
}

@Override
Expand All @@ -65,8 +106,6 @@ public String getDescription()
@Override
public TaskPipeline getTaskPipeline()
{
TaskPipelineImpl result = new TaskPipelineImpl(new TaskId(DummyRemoteExecutionEngine.class, "DummyPipeline"));
result.setTaskProgression(new TaskId(DummyTaskFactory.class));
return result;
return PipelineJobService.get().getTaskPipeline(new TaskId(DummyPipelineJob.class));
}
}
9 changes: 8 additions & 1 deletion pipeline/src/org/labkey/pipeline/mule/test/DummyTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,22 @@
*/
public class DummyTask extends PipelineJob.Task
{
public DummyTask(PipelineJob job)
public DummyTask(DummyPipelineJob job)
{
super(null, job);
}

@Override
public DummyPipelineJob getJob()
{
return (DummyPipelineJob)super.getJob();
}

@NotNull
@Override
public RecordedActionSet run()
{
getJob().getWorker().run(getJob().getLogger());
return new RecordedActionSet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public DummyTaskFactory()
@Override
public PipelineJob.Task createTask(PipelineJob job)
{
return new DummyTask(job);
return new DummyTask((DummyPipelineJob)job);
}

@Override
Expand Down
Loading