Skip to content

Commit 2efb386

Browse files
committed
refactor(incremental): introduce StreamRecord type
Replicates graphql/graphql-js@eceeb4c
1 parent d664609 commit 2efb386

File tree

5 files changed

+65
-39
lines changed

5 files changed

+65
-39
lines changed

src/graphql/execution/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
InitialIncrementalExecutionResult,
2424
PendingResult,
2525
SubsequentIncrementalExecutionResult,
26+
StreamRecord,
2627
)
2728
from .middleware import MiddlewareManager
2829
from .values import get_argument_values, get_directive_values, get_variable_values
@@ -60,6 +61,7 @@
6061
"Middleware",
6162
"MiddlewareManager",
6263
"PendingResult",
64+
"StreamRecord",
6365
"SubsequentIncrementalExecutionResult",
6466
"create_source_event_stream",
6567
"default_field_resolver",

src/graphql/execution/execute.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@
104104
ReconcilableStreamItemsResult,
105105
StreamItemsRecord,
106106
StreamItemsResult,
107-
SubsequentResultRecord,
107+
StreamRecord,
108108
TerminatingStreamItemsResult,
109109
is_reconcilable_stream_items_result,
110110
)
@@ -957,10 +957,10 @@ async def complete_async_iterator_value(
957957
early_return = async_iterator.aclose() # type: ignore
958958
except AttributeError:
959959
early_return = None
960-
stream_record: SubsequentResultRecord | CancellableStreamRecord
960+
stream_record: StreamRecord
961961

962962
if early_return is None:
963-
stream_record = SubsequentResultRecord(path, stream_usage.label)
963+
stream_record = StreamRecord(path, stream_usage.label)
964964
else:
965965
stream_record = CancellableStreamRecord(
966966
early_return, path, stream_usage.label
@@ -1117,7 +1117,7 @@ def complete_iterable_value(
11171117
except StopIteration:
11181118
break
11191119
if stream_usage and index >= stream_usage.initial_count:
1120-
stream_record = SubsequentResultRecord(path, stream_usage.label)
1120+
stream_record = StreamRecord(path, stream_usage.label)
11211121

11221122
first_stream_items = self.first_sync_stream_items(
11231123
stream_record,
@@ -1726,7 +1726,7 @@ async def await_result() -> DeferredGroupedFieldSetResult:
17261726

17271727
def first_sync_stream_items(
17281728
self,
1729-
stream_record: SubsequentResultRecord,
1729+
stream_record: StreamRecord,
17301730
initial_item: AwaitableOrValue[Any],
17311731
initial_index: int,
17321732
iterator: Iterable[Any],
@@ -1798,7 +1798,7 @@ async def await_result() -> StreamItemsResult:
17981798

17991799
def first_async_stream_items(
18001800
self,
1801-
stream_record: SubsequentResultRecord,
1801+
stream_record: StreamRecord,
18021802
path: Path,
18031803
initial_index: int,
18041804
async_iterator: AsyncIterator[Any],
@@ -1822,7 +1822,7 @@ def first_async_stream_items(
18221822

18231823
async def get_next_async_stream_items_result(
18241824
self,
1825-
stream_record: SubsequentResultRecord,
1825+
stream_record: StreamRecord,
18261826
path: Path,
18271827
index: int,
18281828
async_iterator: AsyncIterator[Any],
@@ -1868,7 +1868,7 @@ async def get_next_async_stream_items_result(
18681868

18691869
def complete_stream_items(
18701870
self,
1871-
stream_record: SubsequentResultRecord,
1871+
stream_record: StreamRecord,
18721872
item_path: Path,
18731873
item: Any,
18741874
incremental_context: IncrementalContext,
@@ -2371,7 +2371,7 @@ def prepend_next_resolved_stream_items(
23712371

23722372
def build_stream_items_result(
23732373
errors: list[GraphQLError] | None,
2374-
stream_record: SubsequentResultRecord,
2374+
stream_record: StreamRecord,
23752375
result: GraphQLWrappedResult[Any],
23762376
) -> StreamItemsResult:
23772377
"""Build a stream items result."""

src/graphql/execution/incremental_graph.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
)
1717

1818
from graphql.execution.types import (
19-
SubsequentResultRecord,
19+
StreamRecord,
2020
is_deferred_grouped_field_set_record,
2121
)
2222

@@ -31,6 +31,7 @@
3131
ReconcilableDeferredGroupedFieldSetResult,
3232
StreamItemsRecord,
3333
StreamItemsResult,
34+
SubsequentResultRecord,
3435
)
3536

3637
try:
@@ -67,7 +68,7 @@ def __init__(self, deferred_fragment_record: DeferredFragmentRecord) -> None:
6768
self.children = []
6869

6970

70-
SubsequentResultNode = Union[DeferredFragmentNode, SubsequentResultRecord]
71+
SubsequentResultNode = Union[DeferredFragmentNode, StreamRecord]
7172

7273

7374
def is_deferred_fragment_node(
@@ -79,9 +80,9 @@ def is_deferred_fragment_node(
7980

8081
def is_stream_node(
8182
node: SubsequentResultNode | None,
82-
) -> TypeGuard[SubsequentResultRecord]:
83+
) -> TypeGuard[StreamRecord]:
8384
"""Check whether the given result node is a stream node."""
84-
return isinstance(node, SubsequentResultRecord)
85+
return isinstance(node, StreamRecord)
8586

8687

8788
class IncrementalGraph:
@@ -251,7 +252,7 @@ def remove_deferred_fragment(
251252
for child in deferred_fragment_node.children: # pragma: no cover
252253
self.remove_deferred_fragment(child.deferred_fragment_record)
253254

254-
def remove_stream(self, stream_record: SubsequentResultRecord) -> None:
255+
def remove_stream(self, stream_record: StreamRecord) -> None:
255256
"""Remove a stream record as no longer pending."""
256257
self._remove_pending(stream_record)
257258

src/graphql/execution/types.py

Lines changed: 38 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -799,18 +799,25 @@ def __init__(
799799
self.deferred_fragment_records = deferred_fragment_records
800800

801801

802-
class SubsequentResultRecord:
803-
"""Subsequent result record"""
802+
class DeferredFragmentRecord:
803+
"""Deferred fragment record"""
804804

805805
path: Path | None
806806
label: str | None
807807
id: str | None
808+
parent: DeferredFragmentRecord | None
808809

809-
__slots__ = "id", "label", "path"
810+
__slots__ = "id", "label", "parent", "path"
810811

811-
def __init__(self, path: Path | None, label: str | None = None) -> None:
812+
def __init__(
813+
self,
814+
path: Path | None = None,
815+
label: str | None = None,
816+
parent: DeferredFragmentRecord | None = None,
817+
) -> None:
812818
self.path = path
813819
self.label = label
820+
self.parent = parent
814821
self.id = None
815822

816823
def __repr__(self) -> str:
@@ -823,38 +830,44 @@ def __repr__(self) -> str:
823830
return f"{name}({', '.join(args)})"
824831

825832

826-
class DeferredFragmentRecord(SubsequentResultRecord):
827-
"""Deferred fragment record
833+
class StreamRecord:
834+
"""Stream record"""
828835

829-
For internal use only.
830-
"""
831-
832-
parent: DeferredFragmentRecord | None
836+
path: Path
837+
label: str | None
838+
id: str | None
833839

834-
__slots__ = ("parent",)
840+
__slots__ = "id", "label", "path"
835841

836842
def __init__(
837843
self,
838-
path: Path | None = None,
844+
path: Path,
839845
label: str | None = None,
840-
parent: DeferredFragmentRecord | None = None,
841846
) -> None:
842-
super().__init__(path, label)
843-
self.parent = parent
847+
self.path = path
848+
self.label = label
849+
self.id = None
850+
851+
def __repr__(self) -> str:
852+
name = self.__class__.__name__
853+
args: list[str] = [f"path={self.path.as_list()!r}"]
854+
if self.label:
855+
args.append(f"label={self.label!r}")
856+
return f"{name}({', '.join(args)})"
857+
844858

859+
SubsequentResultRecord = Union[DeferredFragmentRecord, StreamRecord]
845860

846-
class CancellableStreamRecord(SubsequentResultRecord):
861+
862+
class CancellableStreamRecord(StreamRecord):
847863
"""Cancellable stream record"""
848864

849865
early_return: Awaitable[None]
850866

851867
__slots__ = ("early_return",)
852868

853869
def __init__(
854-
self,
855-
early_return: Awaitable[None],
856-
path: Path | None = None,
857-
label: str | None = None,
870+
self, early_return: Awaitable[None], path: Path, label: str | None = None
858871
) -> None:
859872
super().__init__(path, label)
860873
self.early_return = early_return
@@ -870,7 +883,7 @@ def is_cancellable_stream_record(
870883
class ReconcilableStreamItemsResult(NamedTuple):
871884
"""Reconcilable stream items result"""
872885

873-
stream_record: SubsequentResultRecord
886+
stream_record: StreamRecord
874887
result: BareStreamItemsResult
875888
incremental_data_records: list[IncrementalDataRecord] | None = None
876889
errors: None = None
@@ -886,7 +899,7 @@ def is_reconcilable_stream_items_result(
886899
class TerminatingStreamItemsResult(NamedTuple):
887900
"""Terminating stream items result"""
888901

889-
stream_record: SubsequentResultRecord
902+
stream_record: StreamRecord
890903
result: None = None
891904
incremental_data_record: None = None
892905
errors: None = None
@@ -895,7 +908,7 @@ class TerminatingStreamItemsResult(NamedTuple):
895908
class NonReconcilableStreamItemsResult(NamedTuple):
896909
"""Non-reconcilable stream items result"""
897910

898-
stream_record: SubsequentResultRecord
911+
stream_record: StreamRecord
899912
errors: list[GraphQLError]
900913
result: None = None
901914

@@ -912,12 +925,12 @@ class StreamItemsRecord:
912925

913926
__slots__ = "result", "stream_record"
914927

915-
stream_record: SubsequentResultRecord
928+
stream_record: StreamRecord
916929
result: AwaitableOrValue[StreamItemsResult]
917930

918931
def __init__(
919932
self,
920-
stream_record: SubsequentResultRecord,
933+
stream_record: StreamRecord,
921934
result: AwaitableOrValue[StreamItemsResult],
922935
) -> None:
923936
self.stream_record = stream_record

tests/execution/test_stream.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
ExecutionResult,
1111
ExperimentalIncrementalExecutionResults,
1212
IncrementalStreamResult,
13+
StreamRecord,
1314
experimental_execute_incrementally,
1415
)
1516
from graphql.language import DocumentNode, parse
17+
from graphql.pyutils import Path
1618
from graphql.type import (
1719
GraphQLField,
1820
GraphQLID,
@@ -244,6 +246,14 @@ def can_hash_incremental_stream_result():
244246
IncrementalStreamResult(**modified_args(args, extensions={"baz": 1}))
245247
)
246248

249+
def can_print_stream_record():
250+
"""Can print a StreamRecord"""
251+
path = Path(None, "bar", "Bar")
252+
record = StreamRecord(path)
253+
assert str(record) == "StreamRecord(path=['bar'])"
254+
record = StreamRecord(path, "foo")
255+
assert str(record) == "StreamRecord(path=['bar'], label='foo')"
256+
247257
async def can_stream_a_list_field():
248258
"""Can stream a list field"""
249259
document = parse("{ scalarList @stream(initialCount: 1) }")

0 commit comments

Comments
 (0)