Skip to content
This repository was archived by the owner on Aug 4, 2022. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using NSubstitute;
using Org.Apache.REEF.Client.Common;
using Org.Apache.REEF.Client.Yarn;
Expand Down Expand Up @@ -52,25 +53,25 @@ public void JobResourceUploaderCanInstantiateWithDefaultBindings()
}

[Fact]
public void UploadJobResourceCreatesResourceArchive()
public async Task UploadJobResourceCreatesResourceArchive()
{
var testContext = new TestContext();
var jobResourceUploader = testContext.GetJobResourceUploader();

jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);
await jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);

// Archive file generator recieved exactly one call with correct driver local folder path
testContext.ResourceArchiveFileGenerator.Received(1).CreateArchiveToUpload(AnyDriverLocalFolderPath);
}

[Fact]
public void UploadJobResourceReturnsJobResourceDetails()
public async Task UploadJobResourceReturnsJobResourceDetails()
{
var testContext = new TestContext();
var jobResourceUploader = testContext.GetJobResourceUploader();

var archiveJobResource = jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);
var fileJobResource = jobResourceUploader.UploadFileResource(AnyLocalJobFilePath, AnyDriverResourceUploadPath);
var archiveJobResource = await jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);
var fileJobResource = await jobResourceUploader.UploadFileResourceAsync(AnyLocalJobFilePath, AnyDriverResourceUploadPath);
var jobResources = new List<JobResource> { archiveJobResource, fileJobResource };

foreach (var resource in jobResources)
Expand All @@ -85,13 +86,13 @@ public void UploadJobResourceReturnsJobResourceDetails()
}

[Fact]
public void UploadJobResourceMakesCorrectFileSystemCalls()
public async Task UploadJobResourceMakesCorrectFileSystemCalls()
{
var testContext = new TestContext();
var jobResourceUploader = testContext.GetJobResourceUploader();

jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);
jobResourceUploader.UploadFileResource(AnyLocalJobFilePath, AnyDriverResourceUploadPath);
await jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);
await jobResourceUploader.UploadFileResourceAsync(AnyLocalJobFilePath, AnyDriverResourceUploadPath);

testContext.FileSystem.Received(1).CreateUriForPath(AnyUploadedResourcePath);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using NSubstitute;
using Org.Apache.REEF.Client.Common;
using Org.Apache.REEF.Client.Yarn;
Expand All @@ -37,75 +38,75 @@ public class LegacyJobResourceUploaderTests
private const long AnyResourceSize = 53092;

[Fact]
public void UploadJobResourceCreatesResourceArchive()
public async Task UploadJobResourceCreatesResourceArchive()
{
var testContext = new TestContext();
var jobResourceUploader = testContext.GetJobResourceUploader();

jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);
await jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);

// Archive file generator recieved exactly one call with correct driver local folder path with trailing \
testContext.ResourceArchiveFileGenerator.Received(1).CreateArchiveToUpload(AnyDriverLocalFolderPath + @"\");
}

[Fact]
public void UploadJobResourceJavaLauncherCalledWithCorrectArguments()
public async Task UploadJobResourceJavaLauncherCalledWithCorrectArguments()
{
var testContext = new TestContext();
var jobResourceUploader = testContext.GetJobResourceUploader();
const string anyLocalArchivePath = @"Any\Local\Archive\Path.zip";
var anyLocalJobFilePath = AnyDriverLocalFolderPath.TrimEnd('\\') + @"\job-submission-params.json";
testContext.ResourceArchiveFileGenerator.CreateArchiveToUpload(AnyDriverLocalFolderPath + @"\")
.Returns(anyLocalArchivePath);
jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);
jobResourceUploader.UploadFileResource(AnyLocalJobFilePath, AnyDriverResourceUploadPath);
await jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath);
await jobResourceUploader.UploadFileResourceAsync(AnyLocalJobFilePath, AnyDriverResourceUploadPath);

