Skip to content

Commit

Permalink
added steem:witness:schedule channel
Browse files Browse the repository at this point in the history
  • Loading branch information
inertia186 committed Feb 18, 2019
1 parent 4b4312b commit 9999b7b
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 11 deletions.
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ When `meeseeker sync` starts for the first time, it initializes from the last ir

For `redis-cli`, please see: https://redis.io/topics/pubsub

Channels available for `meeseeker`:
##### Sync

When running `meeseeker sync`, the following channels are available:

* `steem:block`
* `steem:transaction`
Expand Down Expand Up @@ -193,6 +195,24 @@ end

Many other clients are supported: https://redis.io/clients

##### Witness Schedule

When running `meeseeker witness:schedule`, the `steem:witness:schedule` channel is available. This is offered as a separate command because most applications don't need to worry about this level of blockchain logistics.

For example, from `redis-cli`, if we wanted to subscribe to the witness schedule:

```
$ redis-cli
127.0.0.1:6379> subscribe steem:witness:schedule
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "steem:witness:schedule"
3) (integer) 1
1) "message"
2) "steem:witness:schedule"
3) "{\"id\":0,\"current_virtual_time\":\"415293532210075480213212125\",\"next_shuffle_block_num\":30035208,\"current_shuffled_witnesses\":[\"thecryptodrive\",\"timcliff\",\"utopian-io\",\"themarkymark\",\"aggroed\",\"smooth.witness\",\"someguy123\",\"gtg\",\"followbtcnews\",\"yabapmatt\",\"therealwolf\",\"ausbitbank\",\"curie\",\"clayop\",\"drakos\",\"blocktrades\",\"good-karma\",\"roelandp\",\"lukestokes.mhth\",\"liondani\",\"anyx\"],\"num_scheduled_witnesses\":21,\"elected_weight\":1,\"timeshare_weight\":5,\"miner_weight\":1,\"witness_pay_normalization_factor\":25,\"median_props\":{\"account_creation_fee\":{\"amount\":\"3000\",\"precision\":3,\"nai\":\"@@000000021\"},\"maximum_block_size\":65536,\"sbd_interest_rate\":0,\"account_subsidy_budget\":797,\"account_subsidy_decay\":347321},\"majority_version\":\"0.20.8\",\"max_voted_witnesses\":20,\"max_miner_witnesses\":0,\"max_runner_witnesses\":1,\"hardfork_required_witnesses\":17,\"account_subsidy_rd\":{\"resource_unit\":10000,\"budget_per_time_unit\":797,\"pool_eq\":157691079,\"max_pool_size\":157691079,\"decay_params\":{\"decay_per_time_unit\":347321,\"decay_per_time_unit_denom_shift\":36},\"min_decay\":0},\"account_subsidy_witness_rd\":{\"resource_unit\":10000,\"budget_per_time_unit\":996,\"pool_eq\":9384019,\"max_pool_size\":9384019,\"decay_params\":{\"decay_per_time_unit\":7293741,\"decay_per_time_unit_denom_shift\":36},\"min_decay\":257},\"min_witness_account_subsidy_decay\":0}"
```

#### Using `SCAN`

From the redis manual:
Expand Down
164 changes: 162 additions & 2 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ require "bundler/gem_tasks"
require "rake/testtask"
require 'meeseeker'

defined? Thread.report_on_exception and Thread.report_on_exception = true

Rake::TestTask.new(:test) do |t|
t.libs << 'test'
t.libs << 'lib'
Expand Down Expand Up @@ -44,6 +46,14 @@ task(:sync, [:at_block_num] => [:check_schema]) do |t, args|
job.perform(at_block_num: args[:at_block_num])
end

namespace :witness do
desc 'Publish the witness schedule every minute or so (steem:witness:schedule).'
task :schedule do
job = Meeseeker::WitnessScheduleJob.new
job.perform
end
end

task(:find, [:what, :key] => [:check_schema]) do |t, args|
redis = Meeseeker.redis
match = case args[:what].downcase.to_sym
Expand Down Expand Up @@ -77,8 +87,6 @@ end
namespace :verify do
desc 'Verifies transactions land where they should.'
task :block_org, [:max_blocks] do |t, args|
defined? Thread.report_on_exception and Thread.report_on_exception = true

max_blocks = args[:max_blocks]
node_url = ENV.fetch('MEESEEKER_NODE_URL', 'https://api.steemit.com')
database_api = Steem::DatabaseApi.new(url: node_url)
Expand Down Expand Up @@ -204,4 +212,156 @@ namespace :verify do
end
end
end

