Mini Project: Enhancing Data Workflow with Python and Gemini #50
Replies: 3 comments
-
Step 1 — Choose dataset + API (scoping, contracts, and setup)1.1 Pick a dataset (small, clean, useful)
Decision checklist
Deliverables
1.2 Pick a public API that complements the dataset
Probe the API
Deliverables
1.3 Define the join story now
Write this down
1.4 Repo and environment
Checkpoint commit
Step 2 — Ingest (raw→landing with retries, caching, and naming)2.1 Function contracts (keep them pure)
Code hint def get_with_retries(url, params=None, tries=3, timeout=10):
import time, requests
for i in range(tries):
try:
r = requests.get(url, params=params, timeout=timeout)
r.raise_for_status()
return r
except Exception as exc:
if i == tries - 1:
raise
time.sleep(2**i) 2.2 Raw-zone layout and file naming
Why JSONL?
Write helper def write_jsonl(rows, path):
from pathlib import Path
import json
Path(path).parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
for r in rows:
f.write(json.dumps(r, ensure_ascii=False) + "\n") 2.3 Offline fallback (classroom-proof)
Acceptance
2.4 Minimal logging
Checkpoint commit
Step 3 — Transform (normalize, align, integrate, derive)3.1 Normalize both sources early
Code hint df_api = pd.json_normalize(api_rows)
df_api["ts"] = pd.to_datetime(df_api["time"], utc=True)
df_ds["ts"] = pd.to_datetime(df_ds["date_local"]).dt.tz_localize("UTC") 3.2 Align the time axis
3.3 Define and compute derived fields
3.4 Handle missingness and duplicates
3.5 Document transform rules
Checkpoint commit
Step 4 — Validate (data contract, quarantine, summary)4.1 Contract rules (start small, be explicit)
Severity levels
4.2 Quarantine design
Code hint bad_required = merged[merged["ts"].isna() | merged["city_key"].isna()]
good = merged.drop(bad_required.index)
bad_required.to_csv("data/processed/quarantine_required.csv", index=False) 4.3 Run summary (for governance later)
Checkpoint commit
Step 5 — Load (formats, partitions, idempotency, and docs)5.1 Output formats and directory layout
Atomic writes def atomic_write_csv(df, path):
import os
tmp = str(path) + ".tmp"
df.to_csv(tmp, index=False)
os.replace(tmp, path) 5.2 Idempotency on re-run
5.3 Load function and CLI
Code hint import argparse, datetime as dt
ap = argparse.ArgumentParser()
ap.add_argument("--fmt", default="csv", choices=["csv","parquet"])
ap.add_argument("--partition", default="today")
args = ap.parse_args()
partition_dt = dt.date.today().isoformat() if args.partition=="today" else args.partition 5.4 Post-load verification
5.5 Documentation and housekeeping
Checkpoint commit
Nuances and pro tips across Steps 1–5
Acceptance summary (per step)
|
Beta Was this translation helpful? Give feedback.
-
Step 6 — GitHub, the right way (simple → powerful)6.1 Set up once
6.2 Everyday loop (keep it boring = keep it safe)flowchart LR
I[Issue or TODO] --> B[Make a small change]
B --> C[git add/commit]
C --> P[git pull --rebase]
P --> U[git push]
U --> R[Open PR]
R --> CI[CI runs]
CI --> RV[Review]
RV --> M[Merge to main]
6.3 Simple mode (single branch: main)Use this if you have very little time.
6.4 Power mode (tiny PRs + one safety rail)Use this when you want a touch more rigor while staying simple.
6.5 Write great commits (the two-line rule)
This pays off when you debug at 2 a.m. 6.6 Pull Requests that reviewers love
6.7 CI in 10 lines (prove your code works)Add name: CI
on: [push, pull_request]
jobs:
lint-smoke:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with: { python-version: "3.10" }
- run: pip install -r requirements.txt
- run: pip install black flake8
- run: black --check .
- run: flake8 .
6.8 Use Issues to plan, not to decorateCreate 3–5 small issues:
Tag them: Simple Kanban (optional): Projects → Table with columns To do / In progress / Done. 6.9 One conflict on purpose (learn it once)
Now you know the drill. No fear next time. 6.10 Lightweight “release” (freeze a milestone)Tag the version after your report is ready: git tag -a v0.1.0 -m "First ETL milestone"
git push origin v0.1.0 This gives you a clean restore point. 6.11 Proof you used GitHub well (acceptance)
6.12 Tiny extras that feel pro
6.13 Quick checklist (run at the end)
Ask yourself:
|
Beta Was this translation helpful? Give feedback.
-
Step 7 — Use the Gemini gem to design the Airflow plan7.1 Set the scene
7.2 Feed the gem a clean requirement
7.3 Enforce a 3-step response from the gem
7.4 Guardrails to reduce hallucinations
7.5 Save the outputs
Acceptance
Step 8 — Validate and harden the gem output8.1 JSON schema check (optional but recommended)
Run validator (example): pip install jsonschema pyyaml
python validators/validate_plan.py orchestration/plan.json Pass if
8.2 Human review
Commit
Step 9 — Generate a TaskFlow scaffold from YAML (design-time)9.1 Write a tiny generator
Snippet import yaml
from textwrap import indent
spec = yaml.safe_load(open("orchestration/plan.yaml"))
print("from airflow.decorators import dag, task")
print("from datetime import datetime")
print("@dag(dag_id='{}', start_date=datetime(2025,1,1), schedule='{}', catchup=False, tags={})"
.format(spec["dag"]["id"], spec["dag"]["schedule"], spec["dag"].get("tags", ["evergent"])))
print("def generated_pipeline():")
for t in spec["tasks"]:
doc = f'{t["operator"]} — {t["description"]}'
fn = t["id"]
print(indent(f"@task()\ndef {fn}():\n \"\"\"{doc}\"\"\"\n ...\n", ' '))
print(indent("# wire dependencies per spec['dependencies']", ' '))
print("generated_pipeline()") 9.2 Sanity runpython dags/generator.py > dags/generated_pipeline.py
Commit
Step 10 — Dry-run the plan locally10.1 Static checks
10.2 YAML → table echo (consistency probe)
Snippet import yaml, pandas as pd
y = yaml.safe_load(open("orchestration/plan.yaml"))
tbl = pd.DataFrame([{
"Task ID": t["id"],
"Operator": t["operator"],
"Retry": t.get("retry"),
} for t in y["tasks"]])
print(tbl) Acceptance
Step 11 — Version and review in GitHub11.1 Open a PR titled:
11.2 PR body template
11.3 CI runs
Merge once green. Step 12 — Write the project report (2–4 pages)12.1 Structure
12.2 Export
Commit
Step 13 — Final quality gates13.1 Functional
13.2 Orchestration design
13.3 Repo hygiene
Step 14 — Stretch (optional, bite-sized)
Step 15 — Hand-off to Airflow (when ready)15.1 Map design → implementation
15.2 Composer specifics (GCP)
15.3 Test plan
One-glance acceptance for 7–15
Keep it simple. Keep it observable. Let the gem do the planning, and let GitHub keep you honest. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Project: Enhancing Data Workflow with Python and Gemini
Objective
Scope choices (pick one pair)
Tip: prefer APIs that need no key (Open-Meteo), or use a free key if everyone can obtain one quickly.
Deliverables
GitHub repo with code, README, and CI config
Processed outputs (CSV/Parquet) in
data/processed/
Gemini outputs:
Report (PDF or Markdown) covering objectives, steps, code snippets, results, and pipeline improvements
Suggested repo structure
Step-by-step (semi-guided)
Step 1 — Pick data + API (15 min)
data/raw/
Acceptance
data/raw/
docs/report.md
Step 2 — Ingest (30–40 min)
etl/ingest.py
:fetch_api(endpoint, params)
with timeout + 3 retries + backoffdata/raw/
as JSON Linespandas.read_csv
Keep functions small and pure
Code hint
Acceptance
ingest_dataset()
returns a DataFrameingest_api()
writes a JSONL and returns a file pathStep 3 — Transform (40–50 min)
etl/transform.py
:temp_bin
,aqi_flag
,hour_of_day
)Acceptance
Code hint
Step 4 — Validate (15–20 min)
etl/validate.py
:data/processed/quarantine_*.csv
Acceptance
Code hint
Step 5 — Load (10–15 min)
etl/load.py
:*.tmp
, then rename--fmt csv|parquet
pipeline.py
:python pipeline.py --fmt csv
Acceptance
data/processed/
Step 6 — GitHub usage (ongoing)
main
for simplicity in class. Commit small and often.github/workflows/ci.yml
Acceptance
Step 7 — Use the Gemini gem for an Airflow plan (45–60 min)
Example:
“Ingest {dataset} daily at 02:00 IST, enrich with {API} hourly data. Validate required fields. Soft-fail if zero rows after clean. Load to a partitioned table. Send Slack alert on failure. Keep 7-day backfills. GCP Composer, BigQuery, GCS.”
Ask the gem to return (must-haves)
Task ID | Description | Operator | Executor | Sensor/Trigger | XComs (keys) | Retry Policy | Fallback | Error Handling | SLA/Alerts | Idempotency | Data Contract Check | Cost Notes
Save outputs
orchestration/plan.yaml
(keep both in one file or split YAML + MD)orchestration/plan.json
Acceptance
yaml.safe_load
and includesdag
+tasks
+dependencies
Step 8 — Generate a DAG scaffold from YAML (design-time only, 20–30 min)
dags/generator.py
readsorchestration/plan.yaml
and prints a minimal TaskFlow skeleton with task IDs and docstringsCode hint
Acceptance
Step 9 — Report (Markdown or PDF, 2–4 pages)
Quality checks
black
,flake8
,pylint
pipeline.py
twice → same outputsyaml.safe_load
works; task IDs match the tableRubric for auto-evaluation (100 points)
Completion (30)
Code quality (20)
Documentation (20)
GitHub usage (10)
Gemini + Airflow planning (20)
Pass mark: 70.
Stretch ideas
data/processed/dt=YYYY-MM-DD/
python pipeline.py --city hyderabad --start 2025-08-01
slugify()
or joinspandas-gbq
if credentials existVerification steps (fast)
python pipeline.py --fmt csv
→ check three files indata/processed/
orchestration/plan.yaml
→ load with a 5-line Python snippetdags/generator.py
→ confirm one stub per task IDdocs/report.md
for the questions → table → YAML flowKeep it crisp. Keep XComs tiny. Treat the Gemini output like a draft RFD and refine it with your judgment.
Beta Was this translation helpful? Give feedback.
All reactions