const string javaClassNameForResourceUploader = @"org.apache.reef.bridge.client.JobResourceUploader";
Guid notUsed;
Guid notUsedGuid;

// Clientlauncher is called with correct class name, local archive path, upload path and temp file.
testContext.JavaClientLauncher.Received(1)
.Launch(javaClassNameForResourceUploader,
var notUsedTask = testContext.JavaClientLauncher.Received(1)
.LaunchAsync(javaClassNameForResourceUploader,
anyLocalArchivePath,
"ARCHIVE",
AnyDriverResourceUploadPath + "/",
Arg.Is<string>(
outputFilePath =>
Path.GetDirectoryName(outputFilePath) + @"\" == Path.GetTempPath()
&& Guid.TryParse(Path.GetFileName(outputFilePath), out notUsed)));
&& Guid.TryParse(Path.GetFileName(outputFilePath), out notUsedGuid)));

// Clientlauncher is called with correct class name, local job file path, upload path and temp file.
testContext.JavaClientLauncher.Received(1)
.Launch(javaClassNameForResourceUploader,
notUsedTask = testContext.JavaClientLauncher.Received(1)
.LaunchAsync(javaClassNameForResourceUploader,
anyLocalJobFilePath,
"FILE",
AnyDriverResourceUploadPath + "/",
Arg.Is<string>(
outputFilePath =>
Path.GetDirectoryName(outputFilePath) + @"\" == Path.GetTempPath()
&& Guid.TryParse(Path.GetFileName(outputFilePath), out notUsed)));
&& Guid.TryParse(Path.GetFileName(outputFilePath), out notUsedGuid)));
}

[Fact]
public void UploadJobResourceNoFileCreatedByJavaCallThrowsException()
public async Task UploadJobResourceNoFileCreatedByJavaCallThrowsException()
{
var testContext = new TestContext();
var jobResourceUploader = testContext.GetJobResourceUploader(fileExistsReturnValue: false);

// throws filenotfound exception
Assert.Throws<FileNotFoundException>(() => jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath));
await Assert.ThrowsAsync<FileNotFoundException>(() => jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath));
}

