Skip to content

Commit

Permalink
feat(bundles ans-104): emit unbundle complete events PE-3769
Browse files Browse the repository at this point in the history
Adds unbundle complete events containing - filter string used to match
data items, total data item count, matched data item count. These events
will be used to index bundles in the DB. The filter string is included
so that we know which bundles need reprocessing when it's changed.
  • Loading branch information
djwhitt committed Jul 18, 2023
1 parent 12056a4 commit ca3cfd8
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 9 deletions.
1 change: 1 addition & 0 deletions src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export const ANS104_DATA_ITEM_DATA_INDEXED = 'ans104-data-item-data-indexed';
export const ANS104_DATA_ITEM_INDEXED = 'ans104-data-indexed';
export const ANS104_DATA_ITEM_MATCHED = 'asn104-data-item-matched';
export const ANS104_TX_INDEXED = 'ans104-tx-indexed';
export const ANS104_UNBUNDLE_COMPLETE = 'ans104-unbundle-complete';
export const BLOCK_FETCHED = 'block-fetched';
export const BLOCK_INDEXED = 'block-indexed';
export const BLOCK_TX_FETCHED = 'block-tx-fetched';
Expand Down
44 changes: 35 additions & 9 deletions src/lib/ans-104.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,23 @@ import { fromB64Url, sha256B64Url, utf8ToB64Url } from './encoding.js';
// @ts-ignore
const { default: processStream } = arbundles;

type ParseEventName =
| 'data-item-matched'
| 'unbundle-complete'
| 'unbundle-error';

const DATA_ITEM_MATCHED: ParseEventName = 'data-item-matched';
const UNBUNDLE_COMPLETE: ParseEventName = 'unbundle-complete';
const UNBUNDLE_ERROR: ParseEventName = 'unbundle-error';

interface ParserMessage {
eventName: ParseEventName;
dataItem?: NormalizedDataItem;
dataItemIndexFilterString?: string;
itemCount?: number;
matchedItemCount?: number;
}

export function normalizeAns104DataItem({
rootTxId,
parentId,
Expand Down Expand Up @@ -66,7 +83,6 @@ export class Ans104Parser {
private worker: Worker;
private contiguousDataSource: ContiguousDataSource;
private unbundlePromise: Promise<void> | undefined;
private dataItemIndexFilterString: string;

constructor({
log,
Expand All @@ -81,29 +97,33 @@ export class Ans104Parser {
}) {
this.log = log.child({ class: 'Ans104Parser' });
this.contiguousDataSource = contiguousDataSource;
this.dataItemIndexFilterString = dataItemIndexFilterString;

const workerUrl = new URL('./ans-104.js', import.meta.url);
this.worker = new Worker(workerUrl, {
workerData: {
dataItemIndexFilterString: this.dataItemIndexFilterString,
dataItemIndexFilterString,
},
});

this.worker.on(
'message',
((message: any) => {
((message: ParserMessage) => {
switch (message.eventName) {
case 'data-item-matched':
case DATA_ITEM_MATCHED:
eventEmitter.emit(
events.ANS104_DATA_ITEM_MATCHED,
message.dataItem,
);
break;
case 'unbundle-complete':
case UNBUNDLE_COMPLETE:
const { eventName, ...eventBody } = message;
eventEmitter.emit(events.ANS104_UNBUNDLE_COMPLETE, {
dataItemIndexFilterString,
...eventBody,
});
this.unbundlePromise = undefined;
break;
case 'unbundle-error':
case UNBUNDLE_ERROR:
this.unbundlePromise = undefined;
break;
}
Expand Down Expand Up @@ -174,6 +194,7 @@ if (!isMainThread) {
const stream = fs.createReadStream(bundlePath);
const iterable = await processStream(stream);
const bundleLength = iterable.length;
let matchedItemCount = 0;

const fnLog = log.child({ rootTxId, parentId, bundleLength });
fnLog.info('Unbundling ANS-104 bundle stream data items...');
Expand Down Expand Up @@ -212,13 +233,18 @@ if (!isMainThread) {
});

if (await filter.match(normalizedDataItem)) {
matchedItemCount++;
parentPort?.postMessage({
eventName: 'data-item-matched',
eventName: DATA_ITEM_MATCHED,
dataItem: normalizedDataItem,
});
}
}
parentPort?.postMessage({ eventName: 'unbundle-complete' });
parentPort?.postMessage({
eventName: UNBUNDLE_COMPLETE,
itemCount: bundleLength,
matchedItemCount,
});
} catch (error) {
log.error('Error unbundling ANS-104 bundle stream', error);
parentPort?.postMessage({ eventName: 'unbundle-error' });
Expand Down

0 comments on commit ca3cfd8

Please sign in to comment.