Skip to content

Commit

Permalink
Fix for issue #52. Committing the latest version of CoordinationAffix.
Browse files Browse the repository at this point in the history
  • Loading branch information
monzum committed Jun 12, 2014
1 parent e1cd9f2 commit 398ea4d
Showing 1 changed file with 53 additions and 102 deletions.
155 changes: 53 additions & 102 deletions affix/affix_components/coordinationaffix.r2py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!python
"""
<Program Name>
coordinationaffix.repy
coordinationaffix.r2py
<Author>
Danny Y. Huang, yh1@cs.williams.edu
Expand Down Expand Up @@ -34,17 +34,16 @@
"""

dy_import_module_symbols("baseaffix.r2py")
dy_import_module_symbols("advertise_objects.r2py")
dy_import_module_symbols("affix_exceptions.r2py")
dy_import_module_symbols("affixstackinterface.r2py")
cachedadvertise = dy_import_module("cachedadvertise.r2py")
advertisepipe = dy_import_module("advertisepipe.r2py")

# Whether we want to print debug statements.
_COORDINATION_AFFIX_DEBUG_MODE = False

zenodotus_subdomain = 'zenodotus.poly.edu'

coordination_lookup_cache = LookupCache(300)

class CoordinationAffix(BaseAffix):

# A dictionary that maps a server socket object to the corresponding
Expand All @@ -56,12 +55,6 @@ class CoordinationAffix(BaseAffix):
# Protects the dictionary above from concurrent modification.
_adv_handle_dict_lock = createlock()

# Whether we are actively advertising the zenodotus name -> IP mapping.
_zenodotus_advertise_active = False

# How long a zenodotus to IP mapping may remain valid (time-to-live).
_ZENODOTUS_NAME_TTL = 600


def __init__(self, next_affix, optional_args=None):

