Skip to content

Commit

Permalink
Dev/sutyag/upgrade mobius (#697)
Browse files Browse the repository at this point in the history
* basic

* Add extractor and outputter

* Add reducer not done

* Add procedure

* kill node, broadcast, upload executable error feed to cosmos, specify avro or parquet syntax

* Add more functions to HDFS.
Add submitter heartbeat
Update doc

* Redesign cosmos download, add replication setting for hdfs

* Improve executable runner to deal with bad lines

* MERGE MOBIUS

* change dependency path

* Add registration method to mobius

* Major refactoring to add ISparkosmosModule to modulize everything
Start supporting streaming
Fixed a couple of Mobius bugs
Added integration tests
Reenabled unit tests
Added DatedPath

* Make sparkcontext settable, fix setjobgroup

* Expose more interface from Mobius

* Mobius change for Spark 2.3

* fix version conflict, remove unused files

* Added support for multiple UDFs

* Fixed non sql udf issue

* 1. Upgarde mobius to spark 2.3.1  2. Fixed UDF bugs 3. Added support for multipe UDFs

* 1. Added sample testcases 2.Updated referece for examples

* Removed stashed files

* Fixed review comments

* Fixed review comments

* Fixed failed unit test cases

* Deleting all the things

* Updated version in appveyor

* Updated tartool download path

* Fixed java process terminate issue

* Revert access modifier to internal from public for JvmBridge
  • Loading branch information
sutyag authored and skaarthik committed Nov 21, 2018
1 parent 09462ff commit 9aa97b9
Show file tree
Hide file tree
Showing 92 changed files with 2,178 additions and 1,439 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -157,4 +157,4 @@ Mobius is licensed under the MIT license. See [LICENSE](LICENSE) file for full l
* tweet [@MobiusForSpark](http://twitter.com/MobiusForSpark)

## Code of Conduct
This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.
This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.
2 changes: 1 addition & 1 deletion appveyor.yml
@@ -1,4 +1,4 @@
version: 2.0.2-SNAPSHOT.{build}
version: 2.3.1-SNAPSHOT.{build}

environment:
securefile:
Expand Down
2 changes: 2 additions & 0 deletions build/Build.cmd
Expand Up @@ -6,6 +6,8 @@ rem Copyright (c) Microsoft. All rights reserved.
rem Licensed under the MIT license. See LICENSE file in the project root for full license information.
rem

SET MAVEN_OPTS=-Dhttps.protocols=TLSv1,TLSv1.1,TLSv1.2

if "%1" == "csharp" set buildCSharp=true

SET CMDHOME=%~dp0
Expand Down
4 changes: 2 additions & 2 deletions build/localmode/RunSamples.cmd
Expand Up @@ -47,7 +47,7 @@ if "%precheck%" == "bad" (goto :EOF)
@rem
@rem setup Hadoop and Spark versions
@rem
set SPARK_VERSION=2.0.2
set SPARK_VERSION=2.3.1
set HADOOP_VERSION=2.6
set APACHE_DIST_SERVER=archive.apache.org
@echo [RunSamples.cmd] SPARK_VERSION=%SPARK_VERSION%, HADOOP_VERSION=%HADOOP_VERSION%, APACHE_DIST_SERVER=%APACHE_DIST_SERVER%
Expand Down Expand Up @@ -100,7 +100,7 @@ if "!USER_EXE!"=="" (
call sparkclr-submit.cmd --conf spark.sql.warehouse.dir=%TEMP_DIR% %*
)

@if ERRORLEVEL 1 GOTO :ErrorStop
@if ERRORLEVEL 2 GOTO :ErrorStop

@GOTO :EOF

Expand Down
2 changes: 1 addition & 1 deletion build/localmode/downloadtools.ps1
Expand Up @@ -20,7 +20,7 @@ if ($stage.ToLower() -eq "run")
$hadoopVersion = if ($envValue -eq $null) { "2.6" } else { $envValue }

$envValue = [Environment]::GetEnvironmentVariable("SPARK_VERSION")
$sparkVersion = if ($envValue -eq $null) { "2.0.2" } else { $envValue }
$sparkVersion = if ($envValue -eq $null) { "2.3.1" } else { $envValue }

Write-Output "[downloadtools] hadoopVersion=$hadoopVersion, sparkVersion=$sparkVersion, apacheDistServer=$apacheDistServer"
}
Expand Down
2 changes: 1 addition & 1 deletion build/localmode/run-samples.sh
Expand Up @@ -16,7 +16,7 @@ do
done

# setup Hadoop and Spark versions
export SPARK_VERSION=2.0.2
export SPARK_VERSION=2.3.1
export HADOOP_VERSION=2.6
export APACHE_DIST_SERVER=archive.apache.org
echo "[run-samples.sh] SPARK_VERSION=$SPARK_VERSION, HADOOP_VERSION=$HADOOP_VERSION, APACHE_DIST_SERVER=$APACHE_DIST_SERVER"
Expand Down
6 changes: 3 additions & 3 deletions cpp/Riosock/Riosock.vcxproj
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Project DefaultTargets="Build" ToolsVersion="14.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|x64">
<Configuration>Debug</Configuration>
Expand All @@ -20,13 +20,13 @@
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="Configuration">
<ConfigurationType>DynamicLibrary</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<PlatformToolset>v120</PlatformToolset>
<PlatformToolset>v140</PlatformToolset>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration">
<ConfigurationType>DynamicLibrary</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<PlatformToolset>v120</PlatformToolset>
<PlatformToolset>v140</PlatformToolset>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
Expand Down
11 changes: 7 additions & 4 deletions csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj
Expand Up @@ -35,16 +35,17 @@
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<DocumentationFile>..\documentation\Microsoft.Spark.CSharp.Adapter.Doc.XML</DocumentationFile>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>
<PropertyGroup>
<StartupObject />
</PropertyGroup>
<ItemGroup>
<Reference Include="log4net">
<HintPath>..\..\packages\log4net.2.0.5\lib\net45-full\log4net.dll</HintPath>
<Reference Include="log4net, Version=2.0.8.0, Culture=neutral, PublicKeyToken=669e0ddf0bb1aa2a, processorArchitecture=MSIL">
<HintPath>..\..\packages\log4net.2.0.8\lib\net45-full\log4net.dll</HintPath>
</Reference>
<Reference Include="Newtonsoft.Json">
<HintPath>..\..\packages\Newtonsoft.Json.7.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
<Reference Include="Newtonsoft.Json, Version=11.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<HintPath>..\..\packages\Newtonsoft.Json.11.0.2\lib\net45\Newtonsoft.Json.dll</HintPath>
</Reference>
<Reference Include="Razorvine.Pyrolite">
<HintPath>..\..\packages\Razorvine.Pyrolite.4.10.0.0\lib\net40\Razorvine.Pyrolite.dll</HintPath>
Expand Down Expand Up @@ -98,6 +99,7 @@
<Compile Include="Network\RioNative.cs" />
<Compile Include="Network\RioSocketWrapper.cs" />
<Compile Include="Network\SaeaSocketWrapper.cs" />
<Compile Include="Network\SocketInfo.cs" />
<Compile Include="Network\SocketStream.cs" />
<Compile Include="Network\SockDataToken.cs" />
<Compile Include="Network\SocketFactory.cs" />
Expand Down Expand Up @@ -184,6 +186,7 @@
<ItemGroup>
<None Include="packages.config" />
</ItemGroup>
<ItemGroup />
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<PropertyGroup>
<PostBuildEvent>
Expand Down
3 changes: 2 additions & 1 deletion csharp/Adapter/Microsoft.Spark.CSharp/Core/IRDDCollector.cs
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using Microsoft.Spark.CSharp.Network;

namespace Microsoft.Spark.CSharp.Core
{
Expand All @@ -11,6 +12,6 @@ namespace Microsoft.Spark.CSharp.Core
/// </summary>
interface IRDDCollector
{
IEnumerable<dynamic> Collect(int port, SerializedMode serializedMode, Type type);
IEnumerable<dynamic> Collect(SocketInfo info, SerializedMode serializedMode, Type type);
}
}
24 changes: 13 additions & 11 deletions csharp/Adapter/Microsoft.Spark.CSharp/Core/RDD.cs
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Spark.CSharp.Network;
using Microsoft.Spark.CSharp.Proxy;
using Microsoft.Spark.CSharp.Services;

Expand Down Expand Up @@ -60,6 +61,7 @@ public SparkContext SparkContext
{
return sparkContext;
}
set { sparkContext = value; }
}

