Skip to content
This repository has been archived by the owner on Apr 13, 2022. It is now read-only.

Commit

Permalink
Merge pull request #367 from ScorexFoundation/comments1
Browse files Browse the repository at this point in the history
Readme update, Message scaladoc, externalizing MessageSerializer
  • Loading branch information
kushti committed Aug 6, 2020
2 parents 07d30ca + 7f6b30d commit 940842a
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 76 deletions.
5 changes: 3 additions & 2 deletions README.md
Expand Up @@ -17,7 +17,8 @@ Motivation
* you can fork codebase of a Bitcoin or Ethereum client. However, such clients are optimized towards concrete
protocol, thus implementing something different would be a cumbersome task.
* there are some modular frameworks, such as Scorex, where you can change consensus layer, or transactional
layer, or both. Still, these modules have concrete interfaces, so for many design
layer, or both. Still, these modules have concrete interfaces, so for many designs more low-level and abstract
approach was needed.

We have read a lot of research papers to make Scorex 2 supporting their implementations. Its abstract core
allows for implementing a broad range of systems, including ones with multiple types of blocks and non-linear history.
Expand All @@ -32,7 +33,7 @@ Features
* Asynchronous networking layer on top of TCP
* JSON API
* Cryptographic primitives externalized into [separate scrypto framework](https://github.com/input-output-hk/scrypto)
* Some examples provided, including once working in production
* Some examples provided, including one working in production

Documentation and Communication.
--------------------------------
Expand Down
100 changes: 26 additions & 74 deletions src/main/scala/scorex/core/network/message/Message.scala
@@ -1,112 +1,64 @@
package scorex.core.network.message

import java.nio.ByteOrder

import akka.actor.DeadLetterSuppression
import akka.util.ByteString
import scorex.core.network.{ConnectedPeer, MaliciousBehaviorException}
import scorex.crypto.hash.Blake2b256

import scorex.core.network.ConnectedPeer
import scala.util.{Success, Try}


/**
* Wrapper for a network message, whether come from external peer or generated locally
*
* @param spec - message specification
* @param input - message being wrapped, whether in byte-array form (if from outside),
* or structured data (if formed locally)
* @param source - source peer, if the message is from outside
* @tparam Content - message data type
*/
case class Message[Content](spec: MessageSpec[Content],
input: Either[Array[Byte], Content],
source: Option[ConnectedPeer])
extends DeadLetterSuppression {

import Message._

/**
* Message data bytes
*/
lazy val dataBytes: Array[Byte] = input match {
case Left(db) => db
case Right(d) => spec.toBytes(d)
}

/**
* Structured message content
*/
lazy val data: Try[Content] = input match {
case Left(db) => spec.parseBytesTry(db)
case Right(d) => Success(d)
}

lazy val dataLength: Int = dataBytes.length

/**
* @return serialized message length in bytes
*/
def messageLength: Int = {
if (dataLength > 0) HeaderLength + ChecksumLength + dataLength else HeaderLength
}
}

class MessageSerializer(specs: Seq[MessageSpec[_]], magicBytes: Array[Byte]) {

import Message.{ChecksumLength, HeaderLength, MagicLength}

import scala.language.existentials

private implicit val byteOrder: ByteOrder = ByteOrder.BIG_ENDIAN

private val specsMap = Map(specs.map(s => s.messageCode -> s): _*)
.ensuring(m => m.size == specs.size, "Duplicate message codes")

def serialize(obj: Message[_]): ByteString = {
val builder = ByteString.createBuilder
.putBytes(magicBytes)
.putByte(obj.spec.messageCode)
.putInt(obj.dataLength)

if (obj.dataLength > 0) {
val checksum = Blake2b256.hash(obj.dataBytes).take(ChecksumLength)
builder.putBytes(checksum).putBytes(obj.dataBytes)
}

builder.result()
}

//MAGIC ++ Array(spec.messageCode) ++ Ints.toByteArray(dataLength) ++ dataWithChecksum
def deserialize(byteString: ByteString, sourceOpt: Option[ConnectedPeer]): Try[Option[Message[_]]] = Try {
if (byteString.length < HeaderLength) {
None
if (dataLength > 0) {
HeaderLength + ChecksumLength + dataLength
} else {
val it = byteString.iterator
val magic = it.getBytes(MagicLength)
val msgCode = it.getByte
val length = it.getInt

//peer is from another network
if (!java.util.Arrays.equals(magic, magicBytes)) {
throw MaliciousBehaviorException(s"Wrong magic bytes, expected ${magicBytes.mkString}, got ${magic.mkString}")
}

//peer is trying to cause buffer overflow or breaking the parsing
if (length < 0) {
throw MaliciousBehaviorException("Data length is negative!")
}

val spec = specsMap.getOrElse(msgCode, throw new Error(s"No message handler found for $msgCode"))

if (length != 0 && byteString.length < length + HeaderLength + ChecksumLength) {
None
} else {
val msgData = if (length > 0) {
val checksum = it.getBytes(ChecksumLength)
val data = it.getBytes(length)
val digest = Blake2b256.hash(data).take(ChecksumLength)

//peer reported incorrect checksum
if (!java.util.Arrays.equals(checksum, digest)) {
throw MaliciousBehaviorException(s"Wrong checksum, expected ${checksum.mkString}, got ${checksum.mkString}")
}
data
} else {
Array.empty[Byte]
}

Some(Message(spec, Left(msgData), sourceOpt))
}
HeaderLength
}
}

}

object Message {
type MessageCode = Byte

val MagicLength: Int = 4

val ChecksumLength: Int = 4

val HeaderLength: Int = MagicLength + 5
}
80 changes: 80 additions & 0 deletions src/main/scala/scorex/core/network/message/MessageSerializer.scala
@@ -0,0 +1,80 @@
package scorex.core.network.message

import java.nio.ByteOrder

import akka.util.ByteString
import scorex.core.network.{ConnectedPeer, MaliciousBehaviorException}
import scorex.crypto.hash.Blake2b256
import scala.util.Try


class MessageSerializer(specs: Seq[MessageSpec[_]], magicBytes: Array[Byte]) {

import Message.{ChecksumLength, HeaderLength, MagicLength}

import scala.language.existentials

private implicit val byteOrder: ByteOrder = ByteOrder.BIG_ENDIAN

private val specsMap = Map(specs.map(s => s.messageCode -> s): _*)
.ensuring(m => m.size == specs.size, "Duplicate message codes")

def serialize(obj: Message[_]): ByteString = {
val builder = ByteString.createBuilder
.putBytes(magicBytes)
.putByte(obj.spec.messageCode)
.putInt(obj.dataLength)

if (obj.dataLength > 0) {
val checksum = Blake2b256.hash(obj.dataBytes).take(ChecksumLength)
builder.putBytes(checksum).putBytes(obj.dataBytes)
}

builder.result()
}

//MAGIC ++ Array(spec.messageCode) ++ Ints.toByteArray(dataLength) ++ dataWithChecksum
def deserialize(byteString: ByteString, sourceOpt: Option[ConnectedPeer]): Try[Option[Message[_]]] = Try {
if (byteString.length < HeaderLength) {
None
} else {
val it = byteString.iterator
val magic = it.getBytes(MagicLength)
val msgCode = it.getByte
val length = it.getInt

//peer is from another network
if (!java.util.Arrays.equals(magic, magicBytes)) {
throw MaliciousBehaviorException(s"Wrong magic bytes, expected ${magicBytes.mkString}, got ${magic.mkString}")
}

//peer is trying to cause buffer overflow or breaking the parsing
if (length < 0) {
throw MaliciousBehaviorException("Data length is negative!")
}

val spec = specsMap.getOrElse(msgCode, throw new Error(s"No message handler found for $msgCode"))

if (length != 0 && byteString.length < length + HeaderLength + ChecksumLength) {
None
} else {
val msgData = if (length > 0) {
val checksum = it.getBytes(ChecksumLength)
val data = it.getBytes(length)
val digest = Blake2b256.hash(data).take(ChecksumLength)

//peer reported incorrect checksum
if (!java.util.Arrays.equals(checksum, digest)) {
throw MaliciousBehaviorException(s"Wrong checksum, expected ${checksum.mkString}, got ${checksum.mkString}")
}
data
} else {
Array.empty[Byte]
}

Some(Message(spec, Left(msgData), sourceOpt))
}
}
}

}

0 comments on commit 940842a

Please sign in to comment.