Skip to content

Recipe Build a Data Pipeline

Joel Natividad edited this page May 13, 2026 · 2 revisions

Recipe: Build a Data Pipeline

Tier: Advanced Commands used: input, safenames, stats, schema, validate, sqlp, pivotp, template, to Anchor dataset: Allegheny County property sales (resources/test/allegheny_property_sales*.csv)

Problem

You want to take a raw CSV all the way from "what is this thing?" to "publishable analytics report" in a single repeatable pipeline. The output should be:

  • clean (normalized headers, validated against a schema)
  • fast (uses the stats cache + Polars schema for speed)
  • analytical (SQL queries, pivoted summaries)
  • human-readable (a Markdown report ready to ship)
  • machine-consumable (Parquet output for downstream systems)

This recipe is a template — copy it, swap the dataset, and you have a production pipeline.

Data

ls resources/test/allegheny_property_sales*.csv
# allegheny_property_sales.csv
# allegheny_property_sales.csv.idx     (pre-built index)
# allegheny_property_sales.stats.csv   (pre-built stats)

If you're not in the qsv repo:

curl -LO https://raw.githubusercontent.com/dathere/qsv/master/resources/test/allegheny_property_sales.csv

Solution

Stage 1 — clean

# 1.1: Normalize encoding, trim whitespace, skip preamble
qsv input --auto-skip --trim-headers --trim-fields \
  allegheny_property_sales.csv > stage1_input.csv

# 1.2: Make column names DB-safe
qsv safenames stage1_input.csv > stage1_clean.csv

# 1.3: Index it (one-time cost, payoff downstream)
qsv index stage1_clean.csv

Stage 2 — profile

# 2.1: Full stats with cardinality + date inference; write the sidecar JSONL
qsv stats --everything --infer-dates --infer-boolean --stats-jsonl \
  stage1_clean.csv > stage2_stats.csv

# 2.2: Generate a Polars schema for fast sqlp/joinp/pivotp
qsv schema --polars stage1_clean.csv
ls stage1_clean.*
# .csv  .idx  .stats.csv  .stats.csv.data.jsonl  .pschema.json

Stage 3 — validate

# 3.1: Generate a JSON Schema from representative data
qsv schema stage1_clean.csv      # produces stage1_clean.csv.schema.json

# 3.2: Hand-edit the schema (in a real pipeline, this is checked in to git)
#      For this recipe, the generated schema is good enough.

# 3.3: Validate the data against the schema
qsv validate stage1_clean.csv stage1_clean.csv.schema.json
# Exit 0 = clean. Non-zero = .invalid.csv + .validation-errors.tsv produced.

Stage 4 — analyze

# 4.1: Polars SQL aggregation — sales by year and property class
qsv sqlp stage1_clean.csv \
  "SELECT
     strftime('%Y', sale_date) AS year,
     property_class,
     COUNT(*) AS sales,
     SUM(sale_price) AS total_revenue,
     AVG(sale_price) AS avg_price,
     MIN(sale_price) AS min_price,
     MAX(sale_price) AS max_price
   FROM stage1_clean
   WHERE sale_price > 0
   GROUP BY year, property_class
   ORDER BY year DESC, total_revenue DESC" \
  > stage4_summary.csv

# 4.2: Pivot for a wide year-over-year view
qsv pivotp property_class stage4_summary.csv \
  --index year --values total_revenue --agg sum \
  > stage4_pivot.csv

# 4.3: Score-before-run is great for production queries — checks for anti-patterns
qsv scoresql stage1_clean.csv \
  "SELECT * FROM stage1_clean WHERE sale_price > 1000000" \
  > stage4_query_score.txt

Stage 5 — report

A MiniJinja template that renders the per-year summary as Markdown:

