-
Notifications
You must be signed in to change notification settings - Fork 6
/
subscription.ts
68 lines (60 loc) · 2.23 KB
/
subscription.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import { AsyncIterable } from 'ix';
import { RichText } from '@atproto/api';
import { Counter } from 'prom-client';
import { Commit } from './lexicon/types/com/atproto/sync/subscribeRepos';
import { FirehoseSubscriptionBase } from './util/subscription';
import { extractTextLanguage, hasHebrewLetters } from './util/hebrew';
import { Record as PostRecord } from './lexicon/types/app/bsky/feed/post';
import { getOpsByType } from './util/commit-parser';
import { min } from 'date-fns';
import logger from './logger';
const indexerPostsCreated = new Counter({
name: 'indexer_posts_created',
help: 'Posts indexed',
});
export class FirehoseSubscription extends FirehoseSubscriptionBase {
async handleCommits(commits: Commit[]) {
const ops = await Promise.all(commits.map(getOpsByType));
const postsToCreate = await AsyncIterable.from(ops)
.flatMap((op) => op.posts.creates)
.filter((op) => hasHebrewLetters(op.record.text))
.map(async (create) => {
const language = await extractTextLanguage(removeFacets(create.record));
const indexedAt = new Date();
const createdAt = create.record.createdAt;
const effectiveTimestamp = min([indexedAt, createdAt]).toISOString();
logger.info({uri: create.uri, text: create.record.text, effectiveTimestamp}, "Indexing new post")
return {
uri: create.uri,
author: create.author,
cid: create.cid,
replyTo: create.record.reply?.parent.uri,
replyRoot: create.record.reply?.root.uri,
indexedAt: indexedAt.toISOString(),
createdAt: create.record.createdAt,
effectiveTimestamp,
language,
};
})
.toArray();
if (postsToCreate.length > 0) {
indexerPostsCreated.inc(postsToCreate.length);
await this.db
.insertInto('post')
.values(postsToCreate)
.onConflict((oc) => oc.doNothing())
.execute();
}
}
}
function removeFacets(record: PostRecord) {
const richText = new RichText({
text: record.text,
facets: record.facets,
entities: record.entities,
});
for (const facet of richText.facets ?? []) {
richText.delete(facet.index.byteStart, facet.index.byteEnd);
}
return richText.text.trim();
}