namespace :witness do
desc 'Verifies witnessses in the schedule produced a block.'
task :schedule, [:max_blocks] do |t, args|
max_blocks = args[:max_blocks]
node_url = ENV.fetch('MEESEEKER_NODE_URL', 'https://api.steemit.com')
database_api = Steem::DatabaseApi.new(url: node_url)
mode = ENV.fetch('MEESEEKER_STREAM_MODE', 'head').to_sym
until_block_num = if !!max_blocks
database_api.get_dynamic_global_properties do |dgpo|
raise 'Got empty dynamic_global_properties result.' if dgpo.nil?

case mode
when :head then dgpo.head_block_number
when :irreversible then dgpo.last_irreversible_block_num
else; abort "Unknown block mode: #{mode}"
end
end + max_blocks.to_i
end

Thread.new do
job = Meeseeker::WitnessScheduleJob.new

loop do
begin
job.perform(mode: mode, until_block_num: until_block_num)
rescue => e
puts e.inspect
sleep 5
end

break # success
end

puts 'Background sync finished ...'
end

begin
block_api = Steem::BlockApi.new(url: node_url)
schedule_channel = 'steem:witness:schedule'
redis_url = ENV.fetch('MEESEEKER_REDIS_URL', 'redis://127.0.0.1:6379/0')
subscription = Redis.new(url: redis_url)
ctx = Redis.new(url: redis_url)
timeout = (max_blocks).to_i * 3

subscribe_mode, subscribe_args = if timeout > 0
[:subscribe_with_timeout, [timeout, [schedule_channel]]]
else
[:subscribe, [[schedule_channel]]]
end

# Check if the redis context is still available right before we
# subscribe.
break unless subscription.ping == 'PONG'

subscription.send(subscribe_mode, *subscribe_args) do |on|
on.subscribe do |channel, subscriptions|
puts "Subscribed to ##{channel} (subscriptions: #{subscriptions})"
end

on.message do |channel, message|
payload = JSON[message]
next_shuffle_block_num = payload['next_shuffle_block_num']
current_shuffled_witnesses = payload['current_shuffled_witnesses']
num_witnesses = current_shuffled_witnesses.size
from_block_num = next_shuffle_block_num - num_witnesses + 1
to_block_num = from_block_num + num_witnesses - 1
block_range = from_block_num..to_block_num # typically 21 blocks

if !!max_blocks
if block_range.include? until_block_num
# We're done trailing blocks. Typically, this is used by unit
# tests so the test can halt.

subscription.unsubscribe
end
end

begin
# We write witnesses to this hash until all 21 produce blocks.
actual_witnesses = {}
tries = 0

while actual_witnesses.size != num_witnesses
# Allow the immediate node to catch up in case it's behind by a
# block.
sleep 3

# Typically, nodes will allow up to 50 block headers in one
# request, if backed by jussi. We only need 21, so each
# request should only make a single response with the entire
# round. Under normal circumstances, this call happens only
# once. But if the there's additional p2p or cache latency,
# it might have missing headers.

block_api.get_block_headers(block_range: block_range) do |header, block_num|
unless !!header
# Can happen when there's excess p2p latency and/or jussi
# cache is under load.
puts "Waiting for block header: #{block_num}"

next
end

actual_witnesses[header.witness] = block_num
end

break if (tries += 1) > 5
end

# If there are multiple tries due to high p2p latency, even though
# we got all 21 block headers, seeing this message could be an
# early-warning of other problems on the blockchain.

# If there's a missing block header, this will always show 5
# tries.

puts "Tries: #{tries}" if tries > 1

missing_witnesses = current_shuffled_witnesses - actual_witnesses.keys
extra_witnesses = actual_witnesses.keys - current_shuffled_witnesses

if missing_witnesses.any? || extra_witnesses.any?
puts "Expected only these witness to produce a block in #{block_range}."
puts "Missing witnesses: #{missing_witnesses.join(', ')}"
puts "Extra witnesses: #{extra_witnesses.join(', ')}"

puts "\nWitnesses and block numbers in range:"
actual_witnesses.sort_by{ |k, v| v }.each do |k, v|
puts "#{v}: #{k}"
end
puts "Count: #{actual_witnesses.size}"

# Non-zero exit to notify the shell caller that there's a
# problem.

exit(-(missing_witnesses.size + extra_witnesses.size))
end
end

# Perfect round.

