New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25647][k8s] Add spark streaming compatibility suite for kubernetes. #22639
Conversation
0a4e222
to
d2230fc
Compare
@@ -120,7 +120,7 @@ private[spark] object SparkAppLauncher extends Logging { | |||
appConf.toStringArray :+ appArguments.mainAppResource | |||
|
|||
if (appArguments.appArgs.nonEmpty) { | |||
commandLine += appArguments.appArgs.mkString(" ") | |||
commandLine ++= appArguments.appArgs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Space separated single argument or, multiple different argument. If we do .mkString(" ")
then, it takes multi arguments as space separated single argument.
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #96987 has finished for PR 22639 at commit
|
Kubernetes integration test starting |
Test build #96989 has finished for PR 22639 at commit
|
Kubernetes integration test status failure |
Jenkins, retest this please |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #96994 has finished for PR 22639 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a lot of this could be cleaner if we deployed in cluster mode. Also deploying a separate pod to run the socket server with the extra connection for the word count to read from would be cleaner in terms of reasoning about resource management and not leaking sockets.
|
||
k8sSuite: KubernetesSuite => | ||
|
||
private def startSocketServer(): (String, Int, ServerSocket) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the submission client starting a server, and then the pod needs to be able to connect to the submission client host and port? An alternative is to deploy a separate pod that does this so that network communications are pod-to-pod instead of pod-to-host.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do it that way it's a lot easier to clear the resources and avoid trouble with sockets hanging open on the Jenkins bare metal host, for example we can just delete the whole server pod.
logInfo(s"Received connection on $socket") | ||
for (i <- 1 to 10 ) { | ||
if (socket.isConnected && !serverSocket.isClosed) { | ||
socket.getOutputStream.write("spark-streaming-kube test.\n".getBytes()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specify encoding as UTF-8.
} | ||
} | ||
|
||
private def getRunLog(_driverPodName: String): String = kubernetesTestComponents.kubernetesClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the underscore in _driverPodName
? If it's a name conflict, I would presume the driver pod name is available elsewhere already and we can just use the local var and not have an argument.
.withImagePullPolicy("IfNotPresent") | ||
.withCommand("/opt/spark/bin/run-example") | ||
.addToArgs("--master", s"k8s://https://kubernetes.default.svc") | ||
.addToArgs("--deploy-mode", mode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're creating the pod manually, the deploy mode should always be client and not cluster, so this should just be hardcoded, and the argument removed from the function signature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also why are we running in client mode if we're going to be deploying a pod anyways? We can deploy this in cluster mode using spark submit and then set up the service in front of the pod anyways.
|
||
private def driverServiceSetup(_driverPodName: String): (Int, Int, Service) = { | ||
val labels = Map("spark-app-selector" -> _driverPodName) | ||
val driverPort = 7077 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
driverPort and blockManagerPort can be constants in a companion object.
@mccheah Thanks for taking a look. Overall nice suggestion, I am okay with idea of having a pod, but struggling with creating a pod for socket server. Can you please suggest how to go about it? |
Not sure what should be the command for running, it.
Kubernetes integration test starting |
Test build #97250 has finished for PR 22639 at commit
|
Kubernetes integration test status failure |
import StreamingCompatibilitySuite._ | ||
|
||
test("Run spark streaming in client mode.", k8sTestTag) { | ||
val (host, port, serverSocket) = startSocketServer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use a custom source as an alternative for feeding the stream. Re-using existing code is also nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please correct my understanding, a custom source has to either live in examples, or a separate image has to be published with the class path of the custom source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am more inclined to add in the examples. Just an alternative option.
Jenkins, retest this please |
Test build #97381 has finished for PR 22639 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
.withLabels(labels.asJava) | ||
.endMetadata() | ||
.withNewSpec() | ||
.withServiceAccountName(kubernetesTestComponents.serviceAccountName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flakiness of the tests seems to be in relation to this service account. Investigate the necessary rbac.yml that would need to be set to ensure that these failures don't come up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same is used in "run in client mode" test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure, what is causing it. Do you have any clue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe this was solved in the Client mode tests, despite the addition of the spark-rbac.yml
I think this might require investigation outside of the scope of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have run these tests on my own setup of minikube, and I am unable to reproduce the failure that occurred on jenkins. It is possible that is related to how minikube is setup on jenkins.
Jenkins, retest this please. |
Kubernetes integration test starting |
Test build #98006 has finished for PR 22639 at commit
|
Kubernetes integration test status success |
@ScrapCodes could you update the PR according to @mccheah comments? I think we will be good to go then. |
If this isn't being worked on it should probably be closed. |
I intend to finish this soon, if you like I can keep it closed until then? |
What changes were proposed in this pull request?
Adds integration tests for spark streaming compatibility with K8s mode.
How was this patch tested?
By running the test suites.