Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class SparkPipelinesSuite extends SparkSubmitTestUtils with BeforeAndAfterEach {
val args = Array(
"run",
"--spec",
"pipeline.yml"
"spark-pipeline.yml"
)
assert(
SparkPipelines.constructSparkSubmitArgs(
Expand All @@ -71,7 +71,7 @@ class SparkPipelinesSuite extends SparkSubmitTestUtils with BeforeAndAfterEach {
"abc/python/pyspark/pipelines/cli.py",
"run",
"--spec",
"pipeline.yml"
"spark-pipeline.yml"
)
)
}
Expand All @@ -83,7 +83,7 @@ class SparkPipelinesSuite extends SparkSubmitTestUtils with BeforeAndAfterEach {
"run",
"--supervise",
"--spec",
"pipeline.yml",
"spark-pipeline.yml",
"--conf",
"spark.conf2=3"
)
Expand All @@ -101,7 +101,7 @@ class SparkPipelinesSuite extends SparkSubmitTestUtils with BeforeAndAfterEach {
"abc/python/pyspark/pipelines/cli.py",
"run",
"--spec",
"pipeline.yml"
"spark-pipeline.yml"
)
)
}
Expand Down
4 changes: 2 additions & 2 deletions docs/declarative-pipelines-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ configuration:
spark.sql.shuffle.partitions: "1000"
```

It's conventional to name pipeline spec files `pipeline.yml`.
It's conventional to name pipeline spec files `spark-pipeline.yml`.

The `spark-pipelines init` command, described below, makes it easy to generate a pipeline project with default configuration and directory structure.

Expand All @@ -113,7 +113,7 @@ The `spark-pipelines` command line interface (CLI) is the primary way to execute

### `spark-pipelines run`

`spark-pipelines run` launches an execution of a pipeline and monitors its progress until it completes. The `--spec` parameter allows selecting the pipeline spec file. If not provided, the CLI will look in the current directory and parent directories for a file named `pipeline.yml` or `pipeline.yaml`.
`spark-pipelines run` launches an execution of a pipeline and monitors its progress until it completes. The `--spec` parameter allows selecting the pipeline spec file. If not provided, the CLI will look in the current directory and parent directories for a file named `spark-pipeline.yml` or `spark-pipeline.yaml`.

### `spark-pipelines dry-run`

Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/errors/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,7 @@
},
"PIPELINE_SPEC_FILE_NOT_FOUND": {
"message": [
"No pipeline.yaml or pipeline.yml file provided in arguments or found in directory `<dir_path>` or readable ancestor directories."
"No spark-pipeline.yaml or spark-pipeline.yml file provided in arguments or found in directory `<dir_path>` or readable ancestor directories."
]
},
"PIPELINE_SPEC_INVALID_GLOB_PATTERN": {
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/pipelines/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@

from pyspark.pipelines.add_pipeline_analysis_context import add_pipeline_analysis_context

PIPELINE_SPEC_FILE_NAMES = ["pipeline.yaml", "pipeline.yml"]
PIPELINE_SPEC_FILE_NAMES = ["spark-pipeline.yaml", "spark-pipeline.yml"]


@dataclass(frozen=True)
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/pipelines/init_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def init(name: str) -> None:
storage_path = f"file://{storage_dir.resolve()}"

# Write the spec file to the project directory
spec_file = project_dir / "pipeline.yml"
spec_file = project_dir / "spark-pipeline.yml"
with open(spec_file, "w") as f:
spec_content = SPEC.replace("{{ name }}", name).replace("{{ storage_root }}", storage_path)
f.write(spec_content)
Expand Down
22 changes: 11 additions & 11 deletions python/pyspark/pipelines/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def test_unpack_pipeline_spec_bad_configuration(self):

def test_find_pipeline_spec_in_current_directory(self):
with tempfile.TemporaryDirectory() as temp_dir:
spec_path = Path(temp_dir) / "pipeline.yaml"
spec_path = Path(temp_dir) / "spark-pipeline.yaml"
with spec_path.open("w") as f:
f.write(
"""
Expand All @@ -208,7 +208,7 @@ def test_find_pipeline_spec_in_current_directory(self):

def test_find_pipeline_spec_in_current_directory_yml(self):
with tempfile.TemporaryDirectory() as temp_dir:
spec_path = Path(temp_dir) / "pipeline.yml"
spec_path = Path(temp_dir) / "spark-pipeline.yml"
with spec_path.open("w") as f:
f.write(
"""
Expand All @@ -225,10 +225,10 @@ def test_find_pipeline_spec_in_current_directory_yml(self):

def test_find_pipeline_spec_in_current_directory_yml_and_yaml(self):
with tempfile.TemporaryDirectory() as temp_dir:
with (Path(temp_dir) / "pipeline.yml").open("w") as f:
with (Path(temp_dir) / "spark-pipeline.yml").open("w") as f:
f.write("")

with (Path(temp_dir) / "pipeline.yaml").open("w") as f:
with (Path(temp_dir) / "spark-pipeline.yaml").open("w") as f:
f.write("")

with self.assertRaises(PySparkException) as context:
Expand All @@ -241,7 +241,7 @@ def test_find_pipeline_spec_in_parent_directory(self):
parent_dir = Path(temp_dir)
child_dir = Path(temp_dir) / "child"
child_dir.mkdir()
spec_path = parent_dir / "pipeline.yaml"
spec_path = parent_dir / "spark-pipeline.yaml"
with spec_path.open("w") as f:
f.write(
"""
Expand Down Expand Up @@ -296,7 +296,7 @@ def mv2():

registry = LocalGraphElementRegistry()
register_definitions(
outer_dir / "pipeline.yaml", registry, spec, self.spark, "test_graph_id"
outer_dir / "spark-pipeline.yaml", registry, spec, self.spark, "test_graph_id"
)
self.assertEqual(len(registry.outputs), 1)
self.assertEqual(registry.outputs[0].name, "mv1")
Expand All @@ -319,7 +319,7 @@ def test_register_definitions_file_raises_error(self):
registry = LocalGraphElementRegistry()
with self.assertRaises(RuntimeError) as context:
register_definitions(
outer_dir / "pipeline.yml", registry, spec, self.spark, "test_graph_id"
outer_dir / "spark-pipeline.yml", registry, spec, self.spark, "test_graph_id"
)
self.assertIn("This is a test exception", str(context.exception))

Expand Down Expand Up @@ -377,7 +377,7 @@ def test_python_import_current_directory(self):
registry = LocalGraphElementRegistry()
with change_dir(inner_dir2):
register_definitions(
inner_dir1 / "pipeline.yaml",
inner_dir1 / "spark-pipeline.yaml",
registry,
PipelineSpec(
name="test_pipeline",
Expand All @@ -394,7 +394,7 @@ def test_python_import_current_directory(self):
def test_full_refresh_all_conflicts_with_full_refresh(self):
with tempfile.TemporaryDirectory() as temp_dir:
# Create a minimal pipeline spec
spec_path = Path(temp_dir) / "pipeline.yaml"
spec_path = Path(temp_dir) / "spark-pipeline.yaml"
with spec_path.open("w") as f:
f.write('{"name": "test_pipeline"}')

Expand All @@ -418,7 +418,7 @@ def test_full_refresh_all_conflicts_with_full_refresh(self):
def test_full_refresh_all_conflicts_with_refresh(self):
with tempfile.TemporaryDirectory() as temp_dir:
# Create a minimal pipeline spec
spec_path = Path(temp_dir) / "pipeline.yaml"
spec_path = Path(temp_dir) / "spark-pipeline.yaml"
with spec_path.open("w") as f:
f.write('{"name": "test_pipeline"}')

Expand All @@ -443,7 +443,7 @@ def test_full_refresh_all_conflicts_with_refresh(self):
def test_full_refresh_all_conflicts_with_both(self):
with tempfile.TemporaryDirectory() as temp_dir:
# Create a minimal pipeline spec
spec_path = Path(temp_dir) / "pipeline.yaml"
spec_path = Path(temp_dir) / "spark-pipeline.yaml"
with spec_path.open("w") as f:
f.write('{"name": "test_pipeline"}')

Expand Down