-
Notifications
You must be signed in to change notification settings - Fork 0
/
SocketSession.scala
157 lines (146 loc) · 5.21 KB
/
SocketSession.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import cats.effect.kernel.Deferred
import cats.effect.{Concurrent, IO, Resource}
import cats.effect.std.Queue
import cats.effect.std.Supervisor
import cats.implicits.*
import com.github.lavrov.bittorrent.InfoHash
import com.github.lavrov.bittorrent.app.protocol.{Message, Event, Command}
import fs2.Stream
import org.legogroup.woof.{Logger, given}
import org.http4s.Response
import org.http4s.server.websocket.WebSocketBuilder
import org.http4s.websocket.WebSocketFrame
import cps.*
import cps.syntax.*
import cps.monads.catsEffect.{*, given}
import scala.concurrent.duration.*
import scala.util.Try
object SocketSession {
def apply(
makeTorrent: MakeTorrent,
metadataRegistry: MetadataRegistry[IO],
torrentIndex: TorrentIndex,
webSocketBuilder: WebSocketBuilder[IO]
)(implicit
F: Concurrent[IO],
logger: Logger[IO]
): IO[Response[IO]] =
for
_ <- logger.info("Session started")
input <- Queue.unbounded[IO, WebSocketFrame]
output <- Queue.unbounded[IO, WebSocketFrame]
send = (str: String) => output.offer(WebSocketFrame.Text(str))
sendMessage = (m: Message) => send(upickle.default.write(m))
(handler, closeHandler) <- CommandHandler(sendMessage, makeTorrent, metadataRegistry, torrentIndex).allocated
fiber <- processor(input, sendMessage, handler).compile.drain.start
pingFiber <- (IO.sleep(10.seconds) >> input.offer(WebSocketFrame.Ping())).foreverM.start
response <- webSocketBuilder
.withOnClose(fiber.cancel >> pingFiber.cancel >> closeHandler >> logger.info("Session closed"))
.build(
Stream.fromQueueUnterminated(output),
_.evalMap(input.offer),
)
yield response
private def processor(
input: Queue[IO, WebSocketFrame],
send: Message => IO[Unit],
commandHandler: CommandHandler
)(implicit logger: Logger[IO]): Stream[IO, Unit] =
Stream.fromQueueUnterminated(input).evalMap {
case WebSocketFrame.Text(JsonMessage(message), _) =>
message match
case Message.Ping =>
send(Message.Pong)
case command: Command => async[IO] {
!logger.debug(s"Received $command")
!commandHandler.handle(command)
}
case _ => IO.unit
case _ => IO.unit
}
private val JsonMessage: PartialFunction[String, Message] =
((input: String) => Try(upickle.default.read[Message](input)).toOption).unlift
class CommandHandler(
send: Event => IO[Unit],
makeTorrent: MakeTorrent,
metadataRegistry: MetadataRegistry[IO],
torrentIndex: TorrentIndex,
supervisor: Supervisor[IO]
)(implicit
F: Concurrent[IO],
logger: Logger[IO]
) {
def handle(command: Command): IO[Unit] =
command match
case Message.RequestTorrent(infoHash, trackers) =>
for
_ <- send(Message.RequestAccepted(infoHash))
_ <- handleRequestTorrent(InfoHash(infoHash.bytes), trackers)
yield ()
private def handleRequestTorrent(infoHash: InfoHash, trackers: List[String]): IO[Unit] =
supervisor.supervise(
makeTorrent(infoHash, trackers)
.use { getTorrent =>
getTorrent
.flatMap { (phase: ServerTorrent.Phase.PeerDiscovery) =>
phase.done
}
.flatMap { (phase: ServerTorrent.Phase.FetchingMetadata) =>
phase.fromPeers.discrete
.evalTap { count =>
send(Message.TorrentPeersDiscovered(infoHash, count))
}
.interruptWhen(phase.done.void.attempt)
.compile
.drain >>
phase.done
}
.flatMap { (phase: ServerTorrent.Phase.Ready) =>
val metadata = phase.serverTorrent.metadata.parsed
val files = metadata.files.map(f => Message.File(f.path, f.length))
send(Message.TorrentMetadataReceived(infoHash, metadata.name, files)) >>
phase.serverTorrent.pure[IO]
}
.timeout(5.minutes)
.flatMap { torrent =>
sendTorrentStats(infoHash, torrent)
}
}
.orElse {
send(Message.TorrentError(infoHash, "Could not fetch metadata"))
}
).void
private def sendTorrentStats(infoHash: InfoHash, torrent: ServerTorrent): IO[Nothing] =
val sendStats =
for
stats <- torrent.stats
_ <- send(Message.TorrentStats(infoHash, stats.connected, stats.availability))
_ <- IO.sleep(5.seconds)
yield ()
sendStats.foreverM
}
object CommandHandler {
def apply(
send: Event => IO[Unit],
makeTorrent: MakeTorrent,
metadataRegistry: MetadataRegistry[IO],
torrentIndex: TorrentIndex
)(implicit
F: Concurrent[IO],
logger: Logger[IO]
): Resource[IO, CommandHandler] =
for
supervisor <- Supervisor[IO]
yield
new CommandHandler(
send,
makeTorrent,
metadataRegistry,
torrentIndex,
supervisor
)
}
trait MakeTorrent {
def apply(infoHash: InfoHash, trackers: List[String]): Resource[IO, IO[ServerTorrent.Phase.PeerDiscovery]]
}
}