Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
f25436b
initial commit
suhsteve Dec 6, 2021
4e28a4b
update test
suhsteve Dec 6, 2021
4b53a64
ubuntu 1604 agent pool no longer supported.
suhsteve Dec 7, 2021
643c7ae
Trigger Build
suhsteve Dec 7, 2021
3bde5be
revert pool
suhsteve Dec 7, 2021
eded9a0
update pool
suhsteve Dec 8, 2021
f5f4310
update pool
suhsteve Dec 8, 2021
d3c8bfb
update pool
suhsteve Dec 8, 2021
672862d
update pool
suhsteve Dec 8, 2021
c3df844
update pool
suhsteve Dec 8, 2021
7bbc30a
dotnet task
suhsteve Dec 8, 2021
a093d9e
update pwsh winutils script
suhsteve Dec 8, 2021
aab077f
Use Hosted VS2017 pool for some versions
Niharikadutta Jan 7, 2022
5cfa12c
Changing all version pools to Hosted VS2017
Niharikadutta Jan 7, 2022
51f4d19
change if condition to check for linux
Niharikadutta Jan 10, 2022
16f6246
else option with hosted vs2017
Niharikadutta Jan 10, 2022
47a59cf
changing linux to Hosted Ubuntu 1804
Niharikadutta Jan 10, 2022
d8fafca
using ubuntu-latest and windows-latest images
Niharikadutta Jan 10, 2022
379c30a
reverting change
Niharikadutta Jan 10, 2022
071897a
change
Niharikadutta Jan 10, 2022
dbecb49
using vmImage
Niharikadutta Jan 11, 2022
68299ee
remove quotes around vmImage
Niharikadutta Jan 11, 2022
9643afb
using Hosted pools
Niharikadutta Jan 11, 2022
ab2283d
remove linux
Niharikadutta Jan 11, 2022
a12acf7
Using pool VS2019
Niharikadutta Jan 11, 2022
f1b478c
Add Delta 1.1.0 for Spark 3.1.2
Niharikadutta Jan 11, 2022
610e095
update pool
Niharikadutta Jan 11, 2022
0b034df
remove pool name
Niharikadutta Jan 11, 2022
e3f2046
Use Spark v3.1.2
Niharikadutta Jan 11, 2022
bf9d972
compat changes
Niharikadutta Jan 11, 2022
0208b98
Add 2.4.8 and 2.4.0 versions
Niharikadutta Jan 11, 2022
1128fa7
adding all versions
Niharikadutta Jan 11, 2022
814638e
use ubuntu-latest
Niharikadutta Jan 11, 2022
c2aa30a
Fix delta tests
Niharikadutta Jan 11, 2022
57f2cf5
remove dotnet installation step
Niharikadutta Jan 11, 2022
38f7f3a
using windows image Build.Server.Amd64.VS2019
Niharikadutta Jan 11, 2022
8d7406b
using image Build.Windows.Amd64.VS2022.Pre.Open
Niharikadutta Jan 11, 2022
a710571
using Build.Windows.10.Amd64.VS2019.Open
Niharikadutta Jan 11, 2022
0d6adce
use windows-latest
Niharikadutta Jan 12, 2022
61b1d21
Add simple test suite
Niharikadutta Jan 14, 2022
e62cffc
revert TestProject change
Niharikadutta Jan 14, 2022
2eb7320
adding null check logging
Niharikadutta Jan 14, 2022
435c6ad
using Build.Windows.10.Amd64.VS2019.Open
Niharikadutta Jan 14, 2022
8507d3c
use windows latest and ubuntu latest
Niharikadutta Jan 14, 2022
a44a2bf
fix typo
Niharikadutta Jan 14, 2022
cb02cbc
remove 1ES hosted pool
Niharikadutta Jan 15, 2022
b06f3a9
add simple tests
Niharikadutta Jan 18, 2022
a79971b
Removing .NET SDk reference in project
Niharikadutta Jan 18, 2022
4e19938
running Microsoft.Spark.E2ETest on windows-latest for Spark 2.4.0
Niharikadutta Jan 18, 2022
ec35ddc
Running Microsoft.Spark*.E2ETest on windows-latest for Spark 2.4.0
Niharikadutta Jan 19, 2022
455e7bc
Adding separate task for other test suites
Niharikadutta Jan 19, 2022
1165e77
Separate out all tests
Niharikadutta Jan 19, 2022
bf13d51
remove delta tests
Niharikadutta Jan 19, 2022
20551b0
Running only Delta tests
Niharikadutta Jan 19, 2022
849907a
logging delta fixture hanging
Niharikadutta Jan 19, 2022
f1a79f7
logging statements in spark fixture
Niharikadutta Jan 19, 2022
fa9413d
add more logging statements
Niharikadutta Jan 19, 2022
38c700b
logs
Niharikadutta Jan 19, 2022
f0e0433
Backend running debug mode check
Niharikadutta Jan 19, 2022
df5916d
string formatting
Niharikadutta Jan 19, 2022
2988025
Run all tests for all Spark versions
Niharikadutta Jan 20, 2022
b46b3b0
Testing 1ESPool
Niharikadutta Jan 20, 2022
72d7933
Running Microsoft.Spark.E2ETest only
Niharikadutta Jan 20, 2022
fef9f93
Running all separate tests for all spark versions and configure diagn…
Niharikadutta Jan 20, 2022
cd28e38
windows latest and Build.Ubuntu.1804.Amd64.Open
Niharikadutta Jan 20, 2022
fe40c27
windows latest and ubuntu latest
Niharikadutta Jan 21, 2022
6db8e43
Bintray fix for Spark packages
Niharikadutta Jan 21, 2022
a3d0c35
merging latest from master
Niharikadutta Jan 21, 2022
da91b44
Using Delta v1.0.0 for Spark 3.1.*
Niharikadutta Jan 21, 2022
03fb7a8
Remove logging statements
Niharikadutta Jan 21, 2022
26a0aa3
Adding Spark 3.2 tests and removing test projects
Niharikadutta Jan 21, 2022
350cdc0
Removing test project
Niharikadutta Jan 21, 2022
4b6d1b3
whiteline formatting
Niharikadutta Jan 21, 2022
078cf9b
fix
Niharikadutta Jan 21, 2022
b8af264
Fixing 1 failing test in 3.2.0 with setting config
Niharikadutta Jan 21, 2022
d3bf8a5
Writing process id to Scala as per worker contract change
Niharikadutta Jan 25, 2022
0ae38de
Change
Niharikadutta Jan 25, 2022
62d1c22
Fixing worker unit tests
Niharikadutta Jan 26, 2022
776faad
Add Spark version checking in TaskRunner
Niharikadutta Jan 26, 2022
1030b90
Fixing worker unit tests
Niharikadutta Jan 26, 2022
e72eafb
Test fix
Niharikadutta Jan 26, 2022
1d70719
Enable forward and backward compat tests
Niharikadutta Jan 26, 2022
faf56d7
installing mvn
Niharikadutta Jan 26, 2022
09b849f
using curl instead of wget
Niharikadutta Jan 26, 2022
badc4a5
maven options
Niharikadutta Jan 26, 2022
bdafea6
Disabling Spark 3.2 forward backward tests
Niharikadutta Jan 26, 2022
c7f31c3
Add tests for Spark 3.2.1
Niharikadutta Jan 26, 2022
6974760
Fixing compat test inconsistencies
Niharikadutta Jan 26, 2022
dd101de
Add support for Spark 3.2.1
Niharikadutta Jan 27, 2022
2b8d04c
Check Linux tests
Niharikadutta Jan 27, 2022
f9c9bb1
Remove Spark 3.2.x contract changes and tests
Niharikadutta Jan 27, 2022
ce3cc38
revert changes
Niharikadutta Jan 27, 2022
bbf8d5c
remove newline at end
Niharikadutta Jan 27, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions src/scala/microsoft-spark-3-2/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.microsoft.scala</groupId>
<artifactId>microsoft-spark</artifactId>
<version>${microsoft-spark.version}</version>
</parent>
<artifactId>microsoft-spark-3-2_2.12</artifactId>
<inceptionYear>2019</inceptionYear>
<properties>
<encoding>UTF-8</encoding>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.2.0</spark.version>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.8</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
</args>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the .NET Foundation under one or more agreements.
* The .NET Foundation licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/