/// <summary>
Expand Down Expand Up @@ -592,13 +594,13 @@ public void ForeachPartition(Action<IEnumerable<T>> f)
/// <returns></returns>
public T[] Collect()
{
int port = RddProxy.CollectAndServe();
return Collect(port).Cast<T>().ToArray();
var info = RddProxy.CollectAndServe();
return Collect(info).Cast<T>().ToArray();
}

internal IEnumerable<dynamic> Collect(int port)
internal IEnumerable<dynamic> Collect(SocketInfo info)
{
return RddProxy.RDDCollector.Collect(port, serializedMode, typeof(T));
return RddProxy.RDDCollector.Collect(info, serializedMode, typeof(T));
}

/// <summary>
Expand Down Expand Up @@ -830,9 +832,9 @@ public T[] Take(int num)


var mappedRDD = MapPartitionsWithIndex<T>(new TakeHelper<T>(left).Execute);
int port = sparkContext.SparkContextProxy.RunJob(mappedRDD.RddProxy, partitions);
var info = sparkContext.SparkContextProxy.RunJob(mappedRDD.RddProxy, partitions);

IEnumerable<T> res = Collect(port).Cast<T>();
IEnumerable<T> res = Collect(info).Cast<T>();

items.AddRange(res);
partsScanned += numPartsToTry;
Expand Down Expand Up @@ -925,7 +927,7 @@ public RDD<T> Subtract(RDD<T> other, int numPartitions = 0)
/// <returns></returns>
public RDD<T> Repartition(int numPartitions)
{
return new RDD<T>(RddProxy.Repartition(numPartitions), sparkContext);
return new RDD<T>(RddProxy.Repartition(numPartitions), sparkContext, serializedMode);
}

