Skip to content

Commit

Permalink
Store information about which servers are in the cluster in the log, …
Browse files Browse the repository at this point in the history
…rather than in a registry.

This means either starting the cluster the first time WITH registrant log statements already IN the log OR
Registering all the servers you want in the cluster before one of them times out.
  • Loading branch information
chelseatroy committed Sep 9, 2020
1 parent 65c39e1 commit 4fed5b4
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 44 deletions.
9 changes: 9 additions & 0 deletions end_to_end_test.py
@@ -0,0 +1,9 @@
from src.server import Server


Server(name=str("Kermit"), port=int(10010)).start()
Server(name=str("MsPiggy"), port=int(10011)).start()
Server(name=str("Gonzo"), port=int(10012)).start()
Server(name=str("Beaker"), port=int(10013)).start()
Server(name=str("Fozzie"), port=int(10014)).start()

7 changes: 0 additions & 7 deletions src/config.py
Expand Up @@ -13,10 +13,3 @@ def server_nodes(path_to_registry="logs/server_registry.txt"):
return registry


def destination_addresses(server_name):
other_servers = {k: v for (k, v) in server_nodes().items() if k != server_name}
return list(other_servers.values())

def other_server_names(server_name):
other_servers = {k: v for (k, v) in server_nodes().items() if k != server_name}
return list(other_servers.keys())
19 changes: 19 additions & 0 deletions src/key_value_store.py
Expand Up @@ -9,6 +9,7 @@ class KeyValueStore:
def __init__(self, server_name):
self.server_name = server_name
self.data = {}
self.server_cluster = {}
self.log = []
self.catch_up_successful = False
self.current_term = 0
Expand Down Expand Up @@ -149,6 +150,12 @@ def write_to_state_machine(self, string_operation, term_absent, path_to_logs='')
self.log.append(string_operation)
self.delete(operands[key])
response = f"key {key} deleted"
elif operands[command] == "register":
self.server_cluster[operands[key]] = operands[values]
print("CURRENT OTHER SERVERS: " + str(self.server_cluster))
elif operands[command] == "deregister":
self.server_cluster.pop(operands[key])
print("CURRENT OTHER SERVERS: " + str(self.server_cluster))
else:
pass

Expand Down Expand Up @@ -210,3 +217,15 @@ def write_to_log(self, string_operation, term_absent, path_to_logs=''):

return ''

def destination_addresses(self, server_name):
other_servers = {k: v for (k, v) in self.server_cluster.items() if k != server_name}
return list(other_servers.values())

def other_server_names(self, server_name):
other_servers = {k: v for (k, v) in self.server_cluster.items() if k != server_name}
return list(other_servers.keys())





21 changes: 0 additions & 21 deletions src/parsing.py
@@ -1,22 +1 @@
import ast

from src.config import server_nodes, destination_addresses

def return_address_and_message(string_request):
address_with_message = string_request.split("@")
return address_with_message[0], "@".join(address_with_message[1:])


def broadcast(server, message):
print("Broadcasting " + message)
for other_server_address in destination_addresses(server.name):
server.send(message, to_server_address=other_server_address)

def with_return_address(server, response):
return server.name + "@" + response


def address_of(server_name):
return server_nodes()[server_name]


49 changes: 33 additions & 16 deletions src/server.py
Expand Up @@ -6,18 +6,17 @@
from src.message_pass import *

from src.key_value_store import KeyValueStore
from src.parsing import address_of, with_return_address, broadcast, return_address_and_message
from src.append_entries_call import AppendEntriesCall
from src.request_vote_call import RequestVoteCall

from src.config import other_server_names, server_nodes, destination_addresses
import ast


class Server:
def __init__(self, name, port=10000, voting=True):
self.port = port
self.name = name
self.server_cluster = {}
self.key_value_store = KeyValueStore(server_name=name)
self.key_value_store.catch_up()
self.latest_leader = "yet unelected"
Expand All @@ -36,10 +35,10 @@ def __init__(self, name, port=10000, voting=True):
self.election_countdown.start()
self.voted_for_me = {}

for server_name in other_server_names(name):
for server_name in self.key_value_store.other_server_names(name):
self.followers_with_update_status[server_name] = False

for server_name in other_server_names(name):
for server_name in self.key_value_store.other_server_names(name):
self.voted_for_me[server_name] = False
self.voted_for_me[self.name] = False

Expand All @@ -53,7 +52,7 @@ def start_election(self):
self.election_countdown.start()

self.voted_for_me[self.name] = True
broadcast(self, with_return_address(
self.broadcast(self, self.with_return_address(
self,
RequestVoteCall(
for_term=str(self.key_value_store.current_term),
Expand All @@ -62,17 +61,19 @@ def start_election(self):
).to_message()
))

def send(self, message, to_server_address):
print(f"connecting to {to_server_address[0]} port {to_server_address[1]}")
def send(self, message, to_port):
print(f"connecting to port {to_port}")

to_address = ("localhost", int(to_port))

peer_socket = socket(AF_INET, SOCK_STREAM)

try:
peer_socket.connect(to_server_address)
peer_socket.connect(to_address)
encoded_message = message.encode('utf-8')

try:
print(f"sending {encoded_message} to {to_server_address}")
print(f"sending {encoded_message} to {to_port}")
send_message(peer_socket, encoded_message)
time.sleep(0.5)
peer_socket.close()
Expand All @@ -82,15 +83,16 @@ def send(self, message, to_server_address):
except OSError as e:
print("Bad file descriptor, supposedly: " + str(e))
except ConnectionRefusedError as e:
print(f"Ope, looks like {to_server_address[0]} port {to_server_address[1]} isn't up right now")
print(f"Ope, looks like port {to_port} isn't up right now")


def start(self):
server_address = ('localhost', self.port)

f = open("logs/server_registry.txt", "a")
f.write(self.name + " localhost " + str(self.port) + '\n')
f.close()
#TODO: Make the "register me" call here instead
# f = open("logs/server_registry.txt", "a")
# f.write(self.name + " localhost " + str(self.port) + '\n')
# f.close()

print("starting up on " + str(server_address[0]) + " port " + str(server_address[1]))

Expand Down Expand Up @@ -169,7 +171,7 @@ def manage_messaging(self, connection, kvs):
if destination == "client":
send_message(connection, response.encode('utf-8'))
else:
self.send(response, to_server_address=address_of(destination))
self.send(response, to_port=self.port_of(destination))

else:
print("no more data")
Expand All @@ -178,10 +180,25 @@ def manage_messaging(self, connection, kvs):
finally:
connection.close()

def broadcast(self, server, message):
print("Broadcasting " + message)
for other_server_address in self.key_value_store.destination_addresses(server.name):
server.send(message, to_port=other_server_address)

def with_return_address(self, server, response):
return server.name + "@" + response

def port_of(self, server_name):
return self.server_cluster[server_name]

def return_address_and_message(self, string_request):
address_with_message = string_request.split("@")
return address_with_message[0], "@".join(address_with_message[1:])

def respond(self, key_value_store, operation):
send_pending = True
string_request = operation.decode("utf-8")
server_name, string_operation = return_address_and_message(string_request)
server_name, string_operation = self.return_address_and_message(string_request)
print("from " + server_name + ": received " + string_operation)

response = ''
Expand Down Expand Up @@ -310,6 +327,6 @@ def respond(self, key_value_store, operation):
response = "I am not the leader. The last leader I heard from is " + str(self.latest_leader) + "."

if send_pending:
response = with_return_address(self, response)
response = self.with_return_address(self, response)

return server_name, response

0 comments on commit 4fed5b4

Please sign in to comment.