Skip to content

Commit

Permalink
Merge pull request #1586 from cloudflare/glen/d1-api-adding-results-f…
Browse files Browse the repository at this point in the history
…ormat

Adding resultsFormat to d1-api.ts to fix .raw() bug
  • Loading branch information
geelen committed Feb 2, 2024
2 parents 510143b + 0033cd4 commit 3c85053
Show file tree
Hide file tree
Showing 6 changed files with 584 additions and 200 deletions.
154 changes: 113 additions & 41 deletions src/cloudflare/internal/d1-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,52 @@ interface Fetcher {
fetch: typeof fetch
}

interface D1Result<T = unknown> {
results: T[]
type D1Response = {
success: true
meta: Record<string, unknown>
error?: never
}

interface D1UpstreamFailure {
type D1Result<T = unknown> = D1Response & {
results: T[]
}

type D1RawOptions = {
columnNames?: boolean
}

type D1UpstreamFailure = {
results?: never
error: string
success: false
meta: Record<string, unknown>
}

type D1UpstreamResponse<T = unknown> = D1Result<T> | D1UpstreamFailure
type D1RowsColumns<T = unknown> = D1Response & {
results: {
columns: string[]
rows: T[][]
}
}

type D1UpstreamSuccess<T = unknown> =
| D1Result<T>
| D1Response
| D1RowsColumns<T>

interface D1ExecResult {
type D1UpstreamResponse<T = unknown> = D1UpstreamSuccess<T> | D1UpstreamFailure

type D1ExecResult = {
count: number
duration: number
}

interface SQLError {
type SQLError = {
error: string
}

type ResultsFormat = 'ARRAY_OF_OBJECTS' | 'ROWS_AND_COLUMNS' | 'NONE'

class D1Database {
private readonly fetcher: Fetcher

Expand All @@ -42,6 +63,7 @@ class D1Database {
return new D1PreparedStatement(this, query)
}

// DEPRECATED, TO BE REMOVED WITH NEXT BREAKING CHANGE
public async dump(): Promise<ArrayBuffer> {
const response = await this.fetcher.fetch('http://d1/dump', {
method: 'POST',
Expand All @@ -67,18 +89,18 @@ class D1Database {
public async batch<T = unknown>(
statements: D1PreparedStatement[]
): Promise<D1Result<T>[]> {
const exec = await this._sendOrThrow(
const exec = (await this._sendOrThrow(
'/query',
statements.map((s: D1PreparedStatement) => s.statement),
statements.map((s: D1PreparedStatement) => s.params)
)
return exec as D1Result<T>[]
statements.map((s: D1PreparedStatement) => s.params),
'ROWS_AND_COLUMNS'
)) as D1UpstreamSuccess<T>[]
return exec.map(toArrayOfObjects)
}

public async exec(query: string): Promise<D1ExecResult> {
// should be /execute - see CFSQL-52
const lines = query.trim().split('\n')
const _exec = await this._send('/query', lines, [])
const _exec = await this._send('/execute', lines, [], 'NONE')
const exec = Array.isArray(_exec) ? _exec : [_exec]
const error = exec
.map((r) => {
Expand Down Expand Up @@ -108,29 +130,31 @@ class D1Database {

public async _sendOrThrow<T = unknown>(
endpoint: string,
query: unknown,
params: unknown[]
): Promise<D1Result<T>[] | D1Result<T>> {
const results = await this._send(endpoint, query, params)
query: string | string[],
params: unknown[],
resultsFormat: ResultsFormat
): Promise<D1UpstreamSuccess<T>[] | D1UpstreamSuccess<T>> {
const results = await this._send(endpoint, query, params, resultsFormat)
const firstResult = firstIfArray(results)
if (!firstResult.success) {
throw new Error(`D1_ERROR: ${firstResult.error}`, {
cause: new Error(firstResult.error),
})
} else {
return results as D1Result<T>[] | D1Result<T>
return results as D1UpstreamSuccess<T>[] | D1UpstreamSuccess<T>
}
}

public async _send<T = unknown>(
endpoint: string,
query: unknown,
params: unknown[]
query: string | string[],
params: unknown[],
resultsFormat: ResultsFormat
): Promise<D1UpstreamResponse<T>[] | D1UpstreamResponse<T>> {
/* this needs work - we currently only support ordered ?n params */
const body = JSON.stringify(
typeof query == 'object'
? (query as string[]).map((s: string, index: number) => {
Array.isArray(query)
? query.map((s: string, index: number) => {
return { sql: s, params: params[index] }
})
: {
Expand All @@ -139,7 +163,9 @@ class D1Database {
}
)

const response = await this.fetcher.fetch(new URL(endpoint, 'http://d1'), {
const url = new URL(endpoint, 'http://d1')
url.searchParams.set('resultsFormat', resultsFormat)
const response = await this.fetcher.fetch(url.href, {
method: 'POST',
headers: {
'content-type': 'application/json',
Expand Down Expand Up @@ -235,11 +261,12 @@ class D1PreparedStatement {
await this.database._sendOrThrow<Record<string, T>>(
'/query',
this.statement,
this.params
this.params,
'ROWS_AND_COLUMNS'
)
)

const results = info.results
const results = toArrayOfObjects(info).results
const hasResults = results.length > 0
if (!hasResults) return null

Expand All @@ -256,49 +283,94 @@ class D1PreparedStatement {
}
}

public async run<T = Record<string, unknown>>(): Promise<D1Result<T>> {
public async run<T = Record<string, unknown>>(): Promise<D1Response> {
return firstIfArray(
await this.database._sendOrThrow<T>(
'/execute',
this.statement,
this.params
this.params,
'NONE'
)
)
}

public async all<T = Record<string, unknown>>(): Promise<D1Result<T[]>> {
return firstIfArray(
await this.database._sendOrThrow<T[]>(
'/query',
this.statement,
this.params
return toArrayOfObjects(
firstIfArray(
await this.database._sendOrThrow<T[]>(
'/query',
this.statement,
this.params,
'ROWS_AND_COLUMNS'
)
)
)
}

public async raw<T = unknown[]>(): Promise<T[]> {
public async raw<T = unknown[]>(options?: D1RawOptions): Promise<T[]> {
const s = firstIfArray(
await this.database._sendOrThrow<Record<string, unknown>>(
'/query',
this.statement,
this.params
this.params,
'ROWS_AND_COLUMNS'
)
)
const raw: T[] = []
for (const row of s.results) {
const entry = Object.keys(row).map((k) => {
return row[k]
})
raw.push(entry as T)
// If no results returned, return empty array
if (!('results' in s)) return []

// If ARRAY_OF_OBJECTS returned, extract cells
if (Array.isArray(s.results)) {
const raw: T[] = []
for (const row of s.results) {
if (options?.columnNames && raw.length === 0) {
raw.push(Array.from(Object.keys(row)) as T)
}
const entry = Object.keys(row).map((k) => {
return row[k]
})
raw.push(entry as T)
}
return raw
} else {
// Otherwise, data is already in the correct format
return [
...(options?.columnNames ? [s.results.columns as T] : []),
...(s.results.rows as T[]),
]
}
return raw
}
}

function firstIfArray<T>(results: T | T[]): T {
return Array.isArray(results) ? results[0]! : results
}

// This shim may be used against an older version of D1 that doesn't support
// the ROWS_AND_COLUMNS/NONE interchange format, so be permissive here
function toArrayOfObjects<T>(response: D1UpstreamSuccess<T>): D1Result<T> {
// If 'results' is missing from upstream, add an empty array
if (!('results' in response))
return {
...response,
results: [],
}

const results = response.results
if (Array.isArray(results)) {
return { ...response, results }
} else {
const { rows, columns } = results
return {
...response,
results: rows.map(
(row) =>
Object.fromEntries(row.map((cell, i) => [columns[i], cell])) as T
),
}
}
}

function mapD1Result<T>(result: D1UpstreamResponse<T>): D1UpstreamResponse<T> {
// The rest of the app can guarantee that success is true/false, but from the API
// we only guarantee that error is present/absent.
Expand All @@ -311,7 +383,7 @@ function mapD1Result<T>(result: D1UpstreamResponse<T>): D1UpstreamResponse<T> {
: {
success: true,
meta: result.meta || {},
results: result.results || [],
...('results' in result ? { results: result.results } : {}),
}
}

Expand Down
1 change: 1 addition & 0 deletions src/cloudflare/internal/test/d1/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ load("//:build/wd_test.bzl", "wd_test")

wd_test(
src = "d1-api-test.wd-test",
args = ["--experimental"],
data = glob([
"*.js",
"*.capnp",
Expand Down
Loading

0 comments on commit 3c85053

Please sign in to comment.