From 46f22b061f6873dac60fce649453db608a396102 Mon Sep 17 00:00:00 2001 From: konstantiniiv Date: Thu, 20 Nov 2025 10:52:21 +0100 Subject: [PATCH 1/9] DROID-4127 new command --- .../com/anytypeio/anytype/core_models/Command.kt | 15 +++++++++++++++ .../data/auth/repo/block/BlockDataRepository.kt | 4 ++++ .../anytype/data/auth/repo/block/BlockRemote.kt | 4 ++++ .../anytype/domain/block/repo/BlockRepository.kt | 9 +++++++++ .../anytype/middleware/block/BlockMiddleware.kt | 6 ++++++ 5 files changed, 38 insertions(+) diff --git a/core-models/src/main/java/com/anytypeio/anytype/core_models/Command.kt b/core-models/src/main/java/com/anytypeio/anytype/core_models/Command.kt index 56cebb6383..5f334bfa0b 100644 --- a/core-models/src/main/java/com/anytypeio/anytype/core_models/Command.kt +++ b/core-models/src/main/java/com/anytypeio/anytype/core_models/Command.kt @@ -606,6 +606,21 @@ sealed class Command { } } + /** + * Subscribe to search results across multiple spaces. + * Unlike regular subscriptions that are scoped to a single space, + * this allows searching and subscribing to objects across all user spaces. + */ + data class CrossSpaceSearchSubscribe( + val subscription: Id, + val filters: List = emptyList(), + val sorts: List = emptyList(), + val keys: List, + val source: List = emptyList(), + val noDepSubscription: Boolean = false, + val collectionId: Id? = null + ) : Command() + data class ProcessCancel( val processId: Id ) : Command() diff --git a/data/src/main/java/com/anytypeio/anytype/data/auth/repo/block/BlockDataRepository.kt b/data/src/main/java/com/anytypeio/anytype/data/auth/repo/block/BlockDataRepository.kt index bed8d77a1d..a2712e18bb 100644 --- a/data/src/main/java/com/anytypeio/anytype/data/auth/repo/block/BlockDataRepository.kt +++ b/data/src/main/java/com/anytypeio/anytype/data/auth/repo/block/BlockDataRepository.kt @@ -462,6 +462,10 @@ class BlockDataRepository( keys = keys ) + override suspend fun crossSpaceSearchSubscribe( + command: Command.CrossSpaceSearchSubscribe + ): SearchResult = remote.crossSpaceSearchSubscribe(command) + override suspend fun cancelObjectSearchSubscription( subscriptions: List ) = remote.cancelObjectSearchSubscription(subscriptions).also { diff --git a/data/src/main/java/com/anytypeio/anytype/data/auth/repo/block/BlockRemote.kt b/data/src/main/java/com/anytypeio/anytype/data/auth/repo/block/BlockRemote.kt index daa4443fb3..9fbac23ad3 100644 --- a/data/src/main/java/com/anytypeio/anytype/data/auth/repo/block/BlockRemote.kt +++ b/data/src/main/java/com/anytypeio/anytype/data/auth/repo/block/BlockRemote.kt @@ -195,6 +195,10 @@ interface BlockRemote { keys: List ): SearchResult + suspend fun crossSpaceSearchSubscribe( + command: Command.CrossSpaceSearchSubscribe + ): SearchResult + suspend fun cancelObjectSearchSubscription(subscriptions: List) suspend fun addRelationToObject(ctx: Id, relation: Key): Payload? diff --git a/domain/src/main/java/com/anytypeio/anytype/domain/block/repo/BlockRepository.kt b/domain/src/main/java/com/anytypeio/anytype/domain/block/repo/BlockRepository.kt index 2bd3e6c600..d51069b4fd 100644 --- a/domain/src/main/java/com/anytypeio/anytype/domain/block/repo/BlockRepository.kt +++ b/domain/src/main/java/com/anytypeio/anytype/domain/block/repo/BlockRepository.kt @@ -236,6 +236,15 @@ interface BlockRepository { keys: List ): SearchResult + /** + * Subscribe to search results across all user spaces. + * Unlike searchObjectsWithSubscription which is scoped to a single space, + * this searches across all spaces globally. + */ + suspend fun crossSpaceSearchSubscribe( + command: Command.CrossSpaceSearchSubscribe + ): SearchResult + suspend fun cancelObjectSearchSubscription(subscriptions: List) suspend fun addRelationToObject(ctx: Id, relation: Key): Payload? diff --git a/middleware/src/main/java/com/anytypeio/anytype/middleware/block/BlockMiddleware.kt b/middleware/src/main/java/com/anytypeio/anytype/middleware/block/BlockMiddleware.kt index 8aa9cd7b1b..67cfe10c26 100644 --- a/middleware/src/main/java/com/anytypeio/anytype/middleware/block/BlockMiddleware.kt +++ b/middleware/src/main/java/com/anytypeio/anytype/middleware/block/BlockMiddleware.kt @@ -422,6 +422,12 @@ class BlockMiddleware( keys = keys ) + override suspend fun crossSpaceSearchSubscribe( + command: Command.CrossSpaceSearchSubscribe + ): SearchResult = middleware.objectCrossSpaceSearchSubscribe( + command + ) + override suspend fun cancelObjectSearchSubscription( subscriptions: List ) = middleware.objectSearchUnsubscribe(subscriptions = subscriptions) From a9a38d4cb5c7c2bfc37c05a422438c45bd66094d Mon Sep 17 00:00:00 2001 From: konstantiniiv Date: Thu, 20 Nov 2025 12:42:24 +0100 Subject: [PATCH 2/9] DROID-4127 mw --- .../anytypeio/anytype/core_models/Command.kt | 1 - .../auth/repo/block/BlockDataRepository.kt | 4 ++ .../data/auth/repo/block/BlockRemote.kt | 2 + .../domain/block/repo/BlockRepository.kt | 2 + .../middleware/block/BlockMiddleware.kt | 4 ++ .../middleware/interactor/Middleware.kt | 44 +++++++++++++++++++ .../middleware/service/MiddlewareService.kt | 6 +++ .../MiddlewareServiceImplementation.kt | 26 +++++++++++ 8 files changed, 88 insertions(+), 1 deletion(-) diff --git a/core-models/src/main/java/com/anytypeio/anytype/core_models/Command.kt b/core-models/src/main/java/com/anytypeio/anytype/core_models/Command.kt index 5f334bfa0b..fc9862ae36 100644 --- a/core-models/src/main/java/com/anytypeio/anytype/core_models/Command.kt +++ b/core-models/src/main/java/com/anytypeio/anytype/core_models/Command.kt @@ -5,7 +5,6 @@ import com.anytypeio.anytype.core_models.chats.NotificationState import com.anytypeio.anytype.core_models.membership.MembershipPaymentMethod import com.anytypeio.anytype.core_models.membership.NameServiceNameType import com.anytypeio.anytype.core_models.multiplayer.SpaceMemberPermissions -import com.anytypeio.anytype.core_models.multiplayer.SpaceUxType import com.anytypeio.anytype.core_models.primitives.SpaceId import com.anytypeio.anytype.core_models.primitives.TypeKey diff --git a/data/src/main/java/com/anytypeio/anytype/data/auth/repo/block/BlockDataRepository.kt b/data/src/main/java/com/anytypeio/anytype/data/auth/repo/block/BlockDataRepository.kt index a2712e18bb..03159ffc1c 100644 --- a/data/src/main/java/com/anytypeio/anytype/data/auth/repo/block/BlockDataRepository.kt +++ b/data/src/main/java/com/anytypeio/anytype/data/auth/repo/block/BlockDataRepository.kt @@ -466,6 +466,10 @@ class BlockDataRepository( command: Command.CrossSpaceSearchSubscribe ): SearchResult = remote.crossSpaceSearchSubscribe(command) + override suspend fun objectCrossSpaceUnsubscribe(subscription: String) { + remote.objectCrossSpaceUnsubscribe(subscription) + } + override suspend fun cancelObjectSearchSubscription( subscriptions: List ) = remote.cancelObjectSearchSubscription(subscriptions).also { diff --git a/data/src/main/java/com/anytypeio/anytype/data/auth/repo/block/BlockRemote.kt b/data/src/main/java/com/anytypeio/anytype/data/auth/repo/block/BlockRemote.kt index 9fbac23ad3..399dc46306 100644 --- a/data/src/main/java/com/anytypeio/anytype/data/auth/repo/block/BlockRemote.kt +++ b/data/src/main/java/com/anytypeio/anytype/data/auth/repo/block/BlockRemote.kt @@ -199,6 +199,8 @@ interface BlockRemote { command: Command.CrossSpaceSearchSubscribe ): SearchResult + suspend fun objectCrossSpaceUnsubscribe(subscription: String) + suspend fun cancelObjectSearchSubscription(subscriptions: List) suspend fun addRelationToObject(ctx: Id, relation: Key): Payload? diff --git a/domain/src/main/java/com/anytypeio/anytype/domain/block/repo/BlockRepository.kt b/domain/src/main/java/com/anytypeio/anytype/domain/block/repo/BlockRepository.kt index d51069b4fd..eb8f77b380 100644 --- a/domain/src/main/java/com/anytypeio/anytype/domain/block/repo/BlockRepository.kt +++ b/domain/src/main/java/com/anytypeio/anytype/domain/block/repo/BlockRepository.kt @@ -245,6 +245,8 @@ interface BlockRepository { command: Command.CrossSpaceSearchSubscribe ): SearchResult + suspend fun objectCrossSpaceUnsubscribe(subscription: String) + suspend fun cancelObjectSearchSubscription(subscriptions: List) suspend fun addRelationToObject(ctx: Id, relation: Key): Payload? diff --git a/middleware/src/main/java/com/anytypeio/anytype/middleware/block/BlockMiddleware.kt b/middleware/src/main/java/com/anytypeio/anytype/middleware/block/BlockMiddleware.kt index 67cfe10c26..a1a4b44ab5 100644 --- a/middleware/src/main/java/com/anytypeio/anytype/middleware/block/BlockMiddleware.kt +++ b/middleware/src/main/java/com/anytypeio/anytype/middleware/block/BlockMiddleware.kt @@ -428,6 +428,10 @@ class BlockMiddleware( command ) + override suspend fun objectCrossSpaceUnsubscribe(subscription: String) { + middleware.objectCrossSpaceUnsubscribe(subscription) + } + override suspend fun cancelObjectSearchSubscription( subscriptions: List ) = middleware.objectSearchUnsubscribe(subscriptions = subscriptions) diff --git a/middleware/src/main/java/com/anytypeio/anytype/middleware/interactor/Middleware.kt b/middleware/src/main/java/com/anytypeio/anytype/middleware/interactor/Middleware.kt index fe7ecd5c3c..a5fe916a55 100644 --- a/middleware/src/main/java/com/anytypeio/anytype/middleware/interactor/Middleware.kt +++ b/middleware/src/main/java/com/anytypeio/anytype/middleware/interactor/Middleware.kt @@ -42,6 +42,7 @@ import com.anytypeio.anytype.core_models.Url import com.anytypeio.anytype.core_models.WidgetLayout import com.anytypeio.anytype.core_models.chats.Chat import com.anytypeio.anytype.core_models.chats.NotificationState +import com.anytypeio.anytype.core_models.ext.isValidObject import com.anytypeio.anytype.core_models.history.DiffVersionResponse import com.anytypeio.anytype.core_models.history.ShowVersionResponse import com.anytypeio.anytype.core_models.history.Version @@ -1380,6 +1381,49 @@ class Middleware @Inject constructor( ) } + @Throws(Exception::class) + fun objectCrossSpaceSearchSubscribe( + command: Command.CrossSpaceSearchSubscribe + ): SearchResult { + val request = Rpc.Object.CrossSpaceSearchSubscribe.Request( + subId = command.subscription, + filters = command.filters.map { it.toMiddlewareModel() }, + sorts = command.sorts.map { it.toMiddlewareModel() }, + keys = command.keys, + source = command.source, + noDepSubscription = command.noDepSubscription, + collectionId = command.collectionId.orEmpty() + ) + logRequestIfDebug(request) + val (response, time) = measureTimedValue { service.objectCrossSpaceSubscribe(request) } + logResponseIfDebug(response, time) + return SearchResult( + results = response.records.mapNotNull { record -> + if (record != null && record.isNotEmpty() && record.isValidObject()) + ObjectWrapper.Basic(record) + else + null + }, + dependencies = response.dependencies.mapNotNull { record -> + if (record != null && record.isNotEmpty() && record.isValidObject()) + ObjectWrapper.Basic(record) + else + null + }, + counter = response.counters?.parse() + ) + } + + @Throws(Exception::class) + fun objectCrossSpaceUnsubscribe(subscription: String) { + val request = Rpc.Object.CrossSpaceSearchUnsubscribe.Request( + subId = subscription + ) + logRequestIfDebug(request) + val (response, time) = measureTimedValue { service.objectCrossSpaceUnsubscribe(request) } + logResponseIfDebug(response, time) + } + @Throws(Exception::class) fun objectSearchUnsubscribe(subscriptions: List) { val request = Rpc.Object.SearchUnsubscribe.Request( diff --git a/middleware/src/main/java/com/anytypeio/anytype/middleware/service/MiddlewareService.kt b/middleware/src/main/java/com/anytypeio/anytype/middleware/service/MiddlewareService.kt index a4e18c8528..f2ed595299 100644 --- a/middleware/src/main/java/com/anytypeio/anytype/middleware/service/MiddlewareService.kt +++ b/middleware/src/main/java/com/anytypeio/anytype/middleware/service/MiddlewareService.kt @@ -105,6 +105,12 @@ interface MiddlewareService { @Throws(Exception::class) fun objectSearchSubscribe(request: Rpc.Object.SearchSubscribe.Request): Rpc.Object.SearchSubscribe.Response + @Throws(Exception::class) + fun objectCrossSpaceSubscribe(request: Rpc.Object.CrossSpaceSearchSubscribe.Request): Rpc.Object.CrossSpaceSearchSubscribe.Response + + @Throws(Exception::class) + fun objectCrossSpaceUnsubscribe(request: Rpc.Object.CrossSpaceSearchUnsubscribe.Request): Rpc.Object.CrossSpaceSearchUnsubscribe.Response + @Throws(Exception::class) fun objectSearchUnsubscribe(request: Rpc.Object.SearchUnsubscribe.Request): Rpc.Object.SearchUnsubscribe.Response diff --git a/middleware/src/main/java/com/anytypeio/anytype/middleware/service/MiddlewareServiceImplementation.kt b/middleware/src/main/java/com/anytypeio/anytype/middleware/service/MiddlewareServiceImplementation.kt index 0b15cfd982..81717ee22e 100644 --- a/middleware/src/main/java/com/anytypeio/anytype/middleware/service/MiddlewareServiceImplementation.kt +++ b/middleware/src/main/java/com/anytypeio/anytype/middleware/service/MiddlewareServiceImplementation.kt @@ -2896,4 +2896,30 @@ class MiddlewareServiceImplementation @Inject constructor( return response } } + + override fun objectCrossSpaceSubscribe(request: Rpc.Object.CrossSpaceSearchSubscribe.Request): Rpc.Object.CrossSpaceSearchSubscribe.Response { + val encoded = Service.objectCrossSpaceSearchSubscribe( + Rpc.Object.CrossSpaceSearchSubscribe.Request.ADAPTER.encode(request) + ) + val response = Rpc.Object.CrossSpaceSearchSubscribe.Response.ADAPTER.decode(encoded) + val error = response.error + if (error != null && error.code != Rpc.Object.CrossSpaceSearchSubscribe.Response.Error.Code.NULL) { + throw Exception(error.description) + } else { + return response + } + } + + override fun objectCrossSpaceUnsubscribe(request: Rpc.Object.CrossSpaceSearchUnsubscribe.Request): Rpc.Object.CrossSpaceSearchUnsubscribe.Response { + val encoded = Service.objectCrossSpaceSearchUnsubscribe( + Rpc.Object.CrossSpaceSearchUnsubscribe.Request.ADAPTER.encode(request) + ) + val response = Rpc.Object.CrossSpaceSearchUnsubscribe.Response.ADAPTER.decode(encoded) + val error = response.error + if (error != null && error.code != Rpc.Object.CrossSpaceSearchUnsubscribe.Response.Error.Code.NULL) { + throw Exception(error.description) + } else { + return response + } + } } From fe5219b0704aedb7bee14a9f139aa18fe4f48b98 Mon Sep 17 00:00:00 2001 From: konstantiniiv Date: Thu, 20 Nov 2025 12:42:40 +0100 Subject: [PATCH 3/9] DROID-4127 di --- .../anytype/di/main/SubscriptionsModule.kt | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/app/src/main/java/com/anytypeio/anytype/di/main/SubscriptionsModule.kt b/app/src/main/java/com/anytypeio/anytype/di/main/SubscriptionsModule.kt index 7264e7ab06..677bca8dc4 100644 --- a/app/src/main/java/com/anytypeio/anytype/di/main/SubscriptionsModule.kt +++ b/app/src/main/java/com/anytypeio/anytype/di/main/SubscriptionsModule.kt @@ -10,6 +10,7 @@ import com.anytypeio.anytype.domain.block.interactor.sets.GetObjectTypes import com.anytypeio.anytype.domain.block.repo.BlockRepository import com.anytypeio.anytype.domain.chats.ChatEventChannel import com.anytypeio.anytype.domain.chats.ChatPreviewContainer +import com.anytypeio.anytype.domain.chats.ChatsSubscriptionContainer import com.anytypeio.anytype.domain.config.ConfigStorage import com.anytypeio.anytype.domain.debugging.DebugAccountSelectTrace import com.anytypeio.anytype.domain.debugging.Logger @@ -17,6 +18,7 @@ import com.anytypeio.anytype.domain.deeplink.PendingIntentStore import com.anytypeio.anytype.domain.device.DeviceTokenStoringService import com.anytypeio.anytype.domain.device.NetworkConnectionStatus import com.anytypeio.anytype.domain.event.interactor.SpaceSyncAndP2PStatusProvider +import com.anytypeio.anytype.domain.library.CrossSpaceSubscriptionContainer import com.anytypeio.anytype.domain.library.StorelessSubscriptionContainer import com.anytypeio.anytype.domain.misc.DeepLinkResolver import com.anytypeio.anytype.domain.multiplayer.ActiveSpaceMemberSubscriptionContainer @@ -174,6 +176,34 @@ object SubscriptionsModule { config = configStorage ) + @JvmStatic + @Provides + @Singleton + fun chatsViewSubscriptionContainer( + dispatchers: AppCoroutineDispatchers, + @Named(DEFAULT_APP_COROUTINE_SCOPE) scope: CoroutineScope, + container: CrossSpaceSubscriptionContainer, + awaitAccountStartManager: AwaitAccountStartManager, + logger: Logger, + ): ChatsSubscriptionContainer = ChatsSubscriptionContainer.Default( + dispatchers = dispatchers, + scope = scope, + container = container, + awaitAccountStart = awaitAccountStartManager, + logger = logger, + ) + + @JvmStatic + @Provides + @Singleton + fun provideCrossSpaceSubscriptionContainer( + repo: BlockRepository, + channel: SubscriptionEventChannel, + dispatchers: AppCoroutineDispatchers, + logger: Logger + ): CrossSpaceSubscriptionContainer = + CrossSpaceSubscriptionContainer.Impl(repo, channel, dispatchers, logger) + @JvmStatic @Provides @Singleton From 134699f9e384a719ea4bdc08314d1e4986182b87 Mon Sep 17 00:00:00 2001 From: konstantiniiv Date: Thu, 20 Nov 2025 12:43:51 +0100 Subject: [PATCH 4/9] DROID-4127 cross space subscription container --- .../CrossSpaceSubscriptionContainer.kt | 160 ++++++++++++++++++ .../domain/library/StoreSearchParams.kt | 14 ++ 2 files changed, 174 insertions(+) create mode 100644 domain/src/main/java/com/anytypeio/anytype/domain/library/CrossSpaceSubscriptionContainer.kt diff --git a/domain/src/main/java/com/anytypeio/anytype/domain/library/CrossSpaceSubscriptionContainer.kt b/domain/src/main/java/com/anytypeio/anytype/domain/library/CrossSpaceSubscriptionContainer.kt new file mode 100644 index 0000000000..1b0a798bb3 --- /dev/null +++ b/domain/src/main/java/com/anytypeio/anytype/domain/library/CrossSpaceSubscriptionContainer.kt @@ -0,0 +1,160 @@ +package com.anytypeio.anytype.domain.library + +import com.anytypeio.anytype.core_models.Command +import com.anytypeio.anytype.core_models.Id +import com.anytypeio.anytype.core_models.ObjectWrapper +import com.anytypeio.anytype.core_models.SubscriptionEvent +import com.anytypeio.anytype.domain.base.AppCoroutineDispatchers +import com.anytypeio.anytype.domain.block.repo.BlockRepository +import com.anytypeio.anytype.domain.debugging.Logger +import com.anytypeio.anytype.domain.library.processors.EventAddProcessor +import com.anytypeio.anytype.domain.library.processors.EventAmendProcessor +import com.anytypeio.anytype.domain.library.processors.EventPositionProcessor +import com.anytypeio.anytype.domain.library.processors.EventRemoveProcessor +import com.anytypeio.anytype.domain.library.processors.EventSetProcessor +import com.anytypeio.anytype.domain.library.processors.EventUnsetProcessor +import com.anytypeio.anytype.domain.search.SubscriptionEventChannel +import javax.inject.Inject +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.scan + +/** + * Subscription container for cross-space searches. + * Unlike StorelessSubscriptionContainer which is scoped to a single space, + * this allows subscribing to objects across all user spaces. + */ +interface CrossSpaceSubscriptionContainer { + + fun subscribe(searchParams: CrossSpaceSearchParams): Flow> + suspend fun unsubscribe(subscription: Id) + + class Impl @Inject constructor( + private val repo: BlockRepository, + private val channel: SubscriptionEventChannel, + private val dispatchers: AppCoroutineDispatchers, + private val logger: Logger + ) : CrossSpaceSubscriptionContainer { + + private val addEventProcessor by lazy { EventAddProcessor() } + private val unsetEventProcessor by lazy { EventUnsetProcessor() } + private val removeEventProcessor by lazy { EventRemoveProcessor() } + private val setEventProcessor by lazy { EventSetProcessor() } + private val amendEventProcessor by lazy { EventAmendProcessor(logger = logger) } + private val positionEventProcessor by lazy { EventPositionProcessor() } + + private fun subscribe(subscriptions: List) = + channel.subscribe(subscriptions).map { payload -> + payload.sortedBy { event -> + when (event) { + is SubscriptionEvent.Add -> 1 + is SubscriptionEvent.Remove -> 2 + is SubscriptionEvent.Set -> 3 + is SubscriptionEvent.Amend -> 4 + is SubscriptionEvent.Unset -> 5 + is SubscriptionEvent.Position -> 6 + is SubscriptionEvent.Counter -> 7 + } + } + } + + override fun subscribe(searchParams: CrossSpaceSearchParams): Flow> = + flow { + with(searchParams) { + val command = Command.CrossSpaceSearchSubscribe( + subscription = subscription, + filters = filters, + sorts = sorts, + keys = keys, + source = source, + noDepSubscription = true, + collectionId = collection + ) + val initial = repo.crossSpaceSearchSubscribe(command).results.map { + SubscriptionObject( + id = it.id, objectWrapper = it + ) + }.toMutableList() + emitAll( + buildObjectsFlow( + subscription = searchParams.subscription, + initial = initial + ) + ) + } + }.catch { + logger.logException( + it, + "Error in cross-space subscription container :[${searchParams.subscription}]" + ) + }.flowOn(dispatchers.io) + + private fun buildObjectsFlow( + subscription: Id, + initial: MutableList + ): Flow> { + val objectsFlow = subscribe(listOf(subscription)).scan(initial) { dataItems, payload -> + var result = dataItems + payload.forEach { event -> + when (event) { + is SubscriptionEvent.Add -> { + if (event.subscription == subscription) { + result = addEventProcessor.process(event, result) + } + } + + is SubscriptionEvent.Amend -> { + if (event.subscriptions.contains(subscription)) { + result = amendEventProcessor.process(event, result) + } + } + + is SubscriptionEvent.Position -> { + result = positionEventProcessor.process(event, result) + } + + is SubscriptionEvent.Remove -> { + if (event.subscription == subscription) { + result = removeEventProcessor.process(event, result) + } + } + + is SubscriptionEvent.Set -> { + if (event.subscriptions.contains(subscription)) { + result = setEventProcessor.process(event, result) + } + } + + is SubscriptionEvent.Unset -> { + if (event.subscriptions.contains(subscription)) { + result = unsetEventProcessor.process(event, result) + } + } + + else -> { + logger.logWarning("Ignoring subscription event") + } + } + } + result + }.map { result -> + result.mapNotNull { item -> + if (item.objectWrapper?.isValid == true) { + item.objectWrapper + } else { + null + } + } + } + return objectsFlow + } + + override suspend fun unsubscribe(subscription: Id) { + repo.objectCrossSpaceUnsubscribe(subscription) + } + } +} diff --git a/domain/src/main/java/com/anytypeio/anytype/domain/library/StoreSearchParams.kt b/domain/src/main/java/com/anytypeio/anytype/domain/library/StoreSearchParams.kt index 4084b74c7f..70ce0b6eb6 100644 --- a/domain/src/main/java/com/anytypeio/anytype/domain/library/StoreSearchParams.kt +++ b/domain/src/main/java/com/anytypeio/anytype/domain/library/StoreSearchParams.kt @@ -23,4 +23,18 @@ data class StoreSearchByIdsParams( val subscription: Id, val keys: List, val targets: List, +) + +/** + * Parameters for cross-space search subscription. + * Unlike StoreSearchParams, this doesn't have a space parameter + * since it searches across all spaces. + */ +data class CrossSpaceSearchParams( + val subscription: Id, + val sorts: List = emptyList(), + val filters: List = emptyList(), + val keys: List = emptyList(), + val source: List = emptyList(), + val collection: Id? = null ) \ No newline at end of file From b2593f6a13a92a488d5b3089fc0f22afdd41ebdb Mon Sep 17 00:00:00 2001 From: konstantiniiv Date: Thu, 20 Nov 2025 13:00:34 +0100 Subject: [PATCH 5/9] DROID-4127 kdoc --- .../CrossSpaceSubscriptionContainer.kt | 47 +++++++++++++++++-- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/domain/src/main/java/com/anytypeio/anytype/domain/library/CrossSpaceSubscriptionContainer.kt b/domain/src/main/java/com/anytypeio/anytype/domain/library/CrossSpaceSubscriptionContainer.kt index 1b0a798bb3..6abd02e91f 100644 --- a/domain/src/main/java/com/anytypeio/anytype/domain/library/CrossSpaceSubscriptionContainer.kt +++ b/domain/src/main/java/com/anytypeio/anytype/domain/library/CrossSpaceSubscriptionContainer.kt @@ -24,9 +24,50 @@ import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.scan /** - * Subscription container for cross-space searches. - * Unlike StorelessSubscriptionContainer which is scoped to a single space, - * this allows subscribing to objects across all user spaces. + * Storeless subscription container for cross-space searches. + * + * ## What "Storeless" Means + * + * This container does NOT maintain persistent state between subscriptions: + * - **No instance-level caching** of subscription results + * - **State exists only within the Flow scope** using the `scan` operator + * - **Each subscribe() call creates a fresh, independent Flow** with isolated state + * - **State is automatically garbage collected** when the Flow collection ends + * + * ## Benefits of Storeless Design + * + * - **Multiple concurrent subscriptions** without state conflicts + * - **Automatic cleanup** when subscribers stop collecting + * - **Memory efficiency** - no leaked state or stale data + * - **Thread safety** - no shared mutable state to synchronize + * + * ## Cross-Space vs Single-Space + * + * Unlike [StorelessSubscriptionContainer] which is scoped to a single space, + * this container searches and subscribes to objects **across all user spaces** using + * the `crossSpaceSearchSubscribe` middleware API. + * + * ## Flow Lifecycle + * + * 1. Subscribe with [CrossSpaceSearchParams] (filters, sorts, keys) + * 2. Initial results fetched via [BlockRepository.crossSpaceSearchSubscribe] + * 3. Results mapped to internal state ([SubscriptionObject] list) + * 4. State accumulated via `scan` operator processing [SubscriptionEvent]s from [SubscriptionEventChannel] + * 5. Final results emitted as [ObjectWrapper.Basic] list (only valid objects) + * 6. **State discarded** when Flow collection ends + * + * ## Event Processing Order + * + * Events are processed in priority order to ensure consistency: + * 1. **Add** - New objects added to subscription + * 2. **Remove** - Objects removed from subscription + * 3. **Set** - Object field values set + * 4. **Amend** - Object field values updated + * 5. **Unset** - Object field values cleared + * 6. **Position** - Object positions reordered + * 7. **Counter** - Counters updated (ignored by this container) + * @see StorelessSubscriptionContainer for single-space subscriptions + * @see SubscriptionObject internal state representation */ interface CrossSpaceSubscriptionContainer { From ff7b1c644438e281d950ed583a2e8414e1ec4cd0 Mon Sep 17 00:00:00 2001 From: konstantiniiv Date: Thu, 20 Nov 2025 13:01:24 +0100 Subject: [PATCH 6/9] DROID-4127 chats subscription --- .../chats/ChatsSubscriptionContainer.kt | 164 ++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 domain/src/main/java/com/anytypeio/anytype/domain/chats/ChatsSubscriptionContainer.kt diff --git a/domain/src/main/java/com/anytypeio/anytype/domain/chats/ChatsSubscriptionContainer.kt b/domain/src/main/java/com/anytypeio/anytype/domain/chats/ChatsSubscriptionContainer.kt new file mode 100644 index 0000000000..2a7d279fc3 --- /dev/null +++ b/domain/src/main/java/com/anytypeio/anytype/domain/chats/ChatsSubscriptionContainer.kt @@ -0,0 +1,164 @@ +package com.anytypeio.anytype.domain.chats + +import com.anytypeio.anytype.core_models.DVFilter +import com.anytypeio.anytype.core_models.DVFilterCondition +import com.anytypeio.anytype.core_models.Id +import com.anytypeio.anytype.core_models.ObjectType +import com.anytypeio.anytype.core_models.ObjectWrapper +import com.anytypeio.anytype.core_models.Relations +import com.anytypeio.anytype.domain.account.AwaitAccountStartManager +import com.anytypeio.anytype.domain.base.AppCoroutineDispatchers +import com.anytypeio.anytype.domain.debugging.Logger +import com.anytypeio.anytype.domain.library.CrossSpaceSearchParams +import com.anytypeio.anytype.domain.library.CrossSpaceSubscriptionContainer +import javax.inject.Inject +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.mapNotNull +import kotlinx.coroutines.launch + +/** + * Container for subscribing to and observing all chat objects (Layout.CHAT_DERIVED). + * Maintains a global subscription to chat objects across all spaces. + */ +interface ChatsSubscriptionContainer { + + fun start() + fun stop() + fun observe(): Flow> + fun observe(chatId: Id): Flow + + fun get(): List + fun get(chatId: Id): ObjectWrapper.Basic? + + class Default @Inject constructor( + private val container: CrossSpaceSubscriptionContainer, + private val scope: CoroutineScope, + private val dispatchers: AppCoroutineDispatchers, + private val awaitAccountStart: AwaitAccountStartManager, + private val logger: Logger + ) : ChatsSubscriptionContainer { + + private val data = MutableStateFlow>(emptyList()) + private val jobs = mutableListOf() + + init { + logger.logInfo("ChatsSubscriptionContainer initialized") + scope.launch { + awaitAccountStart.state().collect { state -> + when (state) { + AwaitAccountStartManager.State.Init -> { + logger.logInfo("AwaitAccountStartManager.State.Init - waiting for account start") + // Do nothing + } + + AwaitAccountStartManager.State.Started -> { + logger.logInfo("AwaitAccountStartManager.State.Started - starting chat subscription") + start() + } + + AwaitAccountStartManager.State.Stopped -> { + logger.logInfo("AwaitAccountStartManager.State.Stopped - stopping chat subscription") + stop() + } + } + } + } + } + + override fun observe(): Flow> { + return data + } + + override fun observe(chatId: Id): Flow { + return data.mapNotNull { all -> + all.firstOrNull { chat -> chat.id == chatId } + } + } + + override fun get(): List { + return data.value + } + + override fun get(chatId: Id): ObjectWrapper.Basic? { + return data.value.find { chat -> chat.id == chatId } + } + + override fun start() { + logger.logInfo("Starting ChatsSubscriptionContainer") + jobs += scope.launch(dispatchers.io) { + proceedWithSubscription() + } + } + + private suspend fun proceedWithSubscription() { + container.subscribe( + CrossSpaceSearchParams( + subscription = GLOBAL_CHATS_SUBSCRIPTION, + keys = listOf( + Relations.ID, + Relations.NAME, + Relations.PLURAL_NAME + ), + filters = listOf( + DVFilter( + relation = Relations.LAYOUT, + value = listOf(ObjectType.Layout.CHAT_DERIVED.code.toDouble()), + condition = DVFilterCondition.IN + ), + DVFilter( + relation = Relations.IS_ARCHIVED, + condition = DVFilterCondition.NOT_EQUAL, + value = true + ), + DVFilter( + relation = Relations.IS_HIDDEN, + condition = DVFilterCondition.NOT_EQUAL, + value = true + ), + DVFilter( + relation = Relations.IS_DELETED, + condition = DVFilterCondition.NOT_EQUAL, + value = true + ), + DVFilter( + relation = Relations.IS_HIDDEN_DISCOVERY, + condition = DVFilterCondition.NOT_EQUAL, + value = true + ) + ) + ) + ).catch { error -> + logger.logException( + e = error, + msg = "Failed to subscribe to chats" + ) + }.collect { + data.value = it + } + } + + override fun stop() { + logger.logInfo("Stopping ChatsSubscriptionContainer") + jobs.forEach { it.cancel() } + scope.launch(dispatchers.io) { + runCatching { + container.unsubscribe(GLOBAL_CHATS_SUBSCRIPTION) + }.onFailure { error -> + logger.logException( + e = error, + msg = "Failed to unsubscribe from $GLOBAL_CHATS_SUBSCRIPTION" + ) + } + .onSuccess { logger.logInfo("Successfully unsubscribed from $GLOBAL_CHATS_SUBSCRIPTION") } + } + } + + companion object { + const val GLOBAL_CHATS_SUBSCRIPTION = "subscription.global.chats" + } + } +} From 0a4912cb3d8f454bc6a9aeb176dc0cecc0096e48 Mon Sep 17 00:00:00 2001 From: konstantiniiv Date: Thu, 20 Nov 2025 13:01:44 +0100 Subject: [PATCH 7/9] DROID-4127 di --- .../main/java/com/anytypeio/anytype/di/feature/vault/VaultDI.kt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/app/src/main/java/com/anytypeio/anytype/di/feature/vault/VaultDI.kt b/app/src/main/java/com/anytypeio/anytype/di/feature/vault/VaultDI.kt index d95446eeda..094077be21 100644 --- a/app/src/main/java/com/anytypeio/anytype/di/feature/vault/VaultDI.kt +++ b/app/src/main/java/com/anytypeio/anytype/di/feature/vault/VaultDI.kt @@ -29,6 +29,7 @@ import com.anytypeio.anytype.domain.search.ProfileSubscriptionManager import com.anytypeio.anytype.domain.wallpaper.GetSpaceWallpapers import com.anytypeio.anytype.domain.workspace.SpaceManager import com.anytypeio.anytype.core_utils.tools.AppInfo +import com.anytypeio.anytype.domain.chats.ChatsSubscriptionContainer import com.anytypeio.anytype.other.DefaultSpaceInviteResolver import com.anytypeio.anytype.presentation.navigation.DeepLinkToObjectDelegate import com.anytypeio.anytype.presentation.notifications.NotificationPermissionManager @@ -122,4 +123,5 @@ interface VaultComponentDependencies : ComponentDependencies { fun provideVaultChatPreviewContainer(): ChatPreviewContainer fun appInfo(): AppInfo @Named(DEFAULT_APP_COROUTINE_SCOPE) fun scope(): CoroutineScope + fun chatSubscriptionContainer(): ChatsSubscriptionContainer } \ No newline at end of file From 558cf6a6efc3c06c24e3e53fcd15997761e3063a Mon Sep 17 00:00:00 2001 From: konstantiniiv Date: Thu, 20 Nov 2025 13:18:13 +0100 Subject: [PATCH 8/9] DROID-4127 fixes --- .../anytype/di/feature/vault/VaultDI.kt | 4 ++-- .../anytype/di/main/SubscriptionsModule.kt | 4 ++-- ...kt => ChatsDetailsSubscriptionContainer.kt} | 18 +++++++++--------- 3 files changed, 13 insertions(+), 13 deletions(-) rename domain/src/main/java/com/anytypeio/anytype/domain/chats/{ChatsSubscriptionContainer.kt => ChatsDetailsSubscriptionContainer.kt} (90%) diff --git a/app/src/main/java/com/anytypeio/anytype/di/feature/vault/VaultDI.kt b/app/src/main/java/com/anytypeio/anytype/di/feature/vault/VaultDI.kt index 094077be21..8e1995610c 100644 --- a/app/src/main/java/com/anytypeio/anytype/di/feature/vault/VaultDI.kt +++ b/app/src/main/java/com/anytypeio/anytype/di/feature/vault/VaultDI.kt @@ -29,7 +29,7 @@ import com.anytypeio.anytype.domain.search.ProfileSubscriptionManager import com.anytypeio.anytype.domain.wallpaper.GetSpaceWallpapers import com.anytypeio.anytype.domain.workspace.SpaceManager import com.anytypeio.anytype.core_utils.tools.AppInfo -import com.anytypeio.anytype.domain.chats.ChatsSubscriptionContainer +import com.anytypeio.anytype.domain.chats.ChatsDetailsSubscriptionContainer import com.anytypeio.anytype.other.DefaultSpaceInviteResolver import com.anytypeio.anytype.presentation.navigation.DeepLinkToObjectDelegate import com.anytypeio.anytype.presentation.notifications.NotificationPermissionManager @@ -123,5 +123,5 @@ interface VaultComponentDependencies : ComponentDependencies { fun provideVaultChatPreviewContainer(): ChatPreviewContainer fun appInfo(): AppInfo @Named(DEFAULT_APP_COROUTINE_SCOPE) fun scope(): CoroutineScope - fun chatSubscriptionContainer(): ChatsSubscriptionContainer + fun chatSubscriptionContainer(): ChatsDetailsSubscriptionContainer } \ No newline at end of file diff --git a/app/src/main/java/com/anytypeio/anytype/di/main/SubscriptionsModule.kt b/app/src/main/java/com/anytypeio/anytype/di/main/SubscriptionsModule.kt index 677bca8dc4..8fec97e372 100644 --- a/app/src/main/java/com/anytypeio/anytype/di/main/SubscriptionsModule.kt +++ b/app/src/main/java/com/anytypeio/anytype/di/main/SubscriptionsModule.kt @@ -10,7 +10,7 @@ import com.anytypeio.anytype.domain.block.interactor.sets.GetObjectTypes import com.anytypeio.anytype.domain.block.repo.BlockRepository import com.anytypeio.anytype.domain.chats.ChatEventChannel import com.anytypeio.anytype.domain.chats.ChatPreviewContainer -import com.anytypeio.anytype.domain.chats.ChatsSubscriptionContainer +import com.anytypeio.anytype.domain.chats.ChatsDetailsSubscriptionContainer import com.anytypeio.anytype.domain.config.ConfigStorage import com.anytypeio.anytype.domain.debugging.DebugAccountSelectTrace import com.anytypeio.anytype.domain.debugging.Logger @@ -185,7 +185,7 @@ object SubscriptionsModule { container: CrossSpaceSubscriptionContainer, awaitAccountStartManager: AwaitAccountStartManager, logger: Logger, - ): ChatsSubscriptionContainer = ChatsSubscriptionContainer.Default( + ): ChatsDetailsSubscriptionContainer = ChatsDetailsSubscriptionContainer.Default( dispatchers = dispatchers, scope = scope, container = container, diff --git a/domain/src/main/java/com/anytypeio/anytype/domain/chats/ChatsSubscriptionContainer.kt b/domain/src/main/java/com/anytypeio/anytype/domain/chats/ChatsDetailsSubscriptionContainer.kt similarity index 90% rename from domain/src/main/java/com/anytypeio/anytype/domain/chats/ChatsSubscriptionContainer.kt rename to domain/src/main/java/com/anytypeio/anytype/domain/chats/ChatsDetailsSubscriptionContainer.kt index 2a7d279fc3..5c0f389875 100644 --- a/domain/src/main/java/com/anytypeio/anytype/domain/chats/ChatsSubscriptionContainer.kt +++ b/domain/src/main/java/com/anytypeio/anytype/domain/chats/ChatsDetailsSubscriptionContainer.kt @@ -24,7 +24,7 @@ import kotlinx.coroutines.launch * Container for subscribing to and observing all chat objects (Layout.CHAT_DERIVED). * Maintains a global subscription to chat objects across all spaces. */ -interface ChatsSubscriptionContainer { +interface ChatsDetailsSubscriptionContainer { fun start() fun stop() @@ -40,13 +40,13 @@ interface ChatsSubscriptionContainer { private val dispatchers: AppCoroutineDispatchers, private val awaitAccountStart: AwaitAccountStartManager, private val logger: Logger - ) : ChatsSubscriptionContainer { + ) : ChatsDetailsSubscriptionContainer { private val data = MutableStateFlow>(emptyList()) private val jobs = mutableListOf() init { - logger.logInfo("ChatsSubscriptionContainer initialized") + logger.logInfo("ChatsDetailsSubscriptionContainer initialized") scope.launch { awaitAccountStart.state().collect { state -> when (state) { @@ -56,12 +56,12 @@ interface ChatsSubscriptionContainer { } AwaitAccountStartManager.State.Started -> { - logger.logInfo("AwaitAccountStartManager.State.Started - starting chat subscription") + logger.logInfo("AwaitAccountStartManager.State.Started - starting chat details subscription") start() } AwaitAccountStartManager.State.Stopped -> { - logger.logInfo("AwaitAccountStartManager.State.Stopped - stopping chat subscription") + logger.logInfo("AwaitAccountStartManager.State.Stopped - stopping chat details subscription") stop() } } @@ -88,7 +88,7 @@ interface ChatsSubscriptionContainer { } override fun start() { - logger.logInfo("Starting ChatsSubscriptionContainer") + logger.logInfo("Starting ChatsDetailsSubscriptionContainer") jobs += scope.launch(dispatchers.io) { proceedWithSubscription() } @@ -134,7 +134,7 @@ interface ChatsSubscriptionContainer { ).catch { error -> logger.logException( e = error, - msg = "Failed to subscribe to chats" + msg = "Failed to subscribe to chats details" ) }.collect { data.value = it @@ -142,7 +142,7 @@ interface ChatsSubscriptionContainer { } override fun stop() { - logger.logInfo("Stopping ChatsSubscriptionContainer") + logger.logInfo("Stopping ChatsDetailsSubscriptionContainer") jobs.forEach { it.cancel() } scope.launch(dispatchers.io) { runCatching { @@ -158,7 +158,7 @@ interface ChatsSubscriptionContainer { } companion object { - const val GLOBAL_CHATS_SUBSCRIPTION = "subscription.global.chats" + const val GLOBAL_CHATS_SUBSCRIPTION = "global-chats-details-subscription" } } } From 2e2abd965670da0154385fe607e6a8dfd0c9da9d Mon Sep 17 00:00:00 2001 From: konstantiniiv Date: Thu, 20 Nov 2025 13:58:36 +0100 Subject: [PATCH 9/9] DROID-4127 log --- .../domain/chats/ChatsDetailsSubscriptionContainer.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/domain/src/main/java/com/anytypeio/anytype/domain/chats/ChatsDetailsSubscriptionContainer.kt b/domain/src/main/java/com/anytypeio/anytype/domain/chats/ChatsDetailsSubscriptionContainer.kt index 5c0f389875..81d9b6d0e4 100644 --- a/domain/src/main/java/com/anytypeio/anytype/domain/chats/ChatsDetailsSubscriptionContainer.kt +++ b/domain/src/main/java/com/anytypeio/anytype/domain/chats/ChatsDetailsSubscriptionContainer.kt @@ -51,17 +51,17 @@ interface ChatsDetailsSubscriptionContainer { awaitAccountStart.state().collect { state -> when (state) { AwaitAccountStartManager.State.Init -> { - logger.logInfo("AwaitAccountStartManager.State.Init - waiting for account start") + logger.logInfo("ChatsDetailsSubscriptionContainer, AwaitAccountStartManager.State.Init - waiting for account start") // Do nothing } AwaitAccountStartManager.State.Started -> { - logger.logInfo("AwaitAccountStartManager.State.Started - starting chat details subscription") + logger.logInfo("ChatsDetailsSubscriptionContainer, AwaitAccountStartManager.State.Started - starting chat details subscription") start() } AwaitAccountStartManager.State.Stopped -> { - logger.logInfo("AwaitAccountStartManager.State.Stopped - stopping chat details subscription") + logger.logInfo("ChatsDetailsSubscriptionContainer, AwaitAccountStartManager.State.Stopped - stopping chat details subscription") stop() } }