Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.anytypeio.anytype.domain.search.ProfileSubscriptionManager
import com.anytypeio.anytype.domain.wallpaper.GetSpaceWallpapers
import com.anytypeio.anytype.domain.workspace.SpaceManager
import com.anytypeio.anytype.core_utils.tools.AppInfo
import com.anytypeio.anytype.domain.chats.ChatsDetailsSubscriptionContainer
import com.anytypeio.anytype.other.DefaultSpaceInviteResolver
import com.anytypeio.anytype.presentation.navigation.DeepLinkToObjectDelegate
import com.anytypeio.anytype.presentation.notifications.NotificationPermissionManager
Expand Down Expand Up @@ -122,4 +123,5 @@ interface VaultComponentDependencies : ComponentDependencies {
fun provideVaultChatPreviewContainer(): ChatPreviewContainer
fun appInfo(): AppInfo
@Named(DEFAULT_APP_COROUTINE_SCOPE) fun scope(): CoroutineScope
fun chatSubscriptionContainer(): ChatsDetailsSubscriptionContainer
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import com.anytypeio.anytype.domain.block.interactor.sets.GetObjectTypes
import com.anytypeio.anytype.domain.block.repo.BlockRepository
import com.anytypeio.anytype.domain.chats.ChatEventChannel
import com.anytypeio.anytype.domain.chats.ChatPreviewContainer
import com.anytypeio.anytype.domain.chats.ChatsDetailsSubscriptionContainer
import com.anytypeio.anytype.domain.config.ConfigStorage
import com.anytypeio.anytype.domain.debugging.DebugAccountSelectTrace
import com.anytypeio.anytype.domain.debugging.Logger
import com.anytypeio.anytype.domain.deeplink.PendingIntentStore
import com.anytypeio.anytype.domain.device.DeviceTokenStoringService
import com.anytypeio.anytype.domain.device.NetworkConnectionStatus
import com.anytypeio.anytype.domain.event.interactor.SpaceSyncAndP2PStatusProvider
import com.anytypeio.anytype.domain.library.CrossSpaceSubscriptionContainer
import com.anytypeio.anytype.domain.library.StorelessSubscriptionContainer
import com.anytypeio.anytype.domain.misc.DeepLinkResolver
import com.anytypeio.anytype.domain.multiplayer.ActiveSpaceMemberSubscriptionContainer
Expand Down Expand Up @@ -174,6 +176,34 @@ object SubscriptionsModule {
config = configStorage
)

@JvmStatic
@Provides
@Singleton
fun chatsViewSubscriptionContainer(
dispatchers: AppCoroutineDispatchers,
@Named(DEFAULT_APP_COROUTINE_SCOPE) scope: CoroutineScope,
container: CrossSpaceSubscriptionContainer,
awaitAccountStartManager: AwaitAccountStartManager,
logger: Logger,
): ChatsDetailsSubscriptionContainer = ChatsDetailsSubscriptionContainer.Default(
dispatchers = dispatchers,
scope = scope,
container = container,
awaitAccountStart = awaitAccountStartManager,
logger = logger,
)

@JvmStatic
@Provides
@Singleton
fun provideCrossSpaceSubscriptionContainer(
repo: BlockRepository,
channel: SubscriptionEventChannel,
dispatchers: AppCoroutineDispatchers,
logger: Logger
): CrossSpaceSubscriptionContainer =
CrossSpaceSubscriptionContainer.Impl(repo, channel, dispatchers, logger)

@JvmStatic
@Provides
@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import com.anytypeio.anytype.core_models.chats.NotificationState
import com.anytypeio.anytype.core_models.membership.MembershipPaymentMethod
import com.anytypeio.anytype.core_models.membership.NameServiceNameType
import com.anytypeio.anytype.core_models.multiplayer.SpaceMemberPermissions
import com.anytypeio.anytype.core_models.multiplayer.SpaceUxType
import com.anytypeio.anytype.core_models.primitives.SpaceId
import com.anytypeio.anytype.core_models.primitives.TypeKey

Expand Down Expand Up @@ -606,6 +605,21 @@ sealed class Command {
}
}

/**
* Subscribe to search results across multiple spaces.
* Unlike regular subscriptions that are scoped to a single space,
* this allows searching and subscribing to objects across all user spaces.
*/
data class CrossSpaceSearchSubscribe(
val subscription: Id,
val filters: List<DVFilter> = emptyList(),
val sorts: List<DVSort> = emptyList(),
val keys: List<String>,
val source: List<String> = emptyList(),
val noDepSubscription: Boolean = false,
val collectionId: Id? = null
) : Command()

data class ProcessCancel(
val processId: Id
) : Command()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,14 @@ class BlockDataRepository(
keys = keys
)

override suspend fun crossSpaceSearchSubscribe(
command: Command.CrossSpaceSearchSubscribe
): SearchResult = remote.crossSpaceSearchSubscribe(command)

override suspend fun objectCrossSpaceUnsubscribe(subscription: String) {
remote.objectCrossSpaceUnsubscribe(subscription)
}

