Skip to content

Commit

Permalink
Successfully replicates logs when registering, then starting, a new s…
Browse files Browse the repository at this point in the history
…erver in an existing cluster!

Because we no longer have a static registry listing the ports, servers have to send their ports to each other in their messages.
  • Loading branch information
chelseatroy committed Sep 9, 2020
1 parent e270225 commit a406539
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 18 deletions.
4 changes: 2 additions & 2 deletions src/key_value_store.py
Expand Up @@ -152,10 +152,10 @@ def write_to_state_machine(self, string_operation, term_absent):
response = f"key {key} deleted"
elif operands[command] == "register":
self.server_cluster[operands[key]] = operands[values]
print("CURRENT OTHER SERVERS: " + str(self.server_cluster))
print("CURRENT SERVERS: " + str(self.server_cluster))
elif operands[command] == "deregister":
self.server_cluster.pop(operands[key])
print("CURRENT OTHER SERVERS: " + str(self.server_cluster))
print("CURRENT SERVERS: " + str(self.server_cluster))
else:
pass

Expand Down
2 changes: 1 addition & 1 deletion src/message_pass.py
Expand Up @@ -11,7 +11,7 @@ def send_message(sock, msg):
# send_message(sock, b'Hello World') ->>> b"<size>Hello World"

def receive_message(sock):
size = receive_size(sock) # get the message size
size = receive_size(sock) # get the message size
try:
msg = receive_exactly(sock, size) # Receive exactly this many bytes
except IOError as e:
Expand Down
23 changes: 8 additions & 15 deletions src/server.py
Expand Up @@ -88,11 +88,6 @@ def send(self, message, to_port):
def start(self):
server_address = ('localhost', self.port)

#TODO: Make the "register me" call here instead
# f = open("logs/server_registry.txt", "a")
# f.write(self.name + " localhost " + str(self.port) + '\n')
# f.close()

print("starting up on " + str(server_address[0]) + " port " + str(server_address[1]))

self.server_socket = socket(AF_INET, SOCK_STREAM)
Expand Down Expand Up @@ -162,15 +157,15 @@ def manage_messaging(self, connection, kvs):
operation = receive_message(connection)

if operation:
destination, response = self.respond(kvs, operation)
destination_name, destination_port, response = self.respond(kvs, operation)

if response == '':
break

if destination == "client":
if destination_name == "client":
send_message(connection, response.encode('utf-8'))
else:
self.send(response, to_port=self.port_of(destination))
self.send(response, to_port=int(destination_port))

else:
print("no more data")
Expand All @@ -185,19 +180,17 @@ def broadcast(self, server, message):
server.send(message, to_port=other_server_address)

def with_return_address(self, server, response):
return server.name + "@" + response

def port_of(self, server_name):
return self.key_value_store.server_cluster[server_name]
return server.name + "|" + str(server.port) + "@" + response

def return_address_and_message(self, string_request):
address_with_message = string_request.split("@")
return address_with_message[0], "@".join(address_with_message[1:])
name, port = address_with_message[0].split("|")
return name, port, "@".join(address_with_message[1:])

def respond(self, key_value_store, operation):
send_pending = True
string_request = operation.decode("utf-8")
server_name, string_operation = self.return_address_and_message(string_request)
server_name, server_port, string_operation = self.return_address_and_message(string_request)
print("from " + server_name + ": received " + string_operation)

response = ''
Expand Down Expand Up @@ -328,4 +321,4 @@ def respond(self, key_value_store, operation):
if send_pending:
response = self.with_return_address(self, response)

return server_name, response
return server_name, server_port, response

0 comments on commit a406539

Please sign in to comment.