Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,12 @@ internal fun createFeedsClient(
maxStrongSubscriptions = Integer.MAX_VALUE,
maxWeakSubscriptions = Integer.MAX_VALUE,
),
stateEventsSubscriptionManager =
StreamSubscriptionManager(
logProvider.taggedLogger("StateEventSubscriptions"),
maxStrongSubscriptions = Integer.MAX_VALUE,
maxWeakSubscriptions = Integer.MAX_VALUE,
),
feedWatchHandler = feedWatchHandler,
errorBus = errorBus,
scope = clientScope,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ import io.getstream.feeds.android.client.internal.state.MemberListImpl
import io.getstream.feeds.android.client.internal.state.ModerationConfigListImpl
import io.getstream.feeds.android.client.internal.state.PollListImpl
import io.getstream.feeds.android.client.internal.state.PollVoteListImpl
import io.getstream.feeds.android.client.internal.state.event.toModel
import io.getstream.feeds.android.client.internal.subscribe.FeedsEventListener
import io.getstream.feeds.android.client.internal.subscribe.StateUpdateEventListener
import io.getstream.feeds.android.network.models.ActivityRequest
import io.getstream.feeds.android.network.models.AddActivityRequest
import io.getstream.feeds.android.network.models.DeleteActivitiesRequest
Expand All @@ -108,6 +110,7 @@ import kotlinx.coroutines.launch
internal class FeedsClientImpl(
private val coreClient: StreamClient,
private val feedsEventsSubscriptionManager: StreamSubscriptionManager<FeedsEventListener>,
private val stateEventsSubscriptionManager: StreamSubscriptionManager<StateUpdateEventListener>,
override val apiKey: StreamApiKey,
override val user: User,
private val connectionRecoveryHandler: ConnectionRecoveryHandler,
Expand Down Expand Up @@ -148,6 +151,11 @@ internal class FeedsClientImpl(
logger.v { "[onEvent] Received event from core: $event" }
_events.tryEmit(event)
feedsEventsSubscriptionManager.forEach { it.onEvent(event) }
event.toModel()?.let { stateEvent ->
stateEventsSubscriptionManager.forEach { listener ->
listener.onEvent(stateEvent)
}
}
} else {
logger.e { "[onEvent] Received non-WSEvent: $event" }
}
Expand Down Expand Up @@ -214,7 +222,8 @@ internal class FeedsClientImpl(
activitiesRepository = activitiesRepository,
commentsRepository = commentsRepository,
pollsRepository = pollsRepository,
subscriptionManager = feedsEventsSubscriptionManager,
subscriptionManager = stateEventsSubscriptionManager,
socketSubscriptionManager = feedsEventsSubscriptionManager,
commentList =
ActivityCommentListImpl(
query =
Expand All @@ -225,7 +234,7 @@ internal class FeedsClientImpl(
),
currentUserId = user.id,
commentsRepository = commentsRepository,
subscriptionManager = feedsEventsSubscriptionManager,
subscriptionManager = stateEventsSubscriptionManager,
),
)

Expand Down Expand Up @@ -286,7 +295,7 @@ internal class FeedsClientImpl(
query = query,
currentUserId = user.id,
commentsRepository = commentsRepository,
subscriptionManager = feedsEventsSubscriptionManager,
subscriptionManager = stateEventsSubscriptionManager,
)

override fun commentReplyList(query: CommentRepliesQuery): CommentReplyList =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ internal interface CommentsRepository {
suspend fun addCommentReaction(
commentId: String,
request: AddCommentReactionRequest,
): Result<Pair<FeedsReactionData, String>>
): Result<Pair<FeedsReactionData, CommentData>>

/**
* Deletes a reaction from a comment.
Expand All @@ -137,7 +137,7 @@ internal interface CommentsRepository {
suspend fun deleteCommentReaction(
commentId: String,
type: String,
): Result<Pair<FeedsReactionData, String>>
): Result<Pair<FeedsReactionData, CommentData>>

/**
* Queries reactions for a specific comment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,17 @@ internal class CommentsRepositoryImpl(
override suspend fun addCommentReaction(
commentId: String,
request: AddCommentReactionRequest,
): Result<Pair<FeedsReactionData, String>> = runSafely {
): Result<Pair<FeedsReactionData, CommentData>> = runSafely {
val response = api.addCommentReaction(commentId, request)
Pair(response.reaction.toModel(), response.comment.id)
Pair(response.reaction.toModel(), response.comment.toModel())
}

override suspend fun deleteCommentReaction(
commentId: String,
type: String,
): Result<Pair<FeedsReactionData, String>> = runSafely {
): Result<Pair<FeedsReactionData, CommentData>> = runSafely {
val response = api.deleteCommentReaction(id = commentId, type = type)
Pair(response.reaction.toModel(), response.comment.id)
Pair(response.reaction.toModel(), response.comment.toModel())
}

override suspend fun queryCommentReactions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import io.getstream.feeds.android.client.api.state.ActivityCommentListState
import io.getstream.feeds.android.client.api.state.query.ActivityCommentsQuery
import io.getstream.feeds.android.client.internal.repository.CommentsRepository
import io.getstream.feeds.android.client.internal.state.event.handler.ActivityCommentListEventHandler
import io.getstream.feeds.android.client.internal.subscribe.FeedsEventListener
import io.getstream.feeds.android.client.internal.subscribe.StateUpdateEventListener

/**
* A paginated list of activities that supports real-time updates and filtering.
Expand All @@ -39,7 +39,7 @@ internal class ActivityCommentListImpl(
override val query: ActivityCommentsQuery,
private val currentUserId: String,
private val commentsRepository: CommentsRepository,
private val subscriptionManager: StreamSubscriptionManager<FeedsEventListener>,
subscriptionManager: StreamSubscriptionManager<StateUpdateEventListener>,
) : ActivityCommentList {

private val _state: ActivityCommentListStateImpl =
Expand Down Expand Up @@ -73,10 +73,6 @@ internal class ActivityCommentListImpl(
return queryComments(nextQuery)
}

/** Internal property to access the mutable state of the comment list. */
internal val mutableState: ActivityCommentListMutableState
get() = _state

private suspend fun queryComments(
query: ActivityCommentsQuery
): Result<List<ThreadedCommentData>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ import io.getstream.feeds.android.client.api.state.ActivityState
import io.getstream.feeds.android.client.internal.repository.ActivitiesRepository
import io.getstream.feeds.android.client.internal.repository.CommentsRepository
import io.getstream.feeds.android.client.internal.repository.PollsRepository
import io.getstream.feeds.android.client.internal.state.event.StateUpdateEvent
import io.getstream.feeds.android.client.internal.state.event.handler.ActivityEventHandler
import io.getstream.feeds.android.client.internal.subscribe.FeedsEventListener
import io.getstream.feeds.android.client.internal.subscribe.StateUpdateEventListener
import io.getstream.feeds.android.client.internal.subscribe.onEvent
import io.getstream.feeds.android.client.internal.utils.flatMap
import io.getstream.feeds.android.network.models.AddCommentReactionRequest
import io.getstream.feeds.android.network.models.CastPollVoteRequest
Expand All @@ -58,7 +61,8 @@ import io.getstream.feeds.android.network.models.UpdatePollRequest
* @property commentsRepository The repository used to fetch and manage comments.
* @property pollsRepository The repository used to fetch and manage polls.
* @property commentList The list of comments associated with this activity.
* @property subscriptionManager The manager for WebSocket subscriptions to receive real-time
* @property subscriptionManager The manager for state update subscriptions.
* @property socketSubscriptionManager The manager for WebSocket subscriptions to receive real-time
* updates.
*/
internal class ActivityImpl(
Expand All @@ -69,7 +73,8 @@ internal class ActivityImpl(
private val commentsRepository: CommentsRepository,
private val pollsRepository: PollsRepository,
private val commentList: ActivityCommentListImpl,
private val subscriptionManager: StreamSubscriptionManager<FeedsEventListener>,
private val subscriptionManager: StreamSubscriptionManager<StateUpdateEventListener>,
socketSubscriptionManager: StreamSubscriptionManager<FeedsEventListener>,
) : Activity {

private val _state: ActivityStateImpl = ActivityStateImpl(currentUserId, commentList.state)
Expand All @@ -78,7 +83,7 @@ internal class ActivityImpl(
ActivityEventHandler(fid = fid, activityId = activityId, state = _state)

init {
subscriptionManager.subscribe(eventHandler)
socketSubscriptionManager.subscribe(eventHandler)
}

override val state: ActivityState
Expand All @@ -101,8 +106,8 @@ internal class ActivityImpl(
}

override suspend fun getComment(commentId: String): Result<CommentData> {
return commentsRepository.getComment(commentId).onSuccess {
commentList.mutableState.onCommentUpdated(it)
return commentsRepository.getComment(commentId).onSuccess { comment ->
subscriptionManager.onEvent(StateUpdateEvent.CommentUpdated(comment))
}
}

Expand All @@ -112,7 +117,7 @@ internal class ActivityImpl(
): Result<CommentData> {
return commentsRepository
.addComment(request = request, attachmentUploadProgress = attachmentUploadProgress)
.onSuccess { commentList.mutableState.onCommentAdded(ThreadedCommentData(it)) }
.onSuccess { subscriptionManager.onEvent(StateUpdateEvent.CommentAdded(it)) }
}

override suspend fun addCommentsBatch(
Expand All @@ -121,18 +126,15 @@ internal class ActivityImpl(
): Result<List<CommentData>> {
return commentsRepository.addCommentsBatch(requests, attachmentUploadProgress).onSuccess {
comments ->
val threadedComments = comments.map(::ThreadedCommentData)
threadedComments.forEach { threadedComment ->
commentList.mutableState.onCommentAdded(threadedComment)
}
comments.forEach { subscriptionManager.onEvent(StateUpdateEvent.CommentAdded(it)) }
}
}

override suspend fun deleteComment(commentId: String, hardDelete: Boolean?): Result<Unit> {
return commentsRepository
.deleteComment(commentId, hardDelete)
.onSuccess { (comment, activity) ->
commentList.mutableState.onCommentRemoved(comment.id)
subscriptionManager.onEvent(StateUpdateEvent.CommentDeleted(comment))
_state.onActivityUpdated(activity)
}
.map {}
Expand All @@ -143,7 +145,7 @@ internal class ActivityImpl(
request: UpdateCommentRequest,
): Result<CommentData> {
return commentsRepository.updateComment(commentId, request).onSuccess {
commentList.mutableState.onCommentUpdated(it)
subscriptionManager.onEvent(StateUpdateEvent.CommentUpdated(it))
}
}

Expand All @@ -153,7 +155,11 @@ internal class ActivityImpl(
): Result<FeedsReactionData> {
return commentsRepository
.addCommentReaction(commentId, request)
.onSuccess { commentList.mutableState.onCommentReactionAdded(it.second, it.first) }
.onSuccess { (reaction, comment) ->
subscriptionManager.onEvent(
StateUpdateEvent.CommentReactionAdded(comment, reaction)
)
}
.map { it.first }
}

Expand All @@ -163,7 +169,11 @@ internal class ActivityImpl(
): Result<FeedsReactionData> {
return commentsRepository
.deleteCommentReaction(commentId, type)
.onSuccess { commentList.mutableState.onCommentReactionRemoved(it.second, it.first) }
.onSuccess { (reaction, comment) ->
subscriptionManager.onEvent(
StateUpdateEvent.CommentReactionDeleted(comment, reaction)
)
}
.map { it.first }
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2014-2025 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-feeds-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.getstream.feeds.android.client.internal.state.event

import io.getstream.feeds.android.client.api.model.CommentData
import io.getstream.feeds.android.client.api.model.FeedsReactionData
import io.getstream.feeds.android.client.api.model.toModel
import io.getstream.feeds.android.network.models.CommentAddedEvent
import io.getstream.feeds.android.network.models.CommentDeletedEvent
import io.getstream.feeds.android.network.models.CommentReactionAddedEvent
import io.getstream.feeds.android.network.models.CommentReactionDeletedEvent
import io.getstream.feeds.android.network.models.CommentUpdatedEvent
import io.getstream.feeds.android.network.models.WSEvent

/**
* Represents an event that may trigger a state update. These events are typically the result of
* receiving a WebSocket event or having executed a successful API call that can modify the state.
*/
internal sealed interface StateUpdateEvent {

data class CommentAdded(val comment: CommentData) : StateUpdateEvent

data class CommentDeleted(val comment: CommentData) : StateUpdateEvent

data class CommentUpdated(val comment: CommentData) : StateUpdateEvent

data class CommentReactionAdded(val comment: CommentData, val reaction: FeedsReactionData) :
StateUpdateEvent

data class CommentReactionDeleted(val comment: CommentData, val reaction: FeedsReactionData) :
StateUpdateEvent
}

internal fun WSEvent.toModel(): StateUpdateEvent? =
when (this) {
is CommentAddedEvent -> StateUpdateEvent.CommentAdded(comment.toModel())

is CommentUpdatedEvent -> StateUpdateEvent.CommentUpdated(comment.toModel())

is CommentDeletedEvent -> StateUpdateEvent.CommentDeleted(comment.toModel())

is CommentReactionAddedEvent ->
StateUpdateEvent.CommentReactionAdded(comment.toModel(), reaction.toModel())

is CommentReactionDeletedEvent ->
StateUpdateEvent.CommentReactionDeleted(comment.toModel(), reaction.toModel())

else -> null
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,51 +16,45 @@
package io.getstream.feeds.android.client.internal.state.event.handler

import io.getstream.feeds.android.client.api.model.ThreadedCommentData
import io.getstream.feeds.android.client.api.model.toModel
import io.getstream.feeds.android.client.internal.state.ActivityCommentListStateUpdates
import io.getstream.feeds.android.client.internal.subscribe.FeedsEventListener
import io.getstream.feeds.android.network.models.CommentAddedEvent
import io.getstream.feeds.android.network.models.CommentDeletedEvent
import io.getstream.feeds.android.network.models.CommentReactionAddedEvent
import io.getstream.feeds.android.network.models.CommentReactionDeletedEvent
import io.getstream.feeds.android.network.models.CommentUpdatedEvent
import io.getstream.feeds.android.network.models.WSEvent
import io.getstream.feeds.android.client.internal.state.event.StateUpdateEvent
import io.getstream.feeds.android.client.internal.subscribe.StateUpdateEventListener

internal class ActivityCommentListEventHandler(
private val objectId: String,
private val objectType: String,
private val state: ActivityCommentListStateUpdates,
) : FeedsEventListener {
) : StateUpdateEventListener {

override fun onEvent(event: WSEvent) {
override fun onEvent(event: StateUpdateEvent) {
when (event) {
is CommentAddedEvent -> {
is StateUpdateEvent.CommentAdded -> {
if (event.comment.objectId == objectId && event.comment.objectType == objectType) {
state.onCommentAdded(ThreadedCommentData(event.comment.toModel()))
state.onCommentAdded(ThreadedCommentData(event.comment))
}
}

is CommentDeletedEvent -> {
is StateUpdateEvent.CommentDeleted -> {
if (event.comment.objectId == objectId && event.comment.objectType == objectType) {
state.onCommentRemoved(event.comment.id)
}
}

is CommentUpdatedEvent -> {
is StateUpdateEvent.CommentUpdated -> {
if (event.comment.objectId == objectId && event.comment.objectType == objectType) {
state.onCommentUpdated(event.comment.toModel())
state.onCommentUpdated(event.comment)
}
}

is CommentReactionAddedEvent -> {
is StateUpdateEvent.CommentReactionAdded -> {
if (event.comment.objectId == objectId && event.comment.objectType == objectType) {
state.onCommentReactionAdded(event.comment.id, event.reaction.toModel())
state.onCommentReactionAdded(event.comment.id, event.reaction)
}
}

is CommentReactionDeletedEvent -> {
is StateUpdateEvent.CommentReactionDeleted -> {
if (event.comment.objectId == objectId && event.comment.objectType == objectType) {
state.onCommentReactionRemoved(event.comment.id, event.reaction.toModel())
state.onCommentReactionRemoved(event.comment.id, event.reaction)
}
}
}
Expand Down
Loading
Loading