Skip to content
This repository was archived by the owner on Aug 18, 2020. It is now read-only.

Commit 53d90a6

Browse files
committed
Add initial StreamElements implementation
1 parent 8478eaf commit 53d90a6

File tree

3 files changed

+224
-0
lines changed

3 files changed

+224
-0
lines changed
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package org.codeoverflow.chatoverflow.requirement.service.streamelements
2+
3+
import io.socket.client.Socket._
4+
import io.socket.client.{IO, Socket}
5+
import org.codeoverflow.chatoverflow.WithLogger
6+
import org.codeoverflow.chatoverflow.connector.EventConnector
7+
import org.json.JSONObject
8+
9+
class StreamElementsConnector(sourceIdentifier: String) extends EventConnector(sourceIdentifier) with WithLogger {
10+
override protected var requiredCredentialKeys: List[String] = List("jwt-token")
11+
override protected var optionalCredentialKeys: List[String] = List()
12+
13+
private val TIMEOUT = 10000
14+
private val SOCKET_URL = "https://realtime.streamelements.com"
15+
private var socket: Option[Socket] = None
16+
private var connected: Option[Boolean] = None
17+
private val listener = new StreamElementsListener
18+
listener.registerEventHandler((e, ct) => call(e)(ct)) // pass events to in/output.
19+
20+
override def start(): Boolean = {
21+
logger info "Connecting to the StreamElements websocket..."
22+
23+
val opts = new IO.Options()
24+
opts.transports = Array("websocket")
25+
26+
socket = Some(IO.socket(SOCKET_URL, opts).connect())
27+
registerSocketEvents(socket.get)
28+
29+
connected.synchronized {
30+
connected.wait(TIMEOUT)
31+
}
32+
33+
connected.getOrElse({
34+
logger warn "Could not connect to StreamElements socket: Timed out!"
35+
false
36+
})
37+
}
38+
39+
private def registerSocketEvents(s: Socket): Unit = {
40+
def setConnected(isConnected: Boolean): Unit = connected.synchronized {
41+
connected.notify()
42+
connected = Some(isConnected)
43+
}
44+
45+
s.on(EVENT_CONNECT, (_: Any) => {
46+
logger info "Successfully connected to the StreamElements websocket."
47+
48+
val authObj = new JSONObject()
49+
.put("method", "jwt")
50+
.put("token", credentials.get.getValue("jwt-token").get)
51+
52+
logger info "Authenticating with the StreamElements websocket..."
53+
s.emit("authenticate", authObj)
54+
})
55+
56+
s.on(EVENT_CONNECT_ERROR, (e: Array[AnyRef]) => {
57+
logger warn s"Could not connect to StreamElements socket:"
58+
logger warn e.mkString(", ")
59+
60+
setConnected(false)
61+
})
62+
63+
s.on(EVENT_CONNECT_TIMEOUT, (_: Any) => {
64+
setConnected(false)
65+
})
66+
67+
s.on(EVENT_ERROR, (e: Array[AnyRef]) => {
68+
logger warn s"StreamElements($sourceIdentifier) socket error:"
69+
logger warn e.mkString(", ")
70+
})
71+
72+
s.on(EVENT_DISCONNECT, (_: Any) => {
73+
logger info "Disconnected from the StreamElements websocket."
74+
})
75+
76+
s.on("authenticated", (_: Any) => {
77+
logger info "Successfully authenticated to the StreamElements websocket."
78+
79+
setConnected(true)
80+
})
81+
82+
s.on("event", (event: Array[AnyRef]) => listener.handleEvent(event))
83+
}
84+
85+
override def stop(): Boolean = {
86+
if (socket.isDefined) {
87+
socket.get.close()
88+
}
89+
connected = None
90+
socket = None
91+
true
92+
}
93+
}
94+
95+
object StreamElementsConnector {
96+
private[streamelements] sealed class StreamElementsEventJSON(json: JSONObject)
97+
private[streamelements] case class SubscriptionEventJSON(json: JSONObject) extends StreamElementsEventJSON(json)
98+
private[streamelements] case class DonationEventJSON(json: JSONObject) extends StreamElementsEventJSON(json)
99+
private[streamelements] case class FollowEventJSON(json: JSONObject) extends StreamElementsEventJSON(json)
100+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package org.codeoverflow.chatoverflow.requirement.service.streamelements
2+
3+
import org.codeoverflow.chatoverflow.requirement.impl.EventManager
4+
import org.codeoverflow.chatoverflow.requirement.service.streamelements.StreamElementsConnector._
5+
import org.json.JSONObject
6+
7+
/**
8+
* Listener for websocket events that are emitted by the StreamElements websocket server.
9+
*/
10+
class StreamElementsListener extends EventManager {
11+
12+
def handleEvent(objects: Array[AnyRef]): Unit = {
13+
val json = objects(0).asInstanceOf[JSONObject]
14+
15+
val eventType = json.optString("type")
16+
if (eventType != null) {
17+
val provider = json.getString("provider")
18+
19+
eventType match {
20+
// Youtube's description differs from the usual ones.
21+
// Youtube's subscription is more like a follow of e.g. Twitch or Twitter, it is free for the user,
22+
// Youtube's sponsor is like a Twitch subscription, a paid extra for some perks and
23+
// Youtube's superchat is like a donation/tip, a donation of money that the streamer gets.
24+
// To unify this across all platforms a Youtube sub is a Follow, a Youtube sponsor is a subscription and
25+
// a Youtube superchat is a donation.
26+
case "subscription" if provider == "youtube" => call(FollowEventJSON(json))
27+
case "sponsor" => call(SubscriptionEventJSON(json))
28+
case "superchat" => call(DonationEventJSON(json))
29+
30+
// Twitch
31+
case "subscription" => call(SubscriptionEventJSON(json))
32+
case "tip" => call(DonationEventJSON(json))
33+
case "follow" => call(FollowEventJSON(json))
34+
case _ =>
35+
}
36+
}
37+
}
38+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package org.codeoverflow.chatoverflow.requirement.service.streamelements.impl
2+
3+
import java.time.format.DateTimeFormatter
4+
import java.time.{LocalDateTime, OffsetDateTime, ZoneOffset}
5+
import java.util.Currency
6+
7+
import org.codeoverflow.chatoverflow.api.io.dto.User
8+
import org.codeoverflow.chatoverflow.api.io.dto.stat.stream.streamelements.{StreamElementsDonation, StreamElementsFollow, StreamElementsProvider, StreamElementsSubscription}
9+
import org.codeoverflow.chatoverflow.api.io.event.stream.streamelements.{StreamElementsDonationEvent, StreamElementsEvent, StreamElementsFollowEvent, StreamElementsSubscriptionEvent}
10+
import org.codeoverflow.chatoverflow.api.io.input.event.StreamElementsEventInput
11+
import org.codeoverflow.chatoverflow.registry.Impl
12+
import org.codeoverflow.chatoverflow.requirement.impl.EventInputImpl
13+
import org.codeoverflow.chatoverflow.requirement.service.streamelements.StreamElementsConnector
14+
import org.codeoverflow.chatoverflow.requirement.service.streamelements.StreamElementsConnector._
15+
import org.json.JSONObject
16+
17+
@Impl(impl = classOf[StreamElementsEventInput], connector = classOf[StreamElementsConnector])
18+
class StreamElementsEventInputImpl extends EventInputImpl[StreamElementsEvent, StreamElementsConnector] with StreamElementsEventInput {
19+
20+
override def start(): Boolean = {
21+
sourceConnector.get.registerEventHandler(onFollow _)
22+
sourceConnector.get.registerEventHandler(onSubscription _)
23+
sourceConnector.get.registerEventHandler(onDonation _)
24+
true
25+
}
26+
27+
private def onFollow(event: FollowEventJSON): Unit = {
28+
val json = event.json
29+
val data = json.getJSONObject("data")
30+
31+
val follow = new StreamElementsFollow(
32+
parseUser(data),
33+
parseTime(json),
34+
parseProvider(json)
35+
)
36+
call(new StreamElementsFollowEvent(follow))
37+
}
38+
39+
private def onSubscription(event: SubscriptionEventJSON): Unit = {
40+
val json = event.json
41+
val data = json.getJSONObject("data")
42+
43+
val sub = new StreamElementsSubscription(
44+
parseUser(data),
45+
data.getDouble("amount").toInt,
46+
parseTime(json),
47+
parseProvider(json),
48+
data.optBoolean("gifted", false)
49+
)
50+
call(new StreamElementsSubscriptionEvent(sub))
51+
}
52+
53+
private def onDonation(event: DonationEventJSON): Unit = {
54+
val json = event.json
55+
val data = json.getJSONObject("data")
56+
57+
val donation = new StreamElementsDonation(
58+
parseUser(data),
59+
data.getDouble("amount").toFloat,
60+
Currency.getInstance(data.getString("currency")),
61+
parseTime(json),
62+
data.getString("message")
63+
)
64+
call(new StreamElementsDonationEvent(donation))
65+
}
66+
67+
override def stop(): Boolean = {
68+
sourceConnector.get.unregisterAllEventListeners
69+
true
70+
}
71+
72+
// Common methods for JSON processing:
73+
74+
private def parseProvider(json: JSONObject): StreamElementsProvider = StreamElementsProvider.parse(json.getString("provider"))
75+
76+
private def parseUser(json: JSONObject): User = {
77+
val username = json.getString("username")
78+
val displayName = json.optString("displayName", username)
79+
new User(username, displayName)
80+
}
81+
82+
private def parseTime(json: JSONObject): OffsetDateTime = {
83+
val utcString = json.getString("createdAt")
84+
LocalDateTime.parse(utcString, DateTimeFormatter.ISO_DATE_TIME).atOffset(ZoneOffset.UTC)
85+
}
86+
}

0 commit comments

Comments
 (0)