This repository contains ready-to-run example pipelines for Source Watcher.
Each file is a JSON pipeline definition validated against the pipeline schema. A pipeline is a steps array of extractors, transformers, and loaders.
- Source Watcher API running locally (default:
http://localhost:8181) - A valid JWT token (obtain via
POST /api/v1/credentials)
Place .json pipeline files inside .source-watcher/transformations/ in the API container's working directory, then run them from the board UI or via curl.
TOKEN="your_jwt_token_here"
curl -X POST http://localhost:8181/api/v1/transformation-run \
-H "Content-Type: application/json" \
-H "x-access-token: $TOKEN" \
-d '{"name": "pipeline-name-without-extension"}'- Open the board at
http://localhost:8282 - Select the pipeline from the dropdown
- Click Load, draw connections between steps, then click Run Saved
Steps: CSV Extractor → Convert Case → Database Loader
Fetches the Oscar Female Winners CSV from a public URL, converts the Year, Name, and Movie column names to lowercase, and loads the result into a local SQLite database.
| Detail | Value |
|---|---|
| Source | https://people.sc.fsu.edu/~jburkardt/data/csv/oscar_age_female.csv |
| Output table | people |
| Output file | .source-watcher/csv-lower.db |
sqlite3 .source-watcher/csv-lower.db "SELECT * FROM people LIMIT 5;"Steps: CSV Extractor → Convert Case → Rename Columns → Database Loader
Same source CSV as above. Converts column names to lowercase, then renames movie to preferred_movie before loading into SQLite.
| Detail | Value |
|---|---|
| Source | https://people.sc.fsu.edu/~jburkardt/data/csv/oscar_age_female.csv |
| Output table | people |
| Output file | .source-watcher/csv-lower-rename.db |
sqlite3 .source-watcher/csv-lower-rename.db "SELECT * FROM people LIMIT 5;"Steps: CSV Extractor → Convert Case (title) → Rename Columns → Database Loader
Fetches the Oscar CSV, applies Title Case to the Movie column name, then renames Movie to Preferred_Movie.
| Detail | Value |
|---|---|
| Source | https://people.sc.fsu.edu/~jburkardt/data/csv/oscar_age_female.csv |
| Output table | people |
| Output file | .source-watcher/csv-title-rename-1.db |
csv-title-rename-to-sqlite-2andcsv-title-rename-to-sqlite-3are variants that write tocsv-title-rename-2.dbandcsv-title-rename-3.dbrespectively, testing different column mapping styles.
Steps: JSON Extractor (URL) → Database Loader
Fetches the CVE record for CVE-2026-3494 from the MITRE CVE API and extracts top-level metadata fields into a SQLite table using JSONPath mappings.
| Detail | Value |
|---|---|
| Source | https://cveawg.mitre.org/api/cve/CVE-2026-3494 |
| Columns | dataType, dataVersion, cveId, state, assignerShortName, dateReserved, datePublished, dateUpdated, title |
| Output table | cve_metadata |
| Output file | .source-watcher/cve-metadata.db |
sqlite3 .source-watcher/cve-metadata.db "SELECT cveId, title, state FROM cve_metadata;"Steps: JSON Extractor (URL) → Database Loader
Same CVE source as above, but extracts deeper nested fields - including arrays stored as JSON strings - giving a richer view of the record.
| Detail | Value |
|---|---|
| Source | https://cveawg.mitre.org/api/cve/CVE-2026-3494 |
| Columns | dataType, dataVersion, cveId, state, assignerShortName, datePublished, dateUpdated, title, descriptionText, descriptionsJson, affectedJson, metricsJson, referencesJson, problemTypesJson |
| Output table | cve_deep |
| Output file | .source-watcher/cve-json-deep.db |
sqlite3 .source-watcher/cve-json-deep.db "SELECT cveId, title, descriptionText FROM cve_deep;"Steps: Database Extractor (remote SQLite URL) → Database Loader
Downloads the Chinook sample database directly from a public URL, runs a SQL JOIN query across Artist and Album tables, and loads the top 50 results into a local SQLite database.
Demonstrates the remote SQLite file download capability of the Database extractor.
| Detail | Value |
|---|---|
| Source | Chinook SQLite via https://raw.githubusercontent.com/... |
| Query | SELECT ArtistId, Name AS ArtistName, Title AS AlbumTitle FROM Artist JOIN Album … LIMIT 50 |
| Output table | artist_albums |
| Output file | .source-watcher/chinook-artists.db |
sqlite3 .source-watcher/chinook-artists.db "SELECT * FROM artist_albums LIMIT 10;"Note: Requires
allow_url_fopen = Onin the PHP container (enabled by default).
Steps: Txt Extractor → Convert Case → Database Loader
Reads a plain text file line by line (each line becomes one row), applies Title Case to the line content, and loads into SQLite.
| Detail | Value |
|---|---|
| Source | /var/www/html/.source-watcher/data/sample.txt (local file inside the container) |
| Output table | lines |
| Output file | .source-watcher/txt-lines.db |
sqlite3 .source-watcher/txt-lines.db "SELECT * FROM lines;"Steps: CSV Extractor → Find Missing From Sequence → Database Loader
Reads a CSV file containing a numeric id column with intentional gaps (1, 2, 3, 5, 6, 9, 10), finds the missing integers in the sequence (4, 7, 8), and writes them to a SQLite table.
Demonstrates the FindMissingFromSequenceExtractor, which chains from the previous extractor's result, sorts the numeric column, and outputs any integers absent between the min and max values.
| Detail | Value |
|---|---|
| Source | .source-watcher/data/sample-sequence.csv (local) |
| Sequence column | id |
| Output table | missing_ids |
| Output file | .source-watcher/find-missing-ids.db |
sqlite3 .source-watcher/find-missing-ids.db "SELECT * FROM missing_ids;"
# Expected: rows with id = 4, 7, 8Steps: CSV Extractor → Guess Gender → Database Loader
Reads a CSV with id, first_name, and last_name columns, uses a name dictionary to guess the gender from the first_name column, adds a gender column to each row, and loads the enriched data into SQLite.
| Detail | Value |
|---|---|
| Source | .source-watcher/data/sample-names.csv (local) |
| First name column | first_name |
| Output gender column | gender |
| Country dictionary | usa |
| Output table | people_with_gender |
| Output file | .source-watcher/guess-gender.db |
sqlite3 .source-watcher/guess-gender.db "SELECT first_name, last_name, gender FROM people_with_gender;"The transformer only fills in the
gendercolumn if it is currently empty. Rows that already have a value are left unchanged.
Each pipeline file is a JSON object with a $schema reference and a steps array:
{
"$schema": "https://raw.githubusercontent.com/TheCocoTeam/source-watcher-api/master/pipeline.schema.json",
"steps": [
{
"type": "extractor",
"name": "Csv",
"options": { "filePath": "...", "columns": ["A", "B"] },
"x": 80,
"y": 100
},
{
"type": "loader",
"name": "Database",
"options": { "driver": "pdo_sqlite", "tableName": "my_table", "path": "/path/to/output.db" },
"x": 300,
"y": 100
}
]
}| Field | Description |
|---|---|
$schema |
Points to the pipeline JSON Schema for editor validation and autocomplete |
type |
extractor, execution-extractor, transformer, or loader |
name |
Core step name (e.g. Csv, Json, ConvertCase, Database) |
options |
Step-specific configuration |
x, y |
Canvas position (used by the board UI; ignored by the API) |