Skip to content

Commit

Permalink
Wip: Node refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Christewart committed Feb 20, 2019
1 parent 6439bf1 commit b671122
Show file tree
Hide file tree
Showing 26 changed files with 580 additions and 250 deletions.
6 changes: 3 additions & 3 deletions node/src/main/scala/org/bitcoins/node/Main.scala
Expand Up @@ -20,10 +20,10 @@ object Main extends App {
val startHeader = BlockHeaderSyncActor.StartHeaders(Seq(gensisBlockHash))
Constants.database.executor*/
val blockHeaderSyncActor = BlockHeaderSyncActor(Constants.actorSystem,
/* val blockHeaderSyncActor = BlockHeaderSyncActor(Constants.actorSystem,
Constants.dbConfig,
Constants.networkParameters)
blockHeaderSyncActor ! StartAtLastSavedHeader
Constants.networkParameters)*/
//blockHeaderSyncActor ! StartAtLastSavedHeader
}

}
Expand Up @@ -5,7 +5,7 @@ import org.bitcoins.core.config.{MainNet, NetworkParameters, RegTest, TestNet3}
import org.bitcoins.core.protocol.blockchain.{ChainParams, MainNetChainParams, RegTestNetChainParams, TestNetChainParams}
import org.bitcoins.node.db.{DbConfig, MainNetDbConfig, RegTestDbConfig, TestNet3DbConfig}
import org.bitcoins.node.messages.control.VersionMessage
import org.bitcoins.node.versions.{ProtocolVersion70013, ProtocolVersion70015}
import org.bitcoins.node.versions.ProtocolVersion70013
import slick.jdbc.PostgresProfile.api._

