-
Notifications
You must be signed in to change notification settings - Fork 0
/
submit_spark.go
142 lines (116 loc) · 3.92 KB
/
submit_spark.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package commands
import (
"github.com/kubeflow/arena/pkg/util"
"github.com/spf13/cobra"
log "github.com/sirupsen/logrus"
"github.com/kubeflow/arena/pkg/workflow"
"os"
"fmt"
"errors"
)
var (
sparkChart = util.GetChartsFolder() + "/sparkjob"
)
const (
defaultSparkJobTrainingType = "sparkjob"
)
/**
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
sparkApplication is the supported as default
scheduledSparkApplication is not supported.
*/
func NewSparkApplicationCommand() *cobra.Command {
submitArgs := NewSubmitSparkJobArgs()
var command = &cobra.Command{
Use: "sparkjob",
Short: "Submit a common spark application job.",
Aliases: []string{"spark"},
Run: func(cmd *cobra.Command, args []string) {
util.SetLogLevel(logLevel)
setupKubeconfig()
_, err := initKubeClient()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
err = updateNamespace(cmd)
if err != nil {
log.Debugf("Failed due to %v", err)
fmt.Println(err)
os.Exit(1)
}
err = submitSparkApplicationJob(args, submitArgs)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
},
}
submitArgs.addFlags(command)
return command
}
func NewSubmitSparkJobArgs() *submitSparkJobArgs {
return &submitSparkJobArgs{
Driver: &Driver{},
Executor: &Executor{},
}
}
type submitSparkJobArgs struct {
Image string `yaml:"Image"`
MainClass string `yaml:"MainClass"`
Jar string `yaml:"Jar"`
Executor *Executor `yaml:"Executor"`
Driver *Driver `yaml:"Driver"`
}
type Driver struct {
CPURequest int `yaml:"CPURequest"`
MemoryRequest string `yaml:"MemoryRequest"`
}
type Executor struct {
Replicas int `yaml:"Replicas"`
CPURequest int `yaml:"CPURequest"`
MemoryRequest string `yaml:"MemoryRequest"`
}
// add flags to submit spark args
func (sa *submitSparkJobArgs) addFlags(command *cobra.Command) {
command.Flags().StringVar(&name, "name", "", "override name")
command.MarkFlagRequired("name")
command.Flags().StringVar(&sa.Image, "image", "registry.aliyuncs.com/acs/spark:v2.4.0", "the docker image name of training job")
command.Flags().IntVar(&sa.Executor.Replicas, "replicas", 1, "the executor's number to run the distributed training.")
command.Flags().StringVar(&sa.MainClass, "main-class", "org.apache.spark.examples.SparkPi", "main class of your jar")
command.Flags().StringVar(&sa.Jar, "jar", "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar", "jar path in image")
// cpu and memory request
command.Flags().IntVar(&sa.Driver.CPURequest, "driver-cpu-request", 1, "cpu request for driver pod")
command.Flags().StringVar(&sa.Driver.MemoryRequest, "driver-memory-request", "500m", "memory request for driver pod (min is 500m)")
command.Flags().IntVar(&sa.Executor.CPURequest, "executor-cpu-request", 1, "cpu request for executor pod")
command.Flags().StringVar(&sa.Executor.MemoryRequest, "executor-memory-request", "500m", "memory request for executor pod (min is 500m)")
}
// TODO add more check
// check params
func (sa *submitSparkJobArgs) isValid() error {
if sa.Executor != nil && sa.Executor.Replicas == 0 {
return errors.New("WorkersMustMoreThanOne")
}
return nil
}
func submitSparkApplicationJob(args []string, submitArgs *submitSparkJobArgs) error {
err := submitArgs.isValid()
if err != nil {
return err
}
trainer := NewSparkJobTrainer(clientset)
job, err := trainer.GetTrainingJob(name, namespace)
if err != nil {
return fmt.Errorf("failed to create sparkjob %s due to error %v", name, err)
}
if job != nil {
return fmt.Errorf("the job %s already exist, please delete it first. use 'arena delete %s'", name, name)
}
err = workflow.SubmitJob(name, defaultSparkJobTrainingType, namespace, submitArgs, sparkChart)
if err != nil {
return err
}
log.Infof("The Job %s has been submitted successfully", name)
log.Infof("You can run `arena get %s --type %s` to check the job status", name, defaultSparkJobTrainingType)
return nil
}