Skip to content
This repository was archived by the owner on Aug 4, 2022. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ public void TestYarnREEFJobSerialization()
"\"securityTokenKind\":\"{0}\"," +
"\"securityTokenService\":\"{0}\"," +
"\"maxApplicationSubmissions\":{1}," +
"\"driverMemory\":{1}" +
"\"driverMemory\":{1}," +
"\"driverStdoutFilePath\":\"{0}\"," +
"\"driverStderrFilePath\":\"{0}\"" +
"}}";

var conf = YARNClientConfiguration.ConfigurationModule
Expand All @@ -173,6 +175,8 @@ public void TestYarnREEFJobSerialization()
.SetJobIdentifier(AnyString)
.SetMaxApplicationSubmissions(AnyInt)
.SetDriverMemory(AnyInt)
.SetDriverStderrFilePath(AnyString)
.SetDriverStdoutFilePath(AnyString)
.Build();

var serializedBytes = serializer.SerializeJobArgsToBytes(jobRequest.JobParameters, AnyString);
Expand Down
38 changes: 36 additions & 2 deletions lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

using System;
using Org.Apache.REEF.Utilities;

namespace Org.Apache.REEF.Client.API
{
Expand All @@ -29,19 +30,36 @@ public sealed class JobParameters
private readonly string _jobIdentifier;
private readonly int _maxApplicationSubmissions;
private readonly int _driverMemory;
private readonly Optional<string> _stdoutFilePath;
private readonly Optional<string> _stderrFilePath;

internal JobParameters(string jobIdentifier, int maxApplicationSubmissions, int driverMemory)
internal JobParameters(
string jobIdentifier,
int maxApplicationSubmissions,
int driverMemory,
string stdoutFilePath,
string stderrFilePath)
{
_jobIdentifier = jobIdentifier;
_maxApplicationSubmissions = maxApplicationSubmissions;
_driverMemory = driverMemory;

_stdoutFilePath = string.IsNullOrWhiteSpace(stdoutFilePath) ?
Optional<string>.Empty() : Optional<string>.Of(stdoutFilePath);

_stderrFilePath = string.IsNullOrWhiteSpace(stderrFilePath) ?
Optional<string>.Empty() : Optional<string>.Of(stderrFilePath);
}

[Obsolete("Introduced to bridge deprecation of IJobSubmission.")]
internal static JobParameters FromJobSubmission(IJobSubmission jobSubmission)
{
return new JobParameters(
jobSubmission.JobIdentifier, jobSubmission.MaxApplicationSubmissions, jobSubmission.DriverMemory);
jobSubmission.JobIdentifier,
jobSubmission.MaxApplicationSubmissions,
jobSubmission.DriverMemory,
null,
null);
}

/// <summary>
Expand All @@ -68,5 +86,21 @@ public int DriverMemoryInMB
{
get { return _driverMemory; }
}

/// <summary>
/// Gets the file path for stdout for the driver.
/// </summary>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the same description here, too?

/// Certain runtimes may impose stricter policies on log file
/// output locations (e.g. YARN jobs have restricted write access
/// to certain folders).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, sorry, I meant to remove that comment entirely, because it turned out not to be true -- I was just entering the paths incorrectly when I tried to test it out. I'll remove the message in the other places. Thanks!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

public Optional<string> StdoutFilePath
{
get { return _stdoutFilePath; }
}

/// <summary>
/// Gets the file path for stderr for the driver.
/// </summary>
public Optional<string> StderrFilePath
{
get { return _stderrFilePath; }
}
}
}
27 changes: 26 additions & 1 deletion lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public sealed class JobParametersBuilder
private string _jobIdentifier;
private int _maxApplicationSubmissions = 1;
private int _driverMemory = 512;
private string _stdoutFilePath = null;
private string _stderrFilePath = null;

private JobParametersBuilder()
{
Expand All @@ -44,7 +46,12 @@ public static JobParametersBuilder NewBuilder()
/// <returns></returns>
public JobParameters Build()
{
return new JobParameters(_jobIdentifier, _maxApplicationSubmissions, _driverMemory);
return new JobParameters(
_jobIdentifier,
_maxApplicationSubmissions,
_driverMemory,
_stdoutFilePath,
_stderrFilePath);
}

/// <summary>
Expand Down Expand Up @@ -76,5 +83,23 @@ public JobParametersBuilder SetDriverMemory(int driverMemoryInMb)
_driverMemory = driverMemoryInMb;
return this;
}

/// <summary>
/// Sets the file path to the stdout file for the driver.
/// </summary>
public JobParametersBuilder SetDriverStdoutFilePath(string stdoutFilePath)
{
_stdoutFilePath = stdoutFilePath;
return this;
}