Expand Down Expand Up @@ -120,37 +113,35 @@ class CoordinationAffix(BaseAffix):
"""
if _COORDINATION_AFFIX_DEBUG_MODE:
log("Coordination is about to start listening on '%s:%d'\n" % (localhost, localport))
# If we are listening on a zenodotus name, first translate it to the local
# IP and start the advertisement thread that constantly announces the
# zenodotus name -> IP mapping.
log(self, "Coordination is about to start listening on '%s:%d'\n" % (localhost, localport))
# If we are listening on a zenodotus name, translate it to the local IP
if localhost.endswith(zenodotus_subdomain):
try:
localip = getmyip()
except InternetConnectivityError:
raise AddressBindingError('Coordination Affix: Not connected to the network.')
self._zenodotus_start_advertise(localhost)

if _COORDINATION_AFFIX_DEBUG_MODE:
log("CoordinationAffix started zenodotus advertise for: %s\n" % str(localhost))

else:
localip = localhost

# Start listening so we have a complete view of the affix stack below us.
# We try up to 5 times as we want to give some time for the zenodotus name
# to establish, if user is using it for localhost.
exceptions_list = []
for i in range(5):
try:
tcpserversocket = self.peek().listenforconnection(localhost, localport)
break
except AddressBindingError:
sleep(0.1)
except (AddressBindingError, RepyArgumentError), e:
exceptions_list.append((self, type(e), str(e)))
sleep(1)
else:
self._zenodotus_advertise_active = False
raise AddressBindingError("Unable to bind with the localhost '%s' provided." % localhost)
raise AddressBindingError("Unable to bind with the localhost " +
str(localhost) + " (IP " + str(localip) +
") provided. Exceptions I saw: " + str(exceptions_list))

if _COORDINATION_AFFIX_DEBUG_MODE:
log("CoordinationAffix started listening socket on '%s:%d'\n" % (localip, localport))
log(self, "CoordinationAffix started listening socket on '%s:%d'\n" % (localip, localport))

# The underlying affix stack has been built completely. We can advertise the
# affix stack.
Expand All @@ -162,8 +153,8 @@ class CoordinationAffix(BaseAffix):
adv_key = str(hostname) + ',' + str(localport) + ',TCP'
adv_value = self.peek().get_advertisement_string()
if _COORDINATION_AFFIX_DEBUG_MODE:
log('CoordinationAffix: Advertised', adv_key, adv_value, '\n')
adv_handle = AdvertisePipe().add(adv_key, adv_value)
log(self, 'CoordinationAffix: Advertised', adv_key, adv_value, '\n')
adv_handle = advertisepipe.add_to_pipe(adv_key, adv_value)

# We need to save the advertisement handle to a global dictionary, so that
# we can keep a reference to the handle. Later on, when we stop listening,
Expand Down Expand Up @@ -228,21 +219,27 @@ class CoordinationAffix(BaseAffix):
# address.
if desthost.endswith(zenodotus_subdomain):
# If the destinationa address ends with zenodotus,
# we try to do a hostname lookup.
# use the vaue we added to the advertisepipe before.
try:
destip = gethostbyname(desthost)
except NetworkAddressError:
destip = cachedadvertise.lookup(desthost, timeout=0)[0]
except (NetworkAddressError, IndexError):
# We catch an indexerror as there is a possibility that
# the advertise lookup returns an empty list.
# In this situation we just set the destination IP as
# the hostname. Note that Affixes attempts to do a
# name lookup before it does a listenforconnection or
# openconnection.
destip = desthost
else:
destip = desthost


# Look up the server's affix stack under this key.
server_key = str(desthost) + ',' + str(destport) + ',TCP'
lookup_result_list = coordination_lookup_cache.lookup(server_key)
lookup_result_list = cachedadvertise.lookup(server_key)

if _COORDINATION_AFFIX_DEBUG_MODE:
log("Lookup result for '" + server_key + "' is: " + str(lookup_result_list) + "\n")
log(self, "Openconnection from", localip, localport, "to", server_key, " via " + str(lookup_result_list) + "\n")

# Reverse the results as the latest affix advertisement always comes last.
lookup_result_list.reverse()
Expand All @@ -261,7 +258,7 @@ class CoordinationAffix(BaseAffix):
# Build my affix stack and store it in the affix_context.
self.affix_context['next_affix'] = AffixStack(server_affix_stack_str, localhost).affix_stack_context['top_affix']
if _COORDINATION_AFFIX_DEBUG_MODE:
log('CoordinationAffix: Trying affix stack for openconnection: ', self.peek(), '\n')
log(self, 'CoordinationAffix: Trying affix stack for openconnection: ', self.peek(), '\n')
# Check if we have exceeded the time limit.
time_taken = getruntime() - start_time
if time_taken > timeout:
Expand All @@ -270,7 +267,7 @@ class CoordinationAffix(BaseAffix):
# Since we spent some time contacting the advertisement service, we have
# less time for the actual openconn.
if _COORDINATION_AFFIX_DEBUG_MODE:
log("Using affix string: " + self.peek().get_advertisement_string() + "to connect to '" +
log(self, "Using affix string: " + self.peek().get_advertisement_string() + "to connect to '" +
destip + ":" + str(destport) + "'\n")

try:
Expand Down Expand Up @@ -324,19 +321,35 @@ class CoordinationAffix(BaseAffix):
"""
# If we are listening on a zenodotus name, first translate it to the local
# IP and start the advertisement thread that constantly announces the
# zenodotus name -> IP mapping.
# IP and add the zenodotus name -> IP mapping to the advertisepipe.
if localhost.endswith(zenodotus_subdomain):
try:
localip = getmyip()
except InternetConnectivityError:
raise AddressBindingError('Coordination Affix: Not connected to the network.')
self._zenodotus_start_advertise(localhost)
adv_handle = advertisepipe.add_to_pipe(localhost, localip)
if _COORDINATION_AFFIX_DEBUG_MODE:
log(self, 'CoordinationAffix: Added', localhost, localip,
'to advertisepipe.\n')

