Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,35 +170,42 @@ for pipeline in pipelines:
print(f"State: {pipeline['state']}")
```

### Pause / Resume Pipeline
### Stop / Terminate / Resume Pipeline

```python
pipeline = client.get_pipeline("my-pipeline-id")
pipeline.pause()
pipeline.stop()
print(pipeline.status)
```

```
STOPPING
```

```python
pipeline = client.get_pipeline("my-pipeline-id")
pipeline.resume()
# Stop a pipeline ungracefully (terminate)
client.stop_pipeline("my-pipeline-id", terminate=True)
print(pipeline.status)
```

### Stop pipeline
```
TERMINATING
```

```python
# Stop a pipeline gracefully
client.stop_pipeline("my-pipeline-id")

# Stop a pipeline ungracefully (terminate)
client.stop_pipeline("my-pipeline-id", terminate=True)
pipeline = client.get_pipeline("my-pipeline-id")
pipeline.resume()
print(pipeline.status)
```

# Or stop via pipeline instance
pipeline.stop()
```
RESUMING
```

### Delete pipeline

Only stopped or terminated pipelines can be deleted.

```python
# Delete a pipeline
client.delete_pipeline("my-pipeline-id")
Expand Down
2 changes: 0 additions & 2 deletions src/glassflow/etl/models/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
class PipelineStatus(CaseInsensitiveStrEnum):
CREATED = "Created"
RUNNING = "Running"
PAUSING = "Pausing"
PAUSED = "Paused"
RESUMING = "Resuming"
STOPPING = "Stopping"
STOPPED = "Stopped"
Expand Down
31 changes: 7 additions & 24 deletions src/glassflow/etl/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,11 @@ def update(
def delete(self) -> None:
"""
Deletes the pipeline from the database. Only pipelines that are stopped or
terminating can be deleted.
terminated can be deleted.

Raises:
PipelineDeletionStateViolationError: If pipeline is not stopped or
terminating
terminated
PipelineNotFoundError: If pipeline is not found
APIError: If the API request fails
"""
Expand All @@ -162,9 +162,8 @@ def delete(self) -> None:

def stop(self, terminate: bool = False) -> Pipeline:
"""
Stops the pipeline. Gracefully by default, ungracefully if terminate is True.
Ungracefully means deleting all the pipeline components without waiting for the
events in the pipeline to be processed.
Stops the pipeline, waiting for all the events in the pipeline to be processed.
If terminate is True, the pipeline will be terminated instead.

Args:
terminate: Whether to terminate the pipeline (i.e. delete all the pipeline
Expand Down Expand Up @@ -192,26 +191,10 @@ def stop(self, terminate: bool = False) -> Pipeline:
self.status = next_status
return self

def pause(self) -> Pipeline:
"""Pauses the pipeline with the given ID.

Returns:
Pipeline: A Pipeline instance for the paused pipeline

Raises:
PipelineInTransitionError: If pipeline is in transition
PipelineNotFoundError: If pipeline is not found
InvalidStatusTransitionError: If pipeline is not in a state that can be
paused
APIError: If the API request fails
"""
endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/pause"
self._request("POST", endpoint, event_name="PipelinePaused")
self.status = models.PipelineStatus.PAUSING
return self

def resume(self) -> Pipeline:
"""Resumes the pipeline with the given ID.
"""
Resumes the pipeline with the given ID.
Only stopped or terminated pipelines can be resumed.

Returns:
Pipeline: A Pipeline instance for the resumed pipeline
Expand Down
7 changes: 3 additions & 4 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,12 @@ def test_create_http_error_scenarios(self, pipeline, scenario):


class TestPipelineLifecycle:
"""Tests for pause, resume, stop, terminate, delete operations."""
"""Tests for resume, stop, terminate, delete operations."""

@pytest.mark.parametrize(
"operation,method,endpoint,params,status",
[
("get", "GET", "", {}, models.PipelineStatus.RUNNING),
("pause", "POST", "/pause", {}, models.PipelineStatus.PAUSING),
("resume", "POST", "/resume", {}, models.PipelineStatus.RESUMING),
("delete", "DELETE", "", {}, models.PipelineStatus.DELETED),
(
Expand Down Expand Up @@ -137,14 +136,14 @@ def test_lifecycle_operations(
assert result == pipeline
assert pipeline.status == status

@pytest.mark.parametrize("operation", ["get", "delete", "pause", "resume", "stop"])
@pytest.mark.parametrize("operation", ["get", "delete", "resume", "stop"])
def test_lifecycle_not_found(self, pipeline, mock_not_found_response, operation):
"""Test lifecycle operations when pipeline is not found."""
with patch("httpx.Client.request", return_value=mock_not_found_response):
with pytest.raises(errors.PipelineNotFoundError):
getattr(pipeline, operation)()

@pytest.mark.parametrize("operation", ["get", "delete", "pause", "resume", "stop"])
@pytest.mark.parametrize("operation", ["get", "delete", "resume", "stop"])
def test_lifecycle_connection_error(
self, pipeline, mock_connection_error, operation
):
Expand Down