Skip to content

Commit

Permalink
Merge branch 'network-clock-improve' into 'master'
Browse files Browse the repository at this point in the history
Network clock improve

See merge request open-platform/chain!245
  • Loading branch information
evgeny-krylov committed Oct 17, 2018
2 parents d768295 + 3b51348 commit f03ef1c
Show file tree
Hide file tree
Showing 29 changed files with 315 additions and 236 deletions.
Expand Up @@ -5,8 +5,8 @@ import io.openfuture.chain.consensus.service.EpochService
import io.openfuture.chain.core.component.NodeKeyHolder
import io.openfuture.chain.core.service.GenesisBlockService
import io.openfuture.chain.core.service.MainBlockService
import io.openfuture.chain.core.sync.SyncStatus
import io.openfuture.chain.core.sync.SyncStatus.SyncStatusType.SYNCHRONIZED
import io.openfuture.chain.core.sync.SyncState
import io.openfuture.chain.core.sync.SyncState.SyncStatusType.SYNCHRONIZED
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
Expand All @@ -23,7 +23,7 @@ class BlockProductionScheduler(
private val genesisBlockService: GenesisBlockService,
private val pendingBlockHandler: PendingBlockHandler,
private val consensusProperties: ConsensusProperties,
private val syncStatus: SyncStatus
private val syncState: SyncState
) {

companion object {
Expand All @@ -40,11 +40,11 @@ class BlockProductionScheduler(
}

private fun proceedProductionLoop() {
if (SYNCHRONIZED != syncStatus.getSyncStatus()) {
return
}

try {
if (SYNCHRONIZED != syncState.getNodeStatus()) {
log.warn("Node is not synchronized")
return
}
val slotOwner = epochService.getCurrentSlotOwner()
if (genesisBlockService.isGenesisBlockRequired()) {
val genesisBlock = genesisBlockService.create()
Expand Down
Expand Up @@ -4,7 +4,7 @@ import io.openfuture.chain.consensus.property.ConsensusProperties
import io.openfuture.chain.core.model.entity.Delegate
import io.openfuture.chain.core.model.entity.block.MainBlock
import io.openfuture.chain.core.service.GenesisBlockService
import io.openfuture.chain.network.component.NodeClock
import io.openfuture.chain.network.component.time.Clock
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.util.*
Expand All @@ -13,7 +13,7 @@ import java.util.*
class DefaultEpochService(
private val genesisBlockService: GenesisBlockService,
private val properties: ConsensusProperties,
private val clock: NodeClock
private val clock: Clock
) : EpochService {

override fun getEpochStart(): Long = genesisBlockService.getLast().timestamp
Expand All @@ -27,7 +27,7 @@ class DefaultEpochService(
@Transactional(readOnly = true)
override fun getCurrentSlotOwner(): Delegate {
val genesisBlock = genesisBlockService.getLast()
val random = Random(genesisBlock.height + getSlotNumber(clock.networkTime()))
val random = Random(genesisBlock.height + getSlotNumber(clock.currentTimeMillis()))
return genesisBlock.payload.activeDelegates.shuffled(random).first()
}

Expand All @@ -36,12 +36,12 @@ class DefaultEpochService(

override fun isInIntermission(time: Long): Boolean = (getTimeSlotFromStart(time) >= properties.timeSlotDuration!!)

override fun timeToNextTimeSlot(): Long = (getFullTimeSlotDuration() - getTimeSlotFromStart(clock.networkTime()))
override fun timeToNextTimeSlot(): Long = (getFullTimeSlotDuration() - getTimeSlotFromStart(clock.currentTimeMillis()))

override fun getSlotNumber(time: Long): Long = ((time - getEpochStart()) / getFullTimeSlotDuration())

override fun getEpochEndTime(): Long =
(getEpochStart() + getSlotNumber(clock.networkTime()) * getFullTimeSlotDuration())
(getEpochStart() + getSlotNumber(clock.currentTimeMillis()) * getFullTimeSlotDuration())

override fun getFullTimeSlotDuration(): Long = (properties.timeSlotDuration!! + properties.timeSlotInterval!!)

Expand Down
Expand Up @@ -2,8 +2,8 @@ package io.openfuture.chain.core.aspect

import io.openfuture.chain.core.annotation.BlockchainSynchronized
import io.openfuture.chain.core.exception.SynchronizationException
import io.openfuture.chain.core.sync.SyncStatus
import io.openfuture.chain.core.sync.SyncStatus.SyncStatusType.SYNCHRONIZED
import io.openfuture.chain.core.sync.SyncState
import io.openfuture.chain.core.sync.SyncState.SyncStatusType.SYNCHRONIZED
import org.aspectj.lang.annotation.Aspect
import org.aspectj.lang.annotation.Before
import org.springframework.stereotype.Component
Expand All @@ -12,13 +12,13 @@ import org.springframework.stereotype.Component
@Aspect
@Component
class BlockchainSynchronizationAspect(
private val syncStatus: SyncStatus
private val syncStatus: SyncState
) {

@Before("@annotation(annotation)")
fun annotated(annotation: BlockchainSynchronized) {
if (syncStatus.getSyncStatus() != SYNCHRONIZED) {
throw SynchronizationException("Application is not synchronized! Current sync status: ${syncStatus.getSyncStatus()}!")
if (syncStatus.getChainStatus() != SYNCHRONIZED) {
throw SynchronizationException("Application is not synchronized! Current sync status: ${syncStatus.getChainStatus()}!")
}
}

Expand Down
Expand Up @@ -12,8 +12,8 @@ import io.openfuture.chain.core.service.DefaultDelegateService
import io.openfuture.chain.core.service.GenesisBlockService
import io.openfuture.chain.core.service.WalletService
import io.openfuture.chain.core.sync.BlockchainLock
import io.openfuture.chain.core.sync.SyncStatus
import io.openfuture.chain.core.sync.SyncStatus.SyncStatusType.NOT_SYNCHRONIZED
import io.openfuture.chain.core.sync.SyncState
import io.openfuture.chain.core.sync.SyncState.SyncStatusType.NOT_SYNCHRONIZED
import io.openfuture.chain.crypto.util.SignatureUtils
import io.openfuture.chain.network.message.sync.GenesisBlockMessage
import io.openfuture.chain.rpc.domain.base.PageRequest
Expand All @@ -29,7 +29,7 @@ class DefaultGenesisBlockService(
delegateService: DefaultDelegateService,
repository: GenesisBlockRepository,
private val keyHolder: NodeKeyHolder,
private val syncStatus: SyncStatus,
private val syncStatus: SyncState,
private val consensusProperties: ConsensusProperties
) : BaseBlockService<GenesisBlock>(repository, blockService, walletService, delegateService), GenesisBlockService {

Expand Down Expand Up @@ -91,7 +91,7 @@ class DefaultGenesisBlockService(
val block = GenesisBlock.of(message, delegates)

if (!isSync(block)) {
syncStatus.setSyncStatus(NOT_SYNCHRONIZED)
syncStatus.setChainStatus(NOT_SYNCHRONIZED)
return
}

Expand Down
Expand Up @@ -16,11 +16,11 @@ import io.openfuture.chain.core.model.entity.transaction.unconfirmed.Unconfirmed
import io.openfuture.chain.core.repository.MainBlockRepository
import io.openfuture.chain.core.service.*
import io.openfuture.chain.core.sync.BlockchainLock
import io.openfuture.chain.core.sync.SyncStatus
import io.openfuture.chain.core.sync.SyncStatus.SyncStatusType.NOT_SYNCHRONIZED
import io.openfuture.chain.core.sync.SyncState
import io.openfuture.chain.core.sync.SyncState.SyncStatusType.NOT_SYNCHRONIZED
import io.openfuture.chain.crypto.util.HashUtils
import io.openfuture.chain.crypto.util.SignatureUtils
import io.openfuture.chain.network.component.NodeClock
import io.openfuture.chain.network.component.time.Clock
import io.openfuture.chain.network.message.consensus.PendingBlockMessage
import io.openfuture.chain.network.message.core.*
import io.openfuture.chain.rpc.domain.base.PageRequest
Expand All @@ -38,15 +38,15 @@ class DefaultMainBlockService(
repository: MainBlockRepository,
walletService: WalletService,
delegateService: DelegateService,
private val clock: NodeClock,
private val clock: Clock,
private val keyHolder: NodeKeyHolder,
private val walletVoteService: WalletVoteService,
private val voteTransactionService: VoteTransactionService,
private val delegateTransactionService: DelegateTransactionService,
private val transferTransactionService: TransferTransactionService,
private val rewardTransactionService: RewardTransactionService,
private val consensusProperties: ConsensusProperties,
private val syncStatus: SyncStatus,
private val syncStatus: SyncState,
private val throughput: TransactionThroughput
) : BaseBlockService<MainBlock>(repository, blockService, walletService, delegateService), MainBlockService {

Expand Down Expand Up @@ -76,7 +76,7 @@ class DefaultMainBlockService(
override fun create(): PendingBlockMessage {
BlockchainLock.readLock.lock()
try {
val timestamp = clock.networkTime()
val timestamp = clock.currentTimeMillis()
val lastBlock = blockService.getLast()
val height = lastBlock.height + 1
val previousHash = lastBlock.hash
Expand Down Expand Up @@ -118,7 +118,7 @@ class DefaultMainBlockService(
BlockchainLock.readLock.lock()
try {
if (!isSync(MainBlock.of(message))) {
syncStatus.setSyncStatus(NOT_SYNCHRONIZED)
syncStatus.setChainStatus(NOT_SYNCHRONIZED)
return false
}

Expand All @@ -143,7 +143,7 @@ class DefaultMainBlockService(

val block = MainBlock.of(message)
if (!isSync(block)) {
syncStatus.setSyncStatus(NOT_SYNCHRONIZED)
syncStatus.setChainStatus(NOT_SYNCHRONIZED)
return
}

Expand Down
@@ -1,8 +1,8 @@
package io.openfuture.chain.core.sync

import io.openfuture.chain.core.sync.SyncStatus.SyncStatusType.NOT_SYNCHRONIZED
import io.openfuture.chain.core.sync.SyncStatus.SyncStatusType.PROCESSING
import io.openfuture.chain.network.component.NodeClock
import io.openfuture.chain.core.sync.SyncState.SyncStatusType.NOT_SYNCHRONIZED
import io.openfuture.chain.core.sync.SyncState.SyncStatusType.PROCESSING
import io.openfuture.chain.network.component.time.Clock
import io.openfuture.chain.network.property.NodeProperties
import io.openfuture.chain.network.service.NetworkApiService
import org.springframework.scheduling.annotation.Scheduled
Expand All @@ -11,10 +11,10 @@ import org.springframework.stereotype.Component
@Component
class SyncBlockScheduler(
private val nodeProperties: NodeProperties,
private val syncStatus: SyncStatus,
private val syncState: SyncState,
private val syncManager: SyncManager,
private val networkApiService: NetworkApiService,
private val nodeClock: NodeClock
private val clock: Clock
) {

@Scheduled(fixedRateString = "\${node.synchronization-interval}")
Expand All @@ -23,13 +23,13 @@ class SyncBlockScheduler(
return
}

if (syncStatus.getSyncStatus() == NOT_SYNCHRONIZED || (syncStatus.getSyncStatus() == PROCESSING
if (syncState.getChainStatus() == NOT_SYNCHRONIZED || (syncState.getChainStatus() == PROCESSING
&& isResponseTimeOut())) {
syncManager.synchronize()
}
}

private fun isResponseTimeOut(): Boolean =
nodeClock.networkTime() - syncManager.getLastResponseTime() > nodeProperties.synchronizationResponseDelay!!
clock.currentTimeMillis() - syncManager.getLastResponseTime() > nodeProperties.synchronizationResponseDelay!!

}
22 changes: 11 additions & 11 deletions src/main/kotlin/io/openfuture/chain/core/sync/SyncManager.kt
Expand Up @@ -3,9 +3,9 @@ package io.openfuture.chain.core.sync
import io.openfuture.chain.core.service.BlockService
import io.openfuture.chain.core.service.GenesisBlockService
import io.openfuture.chain.core.service.MainBlockService
import io.openfuture.chain.core.sync.SyncStatus.SyncStatusType.PROCESSING
import io.openfuture.chain.core.sync.SyncStatus.SyncStatusType.SYNCHRONIZED
import io.openfuture.chain.network.component.NodeClock
import io.openfuture.chain.core.sync.SyncState.SyncStatusType.PROCESSING
import io.openfuture.chain.core.sync.SyncState.SyncStatusType.SYNCHRONIZED
import io.openfuture.chain.network.component.time.Clock
import io.openfuture.chain.network.entity.NodeInfo
import io.openfuture.chain.network.message.core.BlockMessage
import io.openfuture.chain.network.message.sync.*
Expand All @@ -23,8 +23,8 @@ class SyncManager(
private val networkApiService: NetworkApiService,
private val mainBlockService: MainBlockService,
private val genesisBlockService: GenesisBlockService,
private val syncStatus: SyncStatus,
private val nodeClock: NodeClock
private val syncStatus: SyncState,
private val clock: Clock
) {

companion object {
Expand All @@ -44,7 +44,7 @@ class SyncManager(
private var expectedHash: String = EMPTY

@Volatile
private var lastResponseTime: Long = nodeClock.networkTime()
private var lastResponseTime: Long = clock.currentTimeMillis()


@Synchronized
Expand All @@ -69,7 +69,7 @@ class SyncManager(
val delegateAddresses = activeDelegatesLastHash[message.hash]
if (null != delegateAddresses && !delegateAddresses.contains(nodeInfo)) {
delegateAddresses.add(nodeInfo)
if (delegateAddresses.size > (activeDelegateAddresses.size - 1) / 3 * 2) {
if (delegateAddresses.size > (activeDelegateAddresses.size * 2 / 3)) {
val currentLastHash = blockService.getLast().hash
if (currentLastHash == message.hash) {
unlock()
Expand Down Expand Up @@ -111,25 +111,25 @@ class SyncManager(
if (expectedHash == block.hash) {
unlock()
} else {
lastResponseTime = nodeClock.networkTime()
lastResponseTime = clock.currentTimeMillis()
}
}

private fun processing() {
reset()
syncStatus.setSyncStatus(PROCESSING)
syncStatus.setChainStatus(PROCESSING)
}

private fun reset() {
activeDelegateAddresses.clear()
activeDelegatesLastHash.clear()
expectedHash = EMPTY
lastResponseTime = nodeClock.networkTime()
lastResponseTime = clock.currentTimeMillis()
sessionId = UUID.randomUUID().toString()
}

private fun unlock() {
syncStatus.setSyncStatus(SYNCHRONIZED)
syncStatus.setChainStatus(SYNCHRONIZED)
}

}
43 changes: 43 additions & 0 deletions src/main/kotlin/io/openfuture/chain/core/sync/SyncState.kt
@@ -0,0 +1,43 @@
package io.openfuture.chain.core.sync

import io.openfuture.chain.core.sync.SyncState.SyncStatusType.NOT_SYNCHRONIZED
import org.springframework.stereotype.Component
import kotlin.math.min

@Component
class SyncState {

@Volatile
private var chainStatus: SyncStatusType = NOT_SYNCHRONIZED

@Volatile
private var clockStatus: SyncStatusType = NOT_SYNCHRONIZED


@Synchronized
fun getChainStatus(): SyncStatusType = chainStatus

@Synchronized
fun getClockStatus(): SyncStatusType = clockStatus

@Synchronized
fun getNodeStatus(): SyncStatusType = SyncStatusType.values()
.first { min(chainStatus.priority, clockStatus.priority) == it.priority }

@Synchronized
fun setChainStatus(status: SyncStatusType) {
chainStatus = status
}

@Synchronized
fun setClockStatus(status: SyncStatusType) {
clockStatus = status
}

enum class SyncStatusType(val priority: Int) {
SYNCHRONIZED(3),
PROCESSING(2),
NOT_SYNCHRONIZED(1)
}

}
27 changes: 0 additions & 27 deletions src/main/kotlin/io/openfuture/chain/core/sync/SyncStatus.kt

This file was deleted.

0 comments on commit f03ef1c

Please sign in to comment.