-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Expand file tree
/
Copy pathresources.py
More file actions
54 lines (38 loc) · 1.53 KB
/
Copy pathresources.py
File metadata and controls
54 lines (38 loc) · 1.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from dagster_spark.configs_spark import spark_config
from dagster_spark.utils import flatten_dict
from pyspark.sql import SparkSession
import dagster._check as check
from dagster import resource
def spark_session_from_config(spark_conf=None):
spark_conf = check.opt_dict_param(spark_conf, "spark_conf")
builder = SparkSession.builder
flat = flatten_dict(spark_conf)
for key, value in flat:
builder = builder.config(key, value)
return builder.getOrCreate()
class PySparkResource:
def __init__(self, spark_conf):
self._spark_session = spark_session_from_config(spark_conf)
@property
def spark_session(self):
return self._spark_session
@property
def spark_context(self):
return self.spark_session.sparkContext
@resource({"spark_conf": spark_config()})
def pyspark_resource(init_context):
"""This resource provides access to a PySpark SparkSession for executing PySpark code within Dagster.
Example:
.. code-block:: python
@op(required_resource_keys={"pyspark"})
def my_op(context):
spark_session = context.resources.pyspark.spark_session
dataframe = spark_session.read.json("examples/src/main/resources/people.json")
my_pyspark_resource = pyspark_resource.configured(
{"spark_conf": {"spark.executor.memory": "2g"}}
)
@job(resource_defs={"pyspark": my_pyspark_resource})
def my_spark_job():
my_op()
"""
return PySparkResource(init_context.resource_config["spark_conf"])