diff --git a/build.sbt b/build.sbt index bdf24932..33ca5d04 100644 --- a/build.sbt +++ b/build.sbt @@ -60,6 +60,8 @@ libraryDependencies += "net.dv8tion" % "JDA" % "4.ALPHA.0_82" //Serial Communication libraryDependencies += "com.fazecast" % "jSerialComm" % "[2.0.0,3.0.0)" +// Socket.io +libraryDependencies += "io.socket" % "socket.io-client"% "1.0.0" // --------------------------------------------------------------------------------------------------------------------- // PLUGIN FRAMEWORK DEFINITIONS // --------------------------------------------------------------------------------------------------------------------- diff --git a/src/main/scala/org/codeoverflow/chatoverflow/requirement/service/tipeeestream/TipeeeStreamConnector.scala b/src/main/scala/org/codeoverflow/chatoverflow/requirement/service/tipeeestream/TipeeeStreamConnector.scala index fef347fc..a5fc12b6 100644 --- a/src/main/scala/org/codeoverflow/chatoverflow/requirement/service/tipeeestream/TipeeeStreamConnector.scala +++ b/src/main/scala/org/codeoverflow/chatoverflow/requirement/service/tipeeestream/TipeeeStreamConnector.scala @@ -1,19 +1,104 @@ package org.codeoverflow.chatoverflow.requirement.service.tipeeestream +import java.util.function.Consumer + +import io.socket.client.{IO, Socket} +import org.codeoverflow.chatoverflow.WithLogger +import org.codeoverflow.chatoverflow.api.io.dto.event.tipeeestream.{TipeeeStreamDonation, TipeeeStreamEvent, TipeeeStreamFollow, TipeeeStreamSubscription} import org.codeoverflow.chatoverflow.connector.Connector +import org.json.{JSONException, JSONObject} -class TipeeeStreamConnector(override val sourceIdentifier: String) extends Connector(sourceIdentifier) { +import scala.collection.mutable.ListBuffer - override protected var requiredCredentialKeys: List[String] = List() +/** + * The tipeeestream connector connects to the socket.io service to work with incoming events. + * + * @param sourceIdentifier the name of the tipeeestream account + */ +class TipeeeStreamConnector(override val sourceIdentifier: String) extends Connector(sourceIdentifier) with WithLogger { + private val eventHandler = ListBuffer[Consumer[TipeeeStreamEvent]]() + private val apiKey = "apiKey" + private val username = "username" + override protected var requiredCredentialKeys: List[String] = List(apiKey, username) override protected var optionalCredentialKeys: List[String] = List() + private var socket: Socket = _ /** * Starts the connector, e.g. creates a connection with its platform. */ - override def start(): Boolean = ??? + override def start(): Boolean = { + socket = IO.socket("https://sso-cf.tipeeestream.com").connect() + socket.on("connect", (_: Any) => { + logger info "Connected to TipeeStream Socket.io" + }) + socket.emit("join-room", getAuthenticationObject) + logger info "emitted credentials to TipeeSetream Socket.io api" + socket.on("new-event", (objects: Array[AnyRef]) => { + serializeObjectToObject(objects) + }) + true + } + + def addIncomingEventHandler(handler: Consumer[TipeeeStreamEvent]): Unit = { + eventHandler += handler + } + + private def serializeObjectToObject(objects : Array[AnyRef]) : Unit = { + val json: JSONObject = objects(0).asInstanceOf[JSONObject] + val event: JSONObject = json.getJSONObject("event") + val eventType: String = event.getString("type") + eventType match { + case "subscription" => + Subscription(event) + case "donation" => + Donation(event) + case "follow" => + Follow(event) + case _ => + } + } + + @throws[JSONException] + private def Donation(event: JSONObject): Unit = { + val parameter = event.getJSONObject("parameters") + val user = parameter.getString("username") + val message = parameter.getString("formattedMessage") + val amount = parameter.getDouble("amount") + val donation: TipeeeStreamDonation = new TipeeeStreamDonation(null, user, message, amount, null, null) + eventHandler.foreach(_.accept(donation)) + } + + @throws[JSONException] + private def Subscription(event: JSONObject): Unit = { + val parameter = event.getJSONObject("parameters") + val user = parameter.getString("username") + val message = parameter.getString("formattedMessage") + val resub = parameter.getInt("resub") + val subscription: TipeeeStreamSubscription = new TipeeeStreamSubscription(null, user, message, resub) + eventHandler.foreach(_.accept(subscription)) + } + + @throws[JSONException] + private def Follow(event: JSONObject): Unit = { + val parameter = event.getJSONObject("parameters") + val user = parameter.getString("username") + val message = parameter.getString("message") + val follow: TipeeeStreamFollow = new TipeeeStreamFollow(null, user, message) + eventHandler.foreach(_.accept(follow)) + } /** * This stops the activity of the connector, e.g. by closing the platform connection. */ - override def stop(): Boolean = ??? + override def stop(): Boolean = { + socket.close() + true + } + + private def getAuthenticationObject: JSONObject = { + val obj = new JSONObject() + obj.put("room", credentials.get.getValue(apiKey).get) + obj.put("username", credentials.get.getValue(username).get) + obj + } } diff --git a/src/main/scala/org/codeoverflow/chatoverflow/requirement/service/tipeeestream/TipeeeStreamListener.scala b/src/main/scala/org/codeoverflow/chatoverflow/requirement/service/tipeeestream/TipeeeStreamListener.scala new file mode 100644 index 00000000..72254420 --- /dev/null +++ b/src/main/scala/org/codeoverflow/chatoverflow/requirement/service/tipeeestream/TipeeeStreamListener.scala @@ -0,0 +1,18 @@ +package org.codeoverflow.chatoverflow.requirement.service.tipeeestream + +import org.codeoverflow.chatoverflow.api.io.dto.event.tipeeestream.TipeeeStreamEvent + +import scala.collection.mutable.ListBuffer + +class TipeeeStreamListener { + + private val messageEventListener = ListBuffer[TipeeeStreamEvent => Unit]() + + def onMessage(event: TipeeeStreamEvent): Unit = { + messageEventListener.foreach(listener => listener(event)) + } + + def addMessageEventListener(listener: TipeeeStreamEvent => Unit): Unit = { + messageEventListener += listener + } +} diff --git a/src/main/scala/org/codeoverflow/chatoverflow/requirement/service/tipeeestream/impl/TipeeeStreamInputImpl.scala b/src/main/scala/org/codeoverflow/chatoverflow/requirement/service/tipeeestream/impl/TipeeeStreamInputImpl.scala index d8b40eec..75059555 100644 --- a/src/main/scala/org/codeoverflow/chatoverflow/requirement/service/tipeeestream/impl/TipeeeStreamInputImpl.scala +++ b/src/main/scala/org/codeoverflow/chatoverflow/requirement/service/tipeeestream/impl/TipeeeStreamInputImpl.scala @@ -1,14 +1,49 @@ package org.codeoverflow.chatoverflow.requirement.service.tipeeestream.impl -import org.codeoverflow.chatoverflow.api.io.input.event.SubscriptionEventInput -import org.codeoverflow.chatoverflow.requirement.Connection +import java.util.function.Consumer + +import org.codeoverflow.chatoverflow.WithLogger +import org.codeoverflow.chatoverflow.api.io.dto.event.tipeeestream.{TipeeeStreamDonation, TipeeeStreamEvent, TipeeeStreamFollow, TipeeeStreamSubscription} +import org.codeoverflow.chatoverflow.api.io.input.chat.TipeeeStreamInput +import org.codeoverflow.chatoverflow.registry.Impl +import org.codeoverflow.chatoverflow.requirement.InputImpl import org.codeoverflow.chatoverflow.requirement.service.tipeeestream.TipeeeStreamConnector -// TODO: This class should have probably a counterpart in the API. Not now, for testing only -class TipeeeStreamInputImpl extends Connection[TipeeeStreamConnector] with SubscriptionEventInput { - override def init(): Boolean = ??? +import scala.collection.mutable.ListBuffer + +@Impl(impl = classOf[TipeeeStreamInput], connector = classOf[TipeeeStreamConnector]) +class TipeeeStreamInputImpl extends InputImpl[TipeeeStreamConnector] with TipeeeStreamInput with WithLogger { + private val donationHandler = ListBuffer[Consumer[TipeeeStreamDonation]]() + private val subscriptionHandler = ListBuffer[Consumer[TipeeeStreamSubscription]]() + private val followHandler = ListBuffer[Consumer[TipeeeStreamFollow]]() + + def onEvent[T <: TipeeeStreamEvent] (event: T ): Unit = { + event match { + case event: TipeeeStreamDonation => donationHandler.foreach(_.accept(event.asInstanceOf[TipeeeStreamDonation])) + case event: TipeeeStreamFollow => followHandler.foreach(_.accept(event.asInstanceOf[TipeeeStreamFollow])) + case event: TipeeeStreamSubscription => subscriptionHandler.foreach(_.accept(event.asInstanceOf[TipeeeStreamSubscription])) + } + } + + override def start(): Boolean = { + sourceConnector.get.addIncomingEventHandler(onEvent) + true + } + + /** + * Let's you register a simple handler immediately react on new subscriptions + * + * @param handler the consumer t ehandle incoming messages + */ + override def registerSubscriptionHandler(handler: Consumer[TipeeeStreamSubscription]): Unit = { + subscriptionHandler += handler + } - override def serialize(): String = ??? + override def registerDonationHandler(handler: Consumer[TipeeeStreamDonation]): Unit = { + donationHandler += handler + } - override def deserialize(value: String): Unit = ??? + override def registerFollowHandler(handler: Consumer[TipeeeStreamFollow]): Unit = { + followHandler += handler + } }