Skip to content

Commit 6c15703

Browse files
committed
fix: make spark example spark-ier
1 parent 6103590 commit 6c15703

12 files changed

Lines changed: 357 additions & 88 deletions

File tree

.github/agents/dev-relations.agent.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
---
22
description: "Use when: writing release summaries, drafting changelogs for end-users, summarizing what changed between tagged releases."
3-
tools: [read, search, execute, edit]
43
model: claude-haiku-4.5
54
---
65

.github/workflows/release.yml

Lines changed: 45 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -189,50 +189,48 @@ jobs:
189189
--source https://api.nuget.org/v3/index.json \
190190
--skip-duplicate
191191
192-
# - name: Install Copilot CLI
193-
# if: steps.release.outputs.changed == 'true'
194-
# run: npm install -g @github/copilot
195-
196-
# - name: Generate release summary
197-
# if: steps.release.outputs.changed == 'true'
198-
# env:
199-
# COPILOT_GITHUB_TOKEN: ${{ secrets.COPILOT_CLI_TOKEN }}
200-
# run: |
201-
# HEAD_TAG="v${{ steps.release.outputs.version }}"
202-
# BASE_TAG="${{ steps.last-published.outputs.tag }}"
203-
# copilot -p "Generate a release summary. Base tag: ${BASE_TAG}, head tag: ${HEAD_TAG}. Summarize all changes between these two tags." \
204-
# --agent=dev-relations \
205-
# --model=claude-haiku-4.5 \
206-
# --available-tools='view,create,edit,bash' \
207-
# --allow-tool='shell(git:*)' \
208-
# --allow-tool='write' \
209-
# --allow-tool='view' \
210-
# --add-dir=. \
211-
# --deny-tool='shell(git push)' \
212-
# --deny-tool='shell(git reset)' \
213-
# --deny-tool='shell(git checkout)' \
214-
# --deny-tool='shell(git commit)' \
215-
# --deny-tool='shell(git merge)' \
216-
# --deny-tool='shell(git rebase)' \
217-
# --deny-tool='shell(git branch -d)' \
218-
# --deny-tool='shell(git tag -d)' \
219-
# --no-ask-user
220-
221-
# - name: Prepend summary to draft release
222-
# if: steps.release.outputs.changed == 'true'
223-
# env:
224-
# GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
225-
# run: |
226-
# VERSION="v${{ steps.release.outputs.version }}"
227-
# SUMMARY_FILE="docs/scratch/release-${{ steps.release.outputs.version }}.md"
228-
# if [ -f "$SUMMARY_FILE" ]; then
229-
# EXISTING_NOTES=$(gh release view "${VERSION}" --json body -q '.body')
230-
# {
231-
# cat "$SUMMARY_FILE"
232-
# echo ""
233-
# echo "---"
234-
# echo ""
235-
# echo "$EXISTING_NOTES"
236-
# } > /tmp/combined-notes.md
237-
# gh release edit "${VERSION}" --notes-file /tmp/combined-notes.md
238-
# fi
192+
- name: Install Copilot CLI
193+
if: steps.release.outputs.changed == 'true'
194+
run: npm install -g @github/copilot
195+
196+
- name: Generate release summary
197+
if: steps.release.outputs.changed == 'true'
198+
env:
199+
COPILOT_GITHUB_TOKEN: ${{ secrets.COPILOT_CLI_TOKEN }}
200+
run: |
201+
HEAD_TAG="v${{ steps.release.outputs.version }}"
202+
BASE_TAG="${{ steps.last-published.outputs.tag }}"
203+
mkdir -p docs/scratch
204+
copilot -p "Generate a release summary. Base tag: ${BASE_TAG}, head tag: ${HEAD_TAG}. Summarize all changes between these two tags." \
205+
--agent=dev-relations \
206+
--model=claude-haiku-4.5 \
207+
--allow-tool='shell(git:*)' \
208+
--allow-tool='write' \
209+
--deny-tool='shell(git push)' \
210+
--deny-tool='shell(git reset)' \
211+
--deny-tool='shell(git checkout)' \
212+
--deny-tool='shell(git commit)' \
213+
--deny-tool='shell(git merge)' \
214+
--deny-tool='shell(git rebase)' \
215+
--deny-tool='shell(git branch -d)' \
216+
--deny-tool='shell(git tag -d)' \
217+
--no-ask-user
218+
219+
- name: Prepend summary to draft release
220+
if: steps.release.outputs.changed == 'true'
221+
env:
222+
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
223+
run: |
224+
VERSION="v${{ steps.release.outputs.version }}"
225+
SUMMARY_FILE="docs/scratch/release-${{ steps.release.outputs.version }}.md"
226+
if [ -f "$SUMMARY_FILE" ]; then
227+
EXISTING_NOTES=$(gh release view "${VERSION}" --json body -q '.body')
228+
{
229+
cat "$SUMMARY_FILE"
230+
echo ""
231+
echo "---"
232+
echo ""
233+
echo "$EXISTING_NOTES"
234+
} > /tmp/combined-notes.md
235+
gh release edit "${VERSION}" --notes-file /tmp/combined-notes.md
236+
fi
Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,18 @@
11
using Flowthru.Core.Data;
2+
using Flowthru.Misc.DataFrames;
23
using KedroSpaceflightsSpark.Data._03_Primary.Schemas;
4+
using SparkFactory = Flowthru.Extensions.Spark.ItemFactory;
35

46
namespace KedroSpaceflightsSpark.Data;
57

68
public partial class Catalog
79
{
810
/// <summary>
9-
/// Unified model input table. Persisted to Parquet at this layer so the DataScience
10-
/// flow can consume it as a materialized IEnumerable without requiring Spark.
11-
/// The TypedFrame produced by CreateModelInputTableStep materializes implicitly
12-
/// when the Parquet serializer enumerates it.
11+
/// Unified model input table. Held as an in-memory TypedFrame so the DataScience
12+
/// and Reporting flows can continue to apply Spark operations (filter, window functions)
13+
/// before materialization. The deferred execution plan from CreateModelInputTableStep
14+
/// is passed through as-is; no Spark action is triggered at this catalog boundary.
1315
/// </summary>
14-
public IItem<IEnumerable<ModelInputTableSchema>> ModelInputTable =>
15-
CreateItem(
16-
() =>
17-
ItemFactory.Enumerable.Parquet<ModelInputTableSchema>(
18-
label: "ModelInputTable",
19-
filePath: $"{_basePath}/_03_Primary/Datasets/model_input_table.parquet"
20-
)
21-
);
16+
public IItem<TypedFrame<ModelInputTableSchema>> ModelInputTable =>
17+
CreateItem(() => SparkFactory.Frame.Memory<ModelInputTableSchema>(label: "ModelInputTable"));
2218
}
Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
using Flowthru.Core.Data;
2+
using Flowthru.Misc.DataFrames;
23
using KedroSpaceflightsSpark.Data._08_Reporting.Schemas;
34
using Plotly.NET;
5+
using CoreFactory = Flowthru.Core.Data.ItemFactory;
6+
using SparkFactory = Flowthru.Extensions.Spark.ItemFactory;
47

58
namespace KedroSpaceflightsSpark.Data;
69

@@ -9,35 +12,38 @@ public partial class Catalog
912
public IItem<IEnumerable<ShuttleCapacityReport>> ShuttleCapacityReport =>
1013
CreateItem(
1114
() =>
12-
ItemFactory.Enumerable.Json<ShuttleCapacityReport>(
15+
CoreFactory.Enumerable.Json<ShuttleCapacityReport>(
1316
label: "ShuttleCapacityReport",
1417
filePath: $"{_basePath}/_08_Reporting/Datasets/shuttle_capacity_report.json"
1518
)
1619
);
1720

1821
public IItem<GenericChart> ShuttlePassengerCapacityChart =>
1922
CreateItem(
20-
() => ItemFactory.Single.Memory<GenericChart>(label: "ShuttlePassengerCapacityChart")
23+
() => CoreFactory.Single.Memory<GenericChart>(label: "ShuttlePassengerCapacityChart")
2124
);
2225

2326
public IItem<byte[]> ShuttlePassengerCapacityPlotPng =>
2427
CreateItem(
2528
() =>
26-
ItemFactory.Single.Binary(
29+
CoreFactory.Single.Binary(
2730
label: "ShuttlePassengerCapacityPlotPng",
2831
filePath: $"{_basePath}/_08_Reporting/Images/shuttle_passenger_capacity_plot.png"
2932
)
3033
);
3134

3235
public IItem<GenericChart> ConfusionMatrixChart =>
33-
CreateItem(() => ItemFactory.Single.Memory<GenericChart>(label: "ConfusionMatrixChart"));
36+
CreateItem(() => CoreFactory.Single.Memory<GenericChart>(label: "ConfusionMatrixChart"));
3437

3538
public IItem<byte[]> ConfusionMatrixPlotPng =>
3639
CreateItem(
3740
() =>
38-
ItemFactory.Single.Binary(
41+
CoreFactory.Single.Binary(
3942
label: "ConfusionMatrixPlotPng",
4043
filePath: $"{_basePath}/_08_Reporting/Images/confusion_matrix_plot.png"
4144
)
4245
);
46+
47+
public IItem<TypedFrame<ShuttlePriceRankSchema>> ShuttlePriceRanks =>
48+
CreateItem(() => SparkFactory.Frame.Memory<ShuttlePriceRankSchema>(label: "ShuttlePriceRanks"));
4349
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
using Flowthru.Core.Abstractions;
2+
3+
namespace KedroSpaceflightsSpark.Data._08_Reporting.Schemas;
4+
5+
/// <summary>
6+
/// Per-shuttle price ranking within shuttle type.
7+
/// Produced by RankShuttlesByPriceStep using a Spark window function over a
8+
/// PartitionBy(ShuttleType).OrderBy(Price) window spec, annotating each row with
9+
/// its DenseRank and the window-average price for its type.
10+
/// </summary>
11+
[FlowthruSchema]
12+
public partial record ShuttlePriceRankSchema
13+
{
14+
[SerializedLabel("shuttle_id")]
15+
public required string ShuttleId { get; init; }
16+
17+
[SerializedLabel("shuttle_type")]
18+
public required string ShuttleType { get; init; }
19+
20+
[SerializedLabel("company_id")]
21+
public required string CompanyId { get; init; }
22+
23+
[SerializedLabel("price")]
24+
public required double Price { get; init; }
25+
26+
[SerializedLabel("price_rank")]
27+
public required long PriceRank { get; init; }
28+
29+
[SerializedLabel("avg_price_for_type")]
30+
public required double AvgPriceForType { get; init; }
31+
}

examples/starter/KedroSpaceflightsSpark/Flows/DataProcessing/Steps/CreateModelInputTableStep.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ namespace KedroSpaceflightsSpark.Flows.DataProcessing.Steps;
1313
/// 1. Shuttles ⋈ ParsedReviews (on shuttle.Id = review.ShuttleId)
1414
/// 2. Result ⋈ Companies (on shuttleReview.CompanyId = company.Id)
1515
///
16-
/// The result is a TypedFrame&lt;ModelInputTableSchema&gt;. The output catalog item is
17-
/// IItem&lt;IEnumerable&lt;ModelInputTableSchema&gt;&gt; backed by Parquet. Materialization
18-
/// (TypedFrame → IEnumerable) happens implicitly when the Parquet serializer enumerates
19-
/// the frame — no explicit Collect() call required.
16+
/// The result is a TypedFrame<ModelInputTableSchema> stored in a memory catalog item.
17+
/// The execution plan remains deferred — no Spark action is triggered here.
18+
/// Downstream steps (SplitDataStep, RankShuttlesByPriceStep) can continue to apply
19+
/// Spark operations before any materialization occurs.
2020
/// </summary>
2121
[FlowthruStep]
2222
public static class CreateModelInputTableStep

examples/starter/KedroSpaceflightsSpark/Flows/DataScience/Steps/EvaluateModelStep.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@ public static Func<
2121
if (data.Count == 0)
2222
{
2323
return (
24-
new ModelMetrics { R2Score = 0, MeanAbsoluteError = 0, MaxError = 0 },
24+
new ModelMetrics
25+
{
26+
R2Score = 0,
27+
MeanAbsoluteError = 0,
28+
MaxError = 0,
29+
},
2530
Enumerable.Empty<ModelPredictions>()
2631
);
2732
}
@@ -30,7 +35,10 @@ public static Func<
3035
var actuals = data.Select(d => d.Label).ToList();
3136

3237
var predictionPairs = actuals
33-
.Zip(predictions, (actual, predicted) => new ModelPredictions { Actual = actual, Predicted = predicted })
38+
.Zip(
39+
predictions,
40+
(actual, predicted) => new ModelPredictions { Actual = actual, Predicted = predicted }
41+
)
3442
.ToList();
3543

3644
return (

examples/starter/KedroSpaceflightsSpark/Flows/DataScience/Steps/SplitDataStep.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using Flowthru.Core.Steps;
2+
using Flowthru.Misc.DataFrames;
23
using KedroSpaceflightsSpark.Data._03_Primary.Schemas;
34
using KedroSpaceflightsSpark.Data._05_ModelInput.Schemas;
45

@@ -15,13 +16,17 @@ public record ModelOptions
1516
}
1617

1718
public static Func<
18-
IEnumerable<ModelInputTableSchema>,
19+
TypedFrame<ModelInputTableSchema>,
1920
(IEnumerable<TrainingData>, IEnumerable<TestData>)
2021
> Create(ModelOptions options)
2122
{
2223
return (input) =>
2324
{
24-
var data = input.ToList();
25+
// Filter out rows with non-positive prices in Spark before materializing.
26+
// This is the last point where Spark operations can be applied — the shuffle
27+
// below requires the full dataset in memory.
28+
var data = input.Where(r => r.Price > 0).ToList();
29+
2530
var random = new Random(options.RandomState);
2631
var shuffled = data.OrderBy(_ => random.Next()).ToList();
2732
var splitIndex = (int)(shuffled.Count * (1 - options.TestSize));

examples/starter/KedroSpaceflightsSpark/Flows/Reporting/ReportingFlow.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,17 @@ public static Flow Create(Catalog catalog, Params? parameters = null)
3838
input: catalog.ModelPredictions,
3939
output: catalog.ConfusionMatrixChart
4040
);
41+
42+
pipeline.AddStep(
43+
label: "RankShuttlesByPrice",
44+
description: """
45+
Annotates each shuttle with its dense price rank and average price within its shuttle
46+
type using Spark window functions (SelectOver + FrameWindowSpec).
47+
""",
48+
transform: RankShuttlesByPriceStep.Create(),
49+
input: catalog.ModelInputTable,
50+
output: catalog.ShuttlePriceRanks
51+
);
4152
});
4253
}
4354
}

examples/starter/KedroSpaceflightsSpark/Flows/Reporting/Steps/GeneratePassengerCapacityChartStep.cs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using Flowthru.Core.Steps;
22
using Flowthru.Misc.DataFrames;
33
using KedroSpaceflightsSpark.Data._02_Intermediate.Schemas;
4+
using KedroSpaceflightsSpark.Data._08_Reporting.Schemas;
45
using Microsoft.Extensions.Logging;
56
using Plotly.NET;
67
using Plotly.NET.LayoutObjects;
@@ -10,8 +11,10 @@ namespace KedroSpaceflightsSpark.Flows.Reporting.Steps;
1011

1112
/// <summary>
1213
/// Generates a bar chart comparing average passenger capacity by shuttle type.
13-
/// Receives a TypedFrame and enumerates it (triggering Spark materialization) to
14-
/// produce the aggregated data needed by Plotly.NET.
14+
///
15+
/// The aggregation runs entirely in Spark: the 15k-row TypedFrame is filtered,
16+
/// grouped, and aggregated before materialization. Only the small per-type summary
17+
/// (~31 rows) is collected into .NET memory to feed Plotly.NET.
1518
/// </summary>
1619
[FlowthruStep]
1720
public static class GeneratePassengerCapacityChartStep
@@ -22,23 +25,22 @@ public static Func<TypedFrame<PreprocessedShuttleSchema>, GenericChart> Create(
2225
{
2326
return (input) =>
2427
{
25-
var shuttles = input.ToList();
26-
27-
logger?.LogInformation(
28-
"Generating passenger capacity chart from {Count} shuttle records",
29-
shuttles.Count
30-
);
31-
32-
var aggregated = shuttles
28+
var aggregated = input
29+
.Where(s => s.PassengerCapacity > 0)
3330
.GroupBy(s => s.ShuttleType)
34-
.Select(g => new
31+
.Aggregate(ctx => new ShuttleCapacityReport
3532
{
36-
ShuttleType = g.Key,
37-
AvgPassengerCapacity = g.Average(s => s.PassengerCapacity),
33+
ShuttleType = ctx.Key,
34+
AvgPassengerCapacity = ctx.Avg(s => (double)s.PassengerCapacity),
3835
})
39-
.OrderByDescending(x => x.AvgPassengerCapacity)
36+
.OrderByDescending(r => r.AvgPassengerCapacity)
4037
.ToList();
4138

39+
logger?.LogInformation(
40+
"Generating passenger capacity chart from {Count} shuttle types",
41+
aggregated.Count
42+
);
43+
4244
var shuttleTypes = aggregated.Select(x => x.ShuttleType).ToList();
4345
var capacities = aggregated.Select(x => x.AvgPassengerCapacity).ToList();
4446

0 commit comments

Comments
 (0)