/// <summary>
/// Sets the file path to the stderr file for the driver.
/// </summary>
public JobParametersBuilder SetDriverStderrFilePath(string stderrFilePath)
{
_stderrFilePath = stderrFilePath;
return this;
}
}
}
18 changes: 18 additions & 0 deletions lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,24 @@ public JobRequestBuilder SetMaxApplicationSubmissions(int maxAppSubmissions)
return this;
}

/// <summary>
/// Sets the stdout file path for the driver.
/// </summary>
public JobRequestBuilder SetDriverStdoutFilePath(string driverStdoutFilePath)
{
_jobParametersBuilder.SetDriverStdoutFilePath(driverStdoutFilePath);
return this;
}

/// <summary>
/// Sets the stderr file path for the driver.
/// </summary>
public JobRequestBuilder SetDriverStderrFilePath(string driverStderrFilePath)
{
_jobParametersBuilder.SetDriverStderrFilePath(driverStderrFilePath);
return this;
}

/// <summary>
/// Driver config file contents (Org.Apache.REEF.Bridge.exe.config) contents
/// Can be use to redirect assembly versions
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//<auto-generated />

using System.Runtime.Serialization;
using Org.Apache.REEF.Utilities.Attributes;

namespace Org.Apache.REEF.Client.Avro.Local
{
/// <summary>
/// Used to serialize and deserialize Avro record
/// org.apache.reef.reef.bridge.client.avro.AvroLocalJobSubmissionParameters.
/// This is a (mostly) auto-generated class.
/// For instructions on how to regenerate, please view the README.md in the same folder.
/// </summary>
[Private]
[DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
public sealed class AvroLocalJobSubmissionParameters
{
private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroLocalJobSubmissionParameters"",""doc"":""Job submission parameters used by the local runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""driverStdoutFilePath"",""type"":""string""},{""name"":""driverStderrFilePath"",""type"":""string""}]}";

/// <summary>
/// Gets the schema.
/// </summary>
public static string Schema
{
get
{
return JsonSchema;
}
}

/// <summary>
/// Gets or sets the sharedJobSubmissionParameters field.
/// </summary>
[DataMember]
public AvroJobSubmissionParameters sharedJobSubmissionParameters { get; set; }

/// <summary>
/// Gets or sets the driverStdoutFilePath field.
/// </summary>
[DataMember]
public string driverStdoutFilePath { get; set; }

/// <summary>
/// Gets or sets the driverStderrFilePath field.
/// </summary>
[DataMember]
public string driverStderrFilePath { get; set; }

/// <summary>
/// Initializes a new instance of the <see cref="AvroLocalJobSubmissionParameters"/> class.
/// </summary>
public AvroLocalJobSubmissionParameters()
{
}

/// <summary>
/// Initializes a new instance of the <see cref="AvroLocalJobSubmissionParameters"/> class.
/// </summary>
/// <param name="sharedJobSubmissionParameters">The sharedJobSubmissionParameters.</param>
/// <param name="driverStdoutFilePath">The driverStdoutFilePath.</param>
/// <param name="driverStderrFilePath">The driverStderrFilePath.</param>
public AvroLocalJobSubmissionParameters(AvroJobSubmissionParameters sharedJobSubmissionParameters, string driverStdoutFilePath, string driverStderrFilePath)
{
this.sharedJobSubmissionParameters = sharedJobSubmissionParameters;
this.driverStdoutFilePath = driverStdoutFilePath;
this.driverStderrFilePath = driverStderrFilePath;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace Org.Apache.REEF.Client.Avro.YARN
[DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
public sealed class AvroYarnClusterJobSubmissionParameters
{
private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}},{""name"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""},{""name"":""driverMemory"",""type"":""int""},{""name"":""maxApplicationSubmissions"",""type"":""int""}]}";
private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}},{""name"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""},{""name"":""driverMemory"",""type"":""int""},{""name"":""maxApplicationSubmissions"",""type"":""int""},{""name"":""driverStdoutFilePath"",""type"":""string""},{""name"":""driverStderrFilePath"",""type"":""string""}]}";

/// <summary>
/// Gets the schema.
Expand Down Expand Up @@ -71,6 +71,18 @@ public static string Schema
[DataMember]
public int maxApplicationSubmissions { get; set; }

/// <summary>
/// Gets or sets the driverStdoutFilePath field.
/// </summary>
[DataMember]
public string driverStdoutFilePath { get; set; }

/// <summary>
/// Gets or sets the driverStderrFilePath field.
/// </summary>
[DataMember]
public string driverStderrFilePath { get; set; }

/// <summary>
/// Initializes a new instance of the <see cref="AvroYarnClusterJobSubmissionParameters"/> class.
/// </summary>
Expand All @@ -88,13 +100,17 @@ public AvroYarnClusterJobSubmissionParameters()
/// <param name="securityTokenService">The securityTokenService.</param>
/// <param name="driverMemory">The driverMemory.</param>
/// <param name="maxApplicationSubmissions">The maxApplicationSubmissions.</param>
public AvroYarnClusterJobSubmissionParameters(AvroYarnJobSubmissionParameters yarnJobSubmissionParameters, string securityTokenKind, string securityTokenService, int driverMemory, int maxApplicationSubmissions)
/// <param name="driverStdoutFilePath">The driverStdoutFilePath.</param>
/// <param name="driverStderrFilePath">The driverStderrFilePath.</param>
public AvroYarnClusterJobSubmissionParameters(AvroYarnJobSubmissionParameters yarnJobSubmissionParameters, string securityTokenKind, string securityTokenService, int driverMemory, int maxApplicationSubmissions, string driverStdoutFilePath, string driverStderrFilePath)
{
this.yarnJobSubmissionParameters = yarnJobSubmissionParameters;
this.securityTokenKind = securityTokenKind;
this.securityTokenService = securityTokenService;
this.driverMemory = driverMemory;
this.maxApplicationSubmissions = maxApplicationSubmissions;
this.driverStdoutFilePath = driverStdoutFilePath;
this.driverStderrFilePath = driverStderrFilePath;
}
}
}
9 changes: 8 additions & 1 deletion lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,17 @@ private string CreateBootstrapAvroJobConfig(JobParameters jobParameters, string
jobId = jobParameters.JobIdentifier,
};

