Skip to content

Commit

Permalink
Fixed some bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
DoctorEenot committed Apr 13, 2021
1 parent 05327b2 commit 1b868c9
Showing 1 changed file with 31 additions and 26 deletions.
57 changes: 31 additions & 26 deletions cluster_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ def job_start(dispatcher,event):
event.callback.sendto(data.encode('ascii'),addr)
job.set_device(device)
break
if counter%2==0:
yield True
#if counter%2==0:
yield
counter += 1


Expand Down Expand Up @@ -416,12 +416,14 @@ def job_done(dispatcher,event):
'message':'another device already solved hash'}
data = json.dumps(data_dict).encode('ascii')
CURRENT_JOB.set_done()

for device in CURRENT_JOB.get_devices():
if device.address != event.address\
and device.isbusy():
device.job_stopped()
event.callback.sendto(data,device.address)
del JOBS_TO_PROCESS[recieved_start_end]
yield
#del JOBS_TO_PROCESS[recieved_start_end]
#CURRENT_JOB.unclaim()

else:
Expand Down Expand Up @@ -573,6 +575,26 @@ def request_job(dispatcher,event):

break

def clean_up_devices(dispatcher,event):
try:
# if event was recieved by server
dict(event)['address']
return None
except:
pass

counter = 0
items = list(devices.items())
while counter<len(devices):
address,device = items[counter]
if not device.is_alive():
del devices[address]
del items[counter]
continue
counter += 1
yield



class Event(object):
def __init__(self,input:dict):
Expand Down Expand Up @@ -650,6 +672,7 @@ def server():
event_dispatcher.register('job_start',job_start)
event_dispatcher.register('job_done',job_done)
event_dispatcher.register('request_job',request_job)
event_dispatcher.register('clean_up_devices',clean_up_devices)
logger.debug('Dispatcher initialized')

last_devices_cleenup = time.time()
Expand Down Expand Up @@ -727,10 +750,10 @@ def server():
if time.time()-last_devices_cleenup>time_for_device:
last_devices_cleenup = time.time()
logger.debug('Cleaning up devices')
for address,device in devices.items():
if not device.is_alive():
del devices[address]
#break
event = {'t':'e',
'event':'clean_up_devices'}
event = Event(event)
event_dispatcher.add_to_queue(event)

time.sleep(0.1)

Expand All @@ -739,29 +762,11 @@ def server():
if __name__ == '__main__':
logger.info('STARTING SERVER')
loadConfig()
#logger.info('Getting Master server info')
#while True:
# try:
# res = requests.get(serveripfile, data=None)
# break
# except:
# pass
# logger.info('getting data again')
# time.sleep(10)

#if res.status_code == 200:
# logger.info('Master server info accepted')
# # Read content and split into lines
# content = (res.content.decode().splitlines())
# masterServer_address = content[0] # Line 1 = pool address
# masterServer_port = content[1] # Line 2 = pool port
#else:
# raise Exception('CANT GET MASTER SERVER ADDRESS')

connect_to_master()
try:
server()
except Exception as e:
#tr = traceback.format_exc()
logger.error('ERROR ACCURED',exc_info=e)

input()
Expand Down

0 comments on commit 1b868c9

Please sign in to comment.