/// <summary>
Expand All @@ -942,8 +944,8 @@ public RDD<T> Repartition(int numPartitions)
/// <returns></returns>
public RDD<T> Coalesce(int numPartitions, bool shuffle = false)
{
return new RDD<T>(RddProxy.Coalesce(numPartitions, shuffle), sparkContext);
}
return new RDD<T>(RddProxy.Coalesce(numPartitions, shuffle), sparkContext, serializedMode);
}

/// <summary>
/// Zips this RDD with another one, returning key-value pairs with the
Expand Down Expand Up @@ -1065,8 +1067,8 @@ public IEnumerable<T> ToLocalIterator()
foreach (int partition in Enumerable.Range(0, GetNumPartitions()))
{
var mappedRDD = MapPartitionsWithIndex<T>((pid, iter) => iter);
int port = sparkContext.SparkContextProxy.RunJob(mappedRDD.RddProxy, Enumerable.Range(partition, 1));
foreach (T row in Collect(port))
var info = sparkContext.SparkContextProxy.RunJob(mappedRDD.RddProxy, Enumerable.Range(partition, 1));
foreach (T row in Collect(info))
yield return row;
}
}
Expand Down
22 changes: 20 additions & 2 deletions csharp/Adapter/Microsoft.Spark.CSharp/Core/RDDCollector.cs
Expand Up @@ -11,6 +11,7 @@
using System.Text;
using Microsoft.Spark.CSharp.Interop.Ipc;
using Microsoft.Spark.CSharp.Network;
using Microsoft.Spark.CSharp.Services;
using Microsoft.Spark.CSharp.Sql;

namespace Microsoft.Spark.CSharp.Core
Expand All @@ -20,14 +21,31 @@ namespace Microsoft.Spark.CSharp.Core
/// </summary>
class RDDCollector : IRDDCollector
{
public IEnumerable<dynamic> Collect(int port, SerializedMode serializedMode, Type type)
private static ILoggerService logger;
private static ILoggerService Logger
{
get
{
if (logger != null) return logger;
logger = LoggerServiceFactory.GetLogger(typeof(RDDCollector));
return logger;
}
}

public IEnumerable<dynamic> Collect(SocketInfo info, SerializedMode serializedMode, Type type)
{
IFormatter formatter = new BinaryFormatter();
var sock = SocketFactory.CreateSocket();
sock.Connect(IPAddress.Loopback, port);
sock.Connect(IPAddress.Loopback, info.Port, null);

using (var s = sock.GetStream())
{
if (info.Secret != null)
{
SerDe.Write(s, info.Secret);
var reply = SerDe.ReadString(s);
Logger.LogDebug("Connect back to JVM: " + reply);
}
byte[] buffer;
while ((buffer = SerDe.ReadBytes(s)) != null && buffer.Length > 0)
{
Expand Down
Expand Up @@ -36,7 +36,7 @@ private ISocketWrapper GetConnection()
if (!sockets.TryDequeue(out socket))
{
socket = SocketFactory.CreateSocket();
socket.Connect(IPAddress.Loopback, portNumber);
socket.Connect(IPAddress.Loopback, portNumber, null);
}
return socket;
}
Expand Down
Expand Up @@ -12,12 +12,12 @@ namespace Microsoft.Spark.CSharp.Interop.Ipc
/// Reference to object created in JVM
/// </summary>
[Serializable]
internal class JvmObjectReference
public class JvmObjectReference
{
public string Id { get; private set; }
private DateTime creationTime;

public JvmObjectReference(string jvmReferenceId)
internal JvmObjectReference(string jvmReferenceId)
{
Id = jvmReferenceId;
creationTime = DateTime.UtcNow;
Expand Down Expand Up @@ -48,6 +48,11 @@ public override int GetHashCode()
return base.GetHashCode();
}

public string ObjectToString()
{
return SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(this, "toString").ToString();
}

public string GetDebugInfo()
{
var javaObjectReferenceForClassObject = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(this, "getClass").ToString());
Expand Down
Expand Up @@ -31,7 +31,9 @@ internal static ISparkCLRProxy SparkCLRProxy
}
}

internal static IConfigurationService configurationService;
internal static IJvmBridge JvmBridge => SparkCLRIpcProxy.JvmBridge;

internal static IConfigurationService configurationService;

internal static IConfigurationService ConfigurationService
{
Expand Down
2 changes: 1 addition & 1 deletion csharp/Adapter/Microsoft.Spark.CSharp/Network/ByteBuf.cs
Expand Up @@ -11,7 +11,7 @@ namespace Microsoft.Spark.CSharp.Network
/// ByteBuf delimits a section of a ByteBufChunk.
/// It is the smallest unit to be allocated.
/// </summary>
internal class ByteBuf
public class ByteBuf
{
private int readerIndex;
private int writerIndex;
Expand Down

0 comments on commit 9aa97b9

Please sign in to comment.