{# report.j2 — one row per year/class #}
# Allegheny Property Sales — {{ year }} — {{ property_class }}

- **Sales count:** {{ sales | human_count }}
- **Total revenue:** ${{ total_revenue | format_float(2) }}
- **Average price:** ${{ avg_price | format_float(2) }}
- **Price range:** ${{ min_price | format_float(2) }} – ${{ max_price | format_float(2) }}

---
qsv template --template-file report.j2 \
  reports_outdir/ \
  --outfilename '{{ year }}_{{ property_class }}.md' \
  stage4_summary.csv
ls reports_outdir/
# 2024_Residential.md  2024_Commercial.md  2023_Residential.md ...

Or render everything into one combined Markdown report:

qsv template --template-file report.j2 - stage4_summary.csv > full_report.md

Stage 6 — publish

# 6.1: Parquet for downstream analytics (DuckDB, Spark, etc.)
qsv to parquet publish/ stage1_clean.csv stage4_summary.csv stage4_pivot.csv
ls publish/
# stage1_clean.parquet  stage4_summary.parquet  stage4_pivot.parquet

# 6.2: A Data Package descriptor for FAIR data sharing
qsv to datapackage --stats datapackage.json stage1_clean.csv

Stage 7 — gate (CI-ready)

# Exit 1 if anything is wrong; bash-pipe-safe
set -euo pipefail
qsv validate stage1_clean.csv stage1_clean.csv.schema.json || {
  echo "::error::Validation failed"; exit 1;
}
test -f publish/stage1_clean.parquet || {
  echo "::error::Parquet output missing"; exit 1;
}
echo "Pipeline OK."

Variations

Replace step 4 with a SQL-RAG describegpt chat

For exploratory analysis, swap the hand-written SQL for a natural-language prompt:

qsv describegpt stage1_clean.csv \
  --prompt "Show me the top 10 most expensive sales per year." \
  > stage4_summary.md

qsv writes the SQL itself in SQL-RAG sub-mode and runs it via DuckDB or Polars SQL.

See Recipe: Stats → Insights.

Make-driven orchestration

# Makefile fragment
stage1_clean.csv: raw.csv
	qsv input --auto-skip $< | qsv safenames - > $@

stage1_clean.csv.idx: stage1_clean.csv
	qsv index $<

stage2_stats.csv stage1_clean.stats.csv.data.jsonl: stage1_clean.csv stage1_clean.csv.idx
	qsv stats --everything --infer-dates --stats-jsonl $< > stage2_stats.csv

stage1_clean.csv.schema.json: stage1_clean.csv stage1_clean.stats.csv.data.jsonl
	qsv schema $<

stage4_summary.csv: stage1_clean.csv stage1_clean.csv.schema.json
	qsv validate $< $<.schema.json
	qsv sqlp $< 'SELECT ...' > $@

publish: stage4_summary.csv
	qsv to parquet publish/ $<

Each step is incremental and cached — make reruns only what changed.

GitHub Actions integration

# .github/workflows/data-pipeline.yml
name: Data pipeline
on:
  push:
    paths: ['data/raw/*.csv']
jobs:
  pipeline:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: curl -L https://github.com/dathere/qsv/releases/latest/download/qsv-x86_64-unknown-linux-gnu.zip -o qsv.zip
      - run: unzip qsv.zip && sudo install qsv /usr/local/bin/
      - run: make publish
      - uses: actions/upload-artifact@v4
        with:
          name: publish
          path: publish/

Performance notes

  • The stats cache + Polars schema accelerate steps 3, 4, and 5 substantially. On the Allegheny dataset (~mid-sized), the full pipeline runs in seconds.
  • For multi-GB inputs, swap sort / dedupextsort / extdedup, and add QSV_AUTOINDEX_SIZE to your environment for automatic indexing.
  • Stage 7 is the "production gate" — make it part of your CI. The exit codes from qsv validate make this easy.
  • The Polars schema (.pschema.json) is the biggest unexplored optimization for most users. Generate it once, check it into git, every sqlp / joinp / pivotp becomes faster.

See also

Clone this wiki locally