Skip to content

Commit

Permalink
Expose internal API to get Spark version on the driver side (#429)
Browse files Browse the repository at this point in the history
  • Loading branch information
imback82 committed Feb 23, 2020
1 parent 0d6498f commit 5c5c249
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
14 changes: 14 additions & 0 deletions src/csharp/Microsoft.Spark/Interop/SparkEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Dynamic;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Services;

Expand All @@ -12,6 +14,18 @@ namespace Microsoft.Spark.Interop
/// </summary>
internal static class SparkEnvironment
{
private static readonly Lazy<Version> s_sparkVersion = new Lazy<Version>(
() => new Version((string)JvmBridge.CallStaticJavaMethod(
"org.apache.spark.deploy.dotnet.DotnetRunner",
"SPARK_VERSION")));
internal static Version SparkVersion
{
get
{
return s_sparkVersion.Value;
}
}

private static IJvmBridge s_jvmBridge;
internal static IJvmBridge JvmBridge
{
Expand Down
25 changes: 16 additions & 9 deletions src/csharp/Microsoft.Spark/Sql/DataFrame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Net;
using Microsoft.Spark.Interop;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Network;
using Microsoft.Spark.Sql.Streaming;
Expand Down Expand Up @@ -902,20 +903,26 @@ private IEnumerable<Row> GetRows(string funcName)
/// <returns>A tuple of port number and secret string</returns>
private (int, string) GetConnectionInfo(string funcName)
{
var result = _jvmObject.Invoke(funcName);
if (result is int)
object result = _jvmObject.Invoke(funcName);
Version version = SparkEnvironment.SparkVersion;
return (version.Major, version.Minor, version.Build) switch
{
// In spark 2.3.0, PythonFunction.serveIterator() returns a port number.
return ((int)result, string.Empty);
}
else
{
(2, 3, 0) => ((int)result, string.Empty),
// From spark >= 2.3.1, PythonFunction.serveIterator() returns a pair
// where the first is a port number and the second is the secret
// string to use for the authentication.
var pair = (JvmObjectReference[])result;
return ((int)pair[0].Invoke("intValue"), (string)pair[1].Invoke("toString"));
}
(2, 3, _) => ParseConnectionInfo(result),
(2, 4, _) => ParseConnectionInfo(result),
(3, 0, _) => ParseConnectionInfo(result),
_ => throw new NotSupportedException($"Spark {version} not supported.")
};
}

private (int, string) ParseConnectionInfo(object info)
{
var pair = (JvmObjectReference[])info;
return ((int)pair[0].Invoke("intValue"), (string)pair[1].Invoke("toString"));
}

private DataFrame WrapAsDataFrame(object obj) => new DataFrame((JvmObjectReference)obj);
Expand Down

0 comments on commit 5c5c249

Please sign in to comment.