override suspend fun cancelObjectSearchSubscription(
subscriptions: List<Id>
) = remote.cancelObjectSearchSubscription(subscriptions).also {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ interface BlockRemote {
keys: List<String>
): SearchResult

suspend fun crossSpaceSearchSubscribe(
command: Command.CrossSpaceSearchSubscribe
): SearchResult

suspend fun objectCrossSpaceUnsubscribe(subscription: String)

suspend fun cancelObjectSearchSubscription(subscriptions: List<Id>)

suspend fun addRelationToObject(ctx: Id, relation: Key): Payload?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,17 @@ interface BlockRepository {
keys: List<String>
): SearchResult

/**
* Subscribe to search results across all user spaces.
* Unlike searchObjectsWithSubscription which is scoped to a single space,
* this searches across all spaces globally.
*/
suspend fun crossSpaceSearchSubscribe(
command: Command.CrossSpaceSearchSubscribe
): SearchResult

suspend fun objectCrossSpaceUnsubscribe(subscription: String)

suspend fun cancelObjectSearchSubscription(subscriptions: List<Id>)

suspend fun addRelationToObject(ctx: Id, relation: Key): Payload?
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package com.anytypeio.anytype.domain.chats

import com.anytypeio.anytype.core_models.DVFilter
import com.anytypeio.anytype.core_models.DVFilterCondition
import com.anytypeio.anytype.core_models.Id
import com.anytypeio.anytype.core_models.ObjectType
import com.anytypeio.anytype.core_models.ObjectWrapper
import com.anytypeio.anytype.core_models.Relations
import com.anytypeio.anytype.domain.account.AwaitAccountStartManager
import com.anytypeio.anytype.domain.base.AppCoroutineDispatchers
import com.anytypeio.anytype.domain.debugging.Logger
import com.anytypeio.anytype.domain.library.CrossSpaceSearchParams
import com.anytypeio.anytype.domain.library.CrossSpaceSubscriptionContainer
import javax.inject.Inject
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.launch

/**
* Container for subscribing to and observing all chat objects (Layout.CHAT_DERIVED).
* Maintains a global subscription to chat objects across all spaces.
*/
interface ChatsDetailsSubscriptionContainer {

fun start()
fun stop()
fun observe(): Flow<List<ObjectWrapper.Basic>>
fun observe(chatId: Id): Flow<ObjectWrapper.Basic>

fun get(): List<ObjectWrapper.Basic>
fun get(chatId: Id): ObjectWrapper.Basic?

class Default @Inject constructor(
private val container: CrossSpaceSubscriptionContainer,
private val scope: CoroutineScope,
private val dispatchers: AppCoroutineDispatchers,
private val awaitAccountStart: AwaitAccountStartManager,
private val logger: Logger
) : ChatsDetailsSubscriptionContainer {

private val data = MutableStateFlow<List<ObjectWrapper.Basic>>(emptyList())
private val jobs = mutableListOf<Job>()

init {
logger.logInfo("ChatsDetailsSubscriptionContainer initialized")
scope.launch {
awaitAccountStart.state().collect { state ->
when (state) {
AwaitAccountStartManager.State.Init -> {
logger.logInfo("ChatsDetailsSubscriptionContainer, AwaitAccountStartManager.State.Init - waiting for account start")
// Do nothing
}

AwaitAccountStartManager.State.Started -> {
logger.logInfo("ChatsDetailsSubscriptionContainer, AwaitAccountStartManager.State.Started - starting chat details subscription")
start()
}

AwaitAccountStartManager.State.Stopped -> {
logger.logInfo("ChatsDetailsSubscriptionContainer, AwaitAccountStartManager.State.Stopped - stopping chat details subscription")
stop()
}
}
}
}
}

override fun observe(): Flow<List<ObjectWrapper.Basic>> {
return data
}

override fun observe(chatId: Id): Flow<ObjectWrapper.Basic> {
return data.mapNotNull { all ->
all.firstOrNull { chat -> chat.id == chatId }
}
}

override fun get(): List<ObjectWrapper.Basic> {
return data.value
}

override fun get(chatId: Id): ObjectWrapper.Basic? {
return data.value.find { chat -> chat.id == chatId }
}

override fun start() {
logger.logInfo("Starting ChatsDetailsSubscriptionContainer")
jobs += scope.launch(dispatchers.io) {
proceedWithSubscription()
}
}

private suspend fun proceedWithSubscription() {
container.subscribe(
CrossSpaceSearchParams(
subscription = GLOBAL_CHATS_SUBSCRIPTION,
keys = listOf(
Relations.ID,
Relations.NAME,
Relations.PLURAL_NAME
),
filters = listOf(
DVFilter(
relation = Relations.LAYOUT,
value = listOf(ObjectType.Layout.CHAT_DERIVED.code.toDouble()),
condition = DVFilterCondition.IN
),
DVFilter(
relation = Relations.IS_ARCHIVED,
condition = DVFilterCondition.NOT_EQUAL,
value = true
),
DVFilter(
relation = Relations.IS_HIDDEN,
condition = DVFilterCondition.NOT_EQUAL,
value = true
),
DVFilter(
relation = Relations.IS_DELETED,
condition = DVFilterCondition.NOT_EQUAL,
value = true
),
DVFilter(
relation = Relations.IS_HIDDEN_DISCOVERY,
condition = DVFilterCondition.NOT_EQUAL,
value = true
)
)
)
).catch { error ->
logger.logException(
e = error,
msg = "Failed to subscribe to chats details"
)
}.collect {
data.value = it
}
}

override fun stop() {
logger.logInfo("Stopping ChatsDetailsSubscriptionContainer")
jobs.forEach { it.cancel() }
scope.launch(dispatchers.io) {
runCatching {
container.unsubscribe(GLOBAL_CHATS_SUBSCRIPTION)
}.onFailure { error ->
logger.logException(
e = error,
msg = "Failed to unsubscribe from $GLOBAL_CHATS_SUBSCRIPTION"
)
}
.onSuccess { logger.logInfo("Successfully unsubscribed from $GLOBAL_CHATS_SUBSCRIPTION") }
}
}

companion object {
const val GLOBAL_CHATS_SUBSCRIPTION = "global-chats-details-subscription"
}
}
}
Loading