Skip to content

Conversation

@jiangzho
Copy link
Contributor

What changes were proposed in this pull request?

This PR adds Java API library for Spark Operator, with the ability to generate yaml spec.

Why are the changes needed?

Spark Operator API refers to the CustomResourceDefinition(https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/) that represents the spec for Spark Application in k8s.

This module would be used by operator controller and reconciler. It can also serve external services that access k8s server with Java library.

Does this PR introduce any user-facing change?

No API changes in Apache Spark core API. Spark Operator API is proposed.

To view generate SparkApplication spec yaml, use

./gradlew :spark-operator-api:finalizeGeneratedCRD

(this requires yq to be installed for patching additional printer columns)

Generated yaml file would be located at

spark-operator-api/build/classes/java/main/META-INF/fabric8/sparkapplications.org.apache.spark-v1.yml

For more details, please also refer spark-operator-docs/spark_application.md

How was this patch tested?

This is tested locally.

Was this patch authored or co-authored using generative AI tooling?

No.


fabric8Version=6.12.1
commonsLang3Version=3.14.0
commonsIOVersion=2.16.1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fabric8 client, commons library & log4j versions are designed to be inline with Apache Spark dependency version.

@jiangzho jiangzho force-pushed the api branch 2 times, most recently from be1fb18 to 7cecb54 Compare April 23, 2024 07:12
Spark Operator API refers to the CustomResourceDefinition(https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/) that represents the spec for Spark Application in k8s.

This aims to add Java API library for Spark Operator, with the ability to generate yaml spec.

To generate SparkApplication spec yaml, use

./gradlew :spark-operator-api:finalizeGeneratedCRD

(this requires yq to be installed for patching additional printer columns)

Generated yaml file would be located at

spark-operator-api/build/classes/java/main/META-INF/fabric8/sparkapplications.org.apache.spark-v1.yml

For more details, please also refer spark-operator-docs/spark_application.md
# limitations under the License.
#

group=org.apache.spark.kubernetes.operator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*
*/

package org.apache.spark.kubernetes.operator;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. k8s.

import org.apache.spark.kubernetes.operator.status.BaseStatus;

public class BaseResource<
S,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is introduced by spotless GoogleJavaStyle plugin. I have not yet dig into the implementation, but seems this could be related to the guide for

When line-wrapping, each line after the first (each continuation line) is indented at least +4 from the original line.

... and it attempts a +4+4 here.

Spotless GoogleJavaStyle does not allow additional customization. If this line wrapping style becomes a concern we may switch to form our own style xml.

public static final String LABEL_SPARK_ROLE_EXECUTOR_VALUE = "executor";
public static final String SPARK_CONF_SENTINEL_DUMMY_FIELD = "sentinel.dummy.number";

public static final String SENTINEL_LABEL = "spark.operator/sentinel";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this has a new group?

public static final String SENTINEL_LABEL = "spark.operator/sentinel";

// Default state messages
public static final String DriverRequestedMessage = "Requested driver from resource scheduler. ";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask why this PR add a space at the end, scheduler. "?

public static final String DriverRequestedMessage = "Requested driver from resource scheduler. ";
public static final String DriverCompletedMessage = "Spark application completed successfully. ";
public static final String DriverTerminatedBeforeInitializationMessage =
"Driver container is terminated without SparkContext / SparkSession initialization. ";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is worse because we have two spaces, initialization. ".

"The Spark application is running with less than minimal number of requested "
+ "executors. ";
public static final String ExecutorLaunchTimeoutMessage =
"The Spark application failed to get enough executors in the given time threshold. ";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a space manually and repeatably is a fragile assumption. Please handle this in the message print logic.

