-
Notifications
You must be signed in to change notification settings - Fork 245
/
models.py
169 lines (145 loc) · 4.99 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import enum
from typing import Dict, Optional
from flyteidl.plugins import spark_pb2 as _spark_task
from flytekit.exceptions import user as _user_exceptions
from flytekit.models import common as _common
class SparkType(enum.Enum):
PYTHON = 1
SCALA = 2
JAVA = 3
R = 4
class SparkJob(_common.FlyteIdlEntity):
def __init__(
self,
spark_type,
application_file,
main_class,
spark_conf,
hadoop_conf,
databricks_conf,
executor_path,
):
"""
This defines a SparkJob target. It will execute the appropriate SparkJob.
:param application_file: The main application file to execute.
:param dict[Text, Text] spark_conf: A definition of key-value pairs for spark config for the job.
:param dict[Text, Text] hadoop_conf: A definition of key-value pairs for hadoop config for the job.
"""
self._application_file = application_file
self._spark_type = spark_type
self._main_class = main_class
self._executor_path = executor_path
self._spark_conf = spark_conf
self._hadoop_conf = hadoop_conf
self._databricks_conf = databricks_conf
def with_overrides(
self,
new_spark_conf: Optional[Dict[str, str]] = None,
new_hadoop_conf: Optional[Dict[str, str]] = None,
new_databricks_conf: Optional[str] = None,
) -> "SparkJob":
if not new_spark_conf:
new_spark_conf = self.spark_conf
if not new_hadoop_conf:
new_hadoop_conf = self.hadoop_conf
if not new_databricks_conf:
new_databricks_conf = self.databricks_conf
return SparkJob(
spark_type=self.spark_type,
application_file=self.application_file,
main_class=self.main_class,
spark_conf=new_spark_conf,
hadoop_conf=new_hadoop_conf,
databricks_conf=new_databricks_conf,
executor_path=self.executor_path,
)
@property
def main_class(self):
"""
The main class to execute
:rtype: Text
"""
return self._main_class
@property
def spark_type(self):
"""
Spark Job Type
:rtype: Text
"""
return self._spark_type
@property
def application_file(self):
"""
The main application file to execute
:rtype: Text
"""
return self._application_file
@property
def executor_path(self):
"""
The python executable to use
:rtype: Text
"""
return self._executor_path
@property
def spark_conf(self):
"""
A definition of key-value pairs for spark config for the job.
:rtype: dict[Text, Text]
"""
return self._spark_conf
@property
def hadoop_conf(self):
"""
A definition of key-value pairs for hadoop config for the job.
:rtype: dict[Text, Text]
"""
return self._hadoop_conf
@property
def databricks_conf(self) -> str:
return self._databricks_conf
def to_flyte_idl(self):
"""
:rtype: flyteidl.plugins.spark_pb2.SparkJob
"""
if self.spark_type == SparkType.PYTHON:
application_type = _spark_task.SparkApplication.PYTHON
elif self.spark_type == SparkType.JAVA:
application_type = _spark_task.SparkApplication.JAVA
elif self.spark_type == SparkType.SCALA:
application_type = _spark_task.SparkApplication.SCALA
elif self.spark_type == SparkType.R:
application_type = _spark_task.SparkApplication.R
else:
raise _user_exceptions.FlyteValidationException("Invalid Spark Application Type Specified")
return _spark_task.SparkJob(
applicationType=application_type,
mainApplicationFile=self.application_file,
mainClass=self.main_class,
executorPath=self.executor_path,
sparkConf=self.spark_conf,
hadoopConf=self.hadoop_conf,
databricksConf=self.databricks_conf,
)
@classmethod
def from_flyte_idl(cls, pb2_object):
"""
:param flyteidl.plugins.spark_pb2.SparkJob pb2_object:
:rtype: SparkJob
"""
application_type = SparkType.PYTHON
if pb2_object.type == _spark_task.SparkApplication.JAVA:
application_type = SparkType.JAVA
elif pb2_object.type == _spark_task.SparkApplication.SCALA:
application_type = SparkType.SCALA
elif pb2_object.type == _spark_task.SparkApplication.R:
application_type = SparkType.R
return cls(
type=application_type,
spark_conf=pb2_object.sparkConf,
application_file=pb2_object.mainApplicationFile,
main_class=pb2_object.mainClass,
hadoop_conf=pb2_object.hadoopConf,
executor_path=pb2_object.executorPath,
databricks_conf=pb2_object.databricksConf,
)