Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 42 additions & 39 deletions typescript/amp/src/cli/commands/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ import { createGrpcTransport, type GrpcTransportOptions } from "@connectrpc/conn
import * as Args from "@effect/cli/Args"
import * as Command from "@effect/cli/Command"
import * as Options from "@effect/cli/Options"
import { Table } from "apache-arrow"
import * as Chunk from "effect/Chunk"
import * as Console from "effect/Console"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
import * as Match from "effect/Match"
import * as Option from "effect/Option"
import * as Schema from "effect/Schema"
import * as Stream from "effect/Stream"
Expand Down Expand Up @@ -41,47 +39,52 @@ export const query = Command.make("query", {
Command.withHandler(
Effect.fn(function*({ args }) {
const flight = yield* ArrowFlight.ArrowFlight
const table = yield* flight.query(args.query).pipe(
Stream.runCollect,
Effect.map((_) => Chunk.toArray(_)),
Effect.map((array) =>
Option.match(args.limit, {
onSome: (limit) => new Table(array.slice(0, limit)),
onNone: () => new Table(array),
})
),
)

const schema = Arrow.generateSchema(table.schema)
const effect = Match.value(args.format).pipe(
Match.when("table", () =>
Effect.succeed([...table]).pipe(
Effect.flatMap(Schema.encodeUnknown(Schema.Array(schema))),
Effect.flatMap(Console.table),
)),
Match.when("json", () =>
Effect.succeed([...table]).pipe(
Effect.flatMap(Schema.encodeUnknown(Schema.Array(schema))),
Effect.map((_) => JSON.stringify(_, null, 2)),
Effect.flatMap(Console.log),
)),
Match.when("jsonl", () =>
Stream.fromIterable([...table]).pipe(
Stream.mapEffect(Schema.encodeUnknown(schema)),
Stream.map((_) => JSON.stringify(_)),
Stream.runForEach(Console.log),
)),
Match.when("pretty", () =>
Stream.fromIterable([...table]).pipe(
// Decode every row from every record batch as it arrives.
// This works for both streaming (infinite) and non-streaming (finite) queries.
const rows = flight.query(args.query).pipe(
Stream.flatMap((batch) => {
const schema = Arrow.generateSchema(batch.schema)
return Stream.fromIterable([...batch]).pipe(
Stream.mapEffect(Schema.encodeUnknown(schema)),
Stream.runForEach(Console.log),
)),
Match.exhaustive,
)
}),
)

yield* effect.pipe(Effect.catchTags({
ParseError: (cause) => Effect.die(cause),
}))
const limited = Option.match(args.limit, {
onSome: (limit) => Stream.take(rows, limit),
onNone: () => rows,
})

if (args.format === "jsonl") {
yield* limited.pipe(
Stream.map((_) => JSON.stringify(_)),
Stream.runForEach(Console.log),
Effect.catchTags({ ParseError: (cause) => Effect.die(cause) }),
)
} else if (args.format === "pretty") {
yield* limited.pipe(
Stream.runForEach(Console.log),
Effect.catchTags({ ParseError: (cause) => Effect.die(cause) }),
)
} else {
// table / json: collect rows then format. Stream.take above ensures this
// terminates even for streaming queries when --limit is provided.
const collected = yield* limited.pipe(
Stream.runCollect,
Effect.map(Chunk.toArray),
Effect.catchTags({ ParseError: (cause) => Effect.die(cause) }),
)
if (args.format === "table") {
yield* Console.table(collected)
} else {
yield* Console.log(JSON.stringify(collected, null, 2))
}
}

// Force exit to close any open gRPC connections (streaming queries never
// send EOS so the event loop would otherwise hang indefinitely).
process.exit(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is slop.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I just remove it, streaming queries with a --limit option hang and the CLI command doesn't terminate.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have suggestions here I'm happy to try them in a follow up

}),
),
Command.provide(({ args }) =>
Expand Down
Loading