Skip to content

Commit

Permalink
Refactor messages, add ping & pong
Browse files Browse the repository at this point in the history
  • Loading branch information
lavrov committed Nov 13, 2022
1 parent 59dc0e1 commit 2fab204
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 46 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Expand Up @@ -8,6 +8,7 @@ on:
jobs:

release:
if: startsWith(github.ref, 'refs/tags/v')
runs-on: ubuntu-latest
steps:

Expand All @@ -21,7 +22,6 @@ jobs:
java-package: jre

- name: Publish
if: startsWith(github.ref, 'refs/tags/v')
env:
SONATYPE_CREDS: ${{ secrets.SONATYPE_CREDS }}
PGP_SECRET_KEY: ${{ secrets.PGP_SECRET_KEY }}
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
@@ -1,6 +1,6 @@
import sbt.Keys.credentials

lazy val root = project.in(file("."))
lazy val root = project.in(file(".")).aggregate(server)

lazy val protocol = crossProject(JSPlatform, JVMPlatform).crossType(CrossType.Pure)
.settings(commonSettings ++ publishSettings)
Expand Down
Expand Up @@ -4,19 +4,15 @@ import com.github.lavrov.bittorrent.InfoHash
import scodec.bits.ByteVector
import upickle.default.{macroRW, ReadWriter}

