Skip to content
Permalink
Browse files

Fix the logic that determines when the key value store must account f…

…or the term in reads and writes

+ In the last commit, I assumed that the cases where the key value store needs to write to its log and the cases where the term is not present in the command were the same. They are not. If a leader sends a follower server a command to catch up its log, that log has term numbers in it, but the follower server also needs to write those commands to its own log.

So we really have three DIFFERENT cases:
1. Terms present, do not write—A server restarts and is catching up its in memory store from its own log
2. Terms present, DO write-A follower server is getting caught up from a leader server
3. Terms absent, DO write-A leader server receives requests to write to its logs from a client
  • Loading branch information
chelseatroy committed Dec 19, 2019
1 parent 9112582 commit 45c21c1d1d65dbc3092383cb25c0189db73ee59f
Showing with 27 additions and 22 deletions.
  1. +11 −12 key_value_store.py
  2. +16 −10 server.py
@@ -28,7 +28,7 @@ def catch_up(self):
f.close()

for command in log.split('\n'):
self.execute(command, write=False)
self.execute(command, term_absent=False, write=False)

self.catch_up_successful = True

@@ -40,36 +40,35 @@ def get_latest_term(self):
return self.latest_term


def execute(self, string_operation, write=True):
if write:
command, key, values = 0, 1, 2
term = self.latest_term
else:
term, command, key, values = 0, 1, 2, 3
def execute(self, string_operation, term_absent, write=True):
print(string_operation)

if len(string_operation) == 0:
return

self.log.append(string_operation)

if term_absent:
string_operation = str(self.latest_term) + " " + string_operation

operands = string_operation.split(" ")
term, command, key, values = 0, 1, 2, 3

response = "Sorry, I don't understand that command."

with self.client_lock:
if not write:
self.latest_term = int(operands[term])
self.latest_term = int(operands[term])

if operands[command] == "get":
response = self.get(operands[key])
elif operands[command] == "set":
value = " ".join(operands[values:])

self.log.append(string_operation)
if write:
self.write_to_log(term, string_operation)
self.set(operands[key], value)
response = f"key {operands[key]} set to {value}"
elif operands[command] == "delete":
self.log.append(string_operation)
if write:
self.write_to_log(term, string_operation)
self.delete(operands[key])
@@ -83,5 +82,5 @@ def execute(self, string_operation, write=True):

def write_to_log(self, current_term, string_operation):
f = open(self.server_name + "_log.txt", "a+")
f.write(str(current_term) + " " + string_operation + '\n')
f.write(string_operation + '\n')
f.close()
@@ -25,15 +25,19 @@ def tell(self, message, to_server_address):
print(f"connecting to {to_server_address[0]} port {to_server_address[1]}")

self.client_socket = socket(AF_INET, SOCK_STREAM)
self.client_socket.connect(to_server_address)
encoded_message = message.encode('utf-8')

try:
print(f"sending {encoded_message} to {to_server_address}")
send_message(self.client_socket, encoded_message)
except Exception as e:
print(f"closing socket due to {str(e)}")
self.client_socket.close()
self.client_socket.connect(to_server_address)
encoded_message = message.encode('utf-8')

try:
print(f"sending {encoded_message} to {to_server_address}")
send_message(self.client_socket, encoded_message)
except Exception as e:
print(f"closing socket due to {str(e)}")
self.client_socket.close()
except ConnectionRefusedError as e:
print(f"Ope, looks like {to_server_address[0]} port {to_server_address[1]} isn't up right now")


def start(self):
@@ -69,7 +73,7 @@ def handle_client(self, connection, kvs):
string_request = operation.decode("utf-8")
server_name, string_operation = self.return_address_and_message(string_request)

print("received " + string_operation)
print("from " + server_name + ": received " + string_operation)

if string_operation == "log_length?":
response = "log_length " + str(len(self.key_value_store.log))
@@ -82,11 +86,13 @@ def handle_client(self, connection, kvs):
response = "Your info is at least as good as mine!"
elif string_operation.split(" ")[0] == "catch_up_logs":
logs_to_append = ast.literal_eval(string_operation.split("catch_up_logs ")[1])
[self.key_value_store.execute(log) for log in logs_to_append]
[self.key_value_store.execute(log, term_absent=False) for log in logs_to_append]

response = "Caught up. Thanks!"
elif string_operation == "term":
response = str(self.term)
elif string_operation == "destination_addresses":
response = str(self.destination_addresses())
elif string_operation == "show_log":
response = str(self.key_value_store.log)
elif string_operation == "youre_the_leader":
@@ -100,7 +106,7 @@ def handle_client(self, connection, kvs):
]:
send_pending = False
else:
response = kvs.execute(string_operation)
response = kvs.execute(string_operation, term_absent=True)

if send_pending:
response = self.with_return_address(response)

0 comments on commit 45c21c1

Please sign in to comment.
You can’t perform that action at this time.