Skip to content

Commit

Permalink
refactor!: move import progress from HTTP API to JS API (#125)
Browse files Browse the repository at this point in the history
This is the next step in [a larger refactor][0] where we replace the
HTTP API with a JS one.

[0]: #111

BREAKING CHANGE: `GET /imports/progress/:importId` replaced with
  `getImportProgress()`
  • Loading branch information
EvanHahn committed Feb 29, 2024
1 parent 0d4ebeb commit 399d38c
Show file tree
Hide file tree
Showing 18 changed files with 303 additions and 499 deletions.
3 changes: 3 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,8 @@ module.exports = {
'@typescript-eslint/no-var-requires': 'off',
'@typescript-eslint/no-empty-function': 'off',
'@typescript-eslint/ban-ts-comment': 'off',
'@typescript-eslint/no-unused-vars': ['error', {
varsIgnorePattern: '^_',
}],
},
}
47 changes: 0 additions & 47 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,50 +162,3 @@ As of now, only [MBTiles](https://github.com/mapbox/mbtiles-spec) files are supp
- `y: number`: The y coordinate of the tile.

Retrieve a tile for a given tileset. Note that this is usually used by a map client (based on a style definition) and not directly by the end user ([more info](https://docs.mapbox.com/mapbox-gl-js/style-spec/sources/)).

---

## Imports

### `GET /imports/progress/:importId`

- Params:
- `importId: string`: The ID for the desired import.

Subscribe to progress information for an import. This is a [Server-Sent Events (SSE)](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) endpoint, so it's expected to be used with an [EventSource](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) by the client.

Messages that are received will have a `data` field with the following structure when deserialized:

- `type: string`: Type indicating the type of progress message. Can be one of the following values:
- `"progress"`: Import is still in progress
- `"complete"`: Import is complete
- `"error"`: Error occurred during import
- `importId: string`: ID for import
- `soFar: number`: Number of assets successfully imported so far
- `total: number`: Total number of assets to be imported

If a requested import is already completed or has errored, responds with `204 No Content`, which should prevent the event source from attempting to reconnect. Generally, the client should explicitly close the event source when:

1. Receiving a message and the deserialized `type` value in the event data is either `"complete"` or `"error"`.
2. Receiving an error

```js
const evtSource = new EventSource(
'http://localhost:3000/imports/progress/some-import-id'
)

evtSource.onmessage = (event) => {
const message = JSON.parse(event.data)

if (message.type === 'complete' || message.type === 'error') {
evtSource.close()
return
}

// Do something with message...
}

evtSource.onerror = (event) => {
evtSource.close()
}
```
9 changes: 9 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"fastify-plugin": "^3.0.1",
"got": "^11.8.5",
"is-url": "^1.2.4",
"iterpal": "^0.3.0",
"make-promises-safe": "^5.1.0",
"mem": "^8.1.1",
"piscina": "^3.2.0",
Expand Down
58 changes: 52 additions & 6 deletions src/api/imports.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import path from 'path'
import Database, { Database as DatabaseInstance } from 'better-sqlite3'
import { EventEmitter } from 'events'
import { MessageChannel, MessagePort } from 'worker_threads'
import { fromEvents } from 'iterpal'
import { MessageChannel } from 'worker_threads'

import { ImportRecord } from '../lib/imports'
import { PortMessage } from '../lib/mbtiles_import_worker'
import { TileJSON, validateTileJSON } from '../lib/tilejson'
import { mbTilesToTileJSON } from '../lib/mbtiles'
import { generateId, getTilesetId } from '../lib/utils'
import { generateId, getTilesetId, ExhaustivenessError } from '../lib/utils'
import { Api, Context, IdResource } from '.'
import {
MBTilesCannotReadError,
Expand All @@ -17,7 +18,7 @@ import {

export interface ImportsApi {
getImport(importId: string): ImportRecord | undefined
getImportPort(importId: string): MessagePort | undefined
getImportProgress(importId: string): AsyncIterableIterator<PortMessage>
importMBTiles(
filePath: string,
baseApiUrl: string
Expand Down Expand Up @@ -46,8 +47,32 @@ function createImportsApi({
)
.get(importId)
},
getImportPort(importId) {
return activeImports.get(importId)
async *getImportProgress(this: ImportsApi, importId) {
const importState = this.getImport(importId)?.state
switch (importState) {
case undefined:
case 'complete':
case 'error':
return
case 'active':
break
default:
throw new ExhaustivenessError(importState)
}

const port = activeImports.get(importId)
if (!port) {
throw new Error(
'Internal error: import is active but no port was found'
)
}

const messageEvents: AsyncIterable<any> = fromEvents(port, 'message')
for await (const { data } of messageEvents) {
assertIsPortMessage(data)
yield data
if (data.type !== 'progress') break
}
},
importMBTiles(filePath: string, baseApiUrl: string) {
const filePathWithExtension =
Expand Down Expand Up @@ -138,7 +163,8 @@ function createImportsApi({
workerDone = true
})

function handleFirstProgressMessage(message: PortMessage) {
function handleFirstProgressMessage(message: unknown) {
assertIsPortMessage(message)
if (message.type === 'progress') {
importStarted = true
port2.off('message', handleFirstProgressMessage)
Expand Down Expand Up @@ -193,3 +219,23 @@ function createImportsApi({
}

export default createImportsApi

function assertIsPortMessage(value: unknown): asserts value is PortMessage {
const isValid =
value !== null &&
!Array.isArray(value) &&
typeof value === 'object' &&
'type' in value &&
'importId' in value &&
'soFar' in value &&
'total' in value &&
(value.type === 'progress' ||
value.type === 'complete' ||
value.type === 'error') &&
typeof value.importId === 'string' &&
typeof value.soFar === 'number' &&
typeof value.total === 'number'
if (!isValid) {
throw new Error('Expected a port message')
}
}
82 changes: 0 additions & 82 deletions src/lib/mbtiles_import_worker.d.ts

This file was deleted.

100 changes: 74 additions & 26 deletions src/lib/mbtiles_import_worker.js → src/lib/mbtiles_import_worker.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,92 @@
// @ts-check
const path = require('node:path')
const Database = require('better-sqlite3')
import * as path from 'node:path'
import Database, { type Statement } from 'better-sqlite3'

const { IMPORT_ERRORS } = require('./imports')
const { extractMBTilesMetadata } = require('./mbtiles')
const { tileToQuadKey } = require('./tiles')
const { hash, encodeBase32 } = require('./utils')

/** @typedef {import('better-sqlite3').Database} Database */
/** @typedef {import('better-sqlite3').Statement} Statement */
import { IMPORT_ERRORS } from './imports'
import { extractMBTilesMetadata } from './mbtiles'
import { tileToQuadKey } from './tiles'
import { hash, encodeBase32 } from './utils'

const PROGRESS_THROTTLE = 200 // ms

module.exports = importMbTiles
interface ImportWorkerOptions {
dbPath: string
importId: string
mbTilesDbPath: string
styleId: string
tilesetId: string
port: MessagePort
}

/**
* @param {import('./mbtiles_import_worker').ImportWorkerOptions} options
*/
function importMbTiles({
interface Queries {
getMbTilesImportTotals(): { bytes: number; tiles: number }
getMbTilesTileRows(): IterableIterator<{
data: Buffer
z: number
y: number
x: number
}>
upsertOfflineArea: Statement<{
id: string
zoomLevel: string
boundingBox: string
name: string
styleId: string
}>
insertImport: Statement<{
id: string
totalResources: number
totalBytes: number
areaId: string
tilesetId?: string
}>
updateImport: Statement<{
id: string
importedResources: number
importedBytes: number
}>
setImportError: Statement<{
id: string
error: string
}>
completeImport: Statement<{
id: string
importedResources: number
importedBytes: number
}>
upsertTileData: Statement<{
data: Buffer
tileHash: string
tilesetId: string
}>
upsertTile: Statement<{
quadKey: string
tileHash: string
tilesetId: string
}>
}

export type PortMessage = {
type: 'progress' | 'complete' | 'error'
importId: string
soFar: number
total: number
}

export default function importMbTiles({
dbPath,
importId,
mbTilesDbPath,
tilesetId,
styleId,
port,
}) {
/** @type {Database} */
}: Readonly<ImportWorkerOptions>) {
const db = new Database(dbPath)
db.pragma('auto_vacuum = INCREMENTAL')
db.pragma('journal_mode = WAL')

/** @type {Database} */
const mbTilesDb = new Database(mbTilesDbPath, { readonly: true })

/** @type {import('./mbtiles_import_worker').Queries} */
const queries = {
const queries: Queries = {
getMbTilesImportTotals: () =>
mbTilesDb
.prepare(
Expand Down Expand Up @@ -212,13 +264,9 @@ function importMbTiles({

/**
* Creates an error indicating which fields from a tile row are unexpectedly null
*
* @param {{[key: string]: *}} row
* @returns {?Error}
*/
function validateTileRow(row) {
/** @type {string[]} */
const invalidFields = []
function validateTileRow(row: Readonly<Record<string, unknown>>): null | Error {
const invalidFields: string[] = []

Object.entries(row).forEach(([field, value]) => {
if (value == null) {
Expand Down
Loading

0 comments on commit 399d38c

Please sign in to comment.