Skip to content

Commit

Permalink
Merge pull request #175 from waveform80/reduce-duplicates
Browse files Browse the repository at this point in the history
Reduce duplicates
  • Loading branch information
waveform80 committed Jun 29, 2019
2 parents 81e4285 + fe41291 commit 610fe61
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 45 deletions.
15 changes: 9 additions & 6 deletions piwheels/master/big_brother.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def __init__(self, config):
'downloads_last_month': 0,
'downloads_all': 0,
}
self.last_run = datetime.now(tz=UTC) - timedelta(seconds=40)
self.last_stats_run = self.last_search_run = (
datetime.now(tz=UTC) - timedelta(minutes=10))
stats_queue = self.socket(
transport.PULL, protocol=protocols.big_brother)
stats_queue.hwm = 10
Expand Down Expand Up @@ -104,18 +105,20 @@ def handle_stats(self, queue):
self.stats['builds_pending'] = sum(data.values())
elif msg == 'HOME':
# Forced rebuild from Mr. Chase
self.last_run = datetime.now(tz=UTC) - timedelta(seconds=40)
self.last_search_run = datetime.now(tz=UTC) - timedelta(minutes=10)

def loop(self):
# Leave 30 seconds between each run of the stats and (expensive) search
# index queries
if datetime.now(tz=UTC) - self.last_run > timedelta(seconds=30):
# Leave 15 seconds between each run of the stats
if datetime.now(tz=UTC) - self.last_stats_run > timedelta(seconds=30):
rec = self.db.get_statistics()
# Rename a couple of columns
rec['builds_last_hour'] = rec.pop('builds_count_last_hour')
rec['builds_success'] = rec.pop('builds_count_success')
self.stats.update(rec)
self.web_queue.send_msg('HOME', self.stats)
self.status_queue.send_msg('STATS', self.stats)
self.last_stats_run = datetime.now(tz=UTC)
# Leave 5 minutes between each run of the (expensive) search index query
if datetime.now(tz=UTC) - self.last_search_run > timedelta(minutes=5):
self.web_queue.send_msg('SEARCH', self.db.get_search_index())
self.last_run = datetime.now(tz=UTC)
self.last_search_run = datetime.now(tz=UTC)
70 changes: 39 additions & 31 deletions piwheels/master/slave_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def __init__(self, config):
super().__init__(config, control_protocol=protocols.master_control)
self.paused = False
self.abi_queues = {}
self.recent_builds = {}
slave_queue = self.socket(
transport.ROUTER, protocol=protocols.slave_driver)
slave_queue.bind(config.slave_queue)
Expand Down Expand Up @@ -179,19 +180,32 @@ def handle_build(self, queue):
queue query will return these packages again, and if no build slaves
are actively working on them at that time they will then be retried.
"""
active_builds = set(self.active_builds())
try:
msg, data = queue.recv_msg()
msg, new_queues = queue.recv_msg()
except IOError as e:
self.logger.error(str(e))
else:
now = datetime.now(tz=UTC)
# Prune expired entries from the recent_builds buffer and add empty
# dicts for new ABIs
for abi in new_queues:
if abi in self.recent_builds:
self.recent_builds[abi] = {
key: expires
for key, expires in self.recent_builds[abi].items()
if expires > now
}
else:
self.recent_builds[abi] = {}
# Set up the new queues without recent builds (and converting
# list-pairs into tuples)
self.abi_queues = {
abi: [
(package, version)
for package, version in build_queue
if (abi, package, version) not in active_builds
][::-1] # reversed to simplify popping stuff off the "front"
for abi, build_queue in data.items()
(package, version) for package, version in new_queue
if (package, version) not in recent_builds
]
for abi, new_queue in new_queues.items()
for recent_builds in (self.recent_builds[abi],)
}
self.stats_queue.send_msg('STATBQ', {
abi: len(queue) for (abi, queue) in self.abi_queues.items()
Expand Down Expand Up @@ -302,23 +316,28 @@ def do_idle(self, slave):
return 'SLEEP', protocols.NoData
else:
try:
package, version = self.abi_queues[slave.native_abi].pop()
except (KeyError, IndexError) as e:
pass
else:
if (package, version) not in self.active_builds():
self.logger.info(
'slave %d: build %s %s',
slave.slave_id, package, version)
return 'BUILD', [package, version]
abi_queue = self.abi_queues[slave.native_abi]
recent_builds = self.recent_builds[slave.native_abi]
except KeyError:
abi_queue = []
try:
while abi_queue:
package, version = abi_queue.pop(0)
if (package, version) not in recent_builds:
self.logger.info(
'slave %d (%s): build %s %s',
slave.slave_id, slave.label, package, version)
recent_builds[(package, version)] = (
datetime.now(tz=UTC) + slave.timeout)
return 'BUILD', [package, version]
self.logger.info(
'slave %d (%s): sleeping because no builds',
slave.slave_id, slave.label)
return 'SLEEP', protocols.NoData
finally:
self.stats_queue.send_msg('STATBQ', {
abi: len(queue) for (abi, queue) in self.abi_queues.items()
})
self.logger.info(
'slave %d (%s): sleeping because no builds',
slave.slave_id, slave.label)
return 'SLEEP', protocols.NoData

def do_built(self, slave):
"""
Expand Down Expand Up @@ -399,17 +418,6 @@ def do_sent(self, slave):
slave.build.next_file)
return 'SEND', slave.build.next_file

