-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathcommit-parser.ts
95 lines (83 loc) · 2.71 KB
/
commit-parser.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import { Counter } from 'prom-client';
import { cborToLexRecord, readCar } from '@atproto/repo';
import { BlobRef } from '@atproto/lexicon';
import { ids, lexicons } from '../lexicon/lexicons';
import { Record as PostRecord } from '../lexicon/types/app/bsky/feed/post';
import { Commit } from '../lexicon/types/com/atproto/sync/subscribeRepos';
const firehose_operations = new Counter({
name: 'firehose_operations',
help: 'All operations seen on the firehose',
labelNames: ['action', 'collection'],
});
export async function getOpsByType(evt: Commit): Promise<OperationsByType> {
if (
!evt.ops.some(
(op) => op.action === 'create' && op.path.startsWith(ids.AppBskyFeedPost),
)
) {
return { posts: { creates: [] } };
}
const car = await readCar(evt.blocks);
const opsByType: OperationsByType = {
posts: { creates: [] },
};
for (const op of evt.ops) {
const uri = `at://${evt.repo}/${op.path}`;
const [collection] = op.path.split('/');
firehose_operations.inc({ action: op.action, collection });
if (op.action === 'update') continue; // updates not supported yet
if (op.action === 'create') {
if (!op.cid) continue;
const recordBytes = car.blocks.get(op.cid);
if (!recordBytes) continue;
const record = cborToLexRecord(recordBytes);
const create = { uri, cid: op.cid.toString(), author: evt.repo };
if (collection === ids.AppBskyFeedPost && isPost(record)) {
opsByType.posts.creates.push({ record, ...create });
}
}
}
return opsByType;
}
type OperationsByType = {
posts: Operations<PostRecord>;
};
type Operations<T = Record<string, unknown>> = {
creates: CreateOp<T>[];
};
type CreateOp<T> = {
uri: string;
cid: string;
author: string;
record: T;
};
export const isPost = (obj: unknown): obj is PostRecord => {
return isType(obj, ids.AppBskyFeedPost);
};
const isType = (obj: unknown, nsid: string) => {
try {
lexicons.assertValidRecord(nsid, fixBlobRefs(obj));
return true;
} catch (err) {
return false;
}
};
// @TODO right now record validation fails on BlobRefs
// simply because multiple packages have their own copy
// of the BlobRef class, causing instanceof checks to fail.
// This is a temporary solution.
const fixBlobRefs = (obj: unknown): unknown => {
if (Array.isArray(obj)) {
return obj.map(fixBlobRefs);
}
if (obj && typeof obj === 'object') {
if (obj.constructor.name === 'BlobRef') {
const blob = obj as BlobRef;
return new BlobRef(blob.ref, blob.mimeType, blob.size, blob.original);
}
return Object.entries(obj).reduce((acc, [key, val]) => {
return Object.assign(acc, { [key]: fixBlobRefs(val) });
}, {} as Record<string, unknown>);
}
return obj;
};