puts "Found all #{num_witnesses} expected witnesses in block range #{block_range}: √"
end

on.unsubscribe do |channel, subscriptions|
puts "Unsubscribed from ##{channel} (subscriptions: #{subscriptions})"
end
end
end
end
end
end
6 changes: 4 additions & 2 deletions bin/meeseeker
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ filename = __FILE__.split('/').last

case ARGV[0]
when 'console' then Rake::Task['console'].invoke
when 'sync'
when 'sync', 'witness:schedule'
backoff = 0.01
max_backoff = 30

loop do; begin
Rake::Task['sync'].invoke(ARGV[1])
Rake::Task[ARGV[0]].invoke(ARGV[1])
rescue => e
puts "Error: #{e.inspect}"
backoff = [backoff, max_backoff].min
Expand All @@ -36,6 +36,8 @@ when 'reset' then Rake::Task['reset'].invoke
else
puts "\nBegin/resume sync:"
puts "\t#{filename} sync\n\n"
puts "Publish witness schedule:"
puts "\t#{filename} witness:schedule\n\n"
puts "Start in the ruby console:"
puts "\t#{filename} console\n\n"
puts 'Find block or transaction:'
Expand Down
1 change: 1 addition & 0 deletions lib/meeseeker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

require 'meeseeker/version'
require 'meeseeker/block_follower_job'
require 'meeseeker/witness_schedule_job'

module Meeseeker
LAST_BLOCK_NUM_KEY = 'steem:meeseeker:last_block_num'
Expand Down
68 changes: 68 additions & 0 deletions lib/meeseeker/witness_schedule_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
module Meeseeker
class WitnessScheduleJob
def perform(options = {})
database_api = Steem::DatabaseApi.new(url: Meeseeker.node_url)
redis = Meeseeker.redis
mode = options.delete(:mode) || Meeseeker.stream_mode
schedule = nil
last_shuffle_block_num = nil

loop do
# Using hammer assignment will ensure we only request a new schedule
# after we've published.

schedule ||= catch :witness_schedule do
database_api.get_witness_schedule do |result|
throw :witness_schedule if result.nil?

result
end
end

next_shuffle_block_num = schedule.next_shuffle_block_num
block_num = catch :dynamic_global_properties do
database_api.get_dynamic_global_properties do |dgpo|
throw :dynamic_global_properties if dgpo.nil?

case mode
when :head then dgpo.head_block_number
when :irreversible then dgpo.last_irreversible_block_num
else; abort "Unknown stream mode: #{mode}"
end
end
end

# Find out how far away we are from the next schedule.

remaining_blocks = [next_shuffle_block_num - block_num - 1.5, 0].max

# It's better for the schedule to publish a little late than to miss
# an entire schedule, so we subtract 1.5 blocks from the total.
# Sometimes we check a little early and sometimes we check a little
# late. But it all averages out.

if remaining_blocks > 0
delay = [remaining_blocks * 3.0, 0.25].max
puts "Sleeping for #{delay} seconds (remaining blocks: #{remaining_blocks})."
sleep delay
next
end

# Now that we've reached the current schedule, check if we've published
# it already. If not, publish and reset for the next schedule.

if next_shuffle_block_num != last_shuffle_block_num
puts "next_shuffle_block_num: #{next_shuffle_block_num}; current_shuffled_witnesses: #{schedule.current_shuffled_witnesses.join(', ')}"
redis.publish('steem:witness:schedule', schedule.to_json)
last_shuffle_block_num = next_shuffle_block_num
end

schedule = nil # re-enabled hammer assignment

if !!options[:until_block_num]
break if block_num >= options[:until_block_num].to_i
end
end
end
end
end
15 changes: 9 additions & 6 deletions test/meeseeker/meeseeker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ def setup
Rake.application.init
Rake.application.load_rakefile
Dir.chdir(pwd)
end

def test_verify_block_org
max_blocks = 30 # must be at least 15 to get past irreversible

if !!Meeseeker.redis.get(Meeseeker::LAST_BLOCK_NUM_KEY)
fail "Found existing keys. Please use 'rake reset' to enable this test."
end

end

def test_verify_all_jobs
max_blocks = 30 # must be at least 15 to get past irreversible

assert Rake::Task['verify:block_org'].invoke(max_blocks)
assert Rake::Task['reset'].invoke
assert Rake::Task['verify:witness:schedule'].invoke(max_blocks)

Rake::Task['reset'].invoke
end
end
end

0 comments on commit 9999b7b

Please sign in to comment.