import scala.concurrent.duration.DurationInt
Expand Down
Expand Up @@ -14,7 +14,7 @@ import scala.concurrent.{ExecutionContext, Future}
* You are responsible for the create function. You also need to specify
* the table and the database you are connecting to.
*/
trait CRUD[T, PrimaryKeyType] extends BitcoinSLogger {
abstract class CRUD[T, PrimaryKeyType] extends BitcoinSLogger {
implicit def ec: ExecutionContext

/** The table inside our database we are inserting into */
Expand Down
32 changes: 32 additions & 0 deletions node/src/main/scala/org/bitcoins/node/models/CRUDAutoInc.scala
@@ -0,0 +1,32 @@
package org.bitcoins.node.models
import slick.dbio.Effect.Write
import slick.jdbc.PostgresProfile.api._
import slick.lifted.TableQuery

import scala.concurrent.Future

abstract class CRUDAutoInc[T <: DbRowAutoInc[T]] extends CRUD[T,Long] {

/** The table inside our database we are inserting into */
val table: TableQuery[_ <: TableAutoInc[T]]


override def createAll(ts: Vector[T]): Future[Vector[T]] = {
val query = table
.returning(table.map(_.id))
.into((t, id) => t.copyWithId(id = id))
val actions: Vector[DBIOAction[query.SingleInsertResult, NoStream, Write]] =
ts.map(r => query.+=(r))
database.runVec(DBIO.sequence(actions))
}

override def findByPrimaryKeys(ids: Vector[Long]): Query[Table[_], T, Seq] = {
table.filter(_.id.inSet(ids))
}


override def findAll(ts: Vector[T]): Query[Table[_], T, Seq] = {
val ids = ts.filter(_.id.isDefined).map(_.id.get)
findByPrimaryKeys(ids)
}
}
18 changes: 17 additions & 1 deletion node/src/main/scala/org/bitcoins/node/models/ColumnMappers.scala
@@ -1,8 +1,9 @@
package org.bitcoins.node.models

import org.bitcoins.core.crypto.DoubleSha256Digest
import org.bitcoins.core.number.{Int32, UInt32}
import org.bitcoins.core.number.{Int32, UInt32, UInt64}
import org.bitcoins.core.protocol.transaction.TransactionOutput
import org.bitcoins.node.messages.control.ServiceIdentifier
import slick.jdbc.PostgresProfile.api._

/**
Expand Down Expand Up @@ -38,6 +39,21 @@ trait ColumnMappers {
TransactionOutput(_)
)
}

implicit val uint64Mapper: BaseColumnType[UInt64] = {
MappedColumnType.base[UInt64, BigDecimal](
{ u64: UInt64 => BigDecimal(u64.toBigInt.bigInteger) } ,
//this has the potential to throw
{ bigDec: BigDecimal => UInt64(bigDec.toBigIntExact().get) }
)
}

implicit val serviceIdentifierMapper: BaseColumnType[ServiceIdentifier] = {
MappedColumnType.base[ServiceIdentifier, UInt64](
_.num,
ServiceIdentifier(_)
)
}
}

object ColumnMappers extends ColumnMappers
17 changes: 17 additions & 0 deletions node/src/main/scala/org/bitcoins/node/models/DbRowAutoInc.scala
@@ -0,0 +1,17 @@
package org.bitcoins.node.models


/** This is meant to be coupled with [[org.bitcoins.node.models.CRUDAutoInc]]
* and [[TableAutoInc]] to allow for automatically incrementing an id
* when inserting something into a database. This removes the boiler
* boiler plate from this having to happen every where a [[CRUD]]
* is created
*/
abstract class DbRowAutoInc[T] {

def id: Option[Long]


def copyWithId(id: Long): T

}
24 changes: 24 additions & 0 deletions node/src/main/scala/org/bitcoins/node/models/Peer.scala
@@ -0,0 +1,24 @@
package org.bitcoins.node.models

import java.net.InetSocketAddress

import org.bitcoins.node.util.NetworkIpAddress

case class Peer(id: Option[Long], networkIpAddress: NetworkIpAddress) extends DbRowAutoInc[Peer] {

def socket: InetSocketAddress = new InetSocketAddress(networkIpAddress.address, networkIpAddress.port)


override def copyWithId(id: Long): Peer = {
this.copy(id = Some(id))
}


}


object Peer {
def fromNetworkIpAddress(networkIpAddress: NetworkIpAddress): Peer = {
Peer(None, networkIpAddress)
}
}
21 changes: 21 additions & 0 deletions node/src/main/scala/org/bitcoins/node/models/PeerDAO.scala
@@ -0,0 +1,21 @@
package org.bitcoins.node.models

import org.bitcoins.node.db.DbConfig
import slick.jdbc.PostgresProfile.api._

import scala.concurrent.ExecutionContext

abstract class PeerDAO extends CRUDAutoInc[Peer] {
override val table = TableQuery[PeerTable]
}


object PeerDAO {
private case class PeerDAOImpl(dbConfig: DbConfig) (override implicit val ec: ExecutionContext) extends PeerDAO



def apply(dbConfig: DbConfig)(implicit ec: ExecutionContext): PeerDAO = {
PeerDAOImpl(dbConfig)
}
}
10 changes: 10 additions & 0 deletions node/src/main/scala/org/bitcoins/node/models/PeerTable.scala
@@ -0,0 +1,10 @@
package org.bitcoins.node.models

import slick.jdbc.PostgresProfile.api._
import slick.lifted.Tag

class PeerTable(tag: Tag) extends TableAutoInc[Peer](tag, "peer_table") {


def * = ???
}
18 changes: 18 additions & 0 deletions node/src/main/scala/org/bitcoins/node/models/TableAutoInc.scala
@@ -0,0 +1,18 @@
package org.bitcoins.node.models


import slick.jdbc.PostgresProfile.api._


/** Defines a table that has an auto incremented fields that is named id.
* This is useful for things we want to store that don't have an
* inherent id such as a hash.
* @param tag
* @param tableName
* @tparam T
*/
abstract class TableAutoInc[T](tag: Tag, tableName: String) extends Table[T](tag,tableName) {

def id: Rep[Long] = column[Long]("id",O.PrimaryKey,O.AutoInc)

}
Expand Up @@ -10,7 +10,6 @@ import org.bitcoins.node.constant.Constants
import org.bitcoins.node.db.DbConfig
import org.bitcoins.node.messages.data.{GetDataMessage, Inventory}
import org.bitcoins.node.messages.{BlockMessage, MsgBlock}
import org.bitcoins.node.networking.peer.PeerMessageHandler

/**
* Created by chris on 7/10/16.
Expand All @@ -19,9 +18,10 @@ sealed abstract class BlockActor extends Actor with BitcoinSLogger {

def dbConfig: DbConfig

def peerMsgHandler: ActorRef

def receive: Receive = LoggingReceive {
case hash: DoubleSha256Digest =>
val peerMsgHandler = PeerMessageHandler(dbConfig)(context.system)
val inv = Inventory(MsgBlock, hash)
val getDataMessage = GetDataMessage(inv)
val networkMessage =
Expand All @@ -40,12 +40,14 @@ sealed abstract class BlockActor extends Actor with BitcoinSLogger {
}

object BlockActor {
private case class BlockActorImpl(dbConfig: DbConfig) extends BlockActor
private case class BlockActorImpl(peerMsgHandler: ActorRef, dbConfig: DbConfig) extends BlockActor

def props(dbConfig: DbConfig): Props = Props(classOf[BlockActorImpl], dbConfig)
def props(peerMsgHandler: ActorRef,dbConfig: DbConfig): Props = {
Props(classOf[BlockActorImpl], peerMsgHandler, dbConfig)
}

def apply(dbConfig: DbConfig)(implicit context: ActorContext): ActorRef = {
context.actorOf(props(dbConfig))
def apply(peerMsgHandler: ActorRef,dbConfig: DbConfig)(implicit context: ActorContext): ActorRef = {
context.actorOf(props(peerMsgHandler,dbConfig))
}

}
38 changes: 27 additions & 11 deletions node/src/main/scala/org/bitcoins/node/networking/Client.scala
Expand Up @@ -9,9 +9,7 @@ import org.bitcoins.core.util.BitcoinSLogger
import org.bitcoins.node.NetworkMessage
import org.bitcoins.node.constant.Constants
import org.bitcoins.node.messages.NetworkPayload
import org.bitcoins.node.util.BitcoinSpvNodeUtil
import org.bitcoins.node.NetworkMessage
import org.bitcoins.node.messages._
import org.bitcoins.node.networking.peer.PeerMessageReceiver
import org.bitcoins.node.util.BitcoinSpvNodeUtil
import scodec.bits.ByteVector

Expand All @@ -23,14 +21,14 @@ import scodec.bits.ByteVector
* with the p2p network. It's responsibly is to deal with low
* level [[Tcp.Message]].
*
* If the [[Client]] receives a [[NetworkMessage]], from a [[org.bitcoins.node.networking.peer.PeerMessageHandler]]
* If the [[Client]] receives a [[NetworkMessage]], from a [[org.bitcoins.node.networking.peer.PeerMessageSender]]
* it serializes the message to it to a [[akka.util.ByteString]] and then sends it to the [[manager]]
* which streams the data to our peer on the bitcoin network.
*
* If the [[Client]] receives a [[Tcp.Received]] message, it means we have received
* a message from our peer on the bitcoin p2p network. This means we try to parse
* the [[ByteString]] into a [[NetworkMessage]]. If we successfully parse the message
* we relay that message to the [[org.bitcoins.node.networking.peer.PeerMessageHandler]]
* we relay that message to the [[org.bitcoins.node.networking.peer.PeerMessageSender]]
* that created the Client Actor.
*
* In this class you will see a 'unalignedBytes' value passed around in a lot of methods
Expand All @@ -39,7 +37,15 @@ import scodec.bits.ByteVector
* CANNOT fit in a single tcp packet. This means we must cache
* the bytes and wait for the rest of them to be sent.
*/
sealed abstract class Client extends Actor with BitcoinSLogger {
sealed abstract class ClientActor extends Actor with BitcoinSLogger {


/** The place we send messages that we successfully parsed from our
* peer on the p2p network. This is mostly likely a [[org.bitcoins.node.networking.peer.PeerMessageSender]]
*
* @return
*/
def peerMsgHandlerReceiver: PeerMessageReceiver

/**
* The manager is an actor that handles the underlying low level I/O resources (selectors, channels)
Expand Down Expand Up @@ -155,7 +161,7 @@ sealed abstract class Client extends Actor with BitcoinSLogger {
//send them to 'context.parent' -- this is the
//PeerMessageHandler that is responsible for
//creating this Client Actor
messages.foreach(m => context.parent ! m)
messages.foreach(m => peerMsgHandlerReceiver.actor ! m)

newUnalignedBytes
}
Expand Down Expand Up @@ -189,11 +195,21 @@ sealed abstract class Client extends Actor with BitcoinSLogger {

}


case class Client(peer: ActorRef)


object Client {
private case class ClientImpl() extends Client
private case class ClientActorImpl(peerMsgHandlerReceiver: PeerMessageReceiver) extends ClientActor

def props: Props = Props(classOf[ClientImpl])
def props(peerMsgHandlerReceiver: PeerMessageReceiver): Props = Props(classOf[ClientActorImpl], peerMsgHandlerReceiver)

def apply(context: ActorContext, peerMessageReceiver: PeerMessageReceiver): Client = {
val peer = context.actorOf(
props(peerMsgHandlerReceiver = peerMessageReceiver),
BitcoinSpvNodeUtil.createActorName(this.getClass))

Client(peer)
}

def apply(context: ActorContext): ActorRef =
context.actorOf(props, BitcoinSpvNodeUtil.createActorName(this.getClass))
}
35 changes: 12 additions & 23 deletions node/src/main/scala/org/bitcoins/node/networking/PaymentActor.scala
Expand Up @@ -15,7 +15,6 @@ import org.bitcoins.node.db.DbConfig
import org.bitcoins.node.messages._
import org.bitcoins.node.messages.control.FilterLoadMessage
import org.bitcoins.node.messages.data.{GetDataMessage, Inventory}
import org.bitcoins.node.networking.peer.PeerMessageHandler
import org.bitcoins.node.util.BitcoinSpvNodeUtil

/**
Expand All @@ -33,10 +32,12 @@ import org.bitcoins.node.util.BitcoinSpvNodeUtil
* to our peer on the network to see if the tx was included on that block
* 7.) If it was, send the actor that that requested this [[PaymentActor.SuccessfulPayment]] message back
*/
sealed trait PaymentActor extends Actor with BitcoinSLogger {
sealed abstract class PaymentActor extends Actor with BitcoinSLogger {

def dbConfig: DbConfig

def peerMsgHandler: ActorRef

def receive = LoggingReceive {
case hash: Sha256Hash160Digest =>
paymentToHash(hash)
Expand All @@ -50,25 +51,9 @@ sealed trait PaymentActor extends Actor with BitcoinSLogger {
val bloomFilter =
BloomFilter(10, 0.0001, UInt32.zero, BloomUpdateNone).insert(hash)
val filterLoadMsg = FilterLoadMessage(bloomFilter)
val peerMsgHandler = PeerMessageHandler(dbConfig)(context.system)
val bloomFilterNetworkMsg =
NetworkMessage(Constants.networkParameters, filterLoadMsg)
peerMsgHandler ! bloomFilterNetworkMsg
logger.debug("Switching to awaitTransactionInventoryMessage")
context.become(awaitTransactionInventoryMessage(hash, peerMsgHandler))
}

/** Waits for a transaction inventory message on the p2p network,
* once we receive one we switch to teh awaitTransactionGetDataMessage context */
def awaitTransactionInventoryMessage(
hash: Sha256Hash160Digest,
peerMessageHandler: ActorRef): Receive = LoggingReceive {
case invMsg: InventoryMessage =>
//txs are broadcast by nodes on the network when they are seen by a node
//filter out the txs we do not care about
val txInventories = invMsg.inventories.filter(_.typeIdentifier == MsgTx)
handleTransactionInventoryMessages(txInventories, peerMessageHandler)
context.become(awaitTransactionGetDataMessage(hash, peerMessageHandler))
}

/** Awaits for a [[GetDataMessage]] that requested a transaction. We can also fire off more [[GetDataMessage]] inside of this context */
Expand Down Expand Up @@ -101,12 +86,16 @@ sealed trait PaymentActor extends Actor with BitcoinSLogger {
/** Sends a [[GetDataMessage]] to get the full transaction for a transaction inventory message */
private def handleTransactionInventoryMessages(
inventory: Seq[Inventory],
peerMessageHandler: ActorRef) =
peerMessageHandler: ActorRef): Unit = {
for {
txInv <- inventory
inventory = GetDataMessage(txInv)
} yield peerMessageHandler ! inventory

()
}


/** This context waits for a block announcement on the network,
* then constructs a [[MerkleBlockMessage]] to check
* if the txid was included in that block */
Expand Down Expand Up @@ -165,12 +154,12 @@ sealed trait PaymentActor extends Actor with BitcoinSLogger {
}

object PaymentActor {
private case class PaymentActorImpl(dbConfig: DbConfig) extends PaymentActor
private case class PaymentActorImpl(peerMsgHandler: ActorRef, dbConfig: DbConfig) extends PaymentActor

def props(dbConfig: DbConfig): Props = Props(classOf[PaymentActorImpl], dbConfig)
def props(peerMsgHandler: ActorRef, dbConfig: DbConfig): Props = Props(classOf[PaymentActorImpl], peerMsgHandler, dbConfig)

def apply(dbConfig: DbConfig)(implicit context: ActorRefFactory): ActorRef =
context.actorOf(props(dbConfig), BitcoinSpvNodeUtil.createActorName(this.getClass))
def apply(peerMsgHandler: ActorRef, dbConfig: DbConfig)(implicit context: ActorRefFactory): ActorRef =
context.actorOf(props(peerMsgHandler, dbConfig), BitcoinSpvNodeUtil.createActorName(this.getClass))

sealed trait PaymentActorMessage
case class SuccessfulPayment(
Expand Down

0 comments on commit b671122

Please sign in to comment.