In [14]:
import threading

class Node:
    def __init__(self, number, right):
        self.number = number
        self.right = right
        self.active = True
        
    def __str__(self):
        if self.active:
            return "[{} -> {}]".format(self.number, self.right.number)
        else:
            return "-[inactive]-"
    
    def __repr_(self):
        return self.__str__()
    
    def prepare_work(self, total_hops):
        self.total_hops = total_hops
    
    def start_asking(self, thread_id):
        for hop in range(1, self.total_hops):
            if not self.ask_right(hop):
                self.active = False
                break
        #print("finished {}".format(thread_id))
    
    def ask_right(self, hop):
        if not self.active:
            return False
        else:
            return self.__ask_right(self.number, hop)
    
    def __ask_right(self, value, hop):
        if hop == 0:
            if value >= self.number:
                self.active = False
                return True
        else:
            return self.right.__ask_right(value, hop - 1)

In [39]:
import numpy as np
import sys
import time

sys.setrecursionlimit(10000)

def synchronous():
    # node_values = [4,3,1,2,3,5,10]
    start = time.time()

    node_values = list(np.random.randint(10000, size=5000))

    reversed_node_values = list(reversed(node_values))

    ring = Node(reversed_node_values[0], None)
    nodes = [ring]

    for node_value in reversed_node_values[1:]:
        node = Node(node_value, ring)
        ring = node
        nodes.append(node)
    nodes = list(reversed(nodes))
    nodes[-1].right = nodes[0]

    # synchronous

    for hop in range(1,len(nodes)):
        for node in nodes:
            node.ask_right(hop)

    for node in nodes:
        if node.active:
            print("max node is {}".format(node))

    print("max is actually: {}".format(max(node_values)))
    end = time.time()
    print("synchronous took {}".format(end - start))
    
synchronous()

max node is [9998 -> 3540]
max is actually: 9998
synchronous took 13.740337133407593


In [40]:
# distributed
import numpy as np
from random import shuffle


# node_values = [4,3,1,2,3,5,10]
def distributed():
    
    start = time.time()

    node_values = list(np.random.randint(10000, size=5000))

    reversed_node_values = list(reversed(node_values))

    ring = Node(reversed_node_values[0], None)
    nodes = [ring]

    for node_value in reversed_node_values[1:]:
        node = Node(node_value, ring)
        ring = node
        nodes.append(node)
    nodes = list(reversed(nodes))
    nodes[-1].right = nodes[0]

    threads = []
    for idx, node in enumerate(nodes):
        node.prepare_work(len(nodes) - 1)
        t = threading.Thread(target=node.start_asking, args=(idx,))
        t.start()
        threads.append(t)
        
    shuffle(threads)
    for t in threads:
        t.join()

    for node in nodes:
        if node.active:
            print("max node is {}".format(node))

    print("max is actually: {}".format(max(node_values)))
    
    end = time.time()
    print("distributed took {}".format(end - start))
    
distributed()

max node is [9997 -> 1816]
max is actually: 9997
distributed took 6.68111777305603


In [33]:
# this is just a thread example in python

import threading
 
# global variable x
x = 0

def thread_task(lock):
    for _ in range(100000):
        lock.acquire()
        global x
        x += 1
        lock.release()
 
def main_task():
    global x
    x = 0
    lock = threading.Lock()
 
    t1 = threading.Thread(target=thread_task, args=(lock,))
    t2 = threading.Thread(target=thread_task, args=(lock,))
 
    t1.start()
    t2.start()
 
    t1.join()
    t2.join()
 
for i in range(10):
    main_task()
    print("Iteration {0}: x = {1}".format(i,x))


Iteration 0: x = 200000
Iteration 1: x = 200000
Iteration 2: x = 200000
Iteration 3: x = 200000
Iteration 4: x = 200000
Iteration 5: x = 200000
Iteration 6: x = 200000
Iteration 7: x = 200000
Iteration 8: x = 200000
Iteration 9: x = 200000
