Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 9 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,6 @@ Flags:
- `--all` — with `--wait`, print the full results table instead of a summary
- `-o, --output <file>` — with `--wait`, write the results to FILE
(`.csv` or `.json`; format inferred from extension)
- `--stream` — emit one JSON event per line as the batch advances; implies
`--wait` and `--json` (see [NDJSON streaming](#ndjson-streaming))
- `--url <url>` — URL that will receive the batch results via HTTP POST
- `--retries=true|false` — retry verifications when mail servers return
certain responses, increasing accuracy (default: `true`)
Expand All @@ -240,8 +238,6 @@ Flags:
- `--all` — print the full results table inline instead of a summary
- `-o, --output <file>` — write the results to FILE (`.csv` or `.json`;
format inferred from extension)
- `--stream` — emit one JSON event per line as the batch advances; implies
`--wait` and `--json` (see [NDJSON streaming](#ndjson-streaming))

### Account

Expand All @@ -267,8 +263,7 @@ emailable account status --json

Payloads pass through from the [Emailable API](https://emailable.com/docs/api/?code_language=cli)
unchanged — the CLI doesn't re-shape or add fields. See the API docs for
the field reference. Error payloads and NDJSON stream events (below) are
CLI-specific.
the field reference. Error payloads are CLI-specific.

### Filtering with `--jq`

Expand All @@ -283,36 +278,17 @@ emailable batch get 5cfc... --jq '.emails[] | select(.state == "deliverable") |
```

A string result is printed raw (unquoted, one per line), like `jq -r`, so it
drops straight into a script. Objects and arrays are printed as JSON. Combined
with `--stream`, the filter runs against each NDJSON event as it arrives (see
below).
drops straight into a script. Objects and arrays are printed as JSON.

### NDJSON streaming

`batch verify --stream` and `batch get --stream` emit one JSON object per
line on stdout while polling, instead of one large object at the end.
Useful for AI agents and long-running scripts that want to react to progress
without waiting for completion. `--stream` automatically turns on `--wait`
and `--json`, so neither needs to be passed explicitly.

```bash
emailable batch verify emails.csv --stream
```

```
{"event":"submitted","id":"5cfc..."}
{"event":"progress","id":"5cfc...","processed":100,"total":1000}
{"event":"progress","id":"5cfc...","processed":500,"total":1000}
{"event":"complete","id":"5cfc...","status":"complete","reason_counts":{...},"emails":[...]}
```

Add `--jq` to filter each event as it streams. The filter sees the event
envelope (`event`, `id`, …), so guard on the event type; events the filter
doesn't match are skipped:
To stream batch results as [NDJSON](https://jsonlines.org/) — one result row
per line, ready to pipe into `while read`, `wc -l`, or another tool — filter
the completed batch's `emails` array with `.emails[]`. Pair it with `--wait`
so the payload is complete before filtering (a still-verifying batch has no
`emails` field, which would make `.emails[]` error):

```bash
emailable batch verify emails.csv --stream \
--jq 'select(.event == "complete") | .emails[] | .email'
emailable batch get 5cfc... --wait --jq '.emails[]' # one row per line
emailable batch get 5cfc... --wait --jq '.emails[] | select(.state == "deliverable") | .email'
```

### Errors
Expand Down
122 changes: 9 additions & 113 deletions cmd/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package cmd

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -52,10 +50,8 @@ func newBatchCmd() *cobra.Command {
partial, _ := cmd.Flags().GetBool("partial")
outPath, _ := cmd.Flags().GetString("output")
showAll, _ := cmd.Flags().GetBool("all")
stream, _ := cmd.Flags().GetBool("stream")
wait, jsonEff := applyStreamImplications(stream, wait, jsonOutput)

cctx, err := newCmdCtxFor(cmd, jsonEff)
cctx, err := newCmdCtxFor(cmd, jsonOutput)
if err != nil {
return err
}
Expand All @@ -68,14 +64,10 @@ func newBatchCmd() *cobra.Command {
if partial {
return fmt.Errorf("--wait and --partial can't be combined: --wait already polls until completion")
}
sw := newStreamerIfEnabled(cmd, stream)
s, err := waitForCompletion(cmd.Context(), client, id, jsonEff, sw, cmd.ErrOrStderr())
s, err := waitForCompletion(cmd.Context(), client, id, cctx.JSONMode || cctx.Quiet, cmd.ErrOrStderr())
if err != nil {
return err
}
if sw != nil {
return sw.emitComplete(id, s)
}
return renderBatchOutcome(cmd, cctx, s, id, outPath, showAll)
}

Expand All @@ -90,7 +82,6 @@ func newBatchCmd() *cobra.Command {
get.Flags().Bool("partial", false, "Include partial results while the batch is still verifying (batches ≤ 1,000 emails)")
get.Flags().StringP("output", "o", "", "Write results to FILE (.csv or .json; format inferred from extension)")
get.Flags().Bool("all", false, "Print the full results table inline instead of a summary")
get.Flags().Bool("stream", false, "Emit one JSON event per line while polling (implies --wait and --json)")

verify := &cobra.Command{
Use: "verify EMAIL_OR_FILE [EMAIL_OR_FILE...]",
Expand All @@ -104,19 +95,14 @@ func newBatchCmd() *cobra.Command {
emailable batch verify emails.csv --wait

# Verify two literal emails
emailable batch verify alice@example.com bob@example.com

# Stream NDJSON progress events to stdout
emailable batch verify emails.csv --stream`,
emailable batch verify alice@example.com bob@example.com`,
RunE: func(cmd *cobra.Command, args []string) error {
field, _ := cmd.Flags().GetString("field")
wait, _ := cmd.Flags().GetBool("wait")
outPath, _ := cmd.Flags().GetString("output")
showAll, _ := cmd.Flags().GetBool("all")
stream, _ := cmd.Flags().GetBool("stream")
wait, jsonEff := applyStreamImplications(stream, wait, jsonOutput)

cctx, err := newCmdCtxFor(cmd, jsonEff)
cctx, err := newCmdCtxFor(cmd, jsonOutput)
if err != nil {
return err
}
Expand All @@ -135,7 +121,7 @@ func newBatchCmd() *cobra.Command {
return err
}

f := newOutput(cmd.OutOrStdout(), jsonEff)
f := newOutput(cmd.OutOrStdout(), cctx.JSONMode)

submit, err := client.SubmitBatch(cmd.Context(), emails, submitOpts)
if err != nil {
Expand All @@ -144,22 +130,13 @@ func newBatchCmd() *cobra.Command {

if wait {
// Print before polling so ctrl-c mid-wait still leaves the id visible.
if !jsonEff && !cctx.Quiet {
if !cctx.JSONMode && !cctx.Quiet {
printBatchID(cmd.ErrOrStderr(), submit.ID)
}
sw := newStreamerIfEnabled(cmd, stream)
if sw != nil {
if err := sw.emitSubmitted(submit.ID); err != nil {
return err
}
}
final, err := waitForCompletion(cmd.Context(), client, submit.ID, jsonEff || cctx.Quiet, sw, cmd.ErrOrStderr())
final, err := waitForCompletion(cmd.Context(), client, submit.ID, cctx.JSONMode || cctx.Quiet, cmd.ErrOrStderr())
if err != nil {
return err
}
if sw != nil {
return sw.emitComplete(submit.ID, final)
}
return renderBatchOutcome(cmd, cctx, final, submit.ID, outPath, showAll)
}

Expand All @@ -170,7 +147,6 @@ func newBatchCmd() *cobra.Command {
verify.Flags().Bool("wait", false, "Poll until the batch completes")
verify.Flags().StringP("output", "o", "", "Write results to FILE (.csv or .json; format inferred from extension)")
verify.Flags().Bool("all", false, "Print the full results table inline instead of a summary")
verify.Flags().Bool("stream", false, "Emit one JSON event per line while polling (implies --wait and --json)")
verify.Flags().String("url", "", "URL that will receive the batch results via HTTP POST")
verify.Flags().Bool("retries", true, "Retry verifications when mail servers return certain responses, increasing accuracy")
verify.Flags().StringSlice("response-fields", nil, "Fields to include in the response (default: all)")
Expand Down Expand Up @@ -218,82 +194,6 @@ func printBatchID(w io.Writer, id string) {
fmt.Fprintf(w, "%s %s\n", label, id)
}

type batchStreamer struct {
f *output.JSON
}

func newStreamerIfEnabled(cmd *cobra.Command, stream bool) *batchStreamer {
if !stream {
return nil
}
return &batchStreamer{f: &output.JSON{W: cmd.OutOrStdout(), Compact: true, Query: jqQuery}}
}

func (s *batchStreamer) emit(payload map[string]any) error {
err := s.f.Print(payload)
// A --jq filter that errors on an event skips it, never aborting the stream.
var fe *output.FilterError
if errors.As(err, &fe) {
return nil
}
return err
}

func (s *batchStreamer) emitSubmitted(id string) error {
return s.emit(map[string]any{"event": "submitted", "id": id})
}

func (s *batchStreamer) emitProgress(id string, processed, total int) error {
return s.emit(map[string]any{
"event": "progress",
"id": id,
"processed": processed,
"total": total,
})
}

func (s *batchStreamer) emitComplete(id string, status *api.BatchStatus) error {
payload := map[string]any{
"event": "complete",
"id": id,
}

if raw := status.RawJSON(); len(raw) > 0 {
var fields map[string]json.RawMessage
if err := json.Unmarshal(raw, &fields); err == nil {
for k, v := range fields {
// Never let the API body shadow CLI-owned envelope keys.
if k == "event" || k == "id" {
continue
}
payload[k] = v
}
return s.emit(payload)
}
}

if status.Status != "" {
payload["status"] = status.Status
}
if len(status.Reason) > 0 {
payload["reason_counts"] = status.Reason
}
if len(status.Emails) > 0 {
payload["emails"] = status.Emails
}
if status.DownloadFile != "" {
payload["download_file"] = status.DownloadFile
}
return s.emit(payload)
}

func applyStreamImplications(stream, wait, jsonIn bool) (waitOut, jsonOut bool) {
if !stream {
return wait, jsonIn
}
return true, true
}

// Fast-then-slow polling: short interval for the first fastPollWindow, then back off.
const (
fastPollInterval = 1 * time.Second
Expand All @@ -302,11 +202,11 @@ const (
)

// waitForCompletion polls until completion. Progress goes to stderr so piped stdout stays clean.
func waitForCompletion(ctx context.Context, client *api.Client, id string, jsonMode bool, sw *batchStreamer, progressOut io.Writer) (*api.BatchStatus, error) {
func waitForCompletion(ctx context.Context, client *api.Client, id string, jsonMode bool, progressOut io.Writer) (*api.BatchStatus, error) {
if progressOut == nil {
progressOut = os.Stderr
}
uiEnabled := !jsonMode && sw == nil
uiEnabled := !jsonMode

var (
bar *ui.Bar
Expand Down Expand Up @@ -339,10 +239,6 @@ func waitForCompletion(ctx context.Context, client *api.Client, id string, jsonM
bar.Set(s.Processed, s.Total)
}

if sw != nil && s.Total > 0 {
_ = sw.emitProgress(id, s.Processed, s.Total)
}

if s.IsComplete() {
queueSpinner.Stop()

Expand Down
Loading
Loading