sealed trait Command
object Command {
case class RequestTorrent(infoHash: InfoHash, trackers: List[String]) extends Command
sealed trait Message
sealed trait Command extends Message
sealed trait Event extends Message

import CommonFormats.*
implicit val rw: ReadWriter[Command] =
ReadWriter.merge(
macroRW[RequestTorrent],
)
}
object Message {

sealed trait Event
object Event {
case object Ping extends Message
case object Pong extends Message
case class RequestTorrent(infoHash: InfoHash, trackers: List[String]) extends Command
case class RequestAccepted(infoHash: InfoHash) extends Event

case class TorrentPeersDiscovered(infoHash: InfoHash, connected: Int) extends Event
Expand All @@ -27,23 +23,21 @@ object Event {

case class TorrentStats(infoHash: InfoHash, connected: Int, availability: List[Double]) extends Event

import CommonFormats.*
implicit val infoHashRW: ReadWriter[InfoHash] =
implicitly[ReadWriter[String]].bimap(
infoHash => infoHash.bytes.toHex,
string => InfoHash(ByteVector.fromValidHex(string))
)
implicit val fileRW: ReadWriter[File] = macroRW
implicit val eventRW: ReadWriter[Event] =
implicit val eventRW: ReadWriter[Message] =
ReadWriter.merge(
macroRW[Ping.type],
macroRW[Pong.type],
macroRW[RequestTorrent],
macroRW[RequestAccepted],
macroRW[TorrentPeersDiscovered],
macroRW[TorrentMetadataReceived],
macroRW[TorrentError],
macroRW[TorrentStats],
)
}

object CommonFormats {

implicit val infoHashRW: ReadWriter[InfoHash] =
implicitly[ReadWriter[String]].bimap(
infoHash => infoHash.bytes.toHex,
string => InfoHash(ByteVector.fromValidHex(string))
)
}
}
47 changes: 26 additions & 21 deletions server/src/main/scala/SocketSession.scala
Expand Up @@ -4,12 +4,15 @@ import cats.effect.std.Queue
import cats.effect.std.Supervisor
import cats.implicits.*
import com.github.lavrov.bittorrent.InfoHash
import com.github.lavrov.bittorrent.app.protocol.{Command, Event}
import com.github.lavrov.bittorrent.app.protocol.{Message, Event, Command}
import fs2.Stream
import org.legogroup.woof.{Logger, given}
import org.http4s.Response
import org.http4s.server.websocket.WebSocketBuilder
import org.http4s.websocket.WebSocketFrame
import cps.*
import cps.syntax.*
import cps.monads.catsEffect.{*, given}

import scala.concurrent.duration.*
import scala.util.Try
Expand All @@ -30,9 +33,9 @@ object SocketSession {
input <- Queue.unbounded[IO, WebSocketFrame]
output <- Queue.unbounded[IO, WebSocketFrame]
send = (str: String) => output.offer(WebSocketFrame.Text(str))
sendEvent = (e: Event) => send(upickle.default.write(e))
(handler, closeHandler) <- CommandHandler(sendEvent, makeTorrent, metadataRegistry, torrentIndex).allocated
fiber <- processor(input, send, handler).compile.drain.start
sendMessage = (m: Message) => send(upickle.default.write(m))
(handler, closeHandler) <- CommandHandler(sendMessage, makeTorrent, metadataRegistry, torrentIndex).allocated
fiber <- processor(input, sendMessage, handler).compile.drain.start
pingFiber <- (IO.sleep(10.seconds) >> input.offer(WebSocketFrame.Ping())).foreverM.start
response <- webSocketBuilder
.withOnClose(fiber.cancel >> pingFiber.cancel >> closeHandler >> logger.info("Session closed"))
Expand All @@ -44,22 +47,24 @@ object SocketSession {

private def processor(
input: Queue[IO, WebSocketFrame],
send: String => IO[Unit],
send: Message => IO[Unit],
commandHandler: CommandHandler
)(implicit logger: Logger[IO]): Stream[IO, Unit] =
Stream.fromQueueUnterminated(input).evalMap {
case WebSocketFrame.Text("ping", _) =>
send("pong")
case WebSocketFrame.Text(Cmd(command), _) =>
for
_ <- logger.debug(s"Received $command")
_ <- commandHandler.handle(command)
yield ()
case WebSocketFrame.Text(JsonMessage(message), _) =>
message match
case Message.Ping =>
send(Message.Pong)
case command: Command => async[IO] {
!logger.debug(s"Received $command")
!commandHandler.handle(command)
}
case _ => IO.unit
case _ => IO.unit
}

private val Cmd: PartialFunction[String, Command] =
((input: String) => Try(upickle.default.read[Command](input)).toOption).unlift
private val JsonMessage: PartialFunction[String, Message] =
((input: String) => Try(upickle.default.read[Message](input)).toOption).unlift

class CommandHandler(
send: Event => IO[Unit],
Expand All @@ -74,9 +79,9 @@ object SocketSession {

def handle(command: Command): IO[Unit] =
command match
case Command.RequestTorrent(infoHash, trackers) =>
case Message.RequestTorrent(infoHash, trackers) =>
for
_ <- send(Event.RequestAccepted(infoHash))
_ <- send(Message.RequestAccepted(infoHash))
_ <- handleRequestTorrent(InfoHash(infoHash.bytes), trackers)
yield ()

Expand All @@ -91,7 +96,7 @@ object SocketSession {
.flatMap { (phase: ServerTorrent.Phase.FetchingMetadata) =>
phase.fromPeers.discrete
.evalTap { count =>
send(Event.TorrentPeersDiscovered(infoHash, count))
send(Message.TorrentPeersDiscovered(infoHash, count))
}
.interruptWhen(phase.done.void.attempt)
.compile
Expand All @@ -100,8 +105,8 @@ object SocketSession {
}
.flatMap { (phase: ServerTorrent.Phase.Ready) =>
val metadata = phase.serverTorrent.metadata.parsed
val files = metadata.files.map(f => Event.File(f.path, f.length))
send(Event.TorrentMetadataReceived(infoHash, metadata.name, files)) >>
val files = metadata.files.map(f => Message.File(f.path, f.length))
send(Message.TorrentMetadataReceived(infoHash, metadata.name, files)) >>
phase.serverTorrent.pure[IO]
}
.timeout(5.minutes)
Expand All @@ -110,15 +115,15 @@ object SocketSession {
}
}
.orElse {
send(Event.TorrentError(infoHash, "Could not fetch metadata"))
send(Message.TorrentError(infoHash, "Could not fetch metadata"))
}
).void

private def sendTorrentStats(infoHash: InfoHash, torrent: ServerTorrent): IO[Nothing] =
val sendStats =
for
stats <- torrent.stats
_ <- send(Event.TorrentStats(infoHash, stats.connected, stats.availability))
_ <- send(Message.TorrentStats(infoHash, stats.connected, stats.availability))
_ <- IO.sleep(5.seconds)
yield ()
sendStats.foreverM
Expand Down

0 comments on commit 2fab204

Please sign in to comment.