-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Begin implementation of Jetstream service API.
This allows developers to access the firehose in a more friendly way than via the subscribeRepos() method. In this way, consumers can receive friendly JSON output instead of CBOR-encoded blocks, as well as easy filtering on specific collections or DIDs. This also supports the zstd compression option for JVM and JS which can reduce message sizes considerably. "A Jetstream consumer that only cares about posts and has zstd compression enabled can get by on as little as ~25.5GB/mo, <99% of the full weight firehose." https://jazco.dev/2024/09/24/jetstream/
- Loading branch information
1 parent
aa92189
commit fcc9e39
Showing
20 changed files
with
919 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
/build/ |
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
import org.jetbrains.dokka.gradle.AbstractDokkaTask | ||
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile | ||
|
||
plugins { | ||
id("ozone-dokka") | ||
id("ozone-multiplatform") | ||
id("ozone-publish") | ||
id("sh.christian.ozone.generator") | ||
id("org.jetbrains.kotlinx.binary-compatibility-validator") | ||
} | ||
|
||
ozone { | ||
js() | ||
jvm() | ||
} | ||
|
||
dependencies { | ||
lexicons(fileTree("schemas") { | ||
include("**/*.json") | ||
}) | ||
} | ||
|
||
kotlin { | ||
sourceSets { | ||
val commonMain by getting { | ||
dependencies { | ||
implementation(libs.ktor.logging) | ||
} | ||
} | ||
val jvmMain by getting { | ||
dependencies { | ||
implementation(libs.zstd) | ||
} | ||
} | ||
val jsMain by getting { | ||
dependencies { | ||
implementation(npm("zstd-codec", "0.1.5")) | ||
} | ||
} | ||
} | ||
} | ||
|
||
val generateLexicons = tasks.generateLexicons | ||
tasks.apiDump.configure { dependsOn(generateLexicons) } | ||
tasks.apiCheck.configure { dependsOn(generateLexicons) } | ||
|
||
tasks.withType<AbstractDokkaTask>().configureEach { | ||
dependsOn(tasks.withType<KotlinCompile>()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
POM_NAME=AT Protocol for Kotlin - Jetstream API | ||
POM_DESCRIPTION=Jetstream API bindings for Kotlin. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
{ | ||
"lexicon": 1, | ||
"id": "app.bsky.jetstream.subscribe", | ||
"defs": { | ||
"main": { | ||
"type": "subscription", | ||
"description": "Consume lightweight, friendly JSON converted from an ATProto `com.atproto.sync.subscribeRepos` stream.", | ||
"parameters": { | ||
"type": "params", | ||
"properties": { | ||
"wantedCollections": { | ||
"type": "array", | ||
"maxLength": 100, | ||
"items": { | ||
"type": "string", | ||
"format": "nsid" | ||
}, | ||
"description": "Which records you receive on your stream (default empty = all collections)." | ||
}, | ||
"wantedDids": { | ||
"type": "array", | ||
"maxLength": 10000, | ||
"items": { | ||
"type": "string", | ||
"format": "did" | ||
}, | ||
"description": "Which records you receive on your stream (default empty = all repos)." | ||
}, | ||
"maxMessageSizeBytes": { | ||
"type": "integer", | ||
"minimum": 0, | ||
"default": 0, | ||
"description": "The maximum size of a payload that this client would like to receive." | ||
}, | ||
"cursor": { | ||
"type": "integer", | ||
"description": "A unix microseconds timestamp cursor to begin playback from." | ||
}, | ||
"compress": { | ||
"type": "boolean", | ||
"default": false, | ||
"description": "Set to true to enable zstd compression." | ||
}, | ||
"requireHello": { | ||
"type": "boolean", | ||
"default": false, | ||
"description": "Set to true to pause replay/live-tail until the server receives a SubscribeOptionsUpdate." | ||
} | ||
} | ||
}, | ||
"message": { | ||
"schema": { | ||
"type": "ref", | ||
"ref": "#event" | ||
} | ||
} | ||
}, | ||
"event": { | ||
"type": "object", | ||
"required": ["did", "time_us", "kind"], | ||
"nullable": ["commit", "identity", "account"], | ||
"properties": { | ||
"did": { | ||
"type": "string", | ||
"format": "did" | ||
}, | ||
"time_us": { | ||
"type": "integer" | ||
}, | ||
"kind": { | ||
"type": "string", | ||
"knownValues": ["commit", "identity", "account"] | ||
}, | ||
"commit": { | ||
"type": "ref", | ||
"ref": "#commit" | ||
}, | ||
"identity": { | ||
"type": "ref", | ||
"ref": "#identity" | ||
}, | ||
"account": { | ||
"type": "ref", | ||
"ref": "#account" | ||
} | ||
} | ||
}, | ||
"commit": { | ||
"type": "object", | ||
"required": ["rev", "operation", "collection", "rkey"], | ||
"nullable": ["record"], | ||
"properties": { | ||
"rev": { "type": "string" }, | ||
"operation": { | ||
"type": "string", | ||
"knownValues": ["create", "update", "delete"] | ||
}, | ||
"collection": { | ||
"type": "string", | ||
"format": "nsid" | ||
}, | ||
"rkey": { | ||
"type": "string", | ||
"format": "record-key" | ||
}, | ||
"record": { "type": "unknown" }, | ||
"cid": { | ||
"type": "string", | ||
"format": "cid" | ||
} | ||
} | ||
}, | ||
"identity": { | ||
"type": "object", | ||
"required": ["did", "handle", "seq", "time"], | ||
"properties": { | ||
"did": { | ||
"type": "string", | ||
"format": "did" | ||
}, | ||
"handle": { | ||
"type": "string", | ||
"format": "handle" | ||
}, | ||
"seq": { "type": "integer" }, | ||
"time": { "type": "string" } | ||
} | ||
}, | ||
"account": { | ||
"type": "object", | ||
"required": ["active", "did", "seq", "time"], | ||
"properties": { | ||
"active": { "type": "boolean" }, | ||
"did": { | ||
"type": "string", | ||
"format": "did" | ||
}, | ||
"seq": { "type": "integer" }, | ||
"time": { "type": "string" } | ||
} | ||
}, | ||
"sourcedMessage": { | ||
"type": "object", | ||
"description": "Send messages back to Jetstream over the websocket.", | ||
"required": ["type", "payload"], | ||
"properties": { | ||
"type": { "type": "string" }, | ||
"payload": { "type": "unknown" } | ||
} | ||
}, | ||
"optionsUpdate": { | ||
"type": "object", | ||
"description": "Update subscription filter after connecting to the socket.", | ||
"properties": { | ||
"wantedCollections": { | ||
"type": "array", | ||
"maxLength": 100, | ||
"items": { | ||
"type": "string", | ||
"format": "nsid" | ||
}, | ||
"description": "Which records you receive on your stream (default empty = all collections)." | ||
}, | ||
"wantedDids": { | ||
"type": "array", | ||
"maxLength": 10000, | ||
"items": { | ||
"type": "string", | ||
"format": "did" | ||
}, | ||
"description": "Which records you receive on your stream (default empty = all repos)." | ||
}, | ||
"maxMessageSizeBytes": { | ||
"type": "integer", | ||
"minimum": 0, | ||
"default": 0, | ||
"description": "The maximum size of a payload that this client would like to receive." | ||
} | ||
} | ||
} | ||
} | ||
} |
136 changes: 136 additions & 0 deletions
136
jetstream/src/commonMain/kotlin/sh/christian/ozone/jetstream/JetstreamApi.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
package sh.christian.ozone.jetstream | ||
|
||
import app.bsky.jetstream.SubscribeMessage | ||
import app.bsky.jetstream.SubscribeOptionsUpdate | ||
import app.bsky.jetstream.SubscribeQueryParams | ||
import app.bsky.jetstream.SubscribeSourcedMessage | ||
import io.ktor.client.HttpClient | ||
import io.ktor.client.plugins.DefaultRequest | ||
import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession | ||
import io.ktor.client.plugins.websocket.WebSockets | ||
import io.ktor.client.plugins.websocket.wss | ||
import io.ktor.client.request.parameter | ||
import io.ktor.websocket.Frame | ||
import kotlinx.coroutines.cancelAndJoin | ||
import kotlinx.coroutines.channels.ReceiveChannel | ||
import kotlinx.coroutines.flow.Flow | ||
import kotlinx.coroutines.flow.MutableSharedFlow | ||
import kotlinx.coroutines.flow.catch | ||
import kotlinx.coroutines.flow.emitAll | ||
import kotlinx.coroutines.flow.flow | ||
import kotlinx.coroutines.flow.map | ||
import kotlinx.coroutines.flow.mapNotNull | ||
import kotlinx.coroutines.flow.receiveAsFlow | ||
import kotlinx.coroutines.launch | ||
import kotlinx.serialization.encodeToString | ||
import sh.christian.ozone.api.model.JsonContent | ||
import sh.christian.ozone.api.runtime.BlueskyJson | ||
import sh.christian.ozone.api.xrpc.defaultHttpClient | ||
|
||
/** | ||
* Implementation to interact with a hosted Jetstream service. | ||
* | ||
* @constructor Construct a new instance using an existing [HttpClient]. | ||
*/ | ||
class JetstreamApi(httpClient: HttpClient) : JetstreamInterface { | ||
|
||
/** Construct a new instance using a free-form [hostName]. */ | ||
constructor(hostName: String) : this( | ||
defaultHttpClient.config { | ||
install(DefaultRequest) { | ||
url.host = hostName | ||
} | ||
} | ||
) | ||
|
||
/** Construct a new instance using a well-known [JetstreamHost] instance. */ | ||
constructor(host: JetstreamHost) : this("jetstream${host.instance}.${host.region}.bsky.network") | ||
|
||
/** Construct a new instance that connects to the [JetstreamHost.JETSTREAM_1_US_EAST] instance. */ | ||
constructor() : this(JetstreamHost.JETSTREAM_1_US_EAST) | ||
|
||
private val client: HttpClient = httpClient.config { | ||
install(WebSockets) | ||
} | ||
|
||
override suspend fun subscribe(params: SubscribeQueryParams): Flow<SubscribeMessage> = flow { | ||
withSubscribe(params) { | ||
emitAll(incoming.messages()) | ||
} | ||
} | ||
|
||
override suspend fun subscribe( | ||
params: SubscribeQueryParams, | ||
block: suspend SubscriptionContext.() -> Unit, | ||
) { | ||
withSubscribe(params) { | ||
val incomingMessages = incoming.messages() | ||
val outgoingMessages = MutableSharedFlow<SubscribeSourcedMessage>() | ||
|
||
val outgoingJob = launch { | ||
outgoingMessages.collect { sourcedMessage -> | ||
outgoing.send(Frame.Text(BlueskyJson.encodeToString(sourcedMessage))) | ||
} | ||
} | ||
|
||
try { | ||
DefaultSubscriptionContext(outgoingMessages, incomingMessages).apply { | ||
block() | ||
} | ||
} finally { | ||
outgoingJob.cancelAndJoin() | ||
} | ||
} | ||
} | ||
|
||
private suspend fun withSubscribe( | ||
params: SubscribeQueryParams, | ||
block: suspend DefaultClientWebSocketSession.() -> Unit | ||
) { | ||
client.wss( | ||
path = "/subscribe", | ||
request = { params.asList().forEach { (key, value) -> parameter(key, value) } }, | ||
) { | ||
initZstd() | ||
block() | ||
} | ||
} | ||
|
||
private fun ReceiveChannel<Frame>.messages(): Flow<SubscribeMessage> { | ||
return receiveAsFlow() | ||
.mapNotNull { frame -> | ||
when (frame) { | ||
// zstd-compressed json | ||
is Frame.Binary -> decompressZstd(frame.data) | ||
// raw json | ||
is Frame.Text -> frame.data | ||
// ignored | ||
is Frame.Close, | ||
is Frame.Ping, | ||
is Frame.Pong -> null | ||
else -> null | ||
} | ||
} | ||
.map { data -> | ||
BlueskyJson.decodeFromString( | ||
deserializer = SubscribeMessage.serializer(), | ||
string = data.decodeToString(), | ||
) | ||
} | ||
.catch { it.printStackTrace() } | ||
} | ||
|
||
private inner class DefaultSubscriptionContext( | ||
private val sourcedMessages: MutableSharedFlow<SubscribeSourcedMessage>, | ||
override val messages: Flow<SubscribeMessage>, | ||
) : SubscriptionContext { | ||
override suspend fun updateSubscription(message: SubscribeOptionsUpdate) { | ||
sourcedMessages.emit( | ||
SubscribeSourcedMessage( | ||
type = "options_update", | ||
payload = JsonContent.encodeFrom(message), | ||
) | ||
) | ||
} | ||
} | ||
} |
Oops, something went wrong.