@Builder
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class ApplicationTimeoutConfig {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask where these magic numbers came from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a few comments here. These numbers are actually the default values that we recommended for previous prod customers. We may add more documents regarding the default values.

@Builder.Default protected Long driverStartTimeoutMillis = 300 * 1000L;
@Builder.Default protected Long sparkSessionStartTimeoutMillis = 300 * 1000L;
@Builder.Default protected Long executorStartTimeoutMillis = 300 * 1000L;
@Builder.Default protected Long forceTerminationGracePeriodMillis = 300 * 1000L;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if there is a reference for the original value source.

* policy is set to 'Never'.
*/
@Builder.Default
protected ResourceRetentionPolicy resourceRetentionPolicy = ResourceRetentionPolicy.AlwaysDelete;
Copy link
Member

@dongjoon-hyun dongjoon-hyun Apr 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this include Driver Pod and ConfigMap itself? It would be great if you can mention in the above comment explicitly.

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, please double-check all license headers. These seems to be copied from some broken sources. For example, we don't have this empty line.

public class ApplicationAttemptSummary extends BaseAttemptSummary {
// The state transition history for given attempt
// This is used when state history trimming is enabled
protected Map<Long, ApplicationState> stateTransitionHistory;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question. Why do we use Map instead of List for this linear data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for the sake of unique state identification. We attempt to assign an unique id to each ApplicationState, that would be always incrementing across multiple attempts.

I also considered to add state id inside the ApplicationState instead of introducing a map of state id <-> state, but it end up with many corner cases to achieve idempotency for state transitioning.

It also serves in state transition history truncating. Sometimes this state transition history can be really long & cause big items in ETCD. The map helps us to avoid iterating the full state each time we truncate the history.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

import java.util.Set;

public enum ApplicationStateSummary implements BaseStateSummary {
/** Spark application is submitted to the cluster but yet scheduled. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. If you want to add . at the end, let's add at all sensenses.

/** A request has been made to start driver pod in the cluster */
DRIVER_REQUESTED,

/** Driver pod has reached running state */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, please use the matched K8s Pod Phase term exactly by using Running instead of running.

- /** Driver pod has reached running state */
+ /** Driver pod has reached `Running` phase */

/** Driver pod has reached running state */
DRIVER_STARTED,

/** Spark session is initialized */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please describe what does this mean.

  • Specifically, this is irrelevant to Pod Liveness or Readiness.
  • What is Spark Session here?
  • What is required in K8s Spark Operator?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically, I guess we need to mention with SparkContext or SparkSession concept together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the doc - yea this can be confusing. We'll name it in a way revealing that this indeed means driver is ready and can be exposed via service.

/** Spark session is initialized */
DRIVER_READY,

/** Less that minimal required executor pods become ready during starting up */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this description is correct. At least, this is insufficient.

  • What is the meaning of Executor Pod Become Ready?
  • Executor Pod readiness doesn't imply that Driver JVM knows this executor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated description. You are absolutely right that executor state does not imply their actual state from Spark perspective. This is a 'best effort' from operator side to observe app status without modification to the core.

We do have some ideas to optimize this in future versions. Making operator able to detect app status by:

  • connect to driver to get its registered executor information (instead of watching executor pods). I may use existing SparkUI for this purpose - user should be able to opt-in this feature if they enables pod-to-pod communication between operator and driver.
  • have the driver updates CRD status, possibly via a listener.

These future enhancement may involve core / k8s module changes.

INITIALIZED_BELOW_THRESHOLD_EXECUTORS,

/** All required executor pods started */
RUNNING_HEALTHY,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same. All required executor pods started doesn't mean that Spark Driver JVM knows these all executors.

FAILED,

/**
* The job has failed because of a scheduler side issue. e.g. driver scheduled on node with
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be misleading because we have many schedulers inside Spark too. This means K8s's pod scheduler or YuniKorn/Volcano's batch scheduler, right?


public BaseStatus(STATE initState, AS currentAttemptSummary) {
this.currentState = initState;
this.stateTransitionHistory = new TreeMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask why we choose this implementation?

# See the License for the specific language governing permissions and
# limitations under the License.
#
#
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. redundant empty line.


script_path=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
crd_path="${script_path}/../../../build/classes/java/main/META-INF/fabric8/sparkapplications.org.apache.spark-v1.yml"
yq -i '.spec.versions[0] += ({"additionalPrinterColumns": [{"jsonPath": ".status.currentState.currentStateSummary", "name": "Current State", "type": "string"}, {"jsonPath": ".metadata.creationTimestamp", "name": "Age", "type": "date"}]})' $crd_path
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this yq requirement is mentioned before, this might be a headache later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 - this is added as a workaround. Additional printer columns are "nice to have" so we did not make build task invoke this as a mandatory step.

We may spend some time fixing the original issue and remove this workaround in future version.

@@ -0,0 +1,221 @@
## Spark Application API
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove all spark-operator-docs from this PR. We didn't start it yet, @jiangzho .

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I finished the first round review, @jiangzho . Please address the review comments. Thank you.

package org.apache.spark.k8s.operator.status;

import static org.apache.spark.k8s.operator.status.ApplicationStateSummary.SUBMITTED;
import static org.apache.spark.k8s.operator.status.ApplicationStateSummary.SUCCEEDED;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use *?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wild card was disabled per style guide - but we can fully qualify static imports.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shy.. Too many lines.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, if that's the way.

void testAppendNewState() {
ApplicationStatus applicationStatus = new ApplicationStatus();
ApplicationState newState =
new ApplicationState(ApplicationStateSummary.RUNNING_HEALTHY, "foo");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like SUBMITTED and SUCCEEDED, we need to import this for consistency.

status1.terminateOrRestart(
noRetryConfig, ResourceRetainPolicy.Never, messageOverride, false);
Assertions.assertEquals(
ApplicationStateSummary.RESOURCE_RELEASED,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. After importing *, let's use a simple form.

// without retry
ApplicationStatus status1 =
new ApplicationStatus().appendNewState(new ApplicationState(SUCCEEDED, "bar"));
ApplicationStatus updatedStatus11 =
Copy link
Member

@dongjoon-hyun dongjoon-hyun Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we have a meaningful name instead of updatedStatus11? Does this mean Application 1's 2nd attempt?

}

@Test
void testTerminateOrRestart() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if you split this long test method into smaller ones.

import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodTemplateSpec;
import io.fabric8.kubernetes.api.model.PodTemplateSpecBuilder;
import org.junit.jupiter.api.Assertions;
Copy link
Member

@dongjoon-hyun dongjoon-hyun Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we do the following in all *Test.java?

- import org.junit.jupiter.api.Assertions;
+ import org.junit.jupiter.api.Assertions.*;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class AttemptInfo {
@Builder.Default protected final Long id = 0L;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't want to complex one, let's change this Long object to AtomicLong.

java.util.concurrent.atomic.AtomicLong

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use addAndGet instead of id + 1L.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, attempt info and summary are designed to be immutable after creation. Using Atomic & addAndGet would change the underlying value and may cause unintentional updates.

In the most recent commit, I remove all setter on AttemptInfo and AttemptSummary, hope that reduces the confusion

import org.apache.spark.k8s.operator.status.BaseStatus;

public class BaseResource<
S,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess 4-space would be enough because our default is 2-space.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack. updated this format & turned off formatter for this class.

STATE extends BaseState<S>,
SPEC extends BaseSpec,
STATUS extends BaseStatus<S, STATE, AS>>
extends CustomResource<SPEC, STATUS> implements Namespaced {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. 2-space.

'org.apache.spark',
)
toggleOffOn()
targetExclude "**/BaseResource.java"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. Got it. This is due to my request.. :(

* initExecutors: 5 maxExecutors: 10 sparkConf: spark.executor.instances: "10"
*
* <p>Spark would try to bring up 10 executors as defined in SparkConf. In addition, from SparkApp
* perspective, + If Spark app acquires less than 5 executors in given tine window
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this + exist for styling?

* <p>Spark would try to bring up 10 executors as defined in SparkConf. In addition, from SparkApp
* perspective, + If Spark app acquires less than 5 executors in given tine window
* (.spec.applicationTolerations.applicationTimeoutConfig.executorStartTimeoutMillis) after
* submitted, it would be shut down proactively in order to avoid resource deadlock. + Spark app
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. +

* (.spec.applicationTolerations.applicationTimeoutConfig.executorStartTimeoutMillis) after
* submitted, it would be shut down proactively in order to avoid resource deadlock. + Spark app
* would be marked as 'RUNNING_WITH_PARTIAL_CAPACITY' if it loses executors after successfully start
* up. + Spark app would be marked as 'RunningHealthy' if it has at least min executors after
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. +

&& RunningHealthy.ordinal() > this.ordinal();
}

public boolean isTerminated() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DriverEvicted is not a part of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not. it's 'stopping' as succeed / failed .etc

}

public boolean isTerminated() {
return ResourceReleased.equals(this) || TerminatedWithoutReleaseResources.equals(this);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if isTerminated considers the following.

  • ResourceRetainPolicy.Always
  • ResourceRetainPolicy.OnFailure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not - or it does that at earlier stage. Operator evaluates retain policy on 'stopping' states, and make app transition into one of the terminated state based on that

}

public boolean isStopping() {
return RunningWithBelowThresholdExecutors.ordinal() < this.ordinal() && !isTerminated();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding. this means DriverStartTimedOut, ExecutorsStartTimedOut, DriverReadyTimedOut are in isStopping?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. In addition, all succeeded / failed / evicted are considered as 'stopping' . Operator would proceed to resource releasing and restart from these stopping states

import static org.apache.spark.k8s.operator.status.ApplicationStateSummary.DriverStartTimedOut;
import static org.apache.spark.k8s.operator.status.ApplicationStateSummary.ExecutorsStartTimedOut;
import static org.apache.spark.k8s.operator.status.ApplicationStateSummary.Failed;
import static org.apache.spark.k8s.operator.status.ApplicationStateSummary.Succeeded;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggested to import *, but is this a result of one of StaticAnalyzers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's actually the Java style guide that discourages wildcard - #8 (comment) - spotless therefore marks it as violation.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you so much for your efforts, @jiangzho .
Merged to main.

@dongjoon-hyun dongjoon-hyun mentioned this pull request Apr 26, 2024
@jiangzho jiangzho deleted the api branch July 23, 2024 23:04
jiangzho referenced this pull request in jiangzho/spark-kubernetes-operator Jul 17, 2025
Updated formatting for ModelUtils
dongjoon-hyun added a commit that referenced this pull request Oct 2, 2025
### What changes were proposed in this pull request?

This PR aims to use `log4j2` instead of `log4j` by
- Use `log4j-slf4j2-impl` instead of `log4j-slf4j-impl`
- Remove `log4j-1.2-api`

### Why are the changes needed?

Apache Spark main repository has been using `log4j-slf4j2-impl` instead of `log4j-slf4j-impl` in order to use `Log4J2`.

- apache/spark#37844

Apache Spark K8s Operator repository seems to use `log4j-slf4j2-impl` mistakenly at the initial implementation.
- #8

### Does this PR introduce _any_ user-facing change?

No behavior change.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #363 from dongjoon-hyun/SPARK-53783.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants