Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How do we use coroutine adapter? I can't find an example for the life of me. #114

Closed
Orpheus007 opened this issue Oct 25, 2019 · 8 comments

Comments

@Orpheus007
Copy link

Orpheus007 commented Oct 25, 2019

Can someone please give me an example on how to use kotlin coroutines with scarlet?
Like how to set it up and how to "Observe" the data etc..
Thank you

@ilyaklyukin
Copy link

ilyaklyukin commented Oct 25, 2019

@NaaleinGrohiik

For dependencies of Scarlett2:
maven { url "https://oss.sonatype.org/content/repositories/snapshots" }

// Networking
//versions.scarlet : '0.2.5-SNAPSHOT'
implementation "com.tinder.scarlet:scarlet:${versions.scarlet}"
implementation "com.tinder.scarlet:protocol-websocket-okhttp:${versions.scarlet}"
implementation "com.tinder.scarlet:message-adapter-gson:${versions.scarlet}"
implementation "com.tinder.scarlet:stream-adapter-coroutines:${versions.scarlet}"
implementation "com.tinder.scarlet:lifecycle-android:${versions.scarlet}"
implementation "com.squareup.okhttp3:okhttp:${versions.okhttp3}"
implementation "com.squareup.okhttp3:logging-interceptor:${versions.okhttp3}"
implementation "com.google.code.gson:gson:${googleVersions.gson}"

I'm using Dagger2 for config Scarlet2:

    @Provides
    @Singleton
    @JvmStatic
    @Named("apiBaseUrl")
    fun provideApiBaseUrl(app: Application): String {
        val url = "wss://" + app.getString(R.string.api_url) + "/game"
        Timber.d("${LogConfig.CONNECTION_TAG} using url: $url")
        return url
    }

    @Provides
    @Singleton
    @JvmStatic
    fun provideScarlet(
        app: Application,
        httpClient: OkHttpClient,
        @Named("apiBaseUrl") baseUrl: String
    ): Scarlet = Scarlet(
        OkHttpWebSocket(
            httpClient,
            OkHttpWebSocket.SimpleRequestFactory(
                { Request.Builder().url(baseUrl).build() },
                { ShutdownReason.GRACEFUL }
            )),
        Scarlet.Configuration(
            backoffStrategy = LinearBackoffStrategy(ClientConfig.RECONNECT_INTERVAL),
            messageAdapterFactories = listOf(GsonMessageAdapter.Factory()),
            streamAdapterFactories = listOf(CoroutinesStreamAdapterFactory()),
            lifecycle = AndroidLifecycle.ofApplicationForeground(app)
        )
    )

/*
    @Provides
    @Singleton
    @JvmStatic
    fun createNoConnectionInterceptor(
        app: Application
    ): NoConnectionInterceptor = NoConnectionInterceptor(app)
*/
    @Provides
    @Singleton
    @JvmStatic
    fun createHttpClient(
   //     noConnectionInterceptor: NoConnectionInterceptor
    ): OkHttpClient {
        val builder = OkHttpClient.Builder()
            //.addInterceptor(noConnectionInterceptor)
            .readTimeout(ClientConfig.RECONNECT_INTERVAL, TimeUnit.MILLISECONDS)
            .writeTimeout(ClientConfig.RECONNECT_INTERVAL, TimeUnit.MILLISECONDS)
            .followRedirects(false)
            .connectTimeout(ClientConfig.RECONNECT_INTERVAL, TimeUnit.MILLISECONDS)

        if (BuildConfig.DEBUG) {
            val httpLoggingInterceptor = HttpLoggingInterceptor()
            val loggingInterceptor =
                httpLoggingInterceptor.apply {
                    httpLoggingInterceptor.level = HttpLoggingInterceptor.Level.BODY
                }
            builder.addInterceptor(loggingInterceptor)
        }

        return builder.build()
    }

    @Provides
    @Singleton
    @JvmStatic
    fun createRestApi(scarlet: Scarlet): RestApi {
        return scarlet.create()
    }

    @Provides
    @Singleton
    @JvmStatic
    fun provideWebSocketService(restApi: RestApi): WebSocketService =
        WebSocketService(restApi)

Custom interface:

interface RestApi {
    @Send
    fun send(message: Any): Boolean

