Skip to content

Commit 5639dda

Browse files
committed
feat: step-based parameterized catalog entries in GQL
1 parent 3896a28 commit 5639dda

25 files changed

Lines changed: 794 additions & 13 deletions

File tree

examples/advanced/KedroSpaceflightsGQL/Data/_01_Raw/Catalog.Raw.cs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public partial class Catalog
7070
);
7171

7272
/// <summary>
73-
/// Raw shuttle data queried from the GQL server.
73+
/// Raw shuttle data queried from the GQL server (all shuttles, unfiltered).
7474
/// </summary>
7575
public IItem<IEnumerable<IGetShuttles_Shuttles>> Shuttles =>
7676
CreateItem(
@@ -96,4 +96,42 @@ public partial class Catalog
9696
allowEmptyData: true
9797
)
9898
);
99+
100+
// ── Parameterized GQL entries — consumed by the Analytics flow ──────────
101+
102+
/// <summary>
103+
/// The company ID of the highest-rated company. Written by FindTopRatedCompany;
104+
/// used as the parameter source for <see cref="TopRatedCompanyShuttles"/>.
105+
/// </summary>
106+
public IItem<string> TopRatedCompanyId =>
107+
CreateItem(() => ItemFactory.Single.Memory<string>("TopRatedCompanyId"));
108+
109+
/// <summary>
110+
/// Shuttles operated by the top-rated company — a parameterized GQL catalog entry.
111+
/// At load time, the adapter reads <see cref="TopRatedCompanyId"/> and fires a
112+
/// filtered <c>GetShuttlesByCompanyId</c> query. Only that company's shuttles are
113+
/// transferred; the full shuttle dataset is never pulled.
114+
/// </summary>
115+
/// <remarks>
116+
/// The dependency analyzer discovers that this item's adapter depends on
117+
/// <see cref="TopRatedCompanyId"/>. Any step consuming <see cref="TopRatedCompanyShuttles"/>
118+
/// is automatically scheduled after the step that produces
119+
/// <see cref="TopRatedCompanyId"/>, with no explicit ordering required in the flow
120+
/// definition.
121+
/// </remarks>
122+
public IItem<IEnumerable<IGetShuttlesByCompanyId_Shuttles>> TopRatedCompanyShuttles =>
123+
CreateItem(
124+
() =>
125+
GqlItemFactory.Enumerable.Query<
126+
string,
127+
IGetShuttlesByCompanyIdResult,
128+
IGetShuttlesByCompanyId_Shuttles
129+
>(
130+
label: "GQLTopRatedCompanyShuttles",
131+
parameterSource: TopRatedCompanyId,
132+
queryFunc: (companyId, ct) => _client.GetShuttlesByCompanyId.ExecuteAsync(companyId, ct),
133+
selectData: r => r.Shuttles,
134+
allowEmptyData: true
135+
)
136+
);
99137
}

examples/advanced/KedroSpaceflightsGQL/Data/_08_Reporting/Catalog.Reporting.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,20 @@ public partial class Catalog
2222
)
2323
);
2424

25+
/// <summary>
26+
/// Fleet summary for the top-rated company, produced by the Analytics flow.
27+
/// Demonstrates that the consuming step receives plain materialized data and has
28+
/// no awareness of the parameterized GQL query that fetched it.
29+
/// </summary>
30+
public IItem<TopRatedCompanyReport> TopRatedCompanyReport =>
31+
CreateItem(
32+
() =>
33+
ItemFactory.Single.Json<TopRatedCompanyReport>(
34+
label: "TopRatedCompanyReport",
35+
filePath: $"{_basePath}/_08_Reporting/Datasets/top_rated_company_report.json"
36+
)
37+
);
38+
2539
/// <summary>
2640
/// Shuttle passenger capacity bar chart (in-memory GenericChart).
2741
/// Intermediate chart object stored in memory for downstream export to PNG.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
using Flowthru.Core.Abstractions;
2+
3+
namespace KedroSpaceflightsGQL.Data._08_Reporting.Schemas;
4+
5+
/// <summary>
6+
/// Summary report for the top-rated company's shuttle fleet.
7+
/// Produced by the Analytics flow to demonstrate parameterized GQL catalog items.
8+
/// </summary>
9+
[FlowthruSchema]
10+
public partial record TopRatedCompanyReport
11+
{
12+
/// <summary>
13+
/// Identifier of the top-rated company.
14+
/// </summary>
15+
public required string CompanyId { get; init; }
16+
17+
/// <summary>
18+
/// Number of shuttles operated by this company.
19+
/// </summary>
20+
public required int ShuttleCount { get; init; }
21+
22+
/// <summary>
23+
/// Average trip price across the company's fleet.
24+
/// </summary>
25+
public required decimal AveragePrice { get; init; }
26+
27+
/// <summary>
28+
/// Total passenger capacity across the company's entire fleet.
29+
/// </summary>
30+
public required int TotalPassengerCapacity { get; init; }
31+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
using Flowthru.Core.Flows;
2+
using KedroSpaceflightsGQL.Data;
3+
using KedroSpaceflightsGQL.Flows.Analytics.Steps;
4+
5+
namespace KedroSpaceflightsGQL.Flows.Analytics;
6+
7+
/// <summary>
8+
/// Analytics pipeline demonstrating parameterized GQL catalog items.
9+
/// </summary>
10+
/// <remarks>
11+
/// <para>
12+
/// <strong>The problem this addresses:</strong> The GQL shuttle endpoint contains the full
13+
/// fleet across all companies. Pulling it unfiltered and then post-filtering in a step wastes
14+
/// network bandwidth and server resources — particularly when only a small subset is needed.
15+
/// </para>
16+
/// <para>
17+
/// <strong>The pattern:</strong>
18+
/// <list type="number">
19+
/// <item>
20+
/// <c>FindTopRatedCompany</c> is a pure transform: it reads all company records and selects
21+
/// the highest-rated company's ID, writing it to the in-memory <c>TopRatedCompanyId</c>
22+
/// catalog item.
23+
/// </item>
24+
/// <item>
25+
/// <c>TopRatedCompanyShuttles</c> is a <em>parameterized</em> GQL catalog entry. Its adapter
26+
/// is declared with <c>parameterSource: catalog.TopRatedCompanyId</c>. When the engine loads
27+
/// this item, the adapter reads <c>TopRatedCompanyId</c> first and passes its value to
28+
/// <c>GetShuttlesByCompanyId</c> — so only the relevant shuttles are fetched.
29+
/// </item>
30+
/// <item>
31+
/// <c>AnalyzeTopCompanyFleet</c> is a pure transform: it receives a plain
32+
/// <c>IEnumerable</c> and produces a report. It has no knowledge of parameterization.
33+
/// </item>
34+
/// </list>
35+
/// </para>
36+
/// <para>
37+
/// <strong>DAG ordering:</strong> The dependency analyzer inspects
38+
/// <c>TopRatedCompanyShuttles</c>'s adapter and discovers that it depends on
39+
/// <c>TopRatedCompanyId</c>. It automatically adds <c>FindTopRatedCompany</c> as a
40+
/// transitive dependency of <c>AnalyzeTopCompanyFleet</c> — no explicit ordering is declared
41+
/// in this flow definition.
42+
/// </para>
43+
/// </remarks>
44+
public static class AnalyticsFlow
45+
{
46+
/// <summary>
47+
/// Creates the analytics pipeline.
48+
/// </summary>
49+
public static Flow Create(Catalog catalog)
50+
{
51+
return FlowBuilder.CreateFlow(pipeline =>
52+
{
53+
pipeline.AddStep(
54+
label: "FindTopRatedCompany",
55+
description: """
56+
Identifies the company with the highest rating across all GQL companies.
57+
GqlDatabaseSeeded is consumed as an explicit DAG gate ensuring Ingest has
58+
completed before this step executes.
59+
""",
60+
transform: FindTopRatedCompanyStep.Create(),
61+
input: (catalog.GqlDatabaseSeeded, catalog.Companies),
62+
output: catalog.TopRatedCompanyId
63+
);
64+
65+
pipeline.AddStep(
66+
label: "AnalyzeTopCompanyFleet",
67+
description: """
68+
Computes a fleet summary for the top-rated company. The input catalog item
69+
(TopRatedCompanyShuttles) is parameterized: its adapter reads TopRatedCompanyId at load
70+
time and fires a filtered GQL query — only that company's shuttles are transferred.
71+
This step is a pure transform with no awareness of the filtering mechanism.
72+
DAG ordering (runs after FindTopRatedCompany) is enforced automatically via the
73+
adapter's ItemDependencies declaration.
74+
""",
75+
transform: AnalyzeTopCompanyShuttlesStep.Create(),
76+
input: catalog.TopRatedCompanyShuttles,
77+
output: catalog.TopRatedCompanyReport
78+
);
79+
});
80+
}
81+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
using Flowthru.Core.Steps;
2+
using KedroSpaceflightsGQL.Data._08_Reporting.Schemas;
3+
using KedroSpaceflightsGQL.Infra.GqlClient;
4+
5+
namespace KedroSpaceflightsGQL.Flows.Analytics.Steps;
6+
7+
/// <summary>
8+
/// Computes a fleet summary for the top-rated company from its filtered shuttle data.
9+
/// </summary>
10+
/// <remarks>
11+
/// This step is a pure transform. It receives a plain <c>IEnumerable</c> of shuttle records
12+
/// and produces a report. It has no awareness that the shuttle data was fetched via a
13+
/// parameterized GQL query — the catalog layer handled all of that. The DAG ordering
14+
/// (this step runs after <c>FindTopRatedCompany</c>) is enforced automatically by the
15+
/// dependency analyzer inspecting <c>TopRatedCompanyShuttles</c>'s adapter dependencies.
16+
/// </remarks>
17+
[FlowthruStep]
18+
public static class AnalyzeTopCompanyShuttlesStep
19+
{
20+
/// <summary>
21+
/// Creates the transform that summarizes a company's shuttle fleet.
22+
/// </summary>
23+
public static Func<
24+
IEnumerable<IGetShuttlesByCompanyId_Shuttles>,
25+
TopRatedCompanyReport
26+
> Create() =>
27+
shuttles =>
28+
{
29+
var list = shuttles.ToList();
30+
31+
var companyId = list.FirstOrDefault()?.CompanyId ?? "unknown";
32+
var shuttleCount = list.Count;
33+
var averagePrice = shuttleCount > 0 ? list.Average(s => s.Price) : 0m;
34+
var totalCapacity = list.Sum(s => s.PassengerCapacity);
35+
36+
return new TopRatedCompanyReport
37+
{
38+
CompanyId = companyId,
39+
ShuttleCount = shuttleCount,
40+
AveragePrice = averagePrice,
41+
TotalPassengerCapacity = totalCapacity,
42+
};
43+
};
44+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
using Flowthru.Core.Steps;
2+
using KedroSpaceflightsGQL.Infra.GqlClient;
3+
4+
namespace KedroSpaceflightsGQL.Flows.Analytics.Steps;
5+
6+
/// <summary>
7+
/// Identifies the company with the highest rating across all companies.
8+
/// Produces the company ID string that parameterizes the downstream shuttle query.
9+
/// </summary>
10+
/// <remarks>
11+
/// This step is a pure transform: it has no awareness that its output will be consumed
12+
/// by a parameterized catalog entry. The catalog layer handles that wiring entirely.
13+
/// </remarks>
14+
[FlowthruStep]
15+
public static class FindTopRatedCompanyStep
16+
{
17+
/// <summary>
18+
/// Creates the transform that selects the highest-rated company's ID.
19+
/// </summary>
20+
/// <remarks>
21+
/// The <c>bool</c> first input is the <c>GqlDatabaseSeeded</c> gate; it is consumed
22+
/// only to express the DAG dependency on Ingest and is otherwise unused.
23+
/// </remarks>
24+
public static Func<(bool, IEnumerable<IGetCompanies_Companies>), string> Create() =>
25+
input =>
26+
{
27+
var (_, companies) = input;
28+
var top = companies.MaxBy(c => c.CompanyRating);
29+
30+
if (top is null)
31+
{
32+
throw new InvalidOperationException(
33+
"Cannot determine top-rated company: the companies collection is empty."
34+
);
35+
}
36+
37+
return top.Id;
38+
};
39+
}

examples/advanced/KedroSpaceflightsGQL/Infra/GqlClient/Operations.graphql

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,20 @@ query GetShuttles {
2121
}
2222
}
2323

24+
query GetShuttlesByCompanyId($companyId: String!) {
25+
shuttles(companyId: $companyId) {
26+
id
27+
shuttleType
28+
companyId
29+
engines
30+
passengerCapacity
31+
crew
32+
price
33+
dCheckComplete
34+
moonClearanceComplete
35+
}
36+
}
37+
2438
query GetReviews {
2539
reviews {
2640
shuttleId

examples/advanced/KedroSpaceflightsGQL/Infra/GqlClient/schema.graphql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ input AddReviewInput {
5050

5151
type Query {
5252
companies: [CompanyRecord!]!
53-
shuttles: [ShuttleRecord!]!
53+
shuttles(companyId: String): [ShuttleRecord!]!
5454
reviews: [ReviewRecord!]!
5555
}
5656

examples/advanced/KedroSpaceflightsGQL/Infra/GqlServer/SpaceflightsGqlServer.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@ public class Query
1212
public IReadOnlyList<CompanyRecord> GetCompanies([Service] SpaceflightsRepository repo) =>
1313
repo.GetCompanies();
1414

15-
public IReadOnlyList<ShuttleRecord> GetShuttles([Service] SpaceflightsRepository repo) =>
16-
repo.GetShuttles();
15+
public IReadOnlyList<ShuttleRecord> GetShuttles(
16+
[Service] SpaceflightsRepository repo,
17+
string? companyId = null
18+
) => companyId is not null ? repo.GetShuttlesByCompanyId(companyId) : repo.GetShuttles();
1719

1820
public IReadOnlyList<ReviewRecord> GetReviews([Service] SpaceflightsRepository repo) =>
1921
repo.GetReviews();

examples/advanced/KedroSpaceflightsGQL/Infra/GqlServer/SpaceflightsRepository.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@ public IReadOnlyList<ShuttleRecord> GetShuttles()
3636
}
3737
}
3838

39+
public IReadOnlyList<ShuttleRecord> GetShuttlesByCompanyId(string companyId)
40+
{
41+
lock (_lock)
42+
{
43+
return _shuttles.Where(s => s.CompanyId == companyId).ToList();
44+
}
45+
}
46+
3947
public IReadOnlyList<ReviewRecord> GetReviews()
4048
{
4149
lock (_lock)

0 commit comments

Comments
 (0)