def active_builds(self):
"""
Generator method which yields all (abi, package, version) tuples
currently being built by build slaves.
"""
for slave in self.slaves.values():
if slave.reply is not None and slave.reply[0] == 'BUILD':
if slave.last_seen + slave.timeout > datetime.now(tz=UTC):
msg, (package, version) = slave.reply
yield (slave.native_abi, package, version)


def build_armv6l_hack(build):
"""
Expand Down
32 changes: 24 additions & 8 deletions tests/master/test_big_brother.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ def task(request, master_config):
def test_gen_skip(master_status_queue, web_queue, task):
with mock.patch('piwheels.master.big_brother.datetime') as dt:
dt.now.return_value = datetime(2018, 1, 1, 12, 30, 0, tzinfo=UTC)
task.last_run = datetime(2018, 1, 1, 12, 30, 0, tzinfo=UTC)
task.last_stats_run = task.last_search_run = (
datetime(2018, 1, 1, 12, 30, 0, tzinfo=UTC))
task.loop() # crank the handle once
with pytest.raises(transport.Error):
master_status_queue.recv_msg(flags=transport.NOBLOCK)
Expand All @@ -135,7 +136,22 @@ def test_gen_stats(db_queue, master_status_queue, web_queue, task,
stats_result, stats_dict):
with mock.patch('piwheels.master.big_brother.datetime') as dt:
dt.now.return_value = datetime(2018, 1, 1, 12, 30, 40, tzinfo=UTC)
task.last_run = datetime(2018, 1, 1, 12, 30, 0, tzinfo=UTC)
task.last_stats_run = task.last_search_run = (
datetime(2018, 1, 1, 12, 30, 0, tzinfo=UTC))
db_queue.expect('GETSTATS')
db_queue.send('OK', stats_result)
task.loop() # crank the handle once
db_queue.check()
assert master_status_queue.recv_msg() == ('STATS', stats_dict)
assert web_queue.recv_msg() == ('HOME', stats_dict)


def test_gen_stats_and_search(db_queue, master_status_queue, web_queue, task,
stats_result, stats_dict):
with mock.patch('piwheels.master.big_brother.datetime') as dt:
dt.now.return_value = datetime(2018, 1, 1, 12, 30, 40, tzinfo=UTC)
task.last_stats_run = task.last_search_run = (
datetime(2018, 1, 1, 12, 20, 40, tzinfo=UTC))
db_queue.expect('GETSTATS')
db_queue.send('OK', stats_result)
db_queue.expect('GETSEARCH')
Expand All @@ -151,7 +167,8 @@ def test_gen_disk_stats(db_queue, master_status_queue, web_queue, task,
stats_queue, stats_result, stats_dict, stats_disk):
with mock.patch('piwheels.master.big_brother.datetime') as dt:
dt.now.return_value = datetime(2018, 1, 1, 12, 30, 40, tzinfo=UTC)
task.last_run = datetime(2018, 1, 1, 12, 30, 0, tzinfo=UTC)
task.last_stats_run = task.last_search_run = (
datetime(2018, 1, 1, 12, 30, 0, tzinfo=UTC))
stats_queue.send_msg('STATFS', stats_disk)
while task.stats['disk_free'] == 0:
task.poll()
Expand All @@ -160,20 +177,18 @@ def test_gen_disk_stats(db_queue, master_status_queue, web_queue, task,
stats_dict['disk_size'] = frsize * blocks
db_queue.expect('GETSTATS')
db_queue.send('OK', stats_result)
db_queue.expect('GETSEARCH')
db_queue.send('OK', {'foo': (10, 100)})
task.loop()
db_queue.check()
assert web_queue.recv_msg() == ('HOME', stats_dict)
assert master_status_queue.recv_msg() == ('STATS', stats_dict)
assert web_queue.recv_msg() == ('SEARCH', {'foo': [10, 100]})


def test_gen_queue_stats(db_queue, master_status_queue, web_queue, task,
stats_queue, stats_result, stats_dict):
with mock.patch('piwheels.master.big_brother.datetime') as dt:
dt.now.return_value = datetime(2018, 1, 1, 12, 30, 40, tzinfo=UTC)
task.last_run = datetime(2018, 1, 1, 12, 30, 0, tzinfo=UTC)
task.last_stats_run = task.last_search_run = (
datetime(2018, 1, 1, 12, 20, 40, tzinfo=UTC))
stats_queue.send_msg('STATBQ', {'cp34m': 1, 'cp35m': 0})
while task.stats['builds_pending'] == 0:
task.poll()
Expand All @@ -194,7 +209,8 @@ def test_bad_stats(db_queue, master_status_queue, web_queue, task,
task.logger = mock.Mock()
with mock.patch('piwheels.master.big_brother.datetime') as dt:
dt.now.return_value = datetime(2018, 1, 1, 12, 30, 40, tzinfo=UTC)
task.last_run = datetime(2018, 1, 1, 12, 30, 0, tzinfo=UTC)
task.last_stats_run = task.last_search_run = (
datetime(2018, 1, 1, 12, 30, 0, tzinfo=UTC))
stats_queue.send(b'FOO')
task.poll()
assert task.logger.error.call_args == mock.call(
Expand Down

0 comments on commit 610fe61

Please sign in to comment.