package org.apache.spark.api.dotnet

import java.io.DataOutputStream

import org.apache.spark.internal.Logging

import scala.collection.mutable.Queue

/**
* CallbackClient is used to communicate with the Dotnet CallbackServer.
* The client manages and maintains a pool of open CallbackConnections.
* Any callback request is delegated to a new CallbackConnection or
* unused CallbackConnection.
* @param address The address of the Dotnet CallbackServer
* @param port The port of the Dotnet CallbackServer
*/
class CallbackClient(serDe: SerDe, address: String, port: Int) extends Logging {
private[this] val connectionPool: Queue[CallbackConnection] = Queue[CallbackConnection]()

private[this] var isShutdown: Boolean = false

final def send(callbackId: Int, writeBody: (DataOutputStream, SerDe) => Unit): Unit =
getOrCreateConnection() match {
case Some(connection) =>
try {
connection.send(callbackId, writeBody)
addConnection(connection)
} catch {
case e: Exception =>
logError(s"Error calling callback [callback id = $callbackId].", e)
connection.close()
throw e
}
case None => throw new Exception("Unable to get or create connection.")
}

private def getOrCreateConnection(): Option[CallbackConnection] = synchronized {
if (isShutdown) {
logInfo("Cannot get or create connection while client is shutdown.")
return None
}

if (connectionPool.nonEmpty) {
return Some(connectionPool.dequeue())
}

Some(new CallbackConnection(serDe, address, port))
}

private def addConnection(connection: CallbackConnection): Unit = synchronized {
assert(connection != null)
connectionPool.enqueue(connection)
}

def shutdown(): Unit = synchronized {
if (isShutdown) {
logInfo("Shutdown called, but already shutdown.")
return
}

logInfo("Shutting down.")
connectionPool.foreach(_.close)
connectionPool.clear
isShutdown = true
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the .NET Foundation under one or more agreements.
* The .NET Foundation licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/

package org.apache.spark.api.dotnet

import java.io.{ByteArrayOutputStream, Closeable, DataInputStream, DataOutputStream}
import java.net.Socket

import org.apache.spark.internal.Logging

/**
* CallbackConnection is used to process the callback communication
* between the JVM and Dotnet. It uses a TCP socket to communicate with
* the Dotnet CallbackServer and the socket is expected to be reused.
* @param address The address of the Dotnet CallbackServer
* @param port The port of the Dotnet CallbackServer
*/
class CallbackConnection(serDe: SerDe, address: String, port: Int) extends Logging {
private[this] val socket: Socket = new Socket(address, port)
private[this] val inputStream: DataInputStream = new DataInputStream(socket.getInputStream)
private[this] val outputStream: DataOutputStream = new DataOutputStream(socket.getOutputStream)

def send(
callbackId: Int,
writeBody: (DataOutputStream, SerDe) => Unit): Unit = {
logInfo(s"Calling callback [callback id = $callbackId] ...")

try {
serDe.writeInt(outputStream, CallbackFlags.CALLBACK)
serDe.writeInt(outputStream, callbackId)

val byteArrayOutputStream = new ByteArrayOutputStream()
writeBody(new DataOutputStream(byteArrayOutputStream), serDe)
serDe.writeInt(outputStream, byteArrayOutputStream.size)
byteArrayOutputStream.writeTo(outputStream);
} catch {
case e: Exception => {
throw new Exception("Error writing to stream.", e)
}
}

logInfo(s"Signaling END_OF_STREAM.")
try {
serDe.writeInt(outputStream, CallbackFlags.END_OF_STREAM)
outputStream.flush()

val endOfStreamResponse = readFlag(inputStream)
endOfStreamResponse match {
case CallbackFlags.END_OF_STREAM =>
logInfo(s"Received END_OF_STREAM signal. Calling callback [callback id = $callbackId] successful.")
case _ => {
throw new Exception(s"Error verifying end of stream. Expected: ${CallbackFlags.END_OF_STREAM}, " +
s"Received: $endOfStreamResponse")
}
}
} catch {
case e: Exception => {
throw new Exception("Error while verifying end of stream.", e)
}
}
}

def close(): Unit = {
try {
serDe.writeInt(outputStream, CallbackFlags.CLOSE)
outputStream.flush()
} catch {
case e: Exception => logInfo("Unable to send close to .NET callback server.", e)
}

close(socket)
close(outputStream)
close(inputStream)
}

private def close(s: Socket): Unit = {
try {
assert(s != null)
s.close()
} catch {
case e: Exception => logInfo("Unable to close socket.", e)
}
}

private def close(c: Closeable): Unit = {
try {
assert(c != null)
c.close()
} catch {
case e: Exception => logInfo("Unable to close closeable.", e)
}
}

private def readFlag(inputStream: DataInputStream): Int = {
val callbackFlag = serDe.readInt(inputStream)
if (callbackFlag == CallbackFlags.DOTNET_EXCEPTION_THROWN) {
val exceptionMessage = serDe.readString(inputStream)
throw new DotnetException(exceptionMessage)
}
callbackFlag
}

private object CallbackFlags {
val CLOSE: Int = -1
val CALLBACK: Int = -2
val DOTNET_EXCEPTION_THROWN: Int = -3
val END_OF_STREAM: Int = -4
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the .NET Foundation under one or more agreements.
* The .NET Foundation licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/

package org.apache.spark.api.dotnet

import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.channel.{ChannelFuture, ChannelInitializer, EventLoopGroup}
import io.netty.handler.codec.LengthFieldBasedFrameDecoder
import io.netty.handler.codec.bytes.{ByteArrayDecoder, ByteArrayEncoder}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.dotnet.Dotnet.DOTNET_NUM_BACKEND_THREADS
import org.apache.spark.{SparkConf, SparkEnv}

/**
* Netty server that invokes JVM calls based upon receiving messages from .NET.
* The implementation mirrors the RBackend.
*
*/
class DotnetBackend extends Logging {
self => // for accessing the this reference in inner class(ChannelInitializer)
private[this] var channelFuture: ChannelFuture = _
private[this] var bootstrap: ServerBootstrap = _
private[this] var bossGroup: EventLoopGroup = _
private[this] val objectTracker = new JVMObjectTracker

@volatile
private[dotnet] var callbackClient: Option[CallbackClient] = None

def init(portNumber: Int): Int = {
val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
val numBackendThreads = conf.get(DOTNET_NUM_BACKEND_THREADS)
logInfo(s"The number of DotnetBackend threads is set to $numBackendThreads.")
bossGroup = new NioEventLoopGroup(numBackendThreads)
val workerGroup = bossGroup

bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(classOf[NioServerSocketChannel])

bootstrap.childHandler(new ChannelInitializer[SocketChannel]() {
def initChannel(ch: SocketChannel): Unit = {
ch.pipeline()
.addLast("encoder", new ByteArrayEncoder())
.addLast(
"frameDecoder",
// maxFrameLength = 2G
// lengthFieldOffset = 0
// lengthFieldLength = 4
// lengthAdjustment = 0
// initialBytesToStrip = 4, i.e. strip out the length field itself
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
.addLast("decoder", new ByteArrayDecoder())
.addLast("handler", new DotnetBackendHandler(self, objectTracker))
}
})

channelFuture = bootstrap.bind(new InetSocketAddress("localhost", portNumber))
channelFuture.syncUninterruptibly()
channelFuture.channel().localAddress().asInstanceOf[InetSocketAddress].getPort
}

private[dotnet] def setCallbackClient(address: String, port: Int): Unit = synchronized {
callbackClient = callbackClient match {
case Some(_) => throw new Exception("Callback client already set.")
case None =>
logInfo(s"Connecting to a callback server at $address:$port")
Some(new CallbackClient(new SerDe(objectTracker), address, port))
}
}

private[dotnet] def shutdownCallbackClient(): Unit = synchronized {
callbackClient match {
case Some(client) => client.shutdown()
case None => logInfo("Callback server has already been shutdown.")
}
callbackClient = None
}

def run(): Unit = {
channelFuture.channel.closeFuture().syncUninterruptibly()
}

def close(): Unit = {
if (channelFuture != null) {
// close is a local operation and should finish within milliseconds; timeout just to be safe
channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS)
channelFuture = null
}
if (bootstrap != null && bootstrap.config().group() != null) {
bootstrap.config().group().shutdownGracefully()
}
if (bootstrap != null && bootstrap.config().childGroup() != null) {
bootstrap.config().childGroup().shutdownGracefully()
}
bootstrap = null

objectTracker.clear()

// Send close to .NET callback server.
shutdownCallbackClient()

// Shutdown the thread pool whose executors could still be running.
ThreadPool.shutdown()
}
}
Loading