    @Receive
    fun observeEvents(): ReceiveChannel<WebSocketEvent>

    @Receive
    fun observeApiMessage(): ReceiveChannel<ApiMessage>
}

import com. ... .datasource.net.message.ApiMessage
import com.tinder.scarlet.websocket.WebSocketEvent
import kotlinx.coroutines.channels.ReceiveChannel
import javax.inject.Inject

class WebSocketService @Inject constructor(private val restApi: RestApi) {

    fun send(message: Any): Boolean {
        return restApi.send(message)
    }

    fun observeEvents(): ReceiveChannel<WebSocketEvent> {
        return restApi.observeEvents()
    }

    fun observeApiMessage(): ReceiveChannel<ApiMessage> {
        return restApi.observeApiMessage()
    }
}

Api message is a base class for a list of different messages, converted to json
abstract class ApiMessage(type: String)
for example:

data class NeedAlias(
    val type: String = "NEED_ALIAS",
    val playerId: Long
) : ApiMessage(type)

Our API has websocket requests with responses, so I've created BaseRep mechanism:

abstract class BaseRepository(private val webSocketService: WebSocketService) {

    private fun sendRequest(request: Any): Result<Unit> {
        val isSent = webSocketService.send(request)
        return if (isSent)
            Result.Success(Unit)
        else Result.Error(Failure.NetworkConnection)
    }

    @ExperimentalCoroutinesApi
    protected suspend fun processRequest(
        request: Request,
        onResponse: (message: ApiMessage) -> Result<Unit>
    ): Result<Unit> {
        return suspendCoroutine { continuation ->

            GlobalScope.launch(CoroutineExceptionHandler { _, throwable ->
                Timber.e("SOCKET ERROR $throwable")
                Result.Error(Failure.NetworkConnection)
            }) {
                val channel = this@BaseRepository.webSocketService.observeApiMessage()
                channel.consumeEach {
                    continuation.resume(onResponse(it))
                    channel.cancel()
                }
            }

            if (sendRequest(request).isFailure) {
                Result.Error(Failure.NetworkConnection)
            }
        }
    }

}
example of rep with request:
class LoginRepositoryImpl @Inject constructor(
    webSocketService: WebSocketService,
    val resources: Resources
) : BaseRepository(webSocketService), LoginRepository {

    @ExperimentalCoroutinesApi
    override suspend fun login(
        email: String,
        password: String
    ): Result<Unit> {
        val request = StandardLogin(email, password)
        return processRequest(request) { message ->
            when (message) {
                is LoginOk -> Result.Success(Unit)
                is LoginFail -> Result.Error(
                    Failure.ServerError(resources.getString(R.string.wrong_password))
                )
                is NeedAlias -> Result.Error(Failure.ServerError(EMPTY_ALIAS_AND_EMAIL))
                is NeedVerification -> Result.Error(Failure.ServerError(resources.getString(R.string.need_verification)))
                is Error -> {
                    val failure = ServerError.getByCode(message.errorCode).let {
                        when (it) {
                            NO_PLAYER_FOUND -> Failure.ServerError(resources.getString(R.string.wrong_password))
                            NOT_PERMITTED_FOR_TRIAL_PLAYER -> Failure.ServerError(
                                resources.getString(
                                    R.string.wrong_password
                                )
                            )
                            INCOMPLETE_EMAIL_VALIDATION -> Failure.ServerError(resources.getString(R.string.need_verification))
                            else -> Failure.ServerError(it, message.errorMessage)
                        }
                    }

                    Result.Error(failure)
                }
                else -> Result.Error(UnexpectedException(message.toString()))
            }
        }
    }
}

LoginRepository is just an interface to be used from domain layer:

interface LoginRepository {

    suspend fun login(email: String, password: String): Result<Unit>

    suspend fun forgotPassword(email: String): Result<Unit>

    suspend fun chooseAlias(alias: String): Result<Unit>

    suspend fun createEmailAccount(email: String, password: String): Result<Unit>
}

@Orpheus007
Copy link
Author

Orpheus007 commented Oct 25, 2019

@ilyaklyukin Thank you so much my good sir! Not only did this answer my question it also gave me new ideas for wrapping results. Really appreciate it!

@lambdatamer
Copy link

If you prefer to use Kotlin Flows, here is the adapter implementation:

class FlowStreamAdapter<T> : StreamAdapter<T, Flow<T>> {
    override fun adapt(stream: Stream<T>) = callbackFlow<T> {
        stream.start(object : Stream.Observer<T> {
            override fun onComplete() {
                close()
            }

            override fun onError(throwable: Throwable) {
                close(cause = throwable)
            }

            override fun onNext(data: T) {
                if (!isClosedForSend) offer(data)
            }
        })
        awaitClose {}
    }

    object Factory : StreamAdapter.Factory {
        override fun create(type: Type): StreamAdapter<Any, Any> {
            return when (type.getRawType()) {
                Flow::class.java -> FlowStreamAdapter()
                else -> throw IllegalArgumentException()
            }
        }
    }
}

Then call .addStreamAdapterFactory(FlowStreamAdapter.Factory) on your Scarlet.Builder.
Now you can use Flow<T> return type in your services:

interface ExampleService {
    @Receive
    fun example(): Flow<String>
}

@Orpheus007
Copy link
Author

I actually just started working with flow. Thank you for sharing!

@Amalip
Copy link

Amalip commented Feb 25, 2021

@Orpheus007 @lambdatamer do you have an example using Flows?

@ShubhamAgr
Copy link

ShubhamAgr commented May 30, 2021

If you prefer to use Kotlin Flows, here is the adapter implementation:

class FlowStreamAdapter<T> : StreamAdapter<T, Flow<T>> {
    override fun adapt(stream: Stream<T>) = callbackFlow<T> {
        stream.start(object : Stream.Observer<T> {
            override fun onComplete() {
                close()
            }

            override fun onError(throwable: Throwable) {
                close(cause = throwable)
            }

            override fun onNext(data: T) {
                if (!isClosedForSend) offer(data)
            }
        })
        awaitClose {}
    }

    object Factory : StreamAdapter.Factory {
        override fun create(type: Type): StreamAdapter<Any, Any> {
            return when (type.getRawType()) {
                Flow::class.java -> FlowStreamAdapter()
                else -> throw IllegalArgumentException()
            }
        }
    }
}

Then call .addStreamAdapterFactory(FlowStreamAdapter.Factory) on your Scarlet.Builder.
Now you can use Flow<T> return type in your services:

interface ExampleService {
    @Receive
    fun example(): Flow<String>
}

I think this is much better way, emit the response and then catch the response with viewmodel
https://github.com/kizok/tinder_scarlet_with_coroutine_adapter/blob/master/app/src/main/java/tech/kizok/sockettest/ScarletAdapter/ReceiveChannelStreamAdapter.kt

@jemshit
Copy link

jemshit commented Jul 25, 2021

Few important points:

  • ReceiveChannel can have single consumer at a time, if there are multiple consumers, only one of them gets the event. BroadcastChannel should be used for such use cases, all consumers/subscribers get all the events.
  • If ReceiveChannel is used with channel.consumeEach/consumeAsFlow, when consumer Scope is cancelled, underlying Channel is cancelled as well. If for (msg in channel){} is used for consumption and Scope is cancelled, then underlying Channel is not cancelled.
  • Each time @Receive fun someMethod is called from interface, it returns New Instance of ReceiveChannel/Flow/BroadcastChannel...
  • Flow is cold, it does not work until there is collector; Channel is hot, it keeps working even if there is no collector

BroadcastChannelStreamAdapter should be sth like this:

import com.tinder.scarlet.Stream
import com.tinder.scarlet.StreamAdapter
import com.tinder.scarlet.utils.getRawType
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.BroadcastChannel
import java.lang.reflect.Type

class BroadcastChannelStreamAdapter<T> : StreamAdapter<T, BroadcastChannel<T>> {
    @ExperimentalCoroutinesApi
    override fun adapt(stream: Stream<T>): BroadcastChannel<T> {
        val channel = BroadcastChannel<T>(100)
        stream.start(object : Stream.Observer<T> {
            override fun onComplete() {
                channel.cancel()
            }

            override fun onError(throwable: Throwable) {
                channel.close(cause = throwable)
            }

            override fun onNext(data: T) {
                if (!channel.isClosedForSend) {
                    channel.offer(data)
                }
            }
        })
        return channel
    }