[Fact]
public void UploadJobResourceReturnsJobResourceDetails()
public async Task UploadJobResourceReturnsJobResourceDetails()
{
var testContext = new TestContext();
var jobResourceUploader = testContext.GetJobResourceUploader();

var jobResources = new List<JobResource>()
var jobResources = new List<JobResource>
{
jobResourceUploader.UploadArchiveResource(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath),
jobResourceUploader.UploadFileResource(AnyLocalJobFilePath, AnyDriverResourceUploadPath)
await jobResourceUploader.UploadArchiveResourceAsync(AnyDriverLocalFolderPath, AnyDriverResourceUploadPath),
await jobResourceUploader.UploadFileResourceAsync(AnyLocalJobFilePath, AnyDriverResourceUploadPath)
};

Assert.Equal(jobResources.Count, 2);
Expand Down
3 changes: 2 additions & 1 deletion lang/cs/Org.Apache.REEF.Client/Common/IJavaClientLauncher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

using System.Collections.Generic;
using System.Threading.Tasks;
using Org.Apache.REEF.Tang.Annotations;

namespace Org.Apache.REEF.Client.Common
Expand All @@ -26,7 +27,7 @@ namespace Org.Apache.REEF.Client.Common
[DefaultImplementation(typeof(JavaClientLauncher))]
internal interface IJavaClientLauncher
{
void Launch(string javaClassName, params string[] parameters);
Task LaunchAsync(string javaClassName, params string[] parameters);

/// <summary>
/// Add entries to the end of the classpath of the java client.
Expand Down
15 changes: 10 additions & 5 deletions lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
using System.Globalization;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Org.Apache.REEF.Client.API.Exceptions;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities.Diagnostics;
Expand Down Expand Up @@ -51,24 +52,26 @@ private JavaClientLauncher()
/// </summary>
/// <param name="javaClassName"></param>
/// <param name="parameters"></param>
public void Launch(string javaClassName, params string[] parameters)
public Task LaunchAsync(string javaClassName, params string[] parameters)
{
var startInfo = new ProcessStartInfo
{
Arguments = AssembleArguments(javaClassName, parameters),
FileName = GetJavaCommand(),
UseShellExecute = false,
RedirectStandardOutput = true,
RedirectStandardError = true
RedirectStandardError = true,
};

var msg = string.Format(CultureInfo.CurrentCulture, "Launch Java with command: {0} {1}",
startInfo.FileName, startInfo.Arguments);
Logger.Log(Level.Info, msg);

var process = Process.Start(startInfo);
var processExitTracker = new TaskCompletionSource<bool>();
if (process != null)
{
process.EnableRaisingEvents = true;
process.OutputDataReceived += delegate(object sender, DataReceivedEventArgs e)
{
if (!string.IsNullOrWhiteSpace(e.Data))
Expand All @@ -85,16 +88,18 @@ public void Launch(string javaClassName, params string[] parameters)
};
process.BeginErrorReadLine();
process.BeginOutputReadLine();
process.WaitForExit();
process.Exited += (sender, args) => { processExitTracker.SetResult(process.ExitCode == 0); };
}
else
{
Exceptions.Throw(new Exception("Java client process didn't start."), Logger);
processExitTracker.SetException(new Exception("Java client process didn't start."));
}

return processExitTracker.Task;
}

/// <summary>
/// Assembles the command line arguments. Used by Launch()
/// Assembles the command line arguments. Used by LaunchAsync()
/// </summary>
/// <param name="javaClassName"></param>
/// <param name="parameters"></param>
Expand Down
6 changes: 4 additions & 2 deletions lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ public void Submit(JobRequest jobRequest)
var driverFolder = PrepareDriverFolder(jobRequest);
var submissionJobArgsFilePath = CreateBootstrapAvroJobConfig(jobRequest.JobParameters, driverFolder);
var submissionAppArgsFilePath = CreateBootstrapAvroAppConfig(jobRequest.AppParameters, driverFolder);
_javaClientLauncher.Launch(JavaClassName, submissionJobArgsFilePath, submissionAppArgsFilePath);
_javaClientLauncher.LaunchAsync(JavaClassName, submissionJobArgsFilePath, submissionAppArgsFilePath)
.GetAwaiter().GetResult();
Logger.Log(Level.Info, "Submitted the Driver for execution.");
}

Expand All @@ -167,7 +168,8 @@ public IJobSubmissionResult SubmitAndGetJobStatus(JobRequest jobRequest)
var submissionJobArgsFilePath = CreateBootstrapAvroJobConfig(jobRequest.JobParameters, driverFolder);
var submissionAppArgsFilePath = CreateBootstrapAvroAppConfig(jobRequest.AppParameters, driverFolder);

Task.Run(() => _javaClientLauncher.Launch(JavaClassName, submissionJobArgsFilePath, submissionAppArgsFilePath));
_javaClientLauncher.LaunchAsync(JavaClassName, submissionJobArgsFilePath, submissionAppArgsFilePath)
.GetAwaiter().GetResult();

var fileName = Path.Combine(driverFolder, _fileNames.DriverHttpEndpoint);
JobSubmissionResult result = new LocalJobSubmissionResult(this, fileName);
Expand Down
5 changes: 3 additions & 2 deletions lang/cs/Org.Apache.REEF.Client/YARN/IJobResourceUploader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

using System.Threading.Tasks;
using Org.Apache.REEF.Tang.Annotations;

namespace Org.Apache.REEF.Client.Yarn
Expand All @@ -28,14 +29,14 @@ public interface IJobResourceUploader
/// <param name="driverLocalFolderPath">Local folder where REEF application resources are staged</param>
/// <param name="remoteUploadDirectoryPath">Remote directory path where we will upload resources</param>
/// <returns>Path, modification time and size of uploaded file as JobResource</returns>
JobResource UploadArchiveResource(string driverLocalFolderPath, string remoteUploadDirectoryPath);
Task<JobResource> UploadArchiveResourceAsync(string driverLocalFolderPath, string remoteUploadDirectoryPath);

/// <summary>
/// Locates a file resource and uploads it to DFS destination path.
/// </summary>
/// <param name="fileLocalPath">file path</param>
/// <param name="remoteUploadDirectoryPath">Remote directory path where we will upload resources</param>
/// <returns>Path, modification time and size of uploaded file as JobResource</returns>
JobResource UploadFileResource(string fileLocalPath, string remoteUploadDirectoryPath);
Task<JobResource> UploadFileResourceAsync(string fileLocalPath, string remoteUploadDirectoryPath);
}
}
15 changes: 8 additions & 7 deletions lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
// under the License.

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Org.Apache.REEF.Client.Common;
using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
using Org.Apache.REEF.Common.Files;
Expand Down Expand Up @@ -62,24 +62,24 @@ private LegacyJobResourceUploader(
_reefFileNames = reefFileNames;
}

