Skip to content

Commit

Permalink
improved sync to work along side witness schedule in a separate proc
Browse files Browse the repository at this point in the history
  • Loading branch information
inertia186 committed Feb 18, 2019
1 parent 9999b7b commit 9b57197
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 32 deletions.
30 changes: 19 additions & 11 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,21 @@ namespace :verify do
max_blocks = args[:max_blocks]
node_url = ENV.fetch('MEESEEKER_NODE_URL', 'https://api.steemit.com')
database_api = Steem::DatabaseApi.new(url: node_url)
until_block_num = nil
mode = ENV.fetch('MEESEEKER_STREAM_MODE', 'head').to_sym
until_block_num = if !!max_blocks
database_api.get_dynamic_global_properties do |dgpo|
raise 'Got empty dynamic_global_properties result.' if dgpo.nil?

case mode
when :head then dgpo.head_block_number
when :irreversible then dgpo.last_irreversible_block_num
else; abort "Unknown block mode: #{mode}"
end
end + max_blocks.to_i
end

Thread.new do
job = Meeseeker::BlockFollowerJob.new
mode = ENV.fetch('MEESEEKER_STREAM_MODE', 'head').to_sym
until_block_num = if !!max_blocks
database_api.get_dynamic_global_properties do |dgpo|
case mode
when :head then dgpo.head_block_number
when :irreversible then dgpo.last_irreversible_block_num
else; abort "Unknown block mode: #{mode}"
end
end + max_blocks.to_i
end

loop do
begin
Expand Down Expand Up @@ -146,6 +147,9 @@ namespace :verify do

if !!max_blocks
if block_num >= until_block_num
# We're done trailing blocks. Typically, this is used by unit
# tests so the test can halt.

subscription.unsubscribe
next
end
Expand All @@ -160,6 +164,8 @@ namespace :verify do
end

database_api.get_dynamic_global_properties do |dgpo|
raise 'Got empty dynamic_global_properties result.' if dgpo.nil?

(block_num - dgpo.last_irreversible_block_num).tap do |offset|
# This will block all channel callbacks until the first known block
# is irreversible. After that, the offsets should mostly go
Expand All @@ -177,6 +183,8 @@ namespace :verify do
expected_ids -= [Meeseeker::VIRTUAL_TRX_ID]

actual_ids, actual_witness = block_api.get_block(block_num: block_num) do |result|
raise 'Got empty block result.' if result.nil? || result.block.nil?

block = result.block
[block.transaction_ids, block.witness]
end
Expand Down
50 changes: 29 additions & 21 deletions lib/meeseeker/block_follower_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,12 @@ def perform(options = {})
}

if Meeseeker.include_block_header
block_api.get_block_header(block_num: block_num) do |result|
block_payload = block_payload.merge(result.header.to_h)
catch :block_header do
block_api.get_block_header(block_num: block_num) do |result|
throw :block_header if result.nil || result.header.nil?

block_payload.merge!(result.header.to_h)
end
end
end

Expand Down Expand Up @@ -81,28 +85,32 @@ def stream_operations(options = {}, &block)
database_api = Steem::DatabaseApi.new(url: Meeseeker.node_url)
last_block_num = redis.get(LAST_BLOCK_NUM_KEY).to_i + 1

database_api.get_dynamic_global_properties do |dgpo|
block_num = case mode
when :head then dgpo.head_block_number
when :irreversible then dgpo.last_irreversible_block_num
else; abort "Unknown stream mode: #{mode}"
end

if Meeseeker.expire_keys == -1
last_block_num = [last_block_num, block_num].max

puts "Sync from: #{last_block_num}"
elsif block_num - last_block_num > Meeseeker.expire_keys / 3
last_block_num = block_num
block_num = catch :dynamic_global_properties do
database_api.get_dynamic_global_properties do |dgpo|
throw :dynamic_global_properties if dgpo.nil?

puts 'Starting new sync.'
else
behind_sec = block_num - last_block_num
behind_sec *= 3.0

puts "Resuming from #{behind_sec / 60} minutes ago ..."
case mode
when :head then dgpo.head_block_number
when :irreversible then dgpo.last_irreversible_block_num
else; abort "Unknown stream mode: #{mode}"
end
end
end

if Meeseeker.expire_keys == -1
last_block_num = [last_block_num, block_num].max

puts "Sync from: #{last_block_num}"
elsif block_num - last_block_num > Meeseeker.expire_keys / 3
last_block_num = block_num

puts 'Starting new sync.'
else
behind_sec = block_num - last_block_num
behind_sec *= 3.0

puts "Resuming from #{behind_sec / 60} minutes ago ..."
end
end

begin
Expand Down

0 comments on commit 9b57197

Please sign in to comment.