    object Factory : StreamAdapter.Factory {
        override fun create(type: Type): StreamAdapter<Any, Any> {
            return when (type.getRawType()) {
                BroadcastChannel::class.java -> BroadcastChannelStreamAdapter()
                else -> throw IllegalArgumentException()
            }
        }
    }
}

@taknikiniga
Copy link

taknikiniga commented Feb 14, 2022

@NaaleinGrohiik

For dependencies of Scarlett2: maven { url "https://oss.sonatype.org/content/repositories/snapshots" }

// Networking //versions.scarlet : '0.2.5-SNAPSHOT' implementation "com.tinder.scarlet:scarlet:${versions.scarlet}" implementation "com.tinder.scarlet:protocol-websocket-okhttp:${versions.scarlet}" implementation "com.tinder.scarlet:message-adapter-gson:${versions.scarlet}" implementation "com.tinder.scarlet:stream-adapter-coroutines:${versions.scarlet}" implementation "com.tinder.scarlet:lifecycle-android:${versions.scarlet}" implementation "com.squareup.okhttp3:okhttp:${versions.okhttp3}" implementation "com.squareup.okhttp3:logging-interceptor:${versions.okhttp3}" implementation "com.google.code.gson:gson:${googleVersions.gson}"

I'm using Dagger2 for config Scarlet2:

    @Provides
    @Singleton
    @JvmStatic
    @Named("apiBaseUrl")
    fun provideApiBaseUrl(app: Application): String {
        val url = "wss://" + app.getString(R.string.api_url) + "/game"
        Timber.d("${LogConfig.CONNECTION_TAG} using url: $url")
        return url
    }

    @Provides
    @Singleton
    @JvmStatic
    fun provideScarlet(
        app: Application,
        httpClient: OkHttpClient,
        @Named("apiBaseUrl") baseUrl: String
    ): Scarlet = Scarlet(
        OkHttpWebSocket(
            httpClient,
            OkHttpWebSocket.SimpleRequestFactory(
                { Request.Builder().url(baseUrl).build() },
                { ShutdownReason.GRACEFUL }
            )),
        Scarlet.Configuration(
            backoffStrategy = LinearBackoffStrategy(ClientConfig.RECONNECT_INTERVAL),
            messageAdapterFactories = listOf(GsonMessageAdapter.Factory()),
            streamAdapterFactories = listOf(CoroutinesStreamAdapterFactory()),
            lifecycle = AndroidLifecycle.ofApplicationForeground(app)
        )
    )

/*
    @Provides
    @Singleton
    @JvmStatic
    fun createNoConnectionInterceptor(
        app: Application
    ): NoConnectionInterceptor = NoConnectionInterceptor(app)
*/
    @Provides
    @Singleton
    @JvmStatic
    fun createHttpClient(
   //     noConnectionInterceptor: NoConnectionInterceptor
    ): OkHttpClient {
        val builder = OkHttpClient.Builder()
            //.addInterceptor(noConnectionInterceptor)
            .readTimeout(ClientConfig.RECONNECT_INTERVAL, TimeUnit.MILLISECONDS)
            .writeTimeout(ClientConfig.RECONNECT_INTERVAL, TimeUnit.MILLISECONDS)
            .followRedirects(false)
            .connectTimeout(ClientConfig.RECONNECT_INTERVAL, TimeUnit.MILLISECONDS)

        if (BuildConfig.DEBUG) {
            val httpLoggingInterceptor = HttpLoggingInterceptor()
            val loggingInterceptor =
                httpLoggingInterceptor.apply {
                    httpLoggingInterceptor.level = HttpLoggingInterceptor.Level.BODY
                }
            builder.addInterceptor(loggingInterceptor)
        }

        return builder.build()
    }

    @Provides
    @Singleton
    @JvmStatic
    fun createRestApi(scarlet: Scarlet): RestApi {
        return scarlet.create()
    }

    @Provides
    @Singleton
    @JvmStatic
    fun provideWebSocketService(restApi: RestApi): WebSocketService =
        WebSocketService(restApi)

Custom interface:

interface RestApi {
    @Send
    fun send(message: Any): Boolean

    @Receive
    fun observeEvents(): ReceiveChannel<WebSocketEvent>

    @Receive
    fun observeApiMessage(): ReceiveChannel<ApiMessage>
}
import com. ... .datasource.net.message.ApiMessage
import com.tinder.scarlet.websocket.WebSocketEvent
import kotlinx.coroutines.channels.ReceiveChannel
import javax.inject.Inject

class WebSocketService @Inject constructor(private val restApi: RestApi) {

    fun send(message: Any): Boolean {
        return restApi.send(message)
    }

    fun observeEvents(): ReceiveChannel<WebSocketEvent> {
        return restApi.observeEvents()
    }

    fun observeApiMessage(): ReceiveChannel<ApiMessage> {
        return restApi.observeApiMessage()
    }
}

Api message is a base class for a list of different messages, converted to json abstract class ApiMessage(type: String) for example:

data class NeedAlias(
    val type: String = "NEED_ALIAS",
    val playerId: Long
) : ApiMessage(type)

Our API has websocket requests with responses, so I've created BaseRep mechanism:

abstract class BaseRepository(private val webSocketService: WebSocketService) {

    private fun sendRequest(request: Any): Result<Unit> {
        val isSent = webSocketService.send(request)
        return if (isSent)
            Result.Success(Unit)
        else Result.Error(Failure.NetworkConnection)
    }

    @ExperimentalCoroutinesApi
    protected suspend fun processRequest(
        request: Request,
        onResponse: (message: ApiMessage) -> Result<Unit>
    ): Result<Unit> {
        return suspendCoroutine { continuation ->

            GlobalScope.launch(CoroutineExceptionHandler { _, throwable ->
                Timber.e("SOCKET ERROR $throwable")
                Result.Error(Failure.NetworkConnection)
            }) {
                val channel = this@BaseRepository.webSocketService.observeApiMessage()
                channel.consumeEach {
                    continuation.resume(onResponse(it))
                    channel.cancel()
                }
            }

            if (sendRequest(request).isFailure) {
                Result.Error(Failure.NetworkConnection)
            }
        }
    }

}
example of rep with request:
class LoginRepositoryImpl @Inject constructor(
    webSocketService: WebSocketService,
    val resources: Resources
) : BaseRepository(webSocketService), LoginRepository {

    @ExperimentalCoroutinesApi
    override suspend fun login(
        email: String,
        password: String
    ): Result<Unit> {
        val request = StandardLogin(email, password)
        return processRequest(request) { message ->
            when (message) {
                is LoginOk -> Result.Success(Unit)
                is LoginFail -> Result.Error(
                    Failure.ServerError(resources.getString(R.string.wrong_password))
                )
                is NeedAlias -> Result.Error(Failure.ServerError(EMPTY_ALIAS_AND_EMAIL))
                is NeedVerification -> Result.Error(Failure.ServerError(resources.getString(R.string.need_verification)))
                is Error -> {
                    val failure = ServerError.getByCode(message.errorCode).let {
                        when (it) {
                            NO_PLAYER_FOUND -> Failure.ServerError(resources.getString(R.string.wrong_password))
                            NOT_PERMITTED_FOR_TRIAL_PLAYER -> Failure.ServerError(
                                resources.getString(
                                    R.string.wrong_password
                                )
                            )
                            INCOMPLETE_EMAIL_VALIDATION -> Failure.ServerError(resources.getString(R.string.need_verification))
                            else -> Failure.ServerError(it, message.errorMessage)
                        }
                    }

                    Result.Error(failure)
                }
                else -> Result.Error(UnexpectedException(message.toString()))
            }
        }
    }
}

LoginRepository is just an interface to be used from domain layer:

interface LoginRepository {

    suspend fun login(email: String, password: String): Result<Unit>

    suspend fun forgotPassword(email: String): Result<Unit>

    suspend fun chooseAlias(alias: String): Result<Unit>

    suspend fun createEmailAccount(email: String, password: String): Result<Unit>
}
  • is this work for socket.io
  • unable to import ( WebSocketEvent )
  • how to use dragger+corountines+flow with socket.io || please make a post for that also
  • Also Want ( Result Code )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants