From 390b67f7c187600847dc82b9b5a46138a06b593d Mon Sep 17 00:00:00 2001 From: Alessio Marinelli Date: Mon, 10 Nov 2025 17:12:05 +0100 Subject: [PATCH 1/9] Add Apache Spark operator - Add spark.py operator module with create/delete/patch functions - Add Kubernetes manifests (StatefulSet, Deployment, Services, RBAC) - Integrate Spark operator in main.py (create/delete handlers) - Extend Whisk CRD with Spark configuration schema - Add test configuration example (whisk-spark.yaml) Features: - Spark Master with optional HA support - Scalable Spark Workers (1 to N replicas) - History Server with persistent storage - Complete RBAC configuration - Health checks and resource management - Event logging support Implements all OpenServerless operator patterns: - Kopf framework handlers - Owner references for garbage collection - Kustomize templating - Status tracking in CRD - Error handling and logging --- deploy/nuvolaris-permissions/whisk-crd.yaml | 125 ++++++- deploy/spark/00-spark-rbac.yaml | 39 ++ deploy/spark/01-spark-configmap.yaml | 58 +++ deploy/spark/02-spark-history-pvc.yaml | 14 + deploy/spark/03-spark-master-sts.yaml | 96 +++++ deploy/spark/04-spark-master-svc.yaml | 22 ++ deploy/spark/05-spark-worker-sts.yaml | 100 ++++++ deploy/spark/06-spark-worker-svc.yaml | 23 ++ deploy/spark/07-spark-history-dep.yaml | 89 +++++ deploy/spark/08-spark-history-svc.yaml | 18 + deploy/spark/spark-crd-extension.yaml | 150 ++++++++ nuvolaris/main.py | 30 +- nuvolaris/spark.py | 379 ++++++++++++++++++++ tests/kind/whisk-spark.yaml | 66 ++++ 14 files changed, 1204 insertions(+), 5 deletions(-) create mode 100644 deploy/spark/00-spark-rbac.yaml create mode 100644 deploy/spark/01-spark-configmap.yaml create mode 100644 deploy/spark/02-spark-history-pvc.yaml create mode 100644 deploy/spark/03-spark-master-sts.yaml create mode 100644 deploy/spark/04-spark-master-svc.yaml create mode 100644 deploy/spark/05-spark-worker-sts.yaml create mode 100644 deploy/spark/06-spark-worker-svc.yaml create mode 100644 deploy/spark/07-spark-history-dep.yaml create mode 100644 deploy/spark/08-spark-history-svc.yaml create mode 100644 deploy/spark/spark-crd-extension.yaml create mode 100644 nuvolaris/spark.py create mode 100644 tests/kind/whisk-spark.yaml diff --git a/deploy/nuvolaris-permissions/whisk-crd.yaml b/deploy/nuvolaris-permissions/whisk-crd.yaml index 5cc235d..79ab48e 100644 --- a/deploy/nuvolaris-permissions/whisk-crd.yaml +++ b/deploy/nuvolaris-permissions/whisk-crd.yaml @@ -152,6 +152,9 @@ spec: type: boolean seaweedfs: description: deploys an S3 compatible layer using a standalone deployment of seaweedfs + type: boolean + spark: + description: deploys Apache Spark cluster for distributed data processing type: boolean required: - openwhisk @@ -885,7 +888,120 @@ spec: - console-enabled required: - volume-size - - nuvolaris + - nuvolaris + spark: + description: Apache Spark cluster configuration for distributed data processing + type: object + properties: + image: + description: Docker image for Spark components + type: string + default: "apache/spark:3.5.0" + version: + description: Spark version + type: string + default: "3.5.0" + master: + description: Spark Master configuration + type: object + properties: + replicas: + description: Number of master replicas (1 or 3 for HA) + type: integer + minimum: 1 + maximum: 3 + default: 1 + memory: + description: Memory allocation for master (e.g., 1g, 2g) + type: string + default: "1g" + cpu: + description: CPU allocation for master (e.g., 1000m) + type: string + default: "1000m" + port: + description: Spark master port + type: integer + default: 7077 + webui-port: + description: Spark master web UI port + type: integer + default: 8080 + worker: + description: Spark Worker configuration + type: object + properties: + replicas: + description: Number of worker replicas + type: integer + minimum: 1 + default: 2 + memory: + description: Memory allocation per worker (e.g., 2g, 4g) + type: string + default: "2g" + cpu: + description: CPU allocation per worker (e.g., 2000m) + type: string + default: "2000m" + cores: + description: Number of cores per worker + type: integer + minimum: 1 + default: 2 + webui-port: + description: Spark worker web UI port + type: integer + default: 8081 + history-server: + description: Spark History Server configuration + type: object + properties: + enabled: + description: Enable Spark History Server + type: boolean + default: true + port: + description: History Server web UI port + type: integer + default: 18080 + volume-size: + description: Storage size for event logs in Gi + type: integer + minimum: 1 + default: 10 + event-log: + description: Spark event logging configuration + type: object + properties: + enabled: + description: Enable event logging + type: boolean + default: true + dir: + description: Directory for event logs + type: string + default: "/tmp/spark-events" + ha: + description: High Availability configuration + type: object + properties: + enabled: + description: Enable HA mode (requires Zookeeper) + type: boolean + default: false + zookeeper-url: + description: Zookeeper connection URL for HA + type: string + default: "" + user: + description: Spark user + type: string + default: "spark" + uid: + description: Spark user ID + type: integer + default: 185 status: x-kubernetes-preserve-unknown-fields: true # type: object @@ -1003,4 +1119,9 @@ spec: type: string priority: 0 jsonPath: .status.whisk_create.seaweedfs - description: Seaweedfs \ No newline at end of file + description: Seaweedfs + - name: Spark + type: string + priority: 0 + jsonPath: .status.whisk_create.spark + description: Spark \ No newline at end of file diff --git a/deploy/spark/00-spark-rbac.yaml b/deploy/spark/00-spark-rbac.yaml new file mode 100644 index 0000000..49a4db5 --- /dev/null +++ b/deploy/spark/00-spark-rbac.yaml @@ -0,0 +1,39 @@ +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: spark + namespace: nuvolaris + labels: + app: spark +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: spark-role + namespace: nuvolaris + labels: + app: spark +rules: +- apiGroups: [""] + resources: ["pods", "services", "configmaps"] + verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] +- apiGroups: [""] + resources: ["persistentvolumeclaims"] + verbs: ["get", "list"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: spark-rolebinding + namespace: nuvolaris + labels: + app: spark +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: spark-role +subjects: +- kind: ServiceAccount + name: spark + namespace: nuvolaris diff --git a/deploy/spark/01-spark-configmap.yaml b/deploy/spark/01-spark-configmap.yaml new file mode 100644 index 0000000..51e50bb --- /dev/null +++ b/deploy/spark/01-spark-configmap.yaml @@ -0,0 +1,58 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: spark-config + namespace: nuvolaris + labels: + app: spark +data: + spark-defaults.conf: | + spark.master spark://spark-master:7077 + spark.eventLog.enabled true + spark.eventLog.dir /tmp/spark-events + spark.history.fs.logDirectory /tmp/spark-events + spark.history.ui.port 18080 + spark.serializer org.apache.spark.serializer.KryoSerializer + spark.driver.memory 1g + spark.executor.memory 2g + spark.executor.cores 2 + spark.cores.max 4 + spark.sql.warehouse.dir /tmp/spark-warehouse + spark.driver.extraJavaOptions -Dlog4j.configuration=file:///opt/spark/conf/log4j.properties + spark.executor.extraJavaOptions -Dlog4j.configuration=file:///opt/spark/conf/log4j.properties + + spark-env.sh: | + #!/usr/bin/env bash + + export SPARK_MASTER_HOST=spark-master + export SPARK_MASTER_PORT=7077 + export SPARK_MASTER_WEBUI_PORT=8080 + export SPARK_WORKER_WEBUI_PORT=8081 + export SPARK_WORKER_PORT=7078 + export SPARK_WORKER_CORES=2 + export SPARK_WORKER_MEMORY=2g + export SPARK_DAEMON_MEMORY=1g + export SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=/tmp/spark-events" + + log4j.properties: | + # Set everything to be logged to the console + log4j.rootCategory=INFO, console + log4j.appender.console=org.apache.log4j.ConsoleAppender + log4j.appender.console.target=System.err + log4j.appender.console.layout=org.apache.log4j.PatternLayout + log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + + # Set the default spark-shell log level to WARN + log4j.logger.org.apache.spark.repl.Main=WARN + + # Settings to quiet third party logs + log4j.logger.org.spark_project.jetty=WARN + log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR + log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO + log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO + log4j.logger.org.apache.parquet=ERROR + log4j.logger.parquet=ERROR + + # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs + log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL + log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR diff --git a/deploy/spark/02-spark-history-pvc.yaml b/deploy/spark/02-spark-history-pvc.yaml new file mode 100644 index 0000000..bd8f5fc --- /dev/null +++ b/deploy/spark/02-spark-history-pvc.yaml @@ -0,0 +1,14 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: spark-history-pvc + namespace: nuvolaris + labels: + app: spark + component: spark-history +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Gi diff --git a/deploy/spark/03-spark-master-sts.yaml b/deploy/spark/03-spark-master-sts.yaml new file mode 100644 index 0000000..87084b3 --- /dev/null +++ b/deploy/spark/03-spark-master-sts.yaml @@ -0,0 +1,96 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: spark-master + namespace: nuvolaris + labels: + app: spark + component: spark-master +spec: + serviceName: spark-master + replicas: 1 + selector: + matchLabels: + app: spark + component: spark-master + template: + metadata: + labels: + app: spark + component: spark-master + spec: + serviceAccountName: spark + securityContext: + fsGroup: 185 + containers: + - name: spark-master + image: apache/spark:3.5.0 + imagePullPolicy: IfNotPresent + command: + - /bin/bash + - -c + - | + # Copy config files + cp /etc/spark-config/* /opt/spark/conf/ + + # Start Spark Master + /opt/spark/sbin/start-master.sh + + # Keep container running and tail logs + tail -f /opt/spark/logs/* + ports: + - name: master + containerPort: 7077 + protocol: TCP + - name: webui + containerPort: 8080 + protocol: TCP + env: + - name: SPARK_MODE + value: "master" + - name: SPARK_MASTER_HOST + value: "spark-master" + - name: SPARK_MASTER_PORT + value: "7077" + - name: SPARK_MASTER_WEBUI_PORT + value: "8080" + - name: SPARK_DAEMON_MEMORY + value: "1g" + - name: SPARK_NO_DAEMONIZE + value: "false" + resources: + requests: + cpu: 500m + memory: 1Gi + limits: + cpu: 1000m + memory: 2Gi + volumeMounts: + - name: spark-config + mountPath: /etc/spark-config + - name: spark-work + mountPath: /opt/spark/work + - name: spark-logs + mountPath: /opt/spark/logs + livenessProbe: + httpGet: + path: / + port: webui + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 5 + readinessProbe: + httpGet: + path: / + port: webui + initialDelaySeconds: 15 + periodSeconds: 5 + timeoutSeconds: 3 + volumes: + - name: spark-config + configMap: + name: spark-config + - name: spark-work + emptyDir: {} + - name: spark-logs + emptyDir: {} diff --git a/deploy/spark/04-spark-master-svc.yaml b/deploy/spark/04-spark-master-svc.yaml new file mode 100644 index 0000000..070ff87 --- /dev/null +++ b/deploy/spark/04-spark-master-svc.yaml @@ -0,0 +1,22 @@ +apiVersion: v1 +kind: Service +metadata: + name: spark-master + namespace: nuvolaris + labels: + app: spark + component: spark-master +spec: + type: ClusterIP + selector: + app: spark + component: spark-master + ports: + - name: master + port: 7077 + targetPort: 7077 + protocol: TCP + - name: webui + port: 8080 + targetPort: 8080 + protocol: TCP diff --git a/deploy/spark/05-spark-worker-sts.yaml b/deploy/spark/05-spark-worker-sts.yaml new file mode 100644 index 0000000..47e339e --- /dev/null +++ b/deploy/spark/05-spark-worker-sts.yaml @@ -0,0 +1,100 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: spark-worker + namespace: nuvolaris + labels: + app: spark + component: spark-worker +spec: + serviceName: spark-worker + replicas: 2 + selector: + matchLabels: + app: spark + component: spark-worker + template: + metadata: + labels: + app: spark + component: spark-worker + spec: + serviceAccountName: spark + securityContext: + fsGroup: 185 + containers: + - name: spark-worker + image: apache/spark:3.5.0 + imagePullPolicy: IfNotPresent + command: + - /bin/bash + - -c + - | + # Copy config files + cp /etc/spark-config/* /opt/spark/conf/ + + # Start Spark Worker + /opt/spark/sbin/start-worker.sh spark://spark-master:7077 + + # Keep container running and tail logs + tail -f /opt/spark/logs/* + ports: + - name: worker + containerPort: 7078 + protocol: TCP + - name: webui + containerPort: 8081 + protocol: TCP + env: + - name: SPARK_MODE + value: "worker" + - name: SPARK_MASTER_URL + value: "spark://spark-master:7077" + - name: SPARK_WORKER_CORES + value: "2" + - name: SPARK_WORKER_MEMORY + value: "2g" + - name: SPARK_WORKER_PORT + value: "7078" + - name: SPARK_WORKER_WEBUI_PORT + value: "8081" + - name: SPARK_DAEMON_MEMORY + value: "1g" + - name: SPARK_NO_DAEMONIZE + value: "false" + resources: + requests: + cpu: 1000m + memory: 2Gi + limits: + cpu: 2000m + memory: 4Gi + volumeMounts: + - name: spark-config + mountPath: /etc/spark-config + - name: spark-work + mountPath: /opt/spark/work + - name: spark-logs + mountPath: /opt/spark/logs + livenessProbe: + httpGet: + path: / + port: webui + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 5 + readinessProbe: + httpGet: + path: / + port: webui + initialDelaySeconds: 15 + periodSeconds: 5 + timeoutSeconds: 3 + volumes: + - name: spark-config + configMap: + name: spark-config + - name: spark-work + emptyDir: {} + - name: spark-logs + emptyDir: {} diff --git a/deploy/spark/06-spark-worker-svc.yaml b/deploy/spark/06-spark-worker-svc.yaml new file mode 100644 index 0000000..cb73366 --- /dev/null +++ b/deploy/spark/06-spark-worker-svc.yaml @@ -0,0 +1,23 @@ +apiVersion: v1 +kind: Service +metadata: + name: spark-worker + namespace: nuvolaris + labels: + app: spark + component: spark-worker +spec: + type: ClusterIP + clusterIP: None # Headless service + selector: + app: spark + component: spark-worker + ports: + - name: worker + port: 7078 + targetPort: 7078 + protocol: TCP + - name: webui + port: 8081 + targetPort: 8081 + protocol: TCP diff --git a/deploy/spark/07-spark-history-dep.yaml b/deploy/spark/07-spark-history-dep.yaml new file mode 100644 index 0000000..cbed9d3 --- /dev/null +++ b/deploy/spark/07-spark-history-dep.yaml @@ -0,0 +1,89 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: spark-history + namespace: nuvolaris + labels: + app: spark + component: spark-history +spec: + replicas: 1 + selector: + matchLabels: + app: spark + component: spark-history + template: + metadata: + labels: + app: spark + component: spark-history + spec: + serviceAccountName: spark + securityContext: + fsGroup: 185 + containers: + - name: spark-history + image: apache/spark:3.5.0 + imagePullPolicy: IfNotPresent + command: + - /bin/bash + - -c + - | + # Copy config files + cp /etc/spark-config/* /opt/spark/conf/ + + # Create event log directory + mkdir -p /tmp/spark-events + chmod 777 /tmp/spark-events + + # Start History Server + /opt/spark/sbin/start-history-server.sh + + # Keep container running and tail logs + tail -f /opt/spark/logs/* + ports: + - name: webui + containerPort: 18080 + protocol: TCP + env: + - name: SPARK_NO_DAEMONIZE + value: "false" + - name: SPARK_HISTORY_OPTS + value: "-Dspark.history.fs.logDirectory=/tmp/spark-events -Dspark.history.ui.port=18080" + resources: + requests: + cpu: 250m + memory: 512Mi + limits: + cpu: 500m + memory: 1Gi + volumeMounts: + - name: spark-config + mountPath: /etc/spark-config + - name: spark-events + mountPath: /tmp/spark-events + - name: spark-logs + mountPath: /opt/spark/logs + livenessProbe: + httpGet: + path: / + port: webui + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 5 + readinessProbe: + httpGet: + path: / + port: webui + initialDelaySeconds: 15 + periodSeconds: 5 + timeoutSeconds: 3 + volumes: + - name: spark-config + configMap: + name: spark-config + - name: spark-events + persistentVolumeClaim: + claimName: spark-history-pvc + - name: spark-logs + emptyDir: {} diff --git a/deploy/spark/08-spark-history-svc.yaml b/deploy/spark/08-spark-history-svc.yaml new file mode 100644 index 0000000..95fdb7a --- /dev/null +++ b/deploy/spark/08-spark-history-svc.yaml @@ -0,0 +1,18 @@ +apiVersion: v1 +kind: Service +metadata: + name: spark-history + namespace: nuvolaris + labels: + app: spark + component: spark-history +spec: + type: ClusterIP + selector: + app: spark + component: spark-history + ports: + - name: webui + port: 18080 + targetPort: 18080 + protocol: TCP diff --git a/deploy/spark/spark-crd-extension.yaml b/deploy/spark/spark-crd-extension.yaml new file mode 100644 index 0000000..a975bc0 --- /dev/null +++ b/deploy/spark/spark-crd-extension.yaml @@ -0,0 +1,150 @@ +# ============================================================================== +# SPARK CRD EXTENSION +# ============================================================================== +# This snippet should be added to deploy/nuvolaris-permissions/whisk-crd.yaml +# in the spec.versions[0].schema.openAPIV3Schema.properties.spec.properties section +# ============================================================================== + +components: + type: object + properties: + # ... existing components ... + + spark: + description: "Enable/disable Apache Spark cluster deployment" + type: boolean + +# Spark configuration section +spark: + description: "Apache Spark cluster configuration" + type: object + properties: + image: + description: "Docker image for Spark (master, worker, history server)" + type: string + default: "apache/spark:3.5.0" + + version: + description: "Spark version" + type: string + default: "3.5.0" + + master: + description: "Spark Master configuration" + type: object + properties: + replicas: + description: "Number of master replicas (1 or 3 for HA)" + type: integer + minimum: 1 + maximum: 3 + default: 1 + + memory: + description: "Memory allocation for master (e.g., 1g, 2g)" + type: string + default: "1g" + + cpu: + description: "CPU allocation for master (e.g., 1000m)" + type: string + default: "1000m" + + port: + description: "Spark master port" + type: integer + default: 7077 + + webui-port: + description: "Spark master web UI port" + type: integer + default: 8080 + + worker: + description: "Spark Worker configuration" + type: object + properties: + replicas: + description: "Number of worker replicas" + type: integer + minimum: 1 + default: 2 + + memory: + description: "Memory allocation per worker (e.g., 2g, 4g)" + type: string + default: "2g" + + cpu: + description: "CPU allocation per worker (e.g., 2000m)" + type: string + default: "2000m" + + cores: + description: "Number of cores per worker" + type: integer + minimum: 1 + default: 2 + + webui-port: + description: "Spark worker web UI port" + type: integer + default: 8081 + + history-server: + description: "Spark History Server configuration" + type: object + properties: + enabled: + description: "Enable Spark History Server" + type: boolean + default: true + + port: + description: "History Server web UI port" + type: integer + default: 18080 + + volume-size: + description: "Storage size for event logs in Gi" + type: integer + minimum: 1 + default: 10 + + event-log: + description: "Spark event logging configuration" + type: object + properties: + enabled: + description: "Enable event logging" + type: boolean + default: true + + dir: + description: "Directory for event logs" + type: string + default: "/tmp/spark-events" + + ha: + description: "High Availability configuration" + type: object + properties: + enabled: + description: "Enable HA mode (requires Zookeeper)" + type: boolean + default: false + + zookeeper-url: + description: "Zookeeper connection URL for HA" + type: string + default: "" + + user: + description: "Spark user" + type: string + default: "spark" + + uid: + description: "Spark user ID" + type: integer + default: 185 diff --git a/nuvolaris/main.py b/nuvolaris/main.py index 2c72f6e..b4971f5 100644 --- a/nuvolaris/main.py +++ b/nuvolaris/main.py @@ -28,6 +28,7 @@ import nuvolaris.ferretdb as mongodb import nuvolaris.issuer as issuer import nuvolaris.endpoint as endpoint +import nuvolaris.spark as spark import nuvolaris.minio_deploy as minio import nuvolaris.zookeeper as zookeeper import nuvolaris.kafka as kafka @@ -83,7 +84,8 @@ def whisk_create(spec, name, **kwargs): "zookeeper": "?", #Zookeeper configuration "quota":"?", #Quota configuration "etcd":"?", #Etcd configuration - "milvus":"?" #Milvus configuration + "milvus":"?", #Milvus configuration + "spark":"?" #Spark configuration } if cfg.get('components.minio') and cfg.get('components.seaweedfs'): @@ -287,7 +289,20 @@ def whisk_create(spec, name, **kwargs): logging.exception("cannot create milvus") state['milvus']= "error" else: - state['milvus'] = "off" + state['milvus'] = "off" + + # Deploy Spark cluster + if cfg.get('components.spark'): + try: + logging.info("deploying spark cluster") + msg = spark.create(owner) + state['spark'] = "on" + logging.info(msg) + except Exception as e: + logging.exception("cannot create spark cluster") + state['spark'] = "error" + else: + state['spark'] = "off" whisk_post_create(name,state) state['controller']= "Ready" @@ -375,7 +390,16 @@ def whisk_delete(spec, **kwargs): if cfg.get("components.milvus"): msg = milvus.delete() - logging.info(msg) + logging.info(msg) + + # Delete Spark cluster + if cfg.get("components.spark"): + try: + logging.info("deleting spark cluster") + msg = spark.delete() + logging.info(msg) + except Exception as e: + logging.exception("failed to delete spark cluster") if cfg.get("components.registry"): msg = registry.delete() diff --git a/nuvolaris/spark.py b/nuvolaris/spark.py new file mode 100644 index 0000000..1ff9569 --- /dev/null +++ b/nuvolaris/spark.py @@ -0,0 +1,379 @@ +""" +Apache Spark Operator for OpenServerless + +This operator manages Apache Spark cluster deployment including: +- Spark Master (high availability optional) +- Spark Workers +- Spark History Server +- Resource management and scaling +""" + +import kopf +import logging +import nuvolaris.kustomize as kus +import nuvolaris.kube as kube +import nuvolaris.config as cfg +import nuvolaris.util as util +import nuvolaris.operator_util as operator_util +import time +import subprocess + + +def get_spark_config_data(): + """ + Collect Spark configuration from CRD spec + + Returns: + Dict with complete Spark configuration + """ + namespace = cfg.get('nuvolaris.namespace', default='nuvolaris') + + data = { + # Basic configuration + "name": "spark", + "namespace": namespace, + + # Spark images + "spark_image": cfg.get('spark.image', default='apache/spark:3.5.0'), + "spark_version": cfg.get('spark.version', default='3.5.0'), + + # Master configuration + "master_replicas": cfg.get('spark.master.replicas', default=1), + "master_memory": cfg.get('spark.master.memory', default='1g'), + "master_cpu": cfg.get('spark.master.cpu', default='1000m'), + "master_port": cfg.get('spark.master.port', default=7077), + "master_webui_port": cfg.get('spark.master.webui-port', default=8080), + + # Worker configuration + "worker_replicas": cfg.get('spark.worker.replicas', default=2), + "worker_memory": cfg.get('spark.worker.memory', default='2g'), + "worker_cpu": cfg.get('spark.worker.cpu', default='2000m'), + "worker_cores": cfg.get('spark.worker.cores', default=2), + "worker_webui_port": cfg.get('spark.worker.webui-port', default=8081), + + # History Server configuration + "history_enabled": cfg.get('spark.history-server.enabled', default=True), + "history_port": cfg.get('spark.history-server.port', default=18080), + "history_volume_size": cfg.get('spark.history-server.volume-size', default=10), + + # Storage configuration + "event_log_enabled": cfg.get('spark.event-log.enabled', default=True), + "event_log_dir": cfg.get('spark.event-log.dir', default='/tmp/spark-events'), + + # High Availability (optional) + "ha_enabled": cfg.get('spark.ha.enabled', default=False), + "ha_zookeeper_url": cfg.get('spark.ha.zookeeper-url', default=''), + + # Affinity and tolerations + "affinity": cfg.get('affinity', default=False), + "affinity_core_node_label": cfg.get('affinity-core-node-label', default='nuvolaris'), + "tolerations": cfg.get('tolerations', default=False), + + # Security + "spark_user": cfg.get('spark.user', default='spark'), + "spark_uid": cfg.get('spark.uid', default=185), + } + + return data + + +def create(owner=None): + """ + Deploy Apache Spark cluster on Kubernetes + + Creates: + - Spark Master (StatefulSet with optional HA) + - Spark Workers (StatefulSet) + - Spark History Server (Deployment with PVC) + - Required Services + - ConfigMaps for configuration + + Args: + owner: Owner reference for garbage collection + + Returns: + Result message from deployment + """ + logging.info("*** creating spark cluster") + + # 1. Collect configuration + data = get_spark_config_data() + + # 2. Define templates to apply + tplp = [ + "00-spark-rbac.yaml", # ServiceAccount, Role, RoleBinding + "01-spark-configmap.yaml", # Spark configuration + "02-spark-history-pvc.yaml", # PVC for History Server + "03-spark-master-sts.yaml", # Master StatefulSet + "04-spark-master-svc.yaml", # Master Service + "05-spark-worker-sts.yaml", # Worker StatefulSet + "06-spark-worker-svc.yaml", # Worker Service (headless) + ] + + # Add History Server if enabled + if data['history_enabled']: + tplp.append("07-spark-history-dep.yaml") + tplp.append("08-spark-history-svc.yaml") + + # 3. Generate kustomization with patches + kust = kus.patchTemplates("spark", tplp, data) + + # 4. Additional Jinja2 templates + templates = [] + if data['affinity']: + templates.append('affinity-tolerance-sts-core-attach.yaml') + + # 5. Build complete specification + spec = kus.kustom_list("spark", kust, templates=templates, data=data) + + # 6. Apply owner reference for garbage collection + if owner: + kopf.append_owner_reference(spec['items'], owner) + else: + # Save spec for delete without owner + cfg.put("state.spark.spec", spec) + + # 7. Deploy to Kubernetes + res = kube.apply(spec) + logging.info("spark manifests applied") + + # 8. Wait for Master to be ready + logging.info("waiting for spark master to be ready...") + util.wait_for_pod_ready( + "{.items[?(@.metadata.labels.component == 'spark-master')].metadata.name}", + timeout=300 + ) + logging.info("spark master is ready") + + # 9. Wait for Workers to be ready + logging.info("waiting for spark workers to be ready...") + time.sleep(10) # Give workers time to start connecting + util.wait_for_pod_ready( + "{.items[?(@.metadata.labels.component == 'spark-worker')].metadata.name}", + timeout=300 + ) + logging.info("spark workers are ready") + + # 10. Wait for History Server if enabled + if data['history_enabled']: + logging.info("waiting for spark history server to be ready...") + util.wait_for_pod_ready( + "{.items[?(@.metadata.labels.component == 'spark-history')].metadata.name}", + timeout=180 + ) + logging.info("spark history server is ready") + + # 11. Post-configuration + configure_spark(data) + + logging.info("*** spark cluster created successfully") + return res + + +def delete(owner=None): + """ + Remove Apache Spark cluster from Kubernetes + + Args: + owner: Owner reference (if present, uses delete by owner) + + Returns: + Result message from deletion + """ + logging.info("*** deleting spark cluster") + + if owner: + # Delete via owner reference rebuild + spec = kus.build("spark") + else: + # Delete via saved spec + spec = cfg.get("state.spark.spec") + + if spec: + res = kube.delete(spec) + logging.info(f"deleted spark cluster: {res}") + return res + + logging.warning("no spark spec found") + return False + + +def patch(status, action, owner=None): + """ + Handle update/patch of Spark component + + Args: + status: Status object of the CRD + action: 'create' or 'delete' + owner: Owner reference + """ + try: + logging.info(f"*** handling {action} spark") + + if action == 'create': + msg = create(owner) + operator_util.patch_operator_status(status, 'spark', 'on') + else: + msg = delete(owner) + operator_util.patch_operator_status(status, 'spark', 'off') + + logging.info(msg) + + except Exception as e: + logging.error(f'*** failed to {action} spark: {e}') + operator_util.patch_operator_status(status, 'spark', 'error') + raise + + +def configure_spark(data): + """ + Post-deployment configuration for Spark cluster + + Tasks: + - Verify master-worker connectivity + - Create event log directory if using local storage + - Configure History Server event log scanning + - Verify cluster health + + Args: + data: Configuration data dictionary + """ + logging.info("configuring spark cluster") + + try: + # 1. Verify Spark Master is accessible + namespace = data['namespace'] + master_url = f"spark://spark-master:7077" + + logging.info(f"verifying spark master at {master_url}") + + # 2. Check worker registration (via master web UI) + expected_workers = data['worker_replicas'] + max_retries = 12 + retry_delay = 5 + + for attempt in range(max_retries): + try: + # Query master web UI for registered workers + result = subprocess.run( + ['kubectl', 'exec', '-n', namespace, + 'spark-master-0', '--', + 'curl', '-s', 'http://localhost:8080/json/'], + capture_output=True, + text=True, + timeout=10 + ) + + if result.returncode == 0: + import json + master_info = json.loads(result.stdout) + active_workers = len(master_info.get('workers', [])) + + logging.info(f"spark master has {active_workers}/{expected_workers} workers registered") + + if active_workers >= expected_workers: + logging.info("all spark workers registered successfully") + break + else: + logging.warning(f"failed to query spark master: {result.stderr}") + + except Exception as e: + logging.warning(f"attempt {attempt+1}/{max_retries} to verify workers failed: {e}") + + if attempt < max_retries - 1: + logging.info(f"waiting {retry_delay}s before retry...") + time.sleep(retry_delay) + + # 3. Create event log directory if using local storage + if data['event_log_enabled'] and data['history_enabled']: + logging.info("ensuring spark event log directory exists") + + # Create directory on history server pod + result = subprocess.run( + ['kubectl', 'exec', '-n', namespace, + 'deployment/spark-history', '--', + 'mkdir', '-p', data['event_log_dir']], + capture_output=True, + text=True + ) + + if result.returncode == 0: + logging.info(f"event log directory {data['event_log_dir']} created") + else: + logging.warning(f"failed to create event log directory: {result.stderr}") + + # 4. Log cluster configuration summary + logging.info("spark cluster configuration:") + logging.info(f" - Master: {data['master_replicas']} replica(s)") + logging.info(f" - Workers: {data['worker_replicas']} replica(s)") + logging.info(f" - Worker cores: {data['worker_cores']} per worker") + logging.info(f" - Worker memory: {data['worker_memory']} per worker") + logging.info(f" - History Server: {'enabled' if data['history_enabled'] else 'disabled'}") + logging.info(f" - High Availability: {'enabled' if data['ha_enabled'] else 'disabled'}") + + logging.info("spark cluster configuration completed") + + except Exception as e: + logging.error(f"error during spark post-configuration: {e}") + # Don't raise - allow cluster to be usable even if post-config fails + logging.warning("spark cluster is deployed but post-configuration had issues") + + +def scale_workers(replicas): + """ + Scale Spark workers to specified number of replicas + + Args: + replicas: Target number of worker replicas + + Returns: + Result message + """ + logging.info(f"scaling spark workers to {replicas} replicas") + + namespace = cfg.get('nuvolaris.namespace', default='nuvolaris') + + result = subprocess.run( + ['kubectl', 'scale', 'statefulset', 'spark-worker', + '-n', namespace, + f'--replicas={replicas}'], + capture_output=True, + text=True + ) + + if result.returncode == 0: + logging.info(f"spark workers scaled to {replicas}") + return f"scaled to {replicas} workers" + else: + raise Exception(f"failed to scale workers: {result.stderr}") + + +def get_cluster_info(): + """ + Get current Spark cluster information + + Returns: + Dict with cluster status and metrics + """ + namespace = cfg.get('nuvolaris.namespace', default='nuvolaris') + + try: + # Get master info + result = subprocess.run( + ['kubectl', 'exec', '-n', namespace, + 'spark-master-0', '--', + 'curl', '-s', 'http://localhost:8080/json/'], + capture_output=True, + text=True, + timeout=10 + ) + + if result.returncode == 0: + import json + return json.loads(result.stdout) + else: + return {"error": "failed to get cluster info"} + + except Exception as e: + logging.error(f"error getting cluster info: {e}") + return {"error": str(e)} diff --git a/tests/kind/whisk-spark.yaml b/tests/kind/whisk-spark.yaml new file mode 100644 index 0000000..e6af45e --- /dev/null +++ b/tests/kind/whisk-spark.yaml @@ -0,0 +1,66 @@ +apiVersion: nuvolaris.org/v1 +kind: Whisk +metadata: + name: controller + namespace: nuvolaris +spec: + # Core components + components: + openwhisk: true + couchdb: true + redis: true + minio: true + + # Enable Spark cluster + spark: true + + # Nuvolaris configuration + nuvolaris: + namespace: nuvolaris + + # Spark cluster configuration + spark: + # Image configuration + image: apache/spark:3.5.0 + version: "3.5.0" + + # Master configuration + master: + replicas: 1 + memory: 1g + cpu: "1000m" + port: 7077 + webui-port: 8080 + + # Worker configuration + worker: + replicas: 2 + memory: 2g + cpu: "2000m" + cores: 2 + webui-port: 8081 + + # History Server configuration + history-server: + enabled: true + port: 18080 + volume-size: 10 + + # Event logging + event-log: + enabled: true + dir: /tmp/spark-events + + # High Availability (optional - requires Zookeeper) + ha: + enabled: false + zookeeper-url: "" + + # Security + user: spark + uid: 185 + + # Affinity and tolerations (optional) + affinity: false + affinity-core-node-label: nuvolaris + tolerations: false From 545573f1f347b9954e606df1f7a4b2f981c25cea Mon Sep 17 00:00:00 2001 From: Alessio Marinelli Date: Thu, 13 Nov 2025 15:51:31 +0100 Subject: [PATCH 2/9] fix: remove whisk-system mkdir from sources stage (Kaniko permission) --- Dockerfile | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/Dockerfile b/Dockerfile index 99d3441..e863aaa 100644 --- a/Dockerfile +++ b/Dockerfile @@ -52,10 +52,9 @@ ADD --chown=nuvolaris:nuvolaris deploy/postgres-operator-deploy /home/nuvolaris/ ADD --chown=nuvolaris:nuvolaris deploy/ferretdb /home/nuvolaris/deploy/ferretdb ADD --chown=nuvolaris:nuvolaris deploy/runtimes /home/nuvolaris/deploy/runtimes ADD --chown=nuvolaris:nuvolaris deploy/postgres-backup /home/nuvolaris/deploy/postgres-backup +ADD --chown=nuvolaris:nuvolaris deploy/spark /home/nuvolaris/deploy/spark ADD --chown=nuvolaris:nuvolaris run.sh dbinit.sh cron.sh pyproject.toml poetry.lock whisk-system.sh /home/nuvolaris/ -# prepares the required folders to deploy the whisk-system actions -RUN mkdir /home/nuvolaris/deploy/whisk-system ADD --chown=nuvolaris:nuvolaris actions /home/nuvolaris/actions # enterprise specific @@ -89,7 +88,8 @@ ENV POETRY_CACHE_DIR=/opt/.cache ENV PATH=${POETRY_HOME}/bin:$PATH WORKDIR /home/nuvolaris -COPY --chown=nuvolaris:nuvolaris pyproject.toml poetry.lock /home/nuvolaris/ +# Use numeric UID:GID to avoid requiring user in this stage +COPY --chown=1001:1001 pyproject.toml poetry.lock /home/nuvolaris/ RUN echo "Installing poetry" && \ # Install minimal dependencies echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selections && \ @@ -144,6 +144,9 @@ RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone & # install taskfile curl -sL https://taskfile.dev/install.sh | sh -s -- -d -b /usr/bin +# ensure whisk-system deploy folder exists and owned by nuvolaris +RUN install -d -o 1001 -g 1001 /home/nuvolaris/deploy/whisk-system + USER nuvolaris WORKDIR /home/nuvolaris # Copy virtualenv @@ -153,8 +156,7 @@ COPY --from=deps --chown=nuvolaris:nuvolaris ${POETRY_HOME} ${POETRY_HOME} # Copy the home COPY --from=sources --chown=nuvolaris:nuvolaris ${HOME} ${HOME} RUN poetry install --only main --no-interaction --no-ansi && rm -rf ${POETRY_CACHE_DIR} -# prepares the required folders to deploy the whisk-system actions -RUN mkdir -p /home/nuvolaris/deploy/whisk-system && \ - ./whisk-system.sh && \ +# initialize whisk-system content +RUN ./whisk-system.sh && \ cd deploy && tar cvf ../deploy.tar * CMD ["./run.sh"] \ No newline at end of file From 1a39cd260955ca5389cf8b6025a9205247e1892c Mon Sep 17 00:00:00 2001 From: Alessio Marinelli Date: Fri, 14 Nov 2025 17:54:25 +0100 Subject: [PATCH 3/9] feat: Add SparkJob CRD operator with GHCR integration - Implement kopf-based SparkJob operator handlers (create/delete) - Add SparkJob CRD definition for nuvolaris.org/v1 - Create Kubernetes Job template for Spark driver execution - Configure GHCR authentication for mobs75/openserverless-operator:spark-dev - Add operator pod configuration with proper resource allocation - Update build tasks for Spark operator deployment --- .env.dist | 6 +- .env.sample | 3 + Taskfile.yml | 284 ++++++++- TaskfileBuild.yml | 2 +- TaskfileTest.yml | 9 +- .../operator-pod-spark.yaml | 33 + .../nuvolaris-permissions/sparkjob-crd.yaml | 278 ++++++++ nuvolaris/spark.py | 603 +++++++++++++++--- nuvolaris/templates/sparkjob-driver-tpl.yaml | 117 ++++ 9 files changed, 1249 insertions(+), 86 deletions(-) create mode 100644 deploy/nuvolaris-operator/operator-pod-spark.yaml create mode 100644 deploy/nuvolaris-permissions/sparkjob-crd.yaml create mode 100644 nuvolaris/templates/sparkjob-driver-tpl.yaml diff --git a/.env.dist b/.env.dist index 0e6a017..2c36711 100644 --- a/.env.dist +++ b/.env.dist @@ -1,5 +1,9 @@ -GITHUB_USER=github: put here your github user +GITHUB_USER=github_username MY_OPERATOR_IMAGE=ghcr.io/${GITHUB_USER}/openserverless-operator +# Optional: enable GHCR secure build+deploy shortcut in `task spark-standard` +# Provide a PAT with scopes: read:packages, write:packages (delete:packages optional) +GHCR_USER=github_username +GHCR_TOKEN=ghcr_pat_with_package_scopes #enterprise monitoring SLACK_API_URL=your-slack-webhook-url diff --git a/.env.sample b/.env.sample index fdbeda5..66775ef 100644 --- a/.env.sample +++ b/.env.sample @@ -36,6 +36,9 @@ REAL_HOME=your-actual-directory-outside-of-the-container LINODE_CLI_TOKEN=linode-token-from-dashboard # if you want to push your custom image to push in your repo MY_OPERATOR_IMAGE=user/nuvolaris-operator +# GitHub Container Registry credentials for Spark operator builds +GHCR_USER=your-github-username +GHCR_TOKEN=your-github-token # a slack incoming webhook to write messages SLACK_WEBOOK=your-slack-webhook # ip or hostname and ssh user with sudo of a server running ubuntu 22 diff --git a/Taskfile.yml b/Taskfile.yml index 16d412e..9a8dde1 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -343,15 +343,31 @@ tasks: install-registry: desc: install a local registry cmds: - - sudo cp registries.yaml /etc/rancher/k3s/registries.yaml - - sudo systemctl restart k3s - - helm repo add twuni https://helm.twun.io - - > - helm install docker-registry twuni/docker-registry - --namespace kube-system - --set image.tag=2.8.3 - --set service.type=NodePort - --set service.nodePort=30050 + - | + if [ -d /etc/rancher/k3s ]; then + echo "k3s detected: updating registries.yaml"; + sudo cp registries.yaml /etc/rancher/k3s/registries.yaml; + sudo systemctl restart k3s; + else + echo "k3s not detected: skipping k3s registries config"; + fi + - | + set -e + if helm repo add twuni https://helm.twun.io && \ + helm install docker-registry twuni/docker-registry \ + --namespace kube-system \ + --set image.tag=2.8.3 \ + --set service.type=ClusterIP \ + --set service.port=5000; then + echo "Helm registry installed" + else + echo "Helm registry failed, applying fallback manifests..." + kubectl -n kube-system apply -f deploy/kube-system-registry/registry-deploy.yaml + kubectl -n kube-system apply -f deploy/kube-system-registry/registry-svc.yaml + kubectl -n kube-system rollout status deployment/docker-registry --timeout=180s || true + fi + # Espone anche NodePort per pull da runtime host + kubectl -n kube-system apply -f deploy/kube-system-registry/registry-nodeport.yaml || true status: - kubectl -n kube-system get po -l app=docker-registry | grep docker-registry @@ -372,7 +388,21 @@ tasks: - envsubst _kaniko-build.yaml - kubectl -n default delete job/kaniko-build || true - kubectl -n default apply -f _kaniko-build.yaml - - kubectl wait po --for=condition=ready -l app=kaniko-build -n default + - kubectl -n default wait --for=condition=complete job/kaniko-build --timeout=600s + + kaniko-build-patched: + desc: build (patched Dockerfile) con initContainer per whisk-system fix + env: + TAG: + sh: git describe --tags --abbrev=0 2>/dev/null || echo latest + GITHUB_USER: + sh: if [ -n "$GITHUB_USER" ]; then echo "$GITHUB_USER"; else whoami; fi + cmds: + - envsubst < kaniko-build-patched.yaml > _kaniko-build-patched.yaml + - kubectl -n default delete job/kaniko-build-patched || true + - kubectl -n default apply -f _kaniko-build-patched.yaml + - kubectl -n default logs -f $(kubectl -n default get pods -l app=kaniko-build-patched -o jsonpath='{.items[0].metadata.name}') --container kaniko || true + - kubectl -n default wait --for=condition=complete job/kaniko-build-patched --timeout=900s || (echo "Kaniko patched build failed" && exit 1) build-logs: desc: show logs of the latest build @@ -393,8 +423,240 @@ tasks: TAG: sh: git describe --tags --abbrev=0 2>/dev/null || echo latest cmds: + - kubectl -n nuvolaris apply -f deploy/nuvolaris-permissions || true - envsubst _operator-deploy.yaml - - kubectl apply -f _operator-deploy.yaml + - kubectl -n nuvolaris apply -f _operator-deploy.yaml + - kubectl -n nuvolaris wait --for=condition=Ready pod/nuvolaris-operator --timeout=180s || true + - kubectl -n nuvolaris get pod nuvolaris-operator -o wide || true + + # --- Spark standard flow (come gli altri operatori) --- + spark-standard: + desc: build (kaniko) e deploy operatore con Spark + istanza whisk-spark + env: + GITHUB_USER: + sh: if [ -n "$GITHUB_USER" ]; then echo "$GITHUB_USER"; else whoami; fi + cmds: + - | + if [ -n "$GHCR_USER" ] && [ -n "$GHCR_TOKEN" ]; then + echo "Rilevate credenziali GHCR: uso pipeline ghcr (build+push)"; + export GHCR_USER GHCR_TOKEN + task spark-all-ghcr || { echo "Pipeline GHCR fallita"; exit 1; } + echo "Deployment GHCR completato, procedo con istanza Spark"; + WHISK=whisk-spark task t:instance; + exit 0; # Evita esecuzione ramo interno registry + else + echo "Credenziali GHCR non presenti: uso percorso registry interno/kind"; + fi + - | + if [ "$(./detect.sh)" = "kind" ]; then + echo "Cluster kind rilevato"; + if command -v kind >/dev/null 2>&1; then + echo "kind CLI presente: uso build-and-load"; + if ! task build-and-load MY_OPERATOR_IMAGE=docker-registry.kube-system.svc.cluster.local:5000/$GITHUB_USER/openserverless-operator; then + echo "kind load fallita: fallback a Kaniko"; + task install-registry || exit 1; + task tag-commit-push || true; + task kaniko-build-patched || exit 1; + fi + else + echo "kind CLI assente: provo installazione veloce"; + curl -fsSL -o kind https://kind.sigs.k8s.io/dl/v0.23.0/kind-linux-amd64 && chmod +x kind && sudo mv kind /usr/local/bin/kind 2>/dev/null || mv kind "$HOME/.local/bin/kind" 2>/dev/null || true; + if command -v kind >/dev/null 2>&1; then + echo "Installazione kind riuscita: eseguo load"; + if ! task build-and-load MY_OPERATOR_IMAGE=docker-registry.kube-system.svc.cluster.local:5000/$GITHUB_USER/openserverless-operator; then + echo "kind load fallita dopo install: fallback a Kaniko"; + task install-registry || exit 1; + task tag-commit-push || true; + task kaniko-build-patched || exit 1; + fi + else + echo "Installazione kind fallita: fallback a Kaniko"; + task install-registry || exit 1; + task tag-commit-push || true; + task kaniko-build-patched || exit 1; + fi + fi + else + echo "Cluster $(./detect.sh): uso Kaniko"; + task install-registry || exit 1; + task tag-commit-push || true; + task kaniko-build-patched || exit 1; + fi + - kubectl -n nuvolaris apply -f deploy/nuvolaris-permissions || true + - | + TAG=$(git describe --tags --abbrev=0 2>/dev/null || echo latest) + REGISTRY_HOST="docker-registry.kube-system.svc.cluster.local:5000" + if [ "$(./detect.sh)" != "kind" ]; then + if kubectl -n kube-system get svc docker-registry-nodeport >/dev/null 2>&1; then + NODE_IP=$(kubectl get nodes -o jsonpath='{.items[0].status.addresses[?(@.type=="InternalIP")].address}') + if [ -n "$NODE_IP" ]; then + REGISTRY_HOST="${NODE_IP}:30500" + echo "Uso NodePort registry $REGISTRY_HOST per image pull" + fi + fi + fi + export TAG + export MY_OPERATOR_IMAGE=${REGISTRY_HOST}/$GITHUB_USER/openserverless-operator + # Imposta immagine Spark: immagine interna se no GHCR + if [ -n "$GHCR_USER" ] && [ -n "$GHCR_TOKEN" ]; then + export MY_OPERATOR_IMAGE_SPARK="ghcr.io/$GHCR_USER/openserverless-operator:spark-dev" + else + export MY_OPERATOR_IMAGE_SPARK="${REGISTRY_HOST}/$GITHUB_USER/openserverless-operator:$TAG" + fi + envsubst < deploy/nuvolaris-operator/operator-pod-spark.yaml > _operator-pod-spark.yaml + - kubectl -n nuvolaris delete pod nuvolaris-operator-spark --ignore-not-found + - kubectl -n nuvolaris apply -f _operator-pod-spark.yaml + - kubectl -n nuvolaris wait --for=condition=Ready pod/nuvolaris-operator-spark --timeout=180s || true + - kubectl -n nuvolaris get pod nuvolaris-operator-spark -o wide || true + - | + WHISK=whisk-spark task t:instance + # --- Spark Operator (GHCR) --- + spark-login-ghcr: + desc: login a GHCR (richiede GHCR_USER e GHCR_TOKEN in .env) + cmds: + - test -n "$GHCR_USER" || (echo "GHCR_USER mancante" && exit 1) + - test -n "$GHCR_TOKEN" || (echo "GHCR_TOKEN mancante" && exit 1) + - echo "$GHCR_TOKEN" | podman login ghcr.io -u "$GHCR_USER" --password-stdin + spark-build-ghcr: + desc: build immagine operatore con Spark (tag ghcr.io/$GHCR_USER/openserverless-operator:spark-dev) + cmds: + - test -n "$GHCR_USER" || (echo "GHCR_USER mancante" && exit 1) + - podman build -t ghcr.io/$GHCR_USER/openserverless-operator:spark-dev -f Dockerfile . + spark-push-ghcr: + desc: push immagine operatore Spark su GHCR + deps: [spark-login-ghcr] + cmds: + - test -n "$GHCR_USER" || (echo "GHCR_USER mancante" && exit 1) + - podman push ghcr.io/$GHCR_USER/openserverless-operator:spark-dev + spark-deploy-ghcr: + desc: deploy pod operatore Spark usando manifest parametrizzato + env: + GHCR_USER: ${GHCR_USER} + cmds: + - test -n "$GHCR_USER" || (echo "GHCR_USER mancante" && exit 1) + - envsubst < deploy/nuvolaris-operator/operator-pod-spark.yaml > _operator-pod-spark.yaml + - kubectl -n nuvolaris apply -f deploy/nuvolaris-permissions || true + - kubectl -n nuvolaris apply -f _operator-pod-spark.yaml + - kubectl -n nuvolaris wait --for=condition=Ready pod/nuvolaris-operator-spark --timeout=180s || true + - kubectl -n nuvolaris get pod nuvolaris-operator-spark -o wide || true + spark-all-ghcr: + desc: build+push+deploy operatore Spark (GHCR) + cmds: + - task: spark-build-ghcr + - task: spark-push-ghcr + - task: spark-deploy-ghcr + + # --- SparkJob CRD Testing --- + sparkjob-deploy-crd: + desc: deploy SparkJob CRD + cmds: + - kubectl apply -f deploy/nuvolaris-permissions/sparkjob-crd.yaml + - kubectl get crd sparkjobs.nuvolaris.org || true + + sparkjob-test-examples: + desc: test SparkJob examples (PySpark pi calculation) + deps: [sparkjob-deploy-crd] + cmds: + - echo "=== Testing PySpark Example ===" + - kubectl apply -f tests/sparkjob-examples.yaml + - echo "Waiting for SparkJob to complete..." + - | + for i in {1..30}; do + STATUS=$(kubectl get sparkjob pyspark-example -o jsonpath='{.status.phase}' 2>/dev/null || echo "NotFound") + echo "Status: $STATUS (attempt $i/30)" + if [ "$STATUS" = "Succeeded" ] || [ "$STATUS" = "Failed" ]; then + break + fi + sleep 10 + done + - echo "=== SparkJob Status ===" + - kubectl get sparkjob pyspark-example -o yaml | grep -A 20 "status:" || true + - echo "=== Driver Job Status ===" + - kubectl get job pyspark-example-driver -o wide || true + - echo "=== Driver Pod Logs ===" + - kubectl logs job/pyspark-example-driver --tail=50 || true + + sparkjob-test-wordcount: + desc: test SparkJob WordCount example (inline PySpark code) + deps: [sparkjob-deploy-crd] + cmds: + - echo "=== Testing PySpark WordCount ===" + - | + cat </dev/null || echo "NotFound") + echo "Status: $STATUS (attempt $i/20)" + if [ "$STATUS" = "Succeeded" ] || [ "$STATUS" = "Failed" ]; then + break + fi + sleep 10 + done + - echo "=== WordCount Results ===" + - kubectl logs job/pyspark-wordcount-test-driver --tail=30 || true + + sparkjob-clean: + desc: cleanup SparkJob test resources + cmds: + - kubectl delete sparkjob pyspark-example --ignore-not-found + - kubectl delete sparkjob pyspark-wordcount --ignore-not-found + - kubectl delete sparkjob pyspark-wordcount-test --ignore-not-found + - kubectl delete sparkjob scala-sparkpi --ignore-not-found + - kubectl delete job pyspark-example-driver --ignore-not-found + - kubectl delete job pyspark-wordcount-driver --ignore-not-found + - kubectl delete job pyspark-wordcount-test-driver --ignore-not-found + - kubectl delete job scala-sparkpi-driver --ignore-not-found + - echo "SparkJob test resources cleaned up" + + sparkjob-status: + desc: show status of all SparkJobs and related resources + cmds: + - echo "=== SparkJobs ===" + - kubectl get sparkjobs -o wide || echo "No SparkJobs found" + - echo "=== Driver Jobs ===" + - kubectl get jobs -l app=spark,component=driver || echo "No driver jobs found" + - echo "=== Driver Pods ===" + - kubectl get pods -l app=spark,component=driver || echo "No driver pods found" + - echo "=== Spark Cluster Status ===" + - kubectl get pods -l app=spark | grep -E "(master|worker|history)" || echo "Spark cluster not running" shell: diff --git a/TaskfileBuild.yml b/TaskfileBuild.yml index 74ff8a4..8a9fe2b 100644 --- a/TaskfileBuild.yml +++ b/TaskfileBuild.yml @@ -51,7 +51,7 @@ tasks: - > kind load docker-image ${MY_OPERATOR_IMAGE:-{{.OPERATOR_IMAGE}}}:{{.OPERATOR_TAG}} - --name=nuvolaris + --name=$(kind get clusters | head -1) buildx-and-push: - > diff --git a/TaskfileTest.yml b/TaskfileTest.yml index fe56603..0ee165b 100644 --- a/TaskfileTest.yml +++ b/TaskfileTest.yml @@ -82,8 +82,13 @@ tasks: silent: false cmds: - kubectl config set-context --current --namespace nuvolaris - - > - cat tests/{{.KUBE}}/{{.WHISK}}.yaml | envsubst | kubectl apply -f - + - | + FILE="tests/{{.KUBE}}/{{.WHISK}}.yaml"; + if [ ! -f "$FILE" ]; then + echo "Manifest $FILE non trovato, fallback a tests/{{.KUBE}}/whisk.yaml"; + FILE="tests/{{.KUBE}}/whisk.yaml"; + fi + cat "$FILE" | envsubst | kubectl apply -f - - | while ! kubectl -n nuvolaris wait --for=condition=ready pod/couchdb-0 2>/dev/null do sleep 5 ; echo $((N++)) waiting couchdb... diff --git a/deploy/nuvolaris-operator/operator-pod-spark.yaml b/deploy/nuvolaris-operator/operator-pod-spark.yaml new file mode 100644 index 0000000..0fe8c65 --- /dev/null +++ b/deploy/nuvolaris-operator/operator-pod-spark.yaml @@ -0,0 +1,33 @@ +apiVersion: v1 +kind: Pod +metadata: + name: nuvolaris-operator-spark + namespace: nuvolaris + labels: + app: nuvolaris-operator + spark-integrated: "true" + annotations: + whisks.nuvolaris.org/annotate-version: "true" +spec: + serviceAccount: nuvolaris-operator + containers: + - name: nuvolaris-operator + # Immagine dell'operatore con Spark + # Template sostituisce MY_OPERATOR_IMAGE_SPARK con immagine appropriata + image: ${MY_OPERATOR_IMAGE_SPARK} + imagePullPolicy: IfNotPresent + command: ["./run.sh"] + args: ["--verbose"] + env: + - name: ENABLE_SPARK + value: "true" + securityContext: + capabilities: + drop: ["ALL"] + resources: + requests: + cpu: 100m + memory: 256Mi + limits: + cpu: 500m + memory: 512Mi \ No newline at end of file diff --git a/deploy/nuvolaris-permissions/sparkjob-crd.yaml b/deploy/nuvolaris-permissions/sparkjob-crd.yaml new file mode 100644 index 0000000..95ddf8e --- /dev/null +++ b/deploy/nuvolaris-permissions/sparkjob-crd.yaml @@ -0,0 +1,278 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: sparkjobs.nuvolaris.org + namespace: nuvolaris +spec: + scope: Namespaced + group: nuvolaris.org + names: + kind: SparkJob + plural: sparkjobs + singular: sparkjob + shortNames: + - spj + - spark-job + versions: + - name: v1 + served: true + storage: true + subresources: + status: {} + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + # Job identification + name: + description: "Spark job name" + type: string + + # Application specification + application: + type: object + properties: + # Main application + mainApplicationFile: + description: "Path to main application file (Python, Scala, Java)" + type: string + + mainClass: + description: "Main class for Java/Scala applications" + type: string + + arguments: + description: "Application arguments" + type: array + items: + type: string + + # Application source + source: + type: object + properties: + type: + description: "Source type: configMap, secret, url, inline" + type: string + enum: ["configMap", "secret", "url", "inline"] + + configMapRef: + type: object + properties: + name: + type: string + key: + type: string + + secretRef: + type: object + properties: + name: + type: string + key: + type: string + + url: + description: "URL to download application from" + type: string + + content: + description: "Inline application content" + type: string + + # Spark configuration + spark: + type: object + properties: + master: + description: "Spark master URL (default: spark://spark-master:7077)" + type: string + default: "spark://spark-master:7077" + + conf: + description: "Spark configuration properties" + type: object + additionalProperties: + type: string + + # Driver configuration + driver: + type: object + properties: + cores: + description: "Driver CPU cores" + type: integer + default: 1 + + memory: + description: "Driver memory (e.g., 1g, 512m)" + type: string + default: "512m" + + serviceAccount: + description: "Kubernetes service account for driver" + type: string + default: "spark" + + # Executor configuration + executor: + type: object + properties: + instances: + description: "Number of executors" + type: integer + default: 2 + + cores: + description: "Executor CPU cores" + type: integer + default: 1 + + memory: + description: "Executor memory (e.g., 1g, 512m)" + type: string + default: "1g" + + # Execution settings + execution: + type: object + properties: + schedule: + description: "Cron schedule for recurring jobs (optional)" + type: string + + restartPolicy: + description: "Restart policy: Never, OnFailure, Always" + type: string + enum: ["Never", "OnFailure", "Always"] + default: "OnFailure" + + timeout: + description: "Job timeout in seconds" + type: integer + default: 3600 + + backoffLimit: + description: "Number of retry attempts" + type: integer + default: 3 + + # Dependencies + dependencies: + type: object + properties: + jars: + description: "Additional JAR files" + type: array + items: + type: string + + files: + description: "Additional files to distribute" + type: array + items: + type: string + + pyFiles: + description: "Python files (for PySpark)" + type: array + items: + type: string + + # Monitoring + monitoring: + type: object + properties: + enabled: + description: "Enable monitoring/logging" + type: boolean + default: true + + eventLog: + description: "Enable event logging" + type: boolean + default: true + + historyServer: + description: "Enable history server integration" + type: boolean + default: true + + status: + type: object + properties: + phase: + description: "Job phase: Pending, Running, Succeeded, Failed" + type: string + enum: ["Pending", "Running", "Succeeded", "Failed", "Unknown"] + + conditions: + type: array + items: + type: object + properties: + type: + type: string + status: + type: string + lastTransitionTime: + type: string + format: date-time + reason: + type: string + message: + type: string + + startTime: + description: "Job start timestamp" + type: string + format: date-time + + completionTime: + description: "Job completion timestamp" + type: string + format: date-time + + applicationId: + description: "Spark application ID" + type: string + + driverPod: + description: "Driver pod name" + type: string + + executorPods: + description: "List of executor pod names" + type: array + items: + type: string + + sparkUI: + description: "Spark UI URLs" + type: object + properties: + driverUI: + type: string + historyUI: + type: string \ No newline at end of file diff --git a/nuvolaris/spark.py b/nuvolaris/spark.py index 1ff9569..721de7b 100644 --- a/nuvolaris/spark.py +++ b/nuvolaris/spark.py @@ -14,6 +14,7 @@ import nuvolaris.kube as kube import nuvolaris.config as cfg import nuvolaris.util as util +import nuvolaris.template as ntp import nuvolaris.operator_util as operator_util import time import subprocess @@ -26,64 +27,70 @@ def get_spark_config_data(): Returns: Dict with complete Spark configuration """ - namespace = cfg.get('nuvolaris.namespace', default='nuvolaris') + namespace = cfg.get('nuvolaris.namespace', defval='nuvolaris') + # Nota: config.get accetta il nome del parametro defval e NON "default". + # Le chiamate precedenti con default= generavano TypeError e impedivano la creazione del cluster Spark. data = { # Basic configuration "name": "spark", "namespace": namespace, # Spark images - "spark_image": cfg.get('spark.image', default='apache/spark:3.5.0'), - "spark_version": cfg.get('spark.version', default='3.5.0'), + "spark_image": cfg.get('spark.image', defval='apache/spark:3.5.0'), + "spark_version": cfg.get('spark.version', defval='3.5.0'), # Master configuration - "master_replicas": cfg.get('spark.master.replicas', default=1), - "master_memory": cfg.get('spark.master.memory', default='1g'), - "master_cpu": cfg.get('spark.master.cpu', default='1000m'), - "master_port": cfg.get('spark.master.port', default=7077), - "master_webui_port": cfg.get('spark.master.webui-port', default=8080), + "master_replicas": cfg.get('spark.master.replicas', defval=1), + "master_memory": cfg.get('spark.master.memory', defval='1g'), + "master_cpu": cfg.get('spark.master.cpu', defval='1000m'), + "master_port": cfg.get('spark.master.port', defval=7077), + "master_webui_port": cfg.get('spark.master.webui-port', defval=8080), # Worker configuration - "worker_replicas": cfg.get('spark.worker.replicas', default=2), - "worker_memory": cfg.get('spark.worker.memory', default='2g'), - "worker_cpu": cfg.get('spark.worker.cpu', default='2000m'), - "worker_cores": cfg.get('spark.worker.cores', default=2), - "worker_webui_port": cfg.get('spark.worker.webui-port', default=8081), + "worker_replicas": cfg.get('spark.worker.replicas', defval=2), + "worker_memory": cfg.get('spark.worker.memory', defval='2g'), + "worker_cpu": cfg.get('spark.worker.cpu', defval='2000m'), + "worker_cores": cfg.get('spark.worker.cores', defval=2), + "worker_webui_port": cfg.get('spark.worker.webui-port', defval=8081), # History Server configuration - "history_enabled": cfg.get('spark.history-server.enabled', default=True), - "history_port": cfg.get('spark.history-server.port', default=18080), - "history_volume_size": cfg.get('spark.history-server.volume-size', default=10), + "history_enabled": cfg.get('spark.history-server.enabled', defval=True), + "history_port": cfg.get('spark.history-server.port', defval=18080), + "history_volume_size": cfg.get('spark.history-server.volume-size', defval=10), # Storage configuration - "event_log_enabled": cfg.get('spark.event-log.enabled', default=True), - "event_log_dir": cfg.get('spark.event-log.dir', default='/tmp/spark-events'), + "event_log_enabled": cfg.get('spark.event-log.enabled', defval=True), + "event_log_dir": cfg.get('spark.event-log.dir', defval='/tmp/spark-events'), + "storage_class": cfg.get("nuvolaris.storageclass"), # High Availability (optional) - "ha_enabled": cfg.get('spark.ha.enabled', default=False), - "ha_zookeeper_url": cfg.get('spark.ha.zookeeper-url', default=''), + "ha_enabled": cfg.get('spark.ha.enabled', defval=False), + "ha_zookeeper_url": cfg.get('spark.ha.zookeeper-url', defval=''), - # Affinity and tolerations - "affinity": cfg.get('affinity', default=False), - "affinity_core_node_label": cfg.get('affinity-core-node-label', default='nuvolaris'), - "tolerations": cfg.get('tolerations', default=False), + # Standard OpenServerless patterns + "affinity": cfg.get('affinity', defval=False), + "affinity_core_node_label": cfg.get('affinity-core-node-label', defval='nuvolaris'), + "tolerations": cfg.get('tolerations', defval=False), - # Security - "spark_user": cfg.get('spark.user', default='spark'), - "spark_uid": cfg.get('spark.uid', default=185), + # Security (standard pattern) + "spark_user": cfg.get('spark.user', defval='spark'), + "spark_uid": cfg.get('spark.uid', defval=185), } + # Add standard OpenServerless affinity/tolerations data + util.couch_affinity_tolerations_data(data) + return data def create(owner=None): """ - Deploy Apache Spark cluster on Kubernetes + Deploy Apache Spark cluster on Kubernetes using standard OpenServerless patterns Creates: - Spark Master (StatefulSet with optional HA) - - Spark Workers (StatefulSet) + - Spark Workers (StatefulSet) - Spark History Server (Deployment with PVC) - Required Services - ConfigMaps for configuration @@ -99,45 +106,47 @@ def create(owner=None): # 1. Collect configuration data = get_spark_config_data() - # 2. Define templates to apply - tplp = [ - "00-spark-rbac.yaml", # ServiceAccount, Role, RoleBinding - "01-spark-configmap.yaml", # Spark configuration - "02-spark-history-pvc.yaml", # PVC for History Server - "03-spark-master-sts.yaml", # Master StatefulSet - "04-spark-master-svc.yaml", # Master Service - "05-spark-worker-sts.yaml", # Worker StatefulSet - "06-spark-worker-svc.yaml", # Worker Service (headless) - ] + # 2. Process Jinja2 templates (standard OpenServerless pattern) + kus.processTemplate("spark", "spark-configmap-tpl.yaml", data, "spark-configmap.yaml") + kus.processTemplate("spark", "spark-master-sts-tpl.yaml", data, "spark-master-sts.yaml") - # Add History Server if enabled + # Process History Server templates if enabled if data['history_enabled']: - tplp.append("07-spark-history-dep.yaml") - tplp.append("08-spark-history-svc.yaml") + kus.processTemplate("spark", "spark-history-pvc-tpl.yaml", data, "spark-history-pvc.yaml") + kus.processTemplate("spark", "spark-history-dep-tpl.yaml", data, "spark-history-dep.yaml") + + # 3. Define kustomize patches (standard pattern) + tplp = ["set-attach.yaml"] + + # 4. Add affinity/tolerations if enabled (standard pattern) + if data.get('affinity') or data.get('tolerations'): + tplp.append("affinity-tolerance-sts-core-attach.yaml") - # 3. Generate kustomization with patches + # 5. Generate kustomization kust = kus.patchTemplates("spark", tplp, data) - # 4. Additional Jinja2 templates - templates = [] - if data['affinity']: - templates.append('affinity-tolerance-sts-core-attach.yaml') + # 6. Build complete specification using standard OpenServerless pattern + templates = ["spark-rbac.yaml"] # Static Jinja2 templates to include + templates_filter = ["spark-configmap.yaml", "spark-master-sts.yaml"] # Generated templates to filter - # 5. Build complete specification - spec = kus.kustom_list("spark", kust, templates=templates, data=data) + if data['history_enabled']: + templates_filter.extend(["spark-history-pvc.yaml", "spark-history-dep.yaml"]) + + spec = kus.restricted_kustom_list("spark", kust, templates=templates, + templates_filter=templates_filter, data=data) - # 6. Apply owner reference for garbage collection + # 7. Apply owner reference for garbage collection if owner: kopf.append_owner_reference(spec['items'], owner) else: # Save spec for delete without owner cfg.put("state.spark.spec", spec) - # 7. Deploy to Kubernetes + # 8. Deploy to Kubernetes res = kube.apply(spec) logging.info("spark manifests applied") - # 8. Wait for Master to be ready + # 9. Wait for components to be ready (standard pattern) logging.info("waiting for spark master to be ready...") util.wait_for_pod_ready( "{.items[?(@.metadata.labels.component == 'spark-master')].metadata.name}", @@ -145,25 +154,7 @@ def create(owner=None): ) logging.info("spark master is ready") - # 9. Wait for Workers to be ready - logging.info("waiting for spark workers to be ready...") - time.sleep(10) # Give workers time to start connecting - util.wait_for_pod_ready( - "{.items[?(@.metadata.labels.component == 'spark-worker')].metadata.name}", - timeout=300 - ) - logging.info("spark workers are ready") - - # 10. Wait for History Server if enabled - if data['history_enabled']: - logging.info("waiting for spark history server to be ready...") - util.wait_for_pod_ready( - "{.items[?(@.metadata.labels.component == 'spark-history')].metadata.name}", - timeout=180 - ) - logging.info("spark history server is ready") - - # 11. Post-configuration + # 10. Post-configuration configure_spark(data) logging.info("*** spark cluster created successfully") @@ -377,3 +368,473 @@ def get_cluster_info(): except Exception as e: logging.error(f"error getting cluster info: {e}") return {"error": str(e)} + + +# ==================== SparkJob CRD Handlers ==================== + +@kopf.on.create('nuvolaris.org', 'v1', 'sparkjobs') +def create_sparkjob(spec, name, namespace, status, **kwargs): + """ + Handle SparkJob creation - submit Spark application to cluster + + Args: + spec: SparkJob specification from CRD + name: SparkJob resource name + namespace: Kubernetes namespace + status: Status object to update + **kwargs: Additional kopf arguments + + Returns: + Status dict with job information + """ + logging.info(f"*** creating SparkJob {name} in namespace {namespace}") + + try: + # 1. Validate SparkJob specification + job_config = _validate_sparkjob_spec(spec, name) + + # 2. Create Kubernetes Job for Spark driver + driver_job = _create_spark_driver_job(job_config, name, namespace) + + # 3. Submit job to cluster + result = kube.apply(driver_job) + + # 4. Get application ID and update status + app_id = _get_spark_application_id(name, namespace) + + # 5. Build success status + status_dict = _build_sparkjob_status('Running', 'SparkJob submitted to cluster', + app_id=app_id, start_time=True, existing_status=status) + + # Add additional fields + status_dict['driverPod'] = f"{name}-driver" + status_dict['sparkUI'] = { + 'driverUI': f"http://{name}-driver:4040", + 'historyUI': f"http://spark-history:18080" + } + + logging.info(f"SparkJob {name} submitted successfully with application ID: {app_id}") + return status_dict + + except Exception as e: + logging.error(f"failed to create SparkJob {name}: {e}") + status_dict = _build_sparkjob_status('Failed', f'Job creation failed: {str(e)}', existing_status=status) + return status_dict + + +@kopf.on.delete('nuvolaris.org', 'v1', 'sparkjobs') +def delete_sparkjob(spec, name, namespace, **kwargs): + """ + Handle SparkJob deletion - cleanup driver job and associated resources + + Args: + spec: SparkJob specification from CRD + name: SparkJob resource name + namespace: Kubernetes namespace + **kwargs: Additional kopf arguments + """ + logging.info(f"*** deleting SparkJob {name} in namespace {namespace}") + + try: + # 1. Delete the Kubernetes Job for Spark driver + delete_job_spec = { + 'apiVersion': 'batch/v1', + 'kind': 'Job', + 'metadata': { + 'name': f"{name}-driver", + 'namespace': namespace + } + } + + result = kube.delete(delete_job_spec) + + # 2. Kill running Spark application if still active + app_id = spec.get('status', {}).get('applicationId') + if app_id: + _kill_spark_application(app_id, namespace) + + logging.info(f"SparkJob {name} deleted successfully") + return {'message': f'SparkJob {name} deleted'} + + except Exception as e: + logging.error(f"failed to delete SparkJob {name}: {e}") + # Don't raise - allow deletion to proceed even if cleanup fails + return {'message': f'SparkJob {name} deletion completed with warnings: {e}'} + + +@kopf.on.field('nuvolaris.org', 'v1', 'sparkjobs', field='spec') +def update_sparkjob(old, new, name, namespace, **kwargs): + """ + Handle SparkJob specification updates + + Args: + old: Previous specification + new: New specification + name: SparkJob resource name + namespace: Kubernetes namespace + **kwargs: Additional kopf arguments + """ + logging.info(f"*** updating SparkJob {name} in namespace {namespace}") + + # For now, SparkJob updates are not supported - would need to restart the job + logging.warning(f"SparkJob {name} specification changed, but updates are not supported") + logging.info("To apply changes, delete and recreate the SparkJob") + + return {'message': 'SparkJob updates not supported - delete and recreate to apply changes'} + + +def _validate_sparkjob_spec(spec, job_name): + """ + Validate and normalize SparkJob specification + + Args: + spec: Raw SparkJob spec from CRD + job_name: Name of the SparkJob resource + + Returns: + Dict with validated and normalized job configuration + + Raises: + ValueError: If specification is invalid + """ + # Default configuration + config = { + 'name': job_name, + 'application': {}, + 'spark': { + 'master': 'spark://spark-master:7077', + 'conf': {}, + 'driver': { + 'cores': 0.5, + 'memory': '512m', + 'serviceAccount': 'spark' + }, + 'executor': { + 'instances': 2, + 'cores': 1, + 'memory': '1g' + } + }, + 'execution': { + 'restartPolicy': 'OnFailure', + 'timeout': 3600, + 'backoffLimit': 3 + }, + 'dependencies': { + 'jars': [], + 'files': [], + 'pyFiles': [] + }, + 'monitoring': { + 'enabled': True, + 'eventLog': True, + 'historyServer': True + } + } + + # Merge user specification + def merge_dict(target, source): + for key, value in source.items(): + if isinstance(value, dict) and key in target: + merge_dict(target[key], value) + else: + target[key] = value + + if spec: + merge_dict(config, spec) + + # Validate required fields + if not config['application'].get('mainApplicationFile'): + raise ValueError("application.mainApplicationFile is required") + + # Validate source configuration + app_source = config['application'].get('source', {}) + source_type = app_source.get('type', 'url') + + if source_type == 'configMap' and not app_source.get('configMapRef'): + raise ValueError("application.source.configMapRef is required for configMap source type") + elif source_type == 'secret' and not app_source.get('secretRef'): + raise ValueError("application.source.secretRef is required for secret source type") + elif source_type == 'url' and not app_source.get('url'): + raise ValueError("application.source.url is required for url source type") + elif source_type == 'inline' and not app_source.get('content'): + raise ValueError("application.source.content is required for inline source type") + + logging.info(f"validated SparkJob configuration for {job_name}") + return config + + +def _convert_k8s_memory_to_jvm(k8s_memory): + """ + Convert Kubernetes memory format (like '1Gi') to JVM format (like '1g') + """ + if k8s_memory.endswith('Gi'): + return k8s_memory[:-2] + 'g' + elif k8s_memory.endswith('Mi'): + return k8s_memory[:-2] + 'm' + elif k8s_memory.endswith('Ki'): + return k8s_memory[:-2] + 'k' + else: + # Already in JVM format or simple number + return k8s_memory + + +def _create_spark_driver_job(job_config, job_name, namespace): + """ + Create Kubernetes Job specification for Spark driver + + Args: + job_config: Validated SparkJob configuration + job_name: Name of the SparkJob resource + namespace: Kubernetes namespace + + Returns: + Dict with Kubernetes Job specification + """ + app = job_config['application'] + spark = job_config['spark'] + execution = job_config['execution'] + deps = job_config['dependencies'] + monitoring = job_config['monitoring'] + + # Build spark-submit command with JVM-compatible memory formats + jvm_driver_memory = _convert_k8s_memory_to_jvm(spark['driver']['memory']) + jvm_executor_memory = _convert_k8s_memory_to_jvm(spark['executor']['memory']) + + submit_cmd = [ + '/opt/spark/bin/spark-submit', + '--master', spark['master'], + '--name', job_name, + '--driver-cores', str(spark['driver']['cores']), + '--driver-memory', jvm_driver_memory, + '--num-executors', str(spark['executor']['instances']), + '--executor-cores', str(spark['executor']['cores']), + '--executor-memory', jvm_executor_memory, + '--deploy-mode', 'client' # Client mode for Kubernetes Jobs + ] + + # Add Spark configuration + for key, value in spark['conf'].items(): + submit_cmd.extend(['--conf', f'{key}={value}']) + + # Add event logging configuration if enabled + if monitoring['eventLog']: + submit_cmd.extend([ + '--conf', 'spark.eventLog.enabled=true', + '--conf', 'spark.eventLog.dir=/tmp/spark-events' + ]) + + # Add dependencies + if deps['jars']: + submit_cmd.extend(['--jars', ','.join(deps['jars'])]) + if deps['files']: + submit_cmd.extend(['--files', ','.join(deps['files'])]) + if deps['pyFiles']: + submit_cmd.extend(['--py-files', ','.join(deps['pyFiles'])]) + + # Add main class if specified (for Java/Scala) - MUST come before JAR file + if app.get('mainClass'): + submit_cmd.extend(['--class', app['mainClass']]) + + # Add main application file + submit_cmd.append(app['mainApplicationFile']) + + # Add application arguments + if app.get('arguments'): + submit_cmd.extend(app['arguments']) + + # Create Job specification + job_spec = { + 'apiVersion': 'batch/v1', + 'kind': 'Job', + 'metadata': { + 'name': f"{job_name}-driver", + 'namespace': namespace, + 'labels': { + 'app': 'spark', + 'component': 'driver', + 'sparkjob': job_name + } + }, + 'spec': { + 'backoffLimit': execution['backoffLimit'], + 'activeDeadlineSeconds': execution['timeout'], + 'template': { + 'metadata': { + 'labels': { + 'app': 'spark', + 'component': 'driver', + 'sparkjob': job_name + } + }, + 'spec': { + 'restartPolicy': execution['restartPolicy'], + 'serviceAccountName': spark['driver']['serviceAccount'], + 'containers': [{ + 'name': 'spark-driver', + 'image': cfg.get('spark.image', defval='apache/spark:3.5.0'), + 'command': submit_cmd, + 'env': [ + {'name': 'SPARK_USER', 'value': 'spark'}, + {'name': 'SPARK_APPLICATION_ID', 'value': job_name} + ], + 'resources': { + 'requests': { + 'cpu': f"{int(spark['driver']['cores']) * 100}m", + 'memory': spark['driver']['memory'] + }, + 'limits': { + 'cpu': f"{int(spark['driver']['cores']) * 100}m", + 'memory': spark['driver']['memory'] + } + }, + 'volumeMounts': [] + }], + 'volumes': [] + } + } + } + } + + # Add event log volume if enabled + if monitoring['eventLog'] and monitoring['historyServer']: + job_spec['spec']['template']['spec']['containers'][0]['volumeMounts'].append({ + 'name': 'spark-events', + 'mountPath': '/tmp/spark-events' + }) + job_spec['spec']['template']['spec']['volumes'].append({ + 'name': 'spark-events', + 'persistentVolumeClaim': { + 'claimName': 'spark-history-pvc' + } + }) + + return job_spec + + +def _build_sparkjob_status(phase, message, app_id=None, start_time=False, existing_status=None): + """ + Build SparkJob status dictionary + + Args: + phase: Job phase (Pending, Running, Succeeded, Failed) + message: Status message + app_id: Spark application ID (optional) + start_time: Whether to set start time (optional) + existing_status: Existing status to update (optional) + + Returns: + Dictionary with status fields + """ + import datetime + + # Start with existing status or empty dict + status_dict = existing_status.copy() if existing_status else {} + + # Set basic fields + status_dict['phase'] = phase + status_dict['message'] = message + + # Initialize conditions if not present + if 'conditions' not in status_dict: + status_dict['conditions'] = [] + + # Add/update condition + condition = { + 'type': 'Ready', + 'status': 'True' if phase == 'Running' else 'False', + 'lastTransitionTime': datetime.datetime.utcnow().isoformat() + 'Z', + 'reason': phase, + 'message': message + } + + # Remove old conditions of same type and add new one + status_dict['conditions'] = [c for c in status_dict['conditions'] if c['type'] != 'Ready'] + status_dict['conditions'].append(condition) + + # Add application ID if provided + if app_id: + status_dict['applicationId'] = app_id + + # Add start time if requested + if start_time: + status_dict['startTime'] = datetime.datetime.utcnow().isoformat() + 'Z' + + # Add completion time if job is finished + if phase in ['Succeeded', 'Failed']: + status_dict['completionTime'] = datetime.datetime.utcnow().isoformat() + 'Z' + + return status_dict + + +def _get_spark_application_id(job_name, namespace): + """ + Get Spark application ID from driver pod logs + + Args: + job_name: SparkJob resource name + namespace: Kubernetes namespace + + Returns: + String with application ID or None if not found + """ + try: + # Wait a bit for the driver to start + time.sleep(5) + + # Get logs from driver pod + result = subprocess.run( + ['kubectl', 'logs', '-n', namespace, + f'job/{job_name}-driver', + '--tail=100'], + capture_output=True, + text=True, + timeout=30 + ) + + if result.returncode == 0: + # Parse application ID from logs (format: application_1234567890_0001) + lines = result.stdout.split('\n') + for line in lines: + if 'Submitted application' in line and 'application_' in line: + # Extract application ID + parts = line.split() + for part in parts: + if part.startswith('application_'): + return part + + # If not found in initial logs, return generated ID + return f"application_{int(time.time())}_{job_name}" + + except Exception as e: + logging.warning(f"could not determine application ID for {job_name}: {e}") + return f"application_{int(time.time())}_{job_name}" + + +def _kill_spark_application(app_id, namespace): + """ + Kill running Spark application + + Args: + app_id: Spark application ID + namespace: Kubernetes namespace + """ + try: + # Try to kill via Spark master + result = subprocess.run( + ['kubectl', 'exec', '-n', namespace, + 'spark-master-0', '--', + 'curl', '-X', 'POST', + f'http://localhost:8080/app/kill/?id={app_id}'], + capture_output=True, + text=True, + timeout=10 + ) + + if result.returncode == 0: + logging.info(f"killed Spark application {app_id}") + else: + logging.warning(f"failed to kill application {app_id}: {result.stderr}") + + except Exception as e: + logging.warning(f"error killing Spark application {app_id}: {e}") diff --git a/nuvolaris/templates/sparkjob-driver-tpl.yaml b/nuvolaris/templates/sparkjob-driver-tpl.yaml new file mode 100644 index 0000000..334bc3b --- /dev/null +++ b/nuvolaris/templates/sparkjob-driver-tpl.yaml @@ -0,0 +1,117 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: {{job_name}}-driver + namespace: {{namespace}} + labels: + app: spark + component: driver + sparkjob: {{job_name}} +spec: + backoffLimit: {{backoff_limit}} + activeDeadlineSeconds: {{timeout}} + template: + metadata: + labels: + app: spark + component: driver + sparkjob: {{job_name}} + spec: + restartPolicy: {{restart_policy}} + serviceAccountName: {{service_account}} + containers: + - name: spark-driver + image: {{spark_image}} + command: + - spark-submit + - --master + - {{spark_master}} + - --name + - {{job_name}} + - --driver-cores + - "{{driver_cores}}" + - --driver-memory + - {{driver_memory}} + - --num-executors + - "{{executor_instances}}" + - --executor-cores + - "{{executor_cores}}" + - --executor-memory + - {{executor_memory}} + - --deploy-mode + - client + {% if event_log_enabled %} + - --conf + - spark.eventLog.enabled=true + - --conf + - spark.eventLog.dir=/tmp/spark-events + {% endif %} + {% if spark_conf %} + {% for key, value in spark_conf.items() %} + - --conf + - {{key}}={{value}} + {% endfor %} + {% endif %} + {% if jars %} + - --jars + - {{jars|join(',')}} + {% endif %} + {% if files %} + - --files + - {{files|join(',')}} + {% endif %} + {% if py_files %} + - --py-files + - {{py_files|join(',')}} + {% endif %} + {% if main_class %} + - --class + - {{main_class}} + {% endif %} + - {{main_application_file}} + {% if arguments %} + {% for arg in arguments %} + - {{arg}} + {% endfor %} + {% endif %} + env: + - name: SPARK_USER + value: spark + - name: SPARK_APPLICATION_ID + value: {{job_name}} + resources: + requests: + cpu: "{{driver_cores}}" + memory: {{driver_memory}} + limits: + cpu: "{{driver_cores}}" + memory: {{driver_memory}} + {% if event_log_enabled and history_server_enabled %} + volumeMounts: + - name: spark-events + mountPath: /tmp/spark-events + {% endif %} + {% if event_log_enabled and history_server_enabled %} + volumes: + - name: spark-events + persistentVolumeClaim: + claimName: spark-history-pvc + {% endif %} \ No newline at end of file From cdf0dd7caaf86c3020e393c6a67c17c29f19b852 Mon Sep 17 00:00:00 2001 From: Alessio Marinelli Date: Fri, 14 Nov 2025 18:00:45 +0100 Subject: [PATCH 4/9] feat: Add Spark cluster templates and deployment configs - Add Spark cluster deployment templates (master, worker, history) - Update Spark StatefulSets and Deployment configurations - Add Spark ConfigMap and RBAC templates - Update kaniko build and operator deployment configs - Update README with Spark operator documentation --- README.md | 61 ++++++++++ deploy/spark/03-spark-master-sts.yaml | 38 ++++-- deploy/spark/05-spark-worker-sts.yaml | 31 +++-- deploy/spark/07-spark-history-dep.yaml | 17 ++- kaniko-build.yaml | 2 +- nuvolaris/templates/spark-configmap-tpl.yaml | 40 +++++++ .../templates/spark-history-dep-tpl.yaml | 91 ++++++++++++++ .../templates/spark-history-pvc-tpl.yaml | 17 +++ nuvolaris/templates/spark-master-sts-tpl.yaml | 112 ++++++++++++++++++ nuvolaris/templates/spark-rbac.yaml | 34 ++++++ operator-deploy.yaml | 28 ++++- 11 files changed, 439 insertions(+), 32 deletions(-) create mode 100644 nuvolaris/templates/spark-configmap-tpl.yaml create mode 100644 nuvolaris/templates/spark-history-dep-tpl.yaml create mode 100644 nuvolaris/templates/spark-history-pvc-tpl.yaml create mode 100644 nuvolaris/templates/spark-master-sts-tpl.yaml create mode 100644 nuvolaris/templates/spark-rbac.yaml diff --git a/README.md b/README.md index 7f6ec17..6b3e476 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,67 @@ task deploy Once you have finished with development you can create a public image with `task publish` that will publish the tag and trigger a creation of the image. +## Spark Operator Integration + +OpenServerless includes a Spark operator that provides automated Spark cluster deployment and SparkJob CRD support for job execution. The Spark operator follows standard OpenServerless patterns for resource management. + +### Quick Start + +1. **Deploy Spark Operator and Cluster**: + ```bash + task spark-standard + ``` + +2. **Test SparkJob CRD**: + ```bash + task sparkjob-deploy-crd + task sparkjob-test-examples + ``` + +3. **Access Spark UI** (with port-forwarding): + ```bash + kubectl port-forward -n nuvolaris service/spark-master 8080:8080 # Master UI + kubectl port-forward -n nuvolaris service/spark-history 18080:18080 # History Server + ``` + +### SparkJob CRD + +The SparkJob Custom Resource Definition enables automated execution of Spark applications. See [SparkJob Documentation](docs/SPARKJOB.md) for complete usage guide including: + +- PySpark, Scala, and Java application examples +- Inline code execution +- Resource configuration +- Monitoring and logging +- Troubleshooting guide + +### Standard OpenServerless Integration + +The Spark operator follows standard OpenServerless patterns: +- **Templates**: Uses Jinja2 templates in `/nuvolaris/templates/` +- **Build Pipeline**: Standard GHCR workflow with `spark-build-ghcr`, `spark-push-ghcr`, `spark-all-ghcr` tasks +- **Configuration**: Uses `.env` pattern for GHCR credentials +- **Kopf Handlers**: Standard `@kopf.on.create`, `@kopf.on.delete` patterns for CRD lifecycle + +### GHCR Integration (Optional) + +For production deployments, configure GitHub Container Registry in `.env`: +```bash +GITHUB_USER= +GHCR_USER= +GHCR_TOKEN= +``` + +Generate token: GitHub → Settings → Developer settings → Personal access tokens → scopes: `read:packages`, `write:packages` + +### Monitoring + +Check operator logs: +```bash +kubectl -n nuvolaris logs pod/nuvolaris-operator-spark | grep -i spark +``` + +If pods show `ImagePullBackOff`, verify registry configuration or use GHCR workflow. + ## Prerequisites 1. Please set up and use a development VM [as described here](https://github.com/apache/openserverless) diff --git a/deploy/spark/03-spark-master-sts.yaml b/deploy/spark/03-spark-master-sts.yaml index 87084b3..89cb09a 100644 --- a/deploy/spark/03-spark-master-sts.yaml +++ b/deploy/spark/03-spark-master-sts.yaml @@ -30,14 +30,25 @@ spec: - /bin/bash - -c - | - # Copy config files - cp /etc/spark-config/* /opt/spark/conf/ + # Prepare directories + mkdir -p /opt/spark/conf /opt/spark/logs /opt/spark/work - # Start Spark Master + # Copy config files if present + if ls /etc/spark-config/* >/dev/null 2>&1; then + cp -r /etc/spark-config/* /opt/spark/conf/ || true + fi + + # Start Spark Master (daemon mode disabled to keep process in foreground) + export SPARK_NO_DAEMONIZE=true /opt/spark/sbin/start-master.sh - # Keep container running and tail logs - tail -f /opt/spark/logs/* + # Fallback sleep if no logs yet + if ! ls /opt/spark/logs/* >/dev/null 2>&1; then + echo "No logs yet, sleeping..."; sleep 5; + fi + + # Keep container running + tail -F /opt/spark/logs/* || sleep infinity ports: - name: master containerPort: 7077 @@ -48,8 +59,8 @@ spec: env: - name: SPARK_MODE value: "master" - - name: SPARK_MASTER_HOST - value: "spark-master" + - name: SPARK_LOCAL_IP + value: "0.0.0.0" - name: SPARK_MASTER_PORT value: "7077" - name: SPARK_MASTER_WEBUI_PORT @@ -57,14 +68,14 @@ spec: - name: SPARK_DAEMON_MEMORY value: "1g" - name: SPARK_NO_DAEMONIZE - value: "false" + value: "true" resources: requests: + cpu: 200m + memory: 512Mi + limits: cpu: 500m memory: 1Gi - limits: - cpu: 1000m - memory: 2Gi volumeMounts: - name: spark-config mountPath: /etc/spark-config @@ -72,6 +83,8 @@ spec: mountPath: /opt/spark/work - name: spark-logs mountPath: /opt/spark/logs + - name: spark-events + mountPath: /tmp/spark-events livenessProbe: httpGet: path: / @@ -94,3 +107,6 @@ spec: emptyDir: {} - name: spark-logs emptyDir: {} + - name: spark-events + persistentVolumeClaim: + claimName: spark-history-pvc diff --git a/deploy/spark/05-spark-worker-sts.yaml b/deploy/spark/05-spark-worker-sts.yaml index 47e339e..1194101 100644 --- a/deploy/spark/05-spark-worker-sts.yaml +++ b/deploy/spark/05-spark-worker-sts.yaml @@ -30,14 +30,25 @@ spec: - /bin/bash - -c - | - # Copy config files - cp /etc/spark-config/* /opt/spark/conf/ + # Prepare directories + mkdir -p /opt/spark/conf /opt/spark/logs /opt/spark/work - # Start Spark Worker + # Copy config files if present + if ls /etc/spark-config/* >/dev/null 2>&1; then + cp -r /etc/spark-config/* /opt/spark/conf/ || true + fi + + # Start Spark Worker (daemon mode disabled) + export SPARK_NO_DAEMONIZE=true /opt/spark/sbin/start-worker.sh spark://spark-master:7077 - # Keep container running and tail logs - tail -f /opt/spark/logs/* + # Fallback sleep if no logs yet + if ! ls /opt/spark/logs/* >/dev/null 2>&1; then + echo "No logs yet, sleeping..."; sleep 5; + fi + + # Keep container running + tail -F /opt/spark/logs/* || sleep infinity ports: - name: worker containerPort: 7078 @@ -53,7 +64,7 @@ spec: - name: SPARK_WORKER_CORES value: "2" - name: SPARK_WORKER_MEMORY - value: "2g" + value: "1g" - name: SPARK_WORKER_PORT value: "7078" - name: SPARK_WORKER_WEBUI_PORT @@ -61,14 +72,14 @@ spec: - name: SPARK_DAEMON_MEMORY value: "1g" - name: SPARK_NO_DAEMONIZE - value: "false" + value: "true" resources: requests: + cpu: 500m + memory: 1Gi + limits: cpu: 1000m memory: 2Gi - limits: - cpu: 2000m - memory: 4Gi volumeMounts: - name: spark-config mountPath: /etc/spark-config diff --git a/deploy/spark/07-spark-history-dep.yaml b/deploy/spark/07-spark-history-dep.yaml index cbed9d3..12b97a7 100644 --- a/deploy/spark/07-spark-history-dep.yaml +++ b/deploy/spark/07-spark-history-dep.yaml @@ -29,8 +29,13 @@ spec: - /bin/bash - -c - | - # Copy config files - cp /etc/spark-config/* /opt/spark/conf/ + # Prepare directories + mkdir -p /opt/spark/conf /opt/spark/logs + + # Copy config files if present + if ls /etc/spark-config/* >/dev/null 2>&1; then + cp -r /etc/spark-config/* /opt/spark/conf/ || true + fi # Create event log directory mkdir -p /tmp/spark-events @@ -52,11 +57,11 @@ spec: value: "-Dspark.history.fs.logDirectory=/tmp/spark-events -Dspark.history.ui.port=18080" resources: requests: - cpu: 250m - memory: 512Mi + cpu: 150m + memory: 256Mi limits: - cpu: 500m - memory: 1Gi + cpu: 300m + memory: 512Mi volumeMounts: - name: spark-config mountPath: /etc/spark-config diff --git a/kaniko-build.yaml b/kaniko-build.yaml index 2e7dda6..8f8d64b 100644 --- a/kaniko-build.yaml +++ b/kaniko-build.yaml @@ -30,7 +30,7 @@ spec: - name: kaniko image: gcr.io/kaniko-project/executor:latest args: - - --context=git://github.com/${GITHUB_USER}/openserverless-operator + - --context=git://github.com/${GITHUB_USER}/openserverless-operator#feature/add-spark-operator - --destination=docker-registry.kube-system.svc.cluster.local:5000/${GITHUB_USER}/openserverless-operator:${TAG} - --insecure - --insecure-registry=docker-registry diff --git a/nuvolaris/templates/spark-configmap-tpl.yaml b/nuvolaris/templates/spark-configmap-tpl.yaml new file mode 100644 index 0000000..7f61fdb --- /dev/null +++ b/nuvolaris/templates/spark-configmap-tpl.yaml @@ -0,0 +1,40 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: spark-config + namespace: {{namespace}} + labels: + app: spark + component: spark-config +data: + spark-defaults.conf: | + spark.master spark://spark-master:{{master_port}} + spark.eventLog.enabled {{event_log_enabled|lower}} + spark.eventLog.dir {{event_log_dir}} + spark.history.fs.logDirectory {{event_log_dir}} + spark.sql.warehouse.dir /opt/spark/work-dir/spark-warehouse + spark.driver.host spark-master-0.spark-master.{{namespace}}.svc.cluster.local + spark.driver.bindAddress 0.0.0.0 + {% if ha_enabled %} + spark.deploy.recoveryMode ZOOKEEPER + spark.deploy.zookeeper.url {{ha_zookeeper_url}} + {% endif %} + log4j.properties: | + # Root logger option + log4j.rootLogger=INFO, console + + # Console appender configuration + log4j.appender.console=org.apache.log4j.ConsoleAppender + log4j.appender.console.layout=org.apache.log4j.PatternLayout + log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + + # Set specific log levels + log4j.logger.org.apache.spark.repl.Main=WARN + log4j.logger.org.spark_project.jetty=WARN + log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR + log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN + log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN + log4j.logger.org.apache.parquet=ERROR + log4j.logger.parquet=ERROR + log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL + log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR \ No newline at end of file diff --git a/nuvolaris/templates/spark-history-dep-tpl.yaml b/nuvolaris/templates/spark-history-dep-tpl.yaml new file mode 100644 index 0000000..ee88892 --- /dev/null +++ b/nuvolaris/templates/spark-history-dep-tpl.yaml @@ -0,0 +1,91 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: spark-history + namespace: {{namespace}} + labels: + app: spark + component: spark-history +spec: + replicas: 1 + selector: + matchLabels: + app: spark + component: spark-history + template: + metadata: + labels: + app: spark + component: spark-history + spec: + serviceAccountName: spark + securityContext: + fsGroup: {{spark_uid}} + containers: + - name: spark-history + image: {{spark_image}} + imagePullPolicy: IfNotPresent + command: + - /bin/bash + - -c + - | + # Prepare directories + mkdir -p /opt/spark/conf /opt/spark/logs {{event_log_dir}} + chmod 777 {{event_log_dir}} + + # Copy config files if present + if ls /etc/spark-config/* >/dev/null 2>&1; then + cp -r /etc/spark-config/* /opt/spark/conf/ || true + fi + + # Start History Server + /opt/spark/sbin/start-history-server.sh + + # Keep container running and tail logs + tail -f /opt/spark/logs/* + ports: + - name: webui + containerPort: {{history_port}} + protocol: TCP + env: + - name: SPARK_NO_DAEMONIZE + value: "false" + - name: SPARK_HISTORY_OPTS + value: "-Dspark.history.fs.logDirectory={{event_log_dir}} -Dspark.history.ui.port={{history_port}}" + resources: + requests: + cpu: 150m + memory: 256Mi + limits: + cpu: 300m + memory: 512Mi + volumeMounts: + - name: spark-config + mountPath: /etc/spark-config + - name: spark-events + mountPath: {{event_log_dir}} + - name: spark-logs + mountPath: /opt/spark/logs + livenessProbe: + httpGet: + path: / + port: webui + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 5 + readinessProbe: + httpGet: + path: / + port: webui + initialDelaySeconds: 15 + periodSeconds: 5 + timeoutSeconds: 3 + volumes: + - name: spark-config + configMap: + name: spark-config + - name: spark-events + persistentVolumeClaim: + claimName: spark-history-pvc + - name: spark-logs + emptyDir: {} \ No newline at end of file diff --git a/nuvolaris/templates/spark-history-pvc-tpl.yaml b/nuvolaris/templates/spark-history-pvc-tpl.yaml new file mode 100644 index 0000000..22797bb --- /dev/null +++ b/nuvolaris/templates/spark-history-pvc-tpl.yaml @@ -0,0 +1,17 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: spark-history-pvc + namespace: {{namespace}} + labels: + app: spark + component: spark-history +spec: + accessModes: + - ReadWriteOnce + {% if storage_class %} + storageClassName: {{storage_class}} + {% endif %} + resources: + requests: + storage: {{history_volume_size}}Gi \ No newline at end of file diff --git a/nuvolaris/templates/spark-master-sts-tpl.yaml b/nuvolaris/templates/spark-master-sts-tpl.yaml new file mode 100644 index 0000000..9b4d1d3 --- /dev/null +++ b/nuvolaris/templates/spark-master-sts-tpl.yaml @@ -0,0 +1,112 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: spark-master + namespace: {{namespace}} + labels: + app: spark + component: spark-master +spec: + serviceName: spark-master + replicas: {{master_replicas}} + selector: + matchLabels: + app: spark + component: spark-master + template: + metadata: + labels: + app: spark + component: spark-master + spec: + serviceAccountName: spark + securityContext: + fsGroup: {{spark_uid}} + containers: + - name: spark-master + image: {{spark_image}} + imagePullPolicy: IfNotPresent + command: + - /bin/bash + - -c + - | + # Prepare directories + mkdir -p /opt/spark/conf /opt/spark/logs /opt/spark/work {{event_log_dir}} + + # Copy config files if present + if ls /etc/spark-config/* >/dev/null 2>&1; then + cp -r /etc/spark-config/* /opt/spark/conf/ || true + fi + + # Start Spark Master (daemon mode disabled to keep process in foreground) + export SPARK_NO_DAEMONIZE=true + /opt/spark/sbin/start-master.sh + + # Fallback sleep if no logs yet + if ! ls /opt/spark/logs/* >/dev/null 2>&1; then + echo "No logs yet, sleeping..."; sleep 5; + fi + + # Keep container running + tail -F /opt/spark/logs/* || sleep infinity + ports: + - name: master + containerPort: {{master_port}} + protocol: TCP + - name: webui + containerPort: {{master_webui_port}} + protocol: TCP + env: + - name: SPARK_MODE + value: "master" + - name: SPARK_LOCAL_IP + value: "0.0.0.0" + - name: SPARK_MASTER_PORT + value: "{{master_port}}" + - name: SPARK_MASTER_WEBUI_PORT + value: "{{master_webui_port}}" + - name: SPARK_DAEMON_MEMORY + value: "{{master_memory}}" + - name: SPARK_NO_DAEMONIZE + value: "true" + resources: + requests: + cpu: 200m + memory: 512Mi + limits: + cpu: {{master_cpu}} + memory: {{master_memory}} + volumeMounts: + - name: spark-config + mountPath: /etc/spark-config + - name: spark-work + mountPath: /opt/spark/work + - name: spark-logs + mountPath: /opt/spark/logs + - name: spark-events + mountPath: {{event_log_dir}} + livenessProbe: + httpGet: + path: / + port: webui + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 5 + readinessProbe: + httpGet: + path: / + port: webui + initialDelaySeconds: 15 + periodSeconds: 5 + timeoutSeconds: 3 + volumes: + - name: spark-config + configMap: + name: spark-config + - name: spark-work + emptyDir: {} + - name: spark-logs + emptyDir: {} + - name: spark-events + persistentVolumeClaim: + claimName: spark-history-pvc \ No newline at end of file diff --git a/nuvolaris/templates/spark-rbac.yaml b/nuvolaris/templates/spark-rbac.yaml new file mode 100644 index 0000000..39b6564 --- /dev/null +++ b/nuvolaris/templates/spark-rbac.yaml @@ -0,0 +1,34 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: spark + namespace: {{namespace}} + labels: + app: spark +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: spark + namespace: {{namespace}} +rules: +- apiGroups: [""] + resources: ["pods", "services", "endpoints", "persistentvolumeclaims", "events", "configmaps", "secrets"] + verbs: ["*"] +- apiGroups: ["apps"] + resources: ["deployments", "statefulsets", "replicasets"] + verbs: ["*"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: spark + namespace: {{namespace}} +subjects: +- kind: ServiceAccount + name: spark + namespace: {{namespace}} +roleRef: + kind: Role + name: spark + apiGroup: rbac.authorization.k8s.io \ No newline at end of file diff --git a/operator-deploy.yaml b/operator-deploy.yaml index e3e965b..1400b3e 100644 --- a/operator-deploy.yaml +++ b/operator-deploy.yaml @@ -18,9 +18,29 @@ apiVersion: v1 kind: Pod metadata: - name: dummy-operator - namespace: default + name: nuvolaris-operator + namespace: nuvolaris + labels: + app: nuvolaris-operator + spark-integrated: "true" + annotations: + whisks.nuvolaris.org/annotate-version: "true" spec: + serviceAccount: nuvolaris-operator containers: - - name: dummy-operator - image: ghcr.io/$GITHUB_USER/openserverless-operator:$TAG \ No newline at end of file + - name: nuvolaris-operator + # usa l'immagine costruita da Kaniko nel registro locale del cluster (kube-system) + image: docker-registry.kube-system.svc.cluster.local:5000/$GITHUB_USER/openserverless-operator:$TAG + imagePullPolicy: Always + command: ["./run.sh"] + args: ["--verbose"] + env: + - name: ENABLE_SPARK + value: "true" + resources: + requests: + cpu: 100m + memory: 256Mi + limits: + cpu: 500m + memory: 512Mi \ No newline at end of file From cd051580d4c231f1819309e81a21f3810e8f73f3 Mon Sep 17 00:00:00 2001 From: Alessio Marinelli Date: Fri, 14 Nov 2025 18:12:52 +0100 Subject: [PATCH 5/9] refactor: Move specialized Taskfiles to task project - Remove TaskfileBuild.yml, TaskfileDev.yml, TaskfileOlaris.yml, TaskfileTest.yml - These belong to the task project, not operator project - Keep main Taskfile.yml for operator-specific tasks --- TaskfileBuild.yml | 63 -------------- TaskfileDev.yml | 104 ----------------------- TaskfileOlaris.yml | 29 ------- TaskfileTest.yml | 202 --------------------------------------------- 4 files changed, 398 deletions(-) delete mode 100644 TaskfileBuild.yml delete mode 100644 TaskfileDev.yml delete mode 100644 TaskfileOlaris.yml delete mode 100644 TaskfileTest.yml diff --git a/TaskfileBuild.yml b/TaskfileBuild.yml deleted file mode 100644 index 8a9fe2b..0000000 --- a/TaskfileBuild.yml +++ /dev/null @@ -1,63 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -version: '3' - -vars: - # taken from the tag - OPERATOR_TAG: - sh: git describe --tags --abbrev=0 2>/dev/null || git rev-parse --short HEAD - # taken from the Dockerfile - ovverdable with MY_OPERATOR_IMAGE - OPERATOR_IMAGE: - sh: awk -F= '/ARG OPERATOR_IMAGE_DEFAULT=/ { print $2 ; exit }' Dockerfile - -tasks: - image: - cmds: - - echo "Operator=${MY_OPERATOR_IMAGE:-{{.OPERATOR_IMAGE}}}:{{.OPERATOR_TAG}}" - silent: true - - docker-login: > - echo $GITHUB_TOKEN | docker login ghcr.io -u $GITHUB_USER --password-stdin - - build: - - > - docker build . - -t ${MY_OPERATOR_IMAGE:-{{.OPERATOR_IMAGE}}}:{{.OPERATOR_TAG}} - --build-arg OPERATOR_IMAGE_DEFAULT=${MY_OPERATOR_IMAGE:-{{.OPERATOR_IMAGE}}} - --build-arg OPERATOR_TAG_DEFAULT={{.OPERATOR_TAG}} --load - - build-and-push: - - task: docker-login - - task: build - - docker push ${MY_OPERATOR_IMAGE:-{{.OPERATOR_IMAGE}}}:{{.OPERATOR_TAG}} - - build-and-load: - - task: build - - > - kind load docker-image - ${MY_OPERATOR_IMAGE:-{{.OPERATOR_IMAGE}}}:{{.OPERATOR_TAG}} - --name=$(kind get clusters | head -1) - - buildx-and-push: - - > - docker buildx build - --platform linux/amd64,linux/arm64 - -t ${MY_OPERATOR_IMAGE:-{{.OPERATOR_IMAGE}}}:{{.OPERATOR_TAG}} - --build-arg OPERATOR_IMAGE_DEFAULT=${MY_OPERATOR_IMAGE:-{{.OPERATOR_IMAGE}}} - --build-arg OPERATOR_TAG_DEFAULT={{.OPERATOR_TAG}} - . --push diff --git a/TaskfileDev.yml b/TaskfileDev.yml deleted file mode 100644 index 17f05c4..0000000 --- a/TaskfileDev.yml +++ /dev/null @@ -1,104 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -version: '3' - -vars: - WHISK: '{{default "whisk" .WHISK}}' - CONFIG: "tests/{{.KUBE}}/{{.WHISK}}.yaml" - # taken from the tag - OPERATOR_TAG: - sh: git describe --tags --abbrev=0 2>/dev/null || git rev-parse --short HEAD - # taken from the Dockerfile - ovveridable with MY_OPERATOR_IMAGE - OPERATOR_IMAGE: - sh: awk -F= '/ARG OPERATOR_IMAGE_DEFAULT=/ { print $2 ; exit }' Dockerfile - # taken from the Dockerfile - CONTROLLER_TAG: - sh: awk -F= '/ENV CONTROLLER_TAG=/ { print $2 ; exit }' Dockerfile - CONTROLLER_IMAGE: - sh: awk -F= '/ENV CONTROLLER_IMAGE=/ { print $2 ; exit }' Dockerfile - # taken from the Dockerfile - INVOKER_TAG: - sh: awk -F= '/ENV INVOKER_TAG=/ { print $2 ; exit }' Dockerfile - INVOKER_IMAGE: - sh: awk -F= '/ENV INVOKER_IMAGE=/ { print $2 ; exit }' Dockerfile - APIKUBE: - sh: kubectl config view -o json | jq -r '.clusters[0].cluster.server' - -env: - APIHOST: - sh: echo {{.APIKUBE}} | sed -e 's|^.*//\(.*\):.*$|\1|' - APIHOST_OSH: - sh: echo {{.APIKUBE}} | sed -e 's|^.*//\(.*\):.*$|\1|' | sed -e 's/^api./nuvolaris.apps./' - -tasks: - - env: env - - cli: - cmds: - - env OPERATOR_IMAGE="${MY_OPERATOR_IMAGE:-{{.OPERATOR_IMAGE}}}" poetry run ipython -i profile.ipy - env: - CONTROLLER_IMAGE: "{{.CONTROLLER_IMAGE}}" - CONTROLLER_TAG: "{{.CONTROLLER_TAG}}" - OPERATOR_TAG: "{{.OPERATOR_TAG}}" - - run: - cmds: - # note that the .env is loaded after the env parameters - - env OPERATOR_IMAGE="${MY_OPERATOR_IMAGE:-{{.OPERATOR_IMAGE}}}" ./run.sh - env: - CONTROLLER_IMAGE: "{{.CONTROLLER_IMAGE}}" - CONTROLLER_TAG: "{{.CONTROLLER_TAG}}" - OPERATOR_TAG: "{{.OPERATOR_TAG}}" - COUCHDB_SERVICE_HOST: "localhost" - MINIO_API_HOST: "localhost" - OW_CONTROLLER_HOST : "localhost" - OW_CONTROLLER_PORT : 3233 - INVOKER_IMAGE: "{{.INVOKER_IMAGE}}" - INVOKER_TAG: "{{.INVOKER_TAG}}" - - instance: - - envsubst <{{.CONFIG}} | kubectl -n nuvolaris apply -f - - - show: - - echo "*** {{.CONFIG}}" - - envsubst <{{.CONFIG}} - - destroy: kubectl -n nuvolaris delete wsk/controller - - couchdb: - - rm -f deploy/*/kustomization.yaml deploy/*/__* - - task: permission - - kubectl apply -f deploy/couchdb - - couchdb-forward: - - kubectl -n nuvolaris port-forward svc/couchdb 5985:5984 - - mongodb: - - kubectl apply -f deploy/mongodb-operator - - kubectl apply -f deploy/mongodb - - defin: > - kubectl -n nuvolaris patch wsk/controller --type=merge --patch '{"metadata": {"finalizers":[] } }' - - defin2: kubectl -n nuvolaris get wsk/controller -o yaml | grep -v Final | kubectl apply -f - - - killpy: ps auwwx | grep python | awk '{print $2}' | xargs kill -9 - - enter: kubectl exec -ti nuvolaris-operator -- bash - diff --git a/TaskfileOlaris.yml b/TaskfileOlaris.yml deleted file mode 100644 index 6e80ad4..0000000 --- a/TaskfileOlaris.yml +++ /dev/null @@ -1,29 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -version: '3' - -env: - DP: deploy/nuvolaris-permissions - OK: olaris/kubernetes - -tasks: - copy-yaml: - - cp -v $DP/nuvolaris-common.yaml $DP/openwhisk-runtimes-cm.yaml $OK/common - - cp -v $DP/openwhisk-core-roles.yaml $DP/operator-clusterroles.yaml $DP/operator-roles.yaml $OK/roles - - cp -v $DP/whisk-crd.yaml $DP/whisk-user-crd.yaml $DP/workflows-crd.yaml $OK/crds - \ No newline at end of file diff --git a/TaskfileTest.yml b/TaskfileTest.yml deleted file mode 100644 index 0ee165b..0000000 --- a/TaskfileTest.yml +++ /dev/null @@ -1,202 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -version: '3' - -vars: - KUBE: - sh: ./detect.sh - # taken from the tag - OPERATOR_TAG: - sh: git describe --tags --abbrev=0 2>/dev/null || git rev-parse --short HEAD - # taken from the Dockerfile - ovverdable with MY_OPERATOR_IMAGE - OPERATOR_IMAGE: - sh: awk -F= '/ARG OPERATOR_IMAGE_DEFAULT=/ { print $2 ; exit }' Dockerfile - - WHISK: '{{default "whisk" .WHISK}}' - CONFIG: "tests/{{.KUBE}}/{{.WHISK}}.yaml" - REDIS_URI: "redis://s0meP%40ass4@redis:6379" - REDIS_PASSWORD: s0meP@ass4 - MINIO_HOST: "nuvolaris-minio" - MINIO_PORT: 9000 - MINIO_USER: "minioadmin" - MINIO_PWD : "minioadmin" - APIKUBE: - sh: kubectl config view -o json | jq -r '.clusters[0].cluster.server' - - T: "" - -env: - APIHOST: - sh: echo {{.APIKUBE}} | sed -e 's|^.*//\(.*\):.*$|\1|' - APIHOST_OSH: - sh: echo {{.APIKUBE}} | sed -e 's|^.*//\(.*\):.*$|\1|' | sed -e 's/^api./nuvolaris.apps./' - - -tasks: - - kustomization: - cmds: - - | - cat <<__EOF__ >deploy/nuvolaris-operator/kustomization.yaml - apiVersion: kustomize.config.k8s.io/v1beta1 - kind: Kustomization - images: - - name: ghcr.io/nuvolaris/nuvolaris-operator:latest - newName: ${MY_OPERATOR_IMAGE:-{{.OPERATOR_IMAGE}}} - newTag: {{.OPERATOR_TAG}} - resources: - - operator-pod.yaml - __EOF__ - - permission: - - kubectl apply -f deploy/nuvolaris-permissions - - operator: - - task: kustomization - - kubectl apply -k deploy/nuvolaris-operator - - | - while ! kubectl -n nuvolaris wait --for=condition=ready pod/nuvolaris-operator - do echo still waiting... - done - - destroy-operator: - - task: kustomization - - kubectl delete -k deploy/nuvolaris-operator - - instance: - silent: false - cmds: - - kubectl config set-context --current --namespace nuvolaris - - | - FILE="tests/{{.KUBE}}/{{.WHISK}}.yaml"; - if [ ! -f "$FILE" ]; then - echo "Manifest $FILE non trovato, fallback a tests/{{.KUBE}}/whisk.yaml"; - FILE="tests/{{.KUBE}}/whisk.yaml"; - fi - cat "$FILE" | envsubst | kubectl apply -f - - - | - while ! kubectl -n nuvolaris wait --for=condition=ready pod/couchdb-0 2>/dev/null - do sleep 5 ; echo $((N++)) waiting couchdb... - done - - | - while ! kubectl -n nuvolaris wait --for=condition=complete job/couchdb-init 2>/dev/null - do sleep 5 ; echo $((N++)) waiting couchdb init... - done - - | - N=1 - while ! kubectl -n nuvolaris wait --for=condition=ready pod/controller-0 2>/dev/null - do sleep 5 ; echo $((N++)) waiting controller... - done - - destroy-instance: - - (sleep 5 ; task defin) & - - kubectl -n nuvolaris delete wsk/controller - - destroy: - - task: destroy-instance - - task: destroy-operator - - cleanup: kubectl -n nuvolaris delete pvc --all - - config: |- - rm -f ~/.wskprops - while true - do APIHOST=$(kubectl -n nuvolaris get cm/config -o yaml | awk '/apihost:/ {print $2}') - if [[ `echo $APIHOST | grep pending` || -z "$APIHOST" ]]; - then sleep 5 ; echo "$((N++)) apihost still pending..." - else break - fi - done - echo "*** $APIHOST ***" - AUTH=$(kubectl -n nuvolaris get wsk/controller -o jsonpath='{.spec.openwhisk.namespaces.nuvolaris}') - echo $AUTH - echo ops -wsk property set --apihost $APIHOST --auth $AUTH - ops -wsk property set --apihost $APIHOST --auth $AUTH - while ! ops action list - do echo $(( N++)) "waiting for the load balancer to be ready..." ; sleep 10 - done - - hello: - - ops -wsk -i action update hello tests/hello.js --web=true - - ops -wsk -i action invoke hello -r | grep "hello" - - | - URL=$(ops -wsk -i action get hello --url | tail +2) - curl -skL $URL | grep hello - - redis: - cmds: - - ops -wsk -i package update redis -p redis_url "{{.REDIS_NUV_URL}}" -p redis_prefix "{{.REDIS_NUV_PREFIX}}" -p password "{{.REDIS_PASSWORD}}" - - ops -wsk -i action update redis/ping tests/ping.js - - ops -wsk -i action invoke redis/ping -r | grep "PONG" - - ops -wsk -i action update redis/redis tests/redis.js - - ops -wsk -i action invoke redis/redis -r | grep "world" - vars: - REDIS_NUV_PREFIX: - sh: kubectl -n nuvolaris get cm/config -o jsonpath='{.metadata.annotations.redis_prefix}' - REDIS_NUV_URL: - sh: kubectl -n nuvolaris get cm/config -o jsonpath='{.metadata.annotations.redis_url}' - - echo: - - ops -wsk -i action update echo tests/echo.js -a provide-api-key true - - ops -wsk -i action invoke echo -r | grep "__OW_API_KEY" - - ops -wsk -i action invoke echo -r | grep "__OW_API_HOST" - - api: - - ops -wsk -i action update api tests/api.js -a provide-api-key true - - ops -wsk -i action invoke api -r | grep '"api"' - - mongo: - - | - MONGODB_URL=$(kubectl -n nuvolaris get cm/config -o jsonpath='{.metadata.annotations.mongodb_url}') - wsk -i package update mongo -p dburi "$MONGODB_URL" - - ops -wsk -i action update mongo/mongo tests/mongo.js - - ops -wsk -i action invoke mongo/mongo -r | grep "hello" - - minio: - - ops -wsk -i package update minio -p minio_host {{.MINIO_HOST}} -p minio_port {{.MINIO_PORT}} -p minio_user {{.MINIO_USER}} -p minio_pwd {{.MINIO_PWD}} - - ops -wsk -i action update minio/minio tests/minio.js - - ops -wsk -i action invoke minio/minio -r - - mongo2: - - ops -wsk -i project deploy --manifest tests/mongo.yaml - - postgres: - - | - PG_URL=$(kubectl -n nuvolaris get cm/config -o jsonpath='{.metadata.annotations.postgres_url}') - ops -wsk -i package update postgres -p dburi "$PG_URL" - - ops -wsk -i action update postgres/postgres tests/postgres.js - - ops -wsk -i action invoke postgres/postgres -r | grep "Nuvolaris Postgres is up and running!" - - minio2: - silent: true - desc: minio test - cmds: - - | - MINIO_ACCESS_KEY=$(kubectl -n nuvolaris get cm/config -o jsonpath='{.metadata.annotations.s3_access_key}') - MINIO_SECRET_KEY=$(kubectl -n nuvolaris get cm/config -o jsonpath='{.metadata.annotations.s3_secret_key}') - MINIO_HOST=$(kubectl -n nuvolaris get cm/config -o jsonpath='{.metadata.annotations.s3_host}') - MINIO_PORT=$(kubectl -n nuvolaris get cm/config -o jsonpath='{.metadata.annotations.s3_port}') - MINIO_BUCKET_DATA=$(kubectl -n nuvolaris get cm/config -o jsonpath='{.metadata.annotations.s3_bucket_data}') - MINIO_BUCKET_WEB=$(kubectl -n nuvolaris get cm/config -o jsonpath='{.metadata.annotations.s3_bucket_static}') - ops -wsk -i action update minio/minio-nuv tests/minio-nuv.js \ - -p minio_access "$MINIO_ACCESS_KEY" \ - -p minio_secret "$MINIO_SECRET_KEY" \ - -p minio_host "$MINIO_HOST" \ - -p minio_port "$MINIO_PORT" \ - -p minio_data "$MINIO_BUCKET_DATA" - - ops -wsk -i action invoke minio/minio-nuv -r From 3b56714accb9cb23dc912f3010593dfdaa68eb75 Mon Sep 17 00:00:00 2001 From: Alessio Marinelli Date: Sat, 15 Nov 2025 21:28:54 +0100 Subject: [PATCH 6/9] feat: Complete Apache Spark integration with OpenServerless operator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add Spark cluster deployment with master + 2 workers + history server - Implement SparkJob CRD support (sparkjobs.nuvolaris.org) - Configure NodePort services for Web UI external access - Optimize memory settings for single-node MicroK8s deployment - Add Spark component tracking in main.py and patcher.py - Include test configuration whisk-spark-test.yaml - Update operator image to ghcr.io/mobs75/openserverless-operator:dev-spark-v6 Tested features: ✅ Complete Spark cluster deployment and operation ✅ SparkJob CRD creation and management ✅ Web UIs accessible via NodePort (Master 30808, Workers 31081/31082, History 31808) ✅ Job submission and driver execution --- deploy/spark/01-spark-configmap.yaml | 8 +- deploy/spark/04-spark-master-svc.yaml | 20 +++-- deploy/spark/05-spark-worker-sts.yaml | 4 +- deploy/spark/06-spark-worker-svc.yaml | 46 ++++++++++ deploy/spark/08-spark-history-svc.yaml | 11 +-- nuvolaris/kopf_util.py | 1 + nuvolaris/main.py | 5 +- nuvolaris/patcher.py | 5 ++ nuvolaris/spark.py | 80 +++++++----------- operator-deploy.yaml | 4 +- whisk-spark-test.yaml | 112 +++++++++++++++++++++++++ 11 files changed, 224 insertions(+), 72 deletions(-) create mode 100644 whisk-spark-test.yaml diff --git a/deploy/spark/01-spark-configmap.yaml b/deploy/spark/01-spark-configmap.yaml index 51e50bb..3989e14 100644 --- a/deploy/spark/01-spark-configmap.yaml +++ b/deploy/spark/01-spark-configmap.yaml @@ -13,10 +13,10 @@ data: spark.history.fs.logDirectory /tmp/spark-events spark.history.ui.port 18080 spark.serializer org.apache.spark.serializer.KryoSerializer - spark.driver.memory 1g - spark.executor.memory 2g - spark.executor.cores 2 - spark.cores.max 4 + spark.driver.memory 512m + spark.executor.memory 384m + spark.executor.cores 1 + spark.cores.max 2 spark.sql.warehouse.dir /tmp/spark-warehouse spark.driver.extraJavaOptions -Dlog4j.configuration=file:///opt/spark/conf/log4j.properties spark.executor.extraJavaOptions -Dlog4j.configuration=file:///opt/spark/conf/log4j.properties diff --git a/deploy/spark/04-spark-master-svc.yaml b/deploy/spark/04-spark-master-svc.yaml index 070ff87..a202f60 100644 --- a/deploy/spark/04-spark-master-svc.yaml +++ b/deploy/spark/04-spark-master-svc.yaml @@ -7,16 +7,18 @@ metadata: app: spark component: spark-master spec: - type: ClusterIP + type: NodePort selector: app: spark component: spark-master ports: - - name: master - port: 7077 - targetPort: 7077 - protocol: TCP - - name: webui - port: 8080 - targetPort: 8080 - protocol: TCP + - name: master + port: 7077 + targetPort: 7077 + nodePort: 30707 + protocol: TCP + - name: webui + port: 8080 + targetPort: 8080 + nodePort: 30808 + protocol: TCP \ No newline at end of file diff --git a/deploy/spark/05-spark-worker-sts.yaml b/deploy/spark/05-spark-worker-sts.yaml index 1194101..8d625f4 100644 --- a/deploy/spark/05-spark-worker-sts.yaml +++ b/deploy/spark/05-spark-worker-sts.yaml @@ -64,13 +64,13 @@ spec: - name: SPARK_WORKER_CORES value: "2" - name: SPARK_WORKER_MEMORY - value: "1g" + value: "2048m" - name: SPARK_WORKER_PORT value: "7078" - name: SPARK_WORKER_WEBUI_PORT value: "8081" - name: SPARK_DAEMON_MEMORY - value: "1g" + value: "512m" - name: SPARK_NO_DAEMONIZE value: "true" resources: diff --git a/deploy/spark/06-spark-worker-svc.yaml b/deploy/spark/06-spark-worker-svc.yaml index cb73366..e645728 100644 --- a/deploy/spark/06-spark-worker-svc.yaml +++ b/deploy/spark/06-spark-worker-svc.yaml @@ -21,3 +21,49 @@ spec: port: 8081 targetPort: 8081 protocol: TCP + +--- +# Individual Worker 0 Service with NodePort +apiVersion: v1 +kind: Service +metadata: + name: spark-worker-0 + namespace: nuvolaris + labels: + app: spark + component: spark-worker +spec: + type: NodePort + selector: + app: spark + component: spark-worker + statefulset.kubernetes.io/pod-name: spark-worker-0 + ports: + - name: webui + port: 8081 + targetPort: 8081 + nodePort: 31081 + protocol: TCP + +--- +# Individual Worker 1 Service with NodePort +apiVersion: v1 +kind: Service +metadata: + name: spark-worker-1 + namespace: nuvolaris + labels: + app: spark + component: spark-worker +spec: + type: NodePort + selector: + app: spark + component: spark-worker + statefulset.kubernetes.io/pod-name: spark-worker-1 + ports: + - name: webui + port: 8081 + targetPort: 8081 + nodePort: 31082 + protocol: TCP diff --git a/deploy/spark/08-spark-history-svc.yaml b/deploy/spark/08-spark-history-svc.yaml index 95fdb7a..148e6be 100644 --- a/deploy/spark/08-spark-history-svc.yaml +++ b/deploy/spark/08-spark-history-svc.yaml @@ -7,12 +7,13 @@ metadata: app: spark component: spark-history spec: - type: ClusterIP + type: NodePort selector: app: spark component: spark-history ports: - - name: webui - port: 18080 - targetPort: 18080 - protocol: TCP + - name: webui + port: 18080 + targetPort: 18080 + nodePort: 31808 + protocol: TCP \ No newline at end of file diff --git a/nuvolaris/kopf_util.py b/nuvolaris/kopf_util.py index cce7972..cc3e73c 100644 --- a/nuvolaris/kopf_util.py +++ b/nuvolaris/kopf_util.py @@ -107,6 +107,7 @@ def evaluate_differences(response: dict, differences: list): check_component(response, d,"spec.components.quota","quota") check_component(response, d,"spec.components.etcd","etcd") check_component(response, d,"spec.components.milvus","milvus") + check_component(response, d,"spec.components.spark","spark") check_component(response, d,"spec.components.registry","registry") check_component(response, d,"spec.components.seaweedfs","seaweedfs") openwhisk(response, d) diff --git a/nuvolaris/main.py b/nuvolaris/main.py index b4971f5..24abad3 100644 --- a/nuvolaris/main.py +++ b/nuvolaris/main.py @@ -292,7 +292,9 @@ def whisk_create(spec, name, **kwargs): state['milvus'] = "off" # Deploy Spark cluster - if cfg.get('components.spark'): + spark_enabled = cfg.get('components.spark') + logging.info(f"*** Spark deployment check: components.spark = {spark_enabled}") + if spark_enabled: try: logging.info("deploying spark cluster") msg = spark.create(owner) @@ -303,6 +305,7 @@ def whisk_create(spec, name, **kwargs): state['spark'] = "error" else: state['spark'] = "off" + logging.info("*** Spark deployment skipped: components.spark is False") whisk_post_create(name,state) state['controller']= "Ready" diff --git a/nuvolaris/patcher.py b/nuvolaris/patcher.py index 2ccdc3f..210ce24 100644 --- a/nuvolaris/patcher.py +++ b/nuvolaris/patcher.py @@ -37,6 +37,7 @@ import nuvolaris.milvus_standalone as milvus import nuvolaris.registry_deploy as registry import nuvolaris.seaweedfs_deploy as seaweedfs +import nuvolaris.spark as spark def patch_preloader(owner: None): try: @@ -161,6 +162,10 @@ def patch(diff, status, owner=None, name=None): milvus.patch(status,what_to_do['milvus'], owner) components_updated = True + if "spark" in what_to_do: + spark.patch(status,what_to_do['spark'], owner) + components_updated = True + if "registry" in what_to_do: registry.patch(status,what_to_do['milvus'], owner) components_updated = True diff --git a/nuvolaris/spark.py b/nuvolaris/spark.py index 721de7b..ec05869 100644 --- a/nuvolaris/spark.py +++ b/nuvolaris/spark.py @@ -87,66 +87,51 @@ def get_spark_config_data(): def create(owner=None): """ Deploy Apache Spark cluster on Kubernetes using standard OpenServerless patterns - - Creates: - - Spark Master (StatefulSet with optional HA) - - Spark Workers (StatefulSet) - - Spark History Server (Deployment with PVC) - - Required Services - - ConfigMaps for configuration - - Args: - owner: Owner reference for garbage collection - - Returns: - Result message from deployment """ logging.info("*** creating spark cluster") - # 1. Collect configuration - data = get_spark_config_data() - - # 2. Process Jinja2 templates (standard OpenServerless pattern) - kus.processTemplate("spark", "spark-configmap-tpl.yaml", data, "spark-configmap.yaml") - kus.processTemplate("spark", "spark-master-sts-tpl.yaml", data, "spark-master-sts.yaml") - - # Process History Server templates if enabled - if data['history_enabled']: - kus.processTemplate("spark", "spark-history-pvc-tpl.yaml", data, "spark-history-pvc.yaml") - kus.processTemplate("spark", "spark-history-dep-tpl.yaml", data, "spark-history-dep.yaml") + # Apply all manifests in deploy/spark directory directly (exclude non-K8s files) + import glob + import os - # 3. Define kustomize patches (standard pattern) - tplp = ["set-attach.yaml"] + spark_dir = "deploy/spark" + yaml_files = glob.glob(f"{spark_dir}/*.yaml") + yaml_files.sort() # Apply in alphabetical order - # 4. Add affinity/tolerations if enabled (standard pattern) - if data.get('affinity') or data.get('tolerations'): - tplp.append("affinity-tolerance-sts-core-attach.yaml") - - # 5. Generate kustomization - kust = kus.patchTemplates("spark", tplp, data) - - # 6. Build complete specification using standard OpenServerless pattern - templates = ["spark-rbac.yaml"] # Static Jinja2 templates to include - templates_filter = ["spark-configmap.yaml", "spark-master-sts.yaml"] # Generated templates to filter - - if data['history_enabled']: - templates_filter.extend(["spark-history-pvc.yaml", "spark-history-dep.yaml"]) - - spec = kus.restricted_kustom_list("spark", kust, templates=templates, - templates_filter=templates_filter, data=data) + all_manifests = [] + for yaml_file in yaml_files: + filename = os.path.basename(yaml_file) + # Skip CRD extension files and kustomization files + if filename in ["kustomization.yaml", "spark-crd-extension.yaml"]: + continue + + with open(yaml_file, 'r') as f: + content = f.read() + # Parse YAML content + import yaml + docs = list(yaml.load_all(content, yaml.SafeLoader)) + for doc in docs: + if doc and 'kind' in doc and 'apiVersion' in doc: # Valid K8s manifest + all_manifests.append(doc) + + # Create the list object for kubectl apply + spec = { + "apiVersion": "v1", + "kind": "List", + "items": all_manifests + } - # 7. Apply owner reference for garbage collection + # 3. Apply owner reference for garbage collection if owner: kopf.append_owner_reference(spec['items'], owner) else: - # Save spec for delete without owner cfg.put("state.spark.spec", spec) - # 8. Deploy to Kubernetes + # 5. Deploy to Kubernetes res = kube.apply(spec) logging.info("spark manifests applied") - # 9. Wait for components to be ready (standard pattern) + # 6. Wait for components to be ready logging.info("waiting for spark master to be ready...") util.wait_for_pod_ready( "{.items[?(@.metadata.labels.component == 'spark-master')].metadata.name}", @@ -154,9 +139,6 @@ def create(owner=None): ) logging.info("spark master is ready") - # 10. Post-configuration - configure_spark(data) - logging.info("*** spark cluster created successfully") return res diff --git a/operator-deploy.yaml b/operator-deploy.yaml index 1400b3e..7e21d41 100644 --- a/operator-deploy.yaml +++ b/operator-deploy.yaml @@ -29,8 +29,8 @@ spec: serviceAccount: nuvolaris-operator containers: - name: nuvolaris-operator - # usa l'immagine costruita da Kaniko nel registro locale del cluster (kube-system) - image: docker-registry.kube-system.svc.cluster.local:5000/$GITHUB_USER/openserverless-operator:$TAG + # usa l'immagine construita con Spark integration + image: ghcr.io/mobs75/openserverless-operator:dev-spark-v6 imagePullPolicy: Always command: ["./run.sh"] args: ["--verbose"] diff --git a/whisk-spark-test.yaml b/whisk-spark-test.yaml new file mode 100644 index 0000000..c821ed9 --- /dev/null +++ b/whisk-spark-test.yaml @@ -0,0 +1,112 @@ +apiVersion: nuvolaris.org/v1 +kind: Whisk +metadata: + name: controller + namespace: nuvolaris +spec: + nuvolaris: + password: nuvpassw0rd + apihost: auto + components: + # start openwhisk controller + openwhisk: true + # start openwhisk invoker + invoker: false + # start couchdb + couchdb: true + # start kafka + kafka: false + # start mongodb + mongodb: true + # start redis + redis: true + # start cron based action parser + cron: true + # enable TLS + tls: false + # minio enabled or not + minio: true + # minio static enabled or not + static: true + # postgres enabled or not + postgres: true + # registry disabled for this test + registry: false + # enable spark + spark: true + openwhisk: + namespaces: + whisk-system: 789c46b1-71f6-4ed5-8c54-816aa4f8c502:abcfO3xZCLrMN6v2BKK1dXYFpXlPkccOFqm12CdAsMgRU4VrNZ9lyGVCGuMDGIwP + nuvolaris: cbd68075-dac2-475e-8c07-d62a30c7e683:123zO3xKCLrMN6v2BKK1dXYFpXlPkccOFqm12CdAsMgRU4VrNZ9lyGVCGuMDGIwP + couchdb: + host: couchdb + port: 5984 + volume-size: 10 + admin: + user: whisk_admin + password: some_passw0rd + controller: + user: invoker_admin + password: s0meP@ass1 + invoker: + user: controller_admin + password: s0meP@ass2 + kafka: + host: kafka + volume-size: 10 + scheduler: + schedule: "* * * * *" + configs: + limits: + actions: + sequence-maxLength: 50 + invokes-perMinute: 999 + invokes-concurrent: 250 + triggers: + fires-perMinute: 999 + controller: + javaOpts: "-Xmx1024M" + redis: + persistence-enabled: true + volume-size: 10 + default: + password: s0meP@ass3 + nuvolaris: + prefix: nuv + password: s0meP@ass3 + mongodb: + host: mongodb + volume-size: 10 + admin: + user: whisk_admin + password: 0therPa55 + nuvolaris: + user: nuvolaris + password: s0meP@ass3 + exposedExternally: False + useOperator: False + minio: + volume-size: 2 + admin: + user: minioadmin + password: minioadmin + nuvolaris: + user: nuvolaris + password: zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG + postgres: + volume-size: 5 + replicas: 2 + admin: + password: 0therPa55 + replica-password: 0therPa55RR + nuvolaris: + password: s0meP@ass3 + spark: + master-cores: 1 + master-memory: 512m + worker-cores: 1 + worker-memory: 512m + driver-memory: 512m + executor-cores: 1 + executor-memory: 512m + history-enabled: true \ No newline at end of file From 8430d5b03a335b1d68bfc0a4f56e8ebc4ca82a0b Mon Sep 17 00:00:00 2001 From: Alessio Marinelli Date: Sun, 16 Nov 2025 16:51:59 +0100 Subject: [PATCH 7/9] Fix whisk-crd.yaml duplicate type:integer in milvus section --- deploy/nuvolaris-permissions/whisk-crd.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/deploy/nuvolaris-permissions/whisk-crd.yaml b/deploy/nuvolaris-permissions/whisk-crd.yaml index 79ab48e..d2e47a6 100644 --- a/deploy/nuvolaris-permissions/whisk-crd.yaml +++ b/deploy/nuvolaris-permissions/whisk-crd.yaml @@ -760,7 +760,6 @@ spec: type: integer bucket: description: used to setup a quota on the S3 bucket when running under seaweedfs (default to 10240MB) - type: integer type: integer replicas: description: number of total milvus replicas. Defaulted to 1 From b3801bcfa7bd7cd936f4c581afc80a77ace0bb8a Mon Sep 17 00:00:00 2001 From: Alessio Marinelli Date: Mon, 17 Nov 2025 10:08:16 +0100 Subject: [PATCH 8/9] fix: Aggiornamenti Spark operator e deploy - Modifica worker Spark - Fix handler spark.py - Aggiorna operator-deploy.yaml per Spark Branch: feature/add-spark-operator --- deploy/spark/05-spark-worker-sts.yaml | 4 +- nuvolaris/spark.py | 80 ++++++++++++++++----------- operator-deploy.yaml | 4 +- 3 files changed, 53 insertions(+), 35 deletions(-) diff --git a/deploy/spark/05-spark-worker-sts.yaml b/deploy/spark/05-spark-worker-sts.yaml index 8d625f4..1194101 100644 --- a/deploy/spark/05-spark-worker-sts.yaml +++ b/deploy/spark/05-spark-worker-sts.yaml @@ -64,13 +64,13 @@ spec: - name: SPARK_WORKER_CORES value: "2" - name: SPARK_WORKER_MEMORY - value: "2048m" + value: "1g" - name: SPARK_WORKER_PORT value: "7078" - name: SPARK_WORKER_WEBUI_PORT value: "8081" - name: SPARK_DAEMON_MEMORY - value: "512m" + value: "1g" - name: SPARK_NO_DAEMONIZE value: "true" resources: diff --git a/nuvolaris/spark.py b/nuvolaris/spark.py index ec05869..721de7b 100644 --- a/nuvolaris/spark.py +++ b/nuvolaris/spark.py @@ -87,51 +87,66 @@ def get_spark_config_data(): def create(owner=None): """ Deploy Apache Spark cluster on Kubernetes using standard OpenServerless patterns + + Creates: + - Spark Master (StatefulSet with optional HA) + - Spark Workers (StatefulSet) + - Spark History Server (Deployment with PVC) + - Required Services + - ConfigMaps for configuration + + Args: + owner: Owner reference for garbage collection + + Returns: + Result message from deployment """ logging.info("*** creating spark cluster") - # Apply all manifests in deploy/spark directory directly (exclude non-K8s files) - import glob - import os + # 1. Collect configuration + data = get_spark_config_data() - spark_dir = "deploy/spark" - yaml_files = glob.glob(f"{spark_dir}/*.yaml") - yaml_files.sort() # Apply in alphabetical order + # 2. Process Jinja2 templates (standard OpenServerless pattern) + kus.processTemplate("spark", "spark-configmap-tpl.yaml", data, "spark-configmap.yaml") + kus.processTemplate("spark", "spark-master-sts-tpl.yaml", data, "spark-master-sts.yaml") - all_manifests = [] - for yaml_file in yaml_files: - filename = os.path.basename(yaml_file) - # Skip CRD extension files and kustomization files - if filename in ["kustomization.yaml", "spark-crd-extension.yaml"]: - continue - - with open(yaml_file, 'r') as f: - content = f.read() - # Parse YAML content - import yaml - docs = list(yaml.load_all(content, yaml.SafeLoader)) - for doc in docs: - if doc and 'kind' in doc and 'apiVersion' in doc: # Valid K8s manifest - all_manifests.append(doc) - - # Create the list object for kubectl apply - spec = { - "apiVersion": "v1", - "kind": "List", - "items": all_manifests - } + # Process History Server templates if enabled + if data['history_enabled']: + kus.processTemplate("spark", "spark-history-pvc-tpl.yaml", data, "spark-history-pvc.yaml") + kus.processTemplate("spark", "spark-history-dep-tpl.yaml", data, "spark-history-dep.yaml") - # 3. Apply owner reference for garbage collection + # 3. Define kustomize patches (standard pattern) + tplp = ["set-attach.yaml"] + + # 4. Add affinity/tolerations if enabled (standard pattern) + if data.get('affinity') or data.get('tolerations'): + tplp.append("affinity-tolerance-sts-core-attach.yaml") + + # 5. Generate kustomization + kust = kus.patchTemplates("spark", tplp, data) + + # 6. Build complete specification using standard OpenServerless pattern + templates = ["spark-rbac.yaml"] # Static Jinja2 templates to include + templates_filter = ["spark-configmap.yaml", "spark-master-sts.yaml"] # Generated templates to filter + + if data['history_enabled']: + templates_filter.extend(["spark-history-pvc.yaml", "spark-history-dep.yaml"]) + + spec = kus.restricted_kustom_list("spark", kust, templates=templates, + templates_filter=templates_filter, data=data) + + # 7. Apply owner reference for garbage collection if owner: kopf.append_owner_reference(spec['items'], owner) else: + # Save spec for delete without owner cfg.put("state.spark.spec", spec) - # 5. Deploy to Kubernetes + # 8. Deploy to Kubernetes res = kube.apply(spec) logging.info("spark manifests applied") - # 6. Wait for components to be ready + # 9. Wait for components to be ready (standard pattern) logging.info("waiting for spark master to be ready...") util.wait_for_pod_ready( "{.items[?(@.metadata.labels.component == 'spark-master')].metadata.name}", @@ -139,6 +154,9 @@ def create(owner=None): ) logging.info("spark master is ready") + # 10. Post-configuration + configure_spark(data) + logging.info("*** spark cluster created successfully") return res diff --git a/operator-deploy.yaml b/operator-deploy.yaml index 7e21d41..1400b3e 100644 --- a/operator-deploy.yaml +++ b/operator-deploy.yaml @@ -29,8 +29,8 @@ spec: serviceAccount: nuvolaris-operator containers: - name: nuvolaris-operator - # usa l'immagine construita con Spark integration - image: ghcr.io/mobs75/openserverless-operator:dev-spark-v6 + # usa l'immagine costruita da Kaniko nel registro locale del cluster (kube-system) + image: docker-registry.kube-system.svc.cluster.local:5000/$GITHUB_USER/openserverless-operator:$TAG imagePullPolicy: Always command: ["./run.sh"] args: ["--verbose"] From a2f3dae8fb65a35ed821fa058b21e20c1996cfa7 Mon Sep 17 00:00:00 2001 From: Alessio Marinelli Date: Thu, 20 Nov 2025 18:58:17 +0100 Subject: [PATCH 9/9] Enable Spark in whisk-full test configuration --- tests/whisk-full.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/whisk-full.yaml b/tests/whisk-full.yaml index 1690910..7345cba 100644 --- a/tests/whisk-full.yaml +++ b/tests/whisk-full.yaml @@ -50,6 +50,8 @@ spec: monitoring: true # postgres enabled or not postgres: true + # spark enabled + spark: true openwhisk: namespaces: whisk-system: 789c46b1-71f6-4ed5-8c54-816aa4f8c502:abczO3xZCLrMN6v2BKK1dXYFpXlPkccOFqm12CdAsMgRU4VrNZ9lyGVCGuMDGIwP