Skip to content

Commit

Permalink
MORE OPTIMIZATION
Browse files Browse the repository at this point in the history
  • Loading branch information
DoctorEenot committed Apr 13, 2021
1 parent 1b868c9 commit 9075ba6
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 70 deletions.
168 changes: 120 additions & 48 deletions cluster_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,28 @@ def __repr__(self):

master_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
master_server_socket.settimeout(15)
master_server_timeout = 15
master_server_is_connected = False

def connect_to_master():
def connect_to_master(dispatcher,event):
'''
event - {'t':'e',
'event':'connect_to_master'}
'''
logger.info('CONNECTING TO MASTER')
global master_server_socket
global masterServer_address
global masterServer_port
global master_server_timeout
global master_server_is_connected

try:
event.dict_representation['address']
return
except:
pass

master_server_is_connected = False

get_master_server_info()
while True:
Expand All @@ -163,11 +179,22 @@ def connect_to_master():
try:
master_server_socket.connect((str(masterServer_address),
int(masterServer_port)))
serverVersion = master_server_socket.recv(3).decode().rstrip("\n") # Get server version
except Exception as e:
#time.sleep(3)
yield
continue
break
master_server_socket.settimeout(0)
serverVersion = None
timeout_start = time.time()
while time.time()-timeout_start<master_server_timeout:
try:
serverVersion = master_server_socket.recv(3).decode().rstrip("\n") # Get server version
master_server_is_connected = True
break
except Exception as e:
yield
if serverVersion != None\
and serverVersion != '':
break



Expand Down Expand Up @@ -269,48 +296,57 @@ def job_start(dispatcher,event):
global algorithm
global JOBS_TO_PROCESS



logger.info('Job is starting')
if event.secret != JOB_START_SECRET:
logger.warning('bad secret')
return

counter = 1
for start_end,job in JOBS_TO_PROCESS.items():
for addr,device in devices.items():
if device.isbusy():
continue
data = json.dumps({'t':'e',
'event':'start_job',
'lastBlockHash':JOB[0],
'expectedHash':JOB[1],
'start':start_end[0],
'end':start_end[1],
'algorithm':algorithm})
device.job_started()
event.callback.sendto(data.encode('ascii'),addr)
job.set_device(device)
break
#if counter%2==0:
yield
logger.info('Job is starting')


counter = 0
jobs = list(JOBS_TO_PROCESS.items())

sent = True
for addr,device in devices.items():
if device.isbusy():
continue
start_end,job = jobs[counter]
data = json.dumps({'t':'e',
'event':'start_job',
'lastBlockHash':JOB[0],
'expectedHash':JOB[1],
'start':start_end[0],
'end':start_end[1],
'algorithm':algorithm})
device.job_started()
event.callback.sendto(data.encode('ascii'),addr)
job.set_device(device)
#if counter%(len(devices)//2)==0:
counter += 1
if counter == len(jobs):
return
yield




def send_results(result):
def send_results(dispatcher,result):
global algorithm
global minerVersion
global rigIdentifier
global HASH_COUNTER
global devices
global master_server_timeout

logger.info('Sending results')
logger.debug(str(result))
logger.info('Hashes were checked: '+str(HASH_COUNTER))
if HASH_COUNTER<result[0]:
HASH_COUNTER = result[0]

master_server_socket.settimeout(master_server_timeout)
while True:

try:
master_server_socket.send(bytes(
str(result[0])
Expand All @@ -326,10 +362,14 @@ def send_results(result):
feedback = master_server_socket.recv(8).decode().rstrip("\n")

except Exception as e:
connect_to_master()
event = {'t':'e',
'event':'connect_to_master'}
event = Event(event)
dispatcher.add_to_queue(event)
#connect_to_master()
logger.warning('Giving up on that hash')
break
continue
#continue

if feedback == 'GOOD':
logger.info('Hash accepted')
Expand All @@ -339,7 +379,7 @@ def send_results(result):

elif feedback == '':
logger.info('Connection with master is lost')
connect_to_master()
#connect_to_master()
continue
else:
logger.info('Hash rejected')
Expand Down Expand Up @@ -477,7 +517,7 @@ def job_done(dispatcher,event):
logger.warning('STOP JOB ON WRONG JOB')
return
HASH_COUNTER += event.result[1]
send_results(event.result)
send_results(dispatcher,event.result)
JOBS_TO_PROCESS = {}
data_dict = {'t':'e',
'event':'stop_job',
Expand All @@ -490,6 +530,7 @@ def job_done(dispatcher,event):
device.job_stopped()
if addr != event.address:
event.callback.sendto(data,addr)
yield
JOB = None


Expand All @@ -508,12 +549,15 @@ def request_job(dispatcher,event):
global master_server_socket
global JOBS_TO_PROCESS
global INC_COEF
global master_server_timeout
global master_server_is_connected

logger.info('requesting job')
if event.secret != JOB_START_SECRET:
logger.warning('bad secret')
return
while True:
job = None
while job == None or job == '':
try:
if algorithm == "XXHASH":
master_server_socket.send(bytes(
Expand All @@ -530,26 +574,46 @@ def request_job(dispatcher,event):
+ str(requestedDiff),
encoding="utf8"))
except Exception as e:
master_server_is_connected = False
logger.error('asking for job error accured')
connect_to_master()
continue
try:
job = master_server_socket.recv(128).decode().rstrip("\n")
except:
connect_to_master()
continue
event = {'t':'e',
'event':'connect_to_master'}
event = Event(event)
dispatcher.add_to_queue(event)
break
master_server_socket.settimeout(0)
timeout_start = time.time()
master_server_is_connected = False
while time.time()-timeout_start<master_server_timeout:
try:
job = master_server_socket.recv(128).decode().rstrip("\n")
master_server_is_connected = True
break
except:
#event = {'t':'e',
# 'event':'connect_to_master'}
#event = Event(event)
#dispatcher.add_to_queue(event)
yield
continue
job = job.split(",")
if job[0] == 'BAD':
logger.warning('GOT "BAD" PACKET IN RESPONSE')
return
elif job[0] == '':
logger.warning('CONNECTION WITH MASTER SERVER WAS BROKEN')
connect_to_master()
#connect_to_master()
continue
logger.info('job accepted')
logger.info('Difficulty: '+str(job[2]))
logger.debug(str(job))

event = Event({'t':'e',
'event':'job_start',
'secret':JOB_START_SECRET,
'callback':server_socket})
dispatcher.add_to_queue(event)

JOBS_TO_PROCESS = {}
parts = len(devices)+INC_COEF

Expand Down Expand Up @@ -578,7 +642,7 @@ def request_job(dispatcher,event):
def clean_up_devices(dispatcher,event):
try:
# if event was recieved by server
dict(event)['address']
event.dict_representation['address']
return None
except:
pass
Expand Down Expand Up @@ -642,7 +706,6 @@ def iter_through_active_list(self):
continue
counter += 1


def dispatch_event(self,count=1):
for i in range(count):
try:
Expand Down Expand Up @@ -673,8 +736,17 @@ def server():
event_dispatcher.register('job_done',job_done)
event_dispatcher.register('request_job',request_job)
event_dispatcher.register('clean_up_devices',clean_up_devices)
event_dispatcher.register('connect_to_master',connect_to_master)
logger.debug('Dispatcher initialized')

event = {'t':'e',
'event':'connect_to_master'}
event = Event(event)
event_dispatcher.add_to_queue(event)
event_dispatcher.dispatch_event()
event_dispatcher.iter_through_active_list()


last_devices_cleenup = time.time()

while True:
Expand Down Expand Up @@ -727,7 +799,7 @@ def server():


# request job and start it
if len(devices)>0:
if len(devices)>0 and master_server_is_connected:
if JOB == None:
#MIN_PARTS = len(devices)+INC_COEF
#logger.debug('MIN_PARTS is setted to '+str(MIN_PARTS))
Expand All @@ -738,11 +810,11 @@ def server():
'parts':20})
event_dispatcher.add_to_queue(event)
#request_job(event_dispatcher,event)
event = Event({'t':'e',
'event':'job_start',
'secret':JOB_START_SECRET,
'callback':server_socket})
event_dispatcher.add_to_queue(event)
#event = Event({'t':'e',
# 'event':'job_start',
# 'secret':JOB_START_SECRET,
# 'callback':server_socket})
#event_dispatcher.add_to_queue(event)
#job_start(event_dispatcher,event)


Expand All @@ -763,7 +835,7 @@ def server():
logger.info('STARTING SERVER')
loadConfig()

connect_to_master()
#connect_to_master()
try:
server()
except Exception as e:
Expand Down
Loading

0 comments on commit 9075ba6

Please sign in to comment.