Skip to content

Commit

Permalink
Send transaction (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
tmedetbekov committed Oct 26, 2018
1 parent b247ca6 commit 2bdfd1d
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class TransactionMessage() : Message("tx") {
}

override fun getPayload(): ByteArray {
TODO("not implemented")
return transaction.toByteArray()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ class Peer(val host: String, private val network: NetworkParameters, private val
}

private val peerConnection = PeerConnection(host, network, this)
private var relayedTransactions: MutableMap<ByteArray, Transaction> = mutableMapOf()

private var tasks = mutableListOf<PeerTask>()

var connected = false
var synced = false
var blockHashesSynced = false
Expand Down Expand Up @@ -64,12 +63,9 @@ class Peer(val host: String, private val network: NetworkParameters, private val
is TransactionMessage -> handleTransactionMessage(message)
is InvMessage -> handleInvMessage(message)
is GetDataMessage -> {

//handle relayed transactions
message.inventory.filter { it.type == InventoryItem.MSG_TX }.forEach {
relayedTransactions[it.hash]?.let { tx ->
peerConnection.sendMessage(TransactionMessage(tx))
relayedTransactions.remove(tx.hash)
for (inv in message.inventory) {
for (task in tasks) if (task.handleGetDataInventoryItem(inv)) {
break
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ import io.horizontalsystems.bitcoinkit.managers.BloomFilterManager
import io.horizontalsystems.bitcoinkit.models.InventoryItem
import io.horizontalsystems.bitcoinkit.models.MerkleBlock
import io.horizontalsystems.bitcoinkit.models.Transaction
import io.horizontalsystems.bitcoinkit.network.PeerTask.GetBlockHashesTask
import io.horizontalsystems.bitcoinkit.network.PeerTask.GetMerkleBlocksTask
import io.horizontalsystems.bitcoinkit.network.PeerTask.PeerTask
import io.horizontalsystems.bitcoinkit.network.PeerTask.RequestTransactionsTask
import io.horizontalsystems.bitcoinkit.network.PeerTask.*
import io.horizontalsystems.bitcoinkit.transactions.TransactionSyncer
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
Expand All @@ -27,12 +24,15 @@ class PeerGroup(private val peerManager: PeerManager, val bloomFilterManager: Bl

private val logger = Logger.getLogger("PeerGroup")
private val peerMap = ConcurrentHashMap<String, Peer>()
private var pendingTransactions: MutableList<Transaction> = mutableListOf()

// @Volatile???
private var syncPeer: Peer? = null

@Volatile
private var running = false
private val syncPeerQueue = Executors.newSingleThreadExecutor()
private val localQueue = Executors.newSingleThreadExecutor()

init {
bloomFilterManager.listener = this
Expand Down Expand Up @@ -83,8 +83,6 @@ class PeerGroup(private val peerManager: PeerManager, val bloomFilterManager: Bl
}
}

private val syncPeerQueue = Executors.newSingleThreadExecutor()

override fun connected(peer: Peer) {
peerMap[peer.host] = peer
bloomFilterManager.bloomFilter?.let {
Expand Down Expand Up @@ -149,12 +147,15 @@ class PeerGroup(private val peerManager: PeerManager, val bloomFilterManager: Bl
}

fun relay(transaction: Transaction) {
TODO()
localQueue.execute {
pendingTransactions.add(transaction)
dispatchTasks()
}
}

override fun onReady(peer: Peer) {
if (peer == syncPeer) {
downloadBlockchain()
localQueue.execute {
dispatchTasks(peer)
}
}

Expand Down Expand Up @@ -225,6 +226,26 @@ class PeerGroup(private val peerManager: PeerManager, val bloomFilterManager: Bl
}
}

private fun dispatchTasks(peer: Peer? = null) {
if (peer != null)
return handleReady(peer)

peerMap.values.filter { it.ready }.forEach {
handleReady(it)
}
}

private fun handleReady(peer: Peer) {
if (!peer.ready)
return

if (peer == syncPeer)
return downloadBlockchain()

pendingTransactions.forEach { peer.addTask(RelayTransactionTask(it)) }
pendingTransactions = mutableListOf()
}

private fun handleRelayedTransaction(hash: ByteArray): Boolean {
return peerMap.any { (_, peer) -> peer.handleRelayedTransaction(hash) }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.horizontalsystems.bitcoinkit.network.PeerTask

import io.horizontalsystems.bitcoinkit.models.InventoryItem
import io.horizontalsystems.bitcoinkit.models.Transaction

class RelayTransactionTask(val transaction: Transaction) : PeerTask() {
override fun start() {
requester?.sendTransactionInventory(transaction.hash)
}

override fun handleGetDataInventoryItem(item: InventoryItem): Boolean {
if (item.type == InventoryItem.MSG_TX && item.hash.contentEquals(transaction.hash)) {
requester?.send(transaction)

return true
}

return false
}

override fun handleRelayedTransaction(hash: ByteArray): Boolean {
if (hash.contentEquals(transaction.hash)) {
delegate?.onTaskCompleted(this)

return true
}

return false
}
}
Original file line number Diff line number Diff line change
@@ -1,65 +1,61 @@
package io.horizontalsystems.bitcoinkit.network

import android.content.Context
import com.nhaarman.mockito_kotlin.whenever
import io.horizontalsystems.bitcoinkit.managers.BloomFilterManager
import io.realm.Realm
import io.realm.RealmConfiguration
import io.realm.internal.RealmCore
import io.horizontalsystems.bitcoinkit.models.Transaction
import io.horizontalsystems.bitcoinkit.network.PeerTask.RelayTransactionTask
import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith
import org.mockito.Mockito.*
import org.mockito.Mockito.mock
import org.mockito.Mockito.verify
import org.powermock.api.mockito.PowerMockito
import org.powermock.api.mockito.PowerMockito.mockStatic
import org.powermock.core.classloader.annotations.PrepareForTest
import org.powermock.modules.junit4.PowerMockRunner
import java.net.SocketTimeoutException

@RunWith(PowerMockRunner::class)
@PrepareForTest(PeerGroup::class, Realm::class, RealmConfiguration::class, RealmCore::class)
@PrepareForTest(PeerGroup::class)

class PeerGroupTest {
private lateinit var peerGroup: PeerGroup
private lateinit var peer: Peer
private lateinit var peer2: Peer
private lateinit var peerManager: PeerManager
private lateinit var bloomFilterManager: BloomFilterManager

private var peer1 = mock(Peer::class.java)
private var peer2 = mock(Peer::class.java)
private var peerManager = mock(PeerManager::class.java)
private var bloomFilterManager = mock(BloomFilterManager::class.java)
private var relayTransactionTask = mock(RelayTransactionTask::class.java)

private val peerIp = "8.8.8.8"
private val peerIp2 = "5.5.5.5"
private val network = MainNet()

@Before
fun setup() {
peerManager = mock(PeerManager::class.java)
bloomFilterManager = mock(BloomFilterManager::class.java)
peerGroup = PeerGroup(peerManager, bloomFilterManager, network, 2)
peer = mock(Peer::class.java)
peer2 = mock(Peer::class.java)
whenever(peer.host).thenReturn(peerIp)
whenever(peer1.host).thenReturn(peerIp)
whenever(peer2.host).thenReturn(peerIp2)

whenever(peerManager.getPeerIp())
.thenReturn(peerIp, peerIp2)

// Peer
PowerMockito.whenNew(Peer::class.java)
.withAnyArguments()
.thenReturn(peer, peer2)
.thenReturn(peer1, peer2)

// Realm initialize
mockStatic(Realm::class.java)
mockStatic(RealmConfiguration::class.java)
mockStatic(RealmCore::class.java)
// RelayTransactionTask
PowerMockito.whenNew(RelayTransactionTask::class.java)
.withAnyArguments()
.thenReturn(relayTransactionTask)

RealmCore.loadLibrary(any(Context::class.java))
peerGroup = PeerGroup(peerManager, bloomFilterManager, network, 2)
}

@Test
fun run() { // creates peer connection with given IP address
peerGroup.start()

Thread.sleep(500L)
verify(peer).start()
verify(peer1).start()

// close thread:
peerGroup.close()
Expand All @@ -69,8 +65,20 @@ class PeerGroupTest {

@Test
fun disconnected_withError() { // removes peer from connection list
peerGroup.disconnected(peer, SocketTimeoutException("Some Error"))
peerGroup.disconnected(peer1, SocketTimeoutException("Some Error"))

verify(peerManager).markFailed(peerIp)
}

@Test
fun relay() { // send transaction
whenever(peer1.ready).thenReturn(true)
peerGroup.connected(peer1)

val transaction = Transaction()
peerGroup.relay(transaction)

Thread.sleep(100) // wait thread executor
verify(peer1).addTask(relayTransactionTask)
}
}

0 comments on commit 2bdfd1d

Please sign in to comment.