# We need to save the advertisement handle to a global dictionary, so that
# we can keep a reference to the handle. Later on, when we stop listening,
# we can conveniently remove this key value pair from the advertisement
# service by using this handle.
if isinstance(tcpserversocket, AffixTCPServerSocket):
adv_handle_key = repr(tcpserversocket._socket)
else:
adv_handle_key = repr(tcpserversocket)

self._adv_handle_dict_lock.acquire(True)
self._adv_handle_dict[adv_handle_key] = adv_handle
self._adv_handle_dict_lock.release()

else:
localip = localhost

if _COORDINATION_AFFIX_DEBUG_MODE:
log("Coordination is about to start UDP server socket on '%s:%d'\n" % (localhost, localport))
log(self, "Coordination is about to start UDP server socket on '%s:%d'\n" % (localhost, localport))

# Start listening so we have a complete view of the affix stack below us.
udpserversocket = self.peek().listenformessage(localip, localport)
Expand All @@ -345,11 +358,11 @@ class CoordinationAffix(BaseAffix):
# affix stack.
adv_key = str(localip) + ',' + str(localport) + ',UDP'
adv_value = self.peek().get_advertisement_string()
adv_handle = AdvertisePipe().add(adv_key, adv_value)
adv_handle = advertisepipe.add_to_pipe(adv_key, adv_value)


if _COORDINATION_AFFIX_DEBUG_MODE:
log('CoordinationAffix: Advertised', adv_key, adv_value, '\n')
log(self, 'CoordinationAffix: Advertised', adv_key, adv_value, '\n')

# We need to save the advertisement handle to a global dictionary, so that
# we can keep a reference to the handle. Later on, when we stop listening,
Expand Down Expand Up @@ -403,7 +416,7 @@ class CoordinationAffix(BaseAffix):

# Now we obtain the server's affix stack string representation from which we
# will construct the client's affix stack.
lookup_result_list = coordination_lookup_cache.lookup(server_key)
lookup_result_list = cachedadvertise.lookup(server_key)


if lookup_result_list:
Expand Down Expand Up @@ -447,70 +460,8 @@ class CoordinationAffix(BaseAffix):

# Stop advertising both our affix stack and the zenodotus name -> IP mapping.
else:
AdvertisePipe().remove(adv_handle)
self._zenodotus_advertise_active = False
advertisepipe.remove_from_pipe(adv_handle)

finally:
self._adv_handle_dict_lock.release()




def _zenodotus_start_advertise(self, zenodotus_name):
"""
Constantly advertises the zenodotus name -> IP mapping in a thread.
"""
def _thread_function():

# Enables the thread
self._zenodotus_advertise_active = True

# Keeps track of the IP changes.
my_ip = ''

# Keeps track of when we last updated the mappings.
last_update_time = getruntime()

while self._zenodotus_advertise_active:

# Also store the IP address in a global cache, so that others
# (particularly the mobility affix) don't have to always call getmyip().
# This is for efficiency. Store None in the global cache if the
# network is down.
try:
current_ip = getmyip()
mycontext['coordination_affix_getmyip_cache'] = current_ip
except InternetConnectivityError:
mycontext['coordination_affix_getmyip_cache'] = None
sleep(1)
continue

current_time = getruntime()

# Advertises the mappings when the IP has changed, or when we haven't
# updated the mappings for 2/3 of the TTL of the zenodotus name. Ignore
# all exceptions during advertisement.
if (current_ip != my_ip) or (current_time - last_update_time > self._ZENODOTUS_NAME_TTL * 0.67):

if _COORDINATION_AFFIX_DEBUG_MODE and current_ip != my_ip:
log('CoordinationAffix: IP changed from', my_ip, '->', current_ip)
log('for', zenodotus_name, '\n')

my_ip = current_ip
last_update_time = current_time

# Advertise till success.
while True:
try:
advertise_announce(zenodotus_name, my_ip, self._ZENODOTUS_NAME_TTL, timeout=10)
break
except:
sleep(1)

sleep(2)

# Clears the IP cache when the loop stops
del mycontext['coordination_affix_getmyip_cache']

createthread(_thread_function)

0 comments on commit 398ea4d

Please sign in to comment.