var bootstrapLocalJobArgs = new AvroLocalJobSubmissionParameters
{
sharedJobSubmissionParameters = bootstrapJobArgs,
driverStdoutFilePath = jobParameters.StdoutFilePath.IsPresent() ? jobParameters.StdoutFilePath.Value : _fileNames.GetDriverStdoutFileName(),
driverStderrFilePath = jobParameters.StderrFilePath.IsPresent() ? jobParameters.StderrFilePath.Value : _fileNames.GetDriverStderrFileName()
};

var submissionArgsFilePath = Path.Combine(driverFolder, _fileNames.GetJobSubmissionParametersFile());
using (var argsFileStream = new FileStream(submissionArgsFilePath, FileMode.CreateNew))
{
var serializedArgs = AvroJsonSerializer<AvroJobSubmissionParameters>.ToBytes(bootstrapJobArgs);
var serializedArgs = AvroJsonSerializer<AvroLocalJobSubmissionParameters>.ToBytes(bootstrapLocalJobArgs);
argsFileStream.Write(serializedArgs, 0, serializedArgs.Length);
}

Expand Down
4 changes: 4 additions & 0 deletions lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ under the License.
<Compile Include="Avro\AvroAppSubmissionParameters.cs" />
<Compile Include="Avro\AvroJobSubmissionParameters.cs" />
<Compile Include="Avro\Local\AvroLocalAppSubmissionParameters.cs" />
<Compile Include="Avro\Local\AvroLocalJobSubmissionParameters.cs" />
<Compile Include="Avro\YARN\AvroYarnAppSubmissionParameters.cs" />
<Compile Include="Avro\YARN\AvroYarnJobSubmissionParameters.cs" />
<Compile Include="Avro\YARN\AvroYarnClusterJobSubmissionParameters.cs" />
Expand Down Expand Up @@ -112,6 +113,8 @@ under the License.
<Compile Include="YARN\IJobSubmissionDirectoryProvider.cs" />
<Compile Include="YARN\Parameters\DriverMaxMemoryAllicationPoolSizeMB.cs" />
<Compile Include="YARN\Parameters\DriverMaxPermSizeMB.cs" />
<Compile Include="YARN\Parameters\DriverStderrFilePath.cs" />
<Compile Include="YARN\Parameters\DriverStdoutFilePath.cs" />
<Compile Include="YARN\RestClient\HttpClient.cs" />
<Compile Include="YARN\RestClient\IDeserializer.cs" />
<Compile Include="YARN\RestClient\IHttpClient.cs" />
Expand Down Expand Up @@ -154,6 +157,7 @@ under the License.
<Compile Include="YARN\RestClient\IUrlProvider.cs" />
<Compile Include="YARN\RestClient\FileSystemJobResourceUploader.cs" />
<Compile Include="YARN\RestClient\MultipleRMUrlProvider.cs" />
<Compile Include="YARN\YarnCommandProviderConfiguration.cs" />
<Compile Include="YARN\YarnJobSubmissionResult.cs" />
<Compile Include="YARN\YARNREEFClient.cs" />
<Compile Include="YARN\RestClient\RestRequestExecutor.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

using Org.Apache.REEF.Tang.Annotations;

namespace Org.Apache.REEF.Client.YARN.Parameters
{
[NamedParameter("Driver stderr file path for YARN.", defaultValue: "<LOG_DIR>/driver.stderr")]
internal sealed class DriverStderrFilePath : Name<string>
{
private DriverStderrFilePath()
{
}
}
}
Loading