public JobResource UploadArchiveResource(string driverLocalFolderPath, string remoteUploadDirectoryPath)
public async Task<JobResource> UploadArchiveResourceAsync(string driverLocalFolderPath, string remoteUploadDirectoryPath)
{
driverLocalFolderPath = driverLocalFolderPath.TrimEnd('\\') + @"\";
var driverUploadPath = remoteUploadDirectoryPath.TrimEnd('/') + @"/";
Log.Log(Level.Info, "DriverFolderPath: {0} DriverUploadPath: {1}", driverLocalFolderPath, driverUploadPath);

var archivePath = _resourceArchiveFileGenerator.CreateArchiveToUpload(driverLocalFolderPath);
return GetJobResource(archivePath, ResourceType.ARCHIVE, driverUploadPath, _reefFileNames.GetReefFolderName());
return await UploadResourceAndGetInfoAsync(archivePath, ResourceType.ARCHIVE, driverUploadPath, _reefFileNames.GetReefFolderName());
}

public JobResource UploadFileResource(string fileLocalPath, string remoteUploadDirectoryPath)
public async Task<JobResource> UploadFileResourceAsync(string fileLocalPath, string remoteUploadDirectoryPath)
{
var driverUploadPath = remoteUploadDirectoryPath.TrimEnd('/') + @"/";
var jobArgsFilePath = fileLocalPath;
return GetJobResource(jobArgsFilePath, ResourceType.FILE, driverUploadPath);
return await UploadResourceAndGetInfoAsync(jobArgsFilePath, ResourceType.FILE, driverUploadPath);
}

private JobResource GetJobResource(string filePath, ResourceType resourceType, string driverUploadPath, string localizedName = null)
private async Task<JobResource> UploadResourceAndGetInfoAsync(string filePath, ResourceType resourceType, string driverUploadPath, string localizedName = null)
{
if (!_file.Exists(filePath))
{
Expand All @@ -92,7 +92,7 @@ private JobResource GetJobResource(string filePath, ResourceType resourceType, s

try
{
_javaLauncher.Launch(JavaClassNameForResourceUploader,
await _javaLauncher.LaunchAsync(JavaClassNameForResourceUploader,
filePath,
resourceType.ToString(),
driverUploadPath,
Expand All @@ -119,6 +119,7 @@ private JobResource ParseGeneratedOutputFile(string resourceDetailsOutputPath, s
Log);
}

// Single line file, easier to deal with sync read
string fileContent = _file.ReadAllText(resourceDetailsOutputPath).Trim();

Log.Log(Level.Info, "Java uploader returned content: " + fileContent);
Expand Down
Loading