-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathuseJetstream.ts
More file actions
93 lines (77 loc) · 2.32 KB
/
useJetstream.ts
File metadata and controls
93 lines (77 loc) · 2.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import { collector, mapped, sampled } from "@/lib/streams/transforms";
import { streamMessages } from "@/lib/streams/web-socket";
import { AppBskyFeedPost } from "@atproto/api";
import { concat } from "lodash";
import { useCallback, useEffect, useState } from "react";
import { useDocumentVisibility } from "./useDocumentVisibility";
export const hosts = [
"jetstream1.us-east.bsky.network", // US-East
"jetstream2.us-east.bsky.network", // US-East
"jetstream1.us-west.bsky.network", // US-West
"jetstream2.us-west.bsky.network", // US-West
] as const;
function getJetstreamUrl(host: (typeof hosts)[number]) {
const jetstreamUrl = new URL("subscribe", `wss://${host}`);
jetstreamUrl.searchParams.append("wantedCollections", "app.bsky.feed.post");
return jetstreamUrl;
}
export type JetstreamPost = {
record: AppBskyFeedPost.Record;
rkey: string;
cid: string;
did: string;
collection: string;
};
export function useJetstream(
sampleRate: number,
bufferSize: number,
active: boolean,
host: (typeof hosts)[number]
) {
const isVisible = useDocumentVisibility();
const [posts, setPosts] = useState<JetstreamPost[]>([]);
const onNewPost = useCallback(
(post: JetstreamPost | undefined) => {
if (post) {
setPosts((posts) => concat(post, posts.slice(0, bufferSize)));
}
},
[bufferSize]
);
useEffect(() => {
if (!active || !isVisible) {
return;
}
const controller = new AbortController();
const ws = new WebSocket(getJetstreamUrl(host));
streamMessages(ws)
.pipeThrough(sampled(sampleRate))
.pipeThrough(mapped(extractPost))
.pipeTo(collector(onNewPost), { signal: controller.signal });
return () => controller.abort();
}, [active, isVisible, sampleRate, onNewPost, host]);
return { posts };
}
function extractPost(message: MessageEvent): JetstreamPost | undefined {
if (!message.data) {
return undefined;
}
const data = JSON.parse(message.data);
if (data.kind !== "commit") {
return undefined;
}
if (data.commit.operation !== "create") {
return undefined;
}
const record = data.commit.record;
if (!record || !AppBskyFeedPost.isRecord(record)) {
return undefined;
}
return {
record,
rkey: data.commit.rkey,
did: data.did,
cid: data.commit.cid,
collection: data.commit.collection,
};
}