<a href="https://colab.research.google.com/github/CKaniklides/BSc-Thesis-Algorithms/blob/main/Parademo_ALL_ALGORITHMS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# ------------------------------------------------------------------
### APPENDIX B - FULL PYTHON CODE LISTINGS & UNIT TESTS ###
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# ------------------------------------------------------------------
## B1 - DELEGATION ROUTING ALGORITHM & UNIT TESTS ##
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# 1. Helper: add_delegation()
#    Inserts src → dst into the delegation map
#    Refuses the edge if it would create a cycle
# ------------------------------------------------------------------
def add_delegation(delegation, src, dst):
    cur = dst
    while cur is not None:
        if cur == src:
            raise ValueError(f"Cycle blocked: {src} → {dst}")
        cur = delegation.get(cur)
    delegation[src] = dst

# ------------------------------------------------------------------
# 2. Resolver: resolve_delegations()
#    Follows each chain to its terminal voter
#    Uses path compression so each edge is visited once
# ------------------------------------------------------------------
def resolve_delegations(delegation):
    final = {}                       # memoised “who really votes”

    def find(u):
        if u in final:
            return final[u]
        v = delegation.get(u)
        if v is None:                # u votes directly
            final[u] = u
            return u
        holder = find(v)             # recurse one hop
        final[u] = holder            # compress path
        return holder

    for u in delegation:             # resolve everyone who delegated
        find(u)
    return final

# ------------------------------------------------------------------
# 3. Verbose test-suite
# ------------------------------------------------------------------
import random, time

# a) Long chain  A→B→C→D→E
chain = {'A':'B','B':'C','C':'D','D':'E','E':None}
print("Test (a):", resolve_delegations(chain))
assert all(resolve_delegations(chain)[u] == 'E' for u in chain)

# b) Three branches converging to a single sink
branch = {'X':'M','Y':'M','Z':'M','M':'N','N':'O','O':None}
print("Test (b): sink →", set(resolve_delegations(branch).values()).pop())
assert set(resolve_delegations(branch).values()) == {'O'}

# c) Attempt to close a loop: D→A should be rejected
safe = {'A':'B','B':'C','C':'D','D':None}
try:
    add_delegation(safe, 'D', 'A')
except ValueError as e:
    print("Test (c):")
    print(e)

# d) Mixed graph: add S→Q (valid), block W→U (cycle)
mixed = {'P':'Q','Q':'R','R':None}
add_delegation(mixed, 'S', 'Q')          # merges into R
mixed.update({'U':'V','V':'W'})
try:
    add_delegation(mixed, 'W', 'U')      # cycle blocked
except ValueError as e:
    print(e)
print("Test (d):", resolve_delegations(mixed))

# e) Large random acyclic graph
users = [f'U{i}' for i in range(10000)]
big = {u: None for u in users}
for src in users:
    if random.random() < 0.5:
        idx = users.index(src)
        if idx:
            add_delegation(big, src, random.choice(users[:idx]))
t0 = time.time()
out_big = resolve_delegations(big)
print(f"Test (e): 10 000-node graph resolved in {time.time()-t0:.3f}s")

print("\nAll delegation-routing tests passed.")

Test (a): {'E': 'E', 'D': 'E', 'C': 'E', 'B': 'E', 'A': 'E'}
Test (b): sink → O
Test (c):
Cycle blocked: D → A
Cycle blocked: W → U
Test (d): {'R': 'R', 'Q': 'R', 'P': 'R', 'S': 'R', 'W': 'W', 'V': 'W', 'U': 'W'}
Test (e): 10 000-node graph resolved in 0.006s

All delegation-routing tests passed.


In [2]:
## B2 - VOTE TALLYING ALGORITHM & UNIT TESTS ##
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# 1. tally_votes(): sum token weights per option
# ------------------------------------------------------------------
def tally_votes(final_holders, votes):
    """
    final_holders : { user -> final_holder }
    votes         : { final_holder -> 'FOR' | 'AGAINST' | 'ABSTAIN' }
                    (missing key  => did not vote)
    returns       : dict with keys 'FOR', 'AGAINST', 'ABSTAIN', 'NOT_VOTED'
    """
    # count how many tokens each final holder controls
    weight = {}
    for user, holder in final_holders.items():
        weight[holder] = weight.get(holder, 0) + 1

    result = {'FOR': 0, 'AGAINST': 0, 'ABSTAIN': 0, 'NOT_VOTED': 0}
    for holder, w in weight.items():
        choice = votes.get(holder)
        if choice in ('FOR', 'AGAINST', 'ABSTAIN'):
            result[choice] += w
        else:                                    # holder failed to vote
            result['NOT_VOTED'] += w
    return result


# ------------------------------------------------------------------
# 2. Verbose test-suite
# ------------------------------------------------------------------
def pretty(res):
    return " | ".join(f"{k}:{v}" for k, v in res.items())

# (a) Simple no-delegation case
final_a = {'A':'A','B':'B','C':'C'}
votes_a = {'A':'FOR','B':'AGAINST','C':'ABSTAIN'}
print("Test (a): ", pretty(tally_votes(final_a, votes_a)))

# (b) Delegate C holds A & B and votes FOR
final_b = {'A':'C','B':'C','C':'C','D':'D'}
votes_b = {'C':'FOR','D':'AGAINST'}
print("Test (b): ", pretty(tally_votes(final_b, votes_b)))

# (c) Same as (b) but delegate C forgets to vote
votes_c = {'D':'AGAINST'}               # C missing
print("Test (c): ", pretty(tally_votes(final_b, votes_c)))

# (d) All tokens delegated to E; E votes ABSTAIN
final_d = {u:'E' for u in ['A','B','C','D','E']}
votes_d = {'E':'ABSTAIN'}
print("Test (d): ", pretty(tally_votes(final_d, votes_d)))

# (e) 1 000-token random sanity check
import random, time
users = [f'U{i}' for i in range(1000)]
final_e = {u: random.choice(users) for u in users}     # random delegates
votes_e = {u: random.choice(['FOR','AGAINST','ABSTAIN'])
           for u in random.sample(users, 600)}         # 60 % vote
t0 = time.time()
print("Test (e): ", pretty(tally_votes(final_e, votes_e)),
      f"(resolved in {time.time() - t0:.3f}s)")

print("\nAll vote-tally tests passed.")


Test (a):  FOR:1 | AGAINST:1 | ABSTAIN:1 | NOT_VOTED:0
Test (b):  FOR:3 | AGAINST:1 | ABSTAIN:0 | NOT_VOTED:0
Test (c):  FOR:0 | AGAINST:1 | ABSTAIN:0 | NOT_VOTED:3
Test (d):  FOR:0 | AGAINST:0 | ABSTAIN:5 | NOT_VOTED:0
Test (e):  FOR:215 | AGAINST:198 | ABSTAIN:218 | NOT_VOTED:369 (resolved in 0.000s)

All vote-tally tests passed.


In [3]:
## B3 - REPUTATION COMPUTATION ALGORITHM & UNIT TESTS ##
# ------------------------------------------------------------------
# ------------------------------------------------------------------
import random, time
# ------------------------------------------------------------------
# penalized_pagerank(): PageRank + Selective NOT_VOTED penalty
# ------------------------------------------------------------------
def penalized_pagerank(delegation, *, alpha = 0.15, tol = 1e-6, max_iter = 100,
                     penalty = 0.5, final_holders = None, votes = None):
    """
    delegation     : { user -> delegate OR None }  (out-degree ≤ 1)
    final_holders  : mapping user -> final_holder  (from Section 6.1)
    votes          : mapping final_holder -> choice  (those who voted)
    penalty        : fraction by which to reduce score for non-voting delegates
                    who held someone ELSE's token.  If final_holders or votes
                    are omitted, the function behaves like plain PageRank.
    returns        : { user -> reputation score }  (scores sum to 1.0)
    """

    # ----- PageRank core -----
    users = list(delegation.keys())
    N     = len(users)
    idx   = {u: i for i, u in enumerate(users)}
    # successor array: self-loop if sink
    succ  = [idx[u] for u in users]
    for u, v in delegation.items():
        if v is not None:
            succ[idx[u]] = idx[v]

    rank = [1.0/N] * N
    for _ in range(max_iter):
        new = [alpha / N] * N
        for j, nxt in enumerate(succ):
            new[nxt] += (1-alpha) * rank[j]
        if max(abs(a-b) for a, b in zip(rank, new)) < tol:
            break
        rank = new
    pr = {users[i]: rank[i] for i in range(N)}

    # ----- Selective penalty phase -----
    if final_holders is not None and votes is not None:
        # token counts per holder + external tokens received
        total_tok  = {}
        external   = {}
        for u, h in final_holders.items():
            total_tok[h]  = total_tok.get(h, 0) + 1
            if u != h:                          # token came from someone else
                external[h] = external.get(h, 0) + 1

        for h, score in pr.items():
            # penalise only if  (a) delegate missed vote AND (b) had externals
            if h not in votes and external.get(h, 0) > 0:
                frac = external[h] / total_tok[h]      # share of tokens lost
                pr[h] = score * (1 - penalty * frac)

        # renormalise
        s = sum(pr.values()) or 1
        for h in pr:
            pr[h] /= s

    return pr

# ------------------------------------------------------------------
# 3. Verbose test-suite
# (Assumes resolve_delegations is already defined in a previous cell)
# ------------------------------------------------------------------
def show_scores(title, scores):
    top = ", ".join(f"{u}:{s:.3f}" for u, s in
                    sorted(scores.items(), key=lambda x: -x[1])[:5])
    print(f"{title:<30} {top}")

# (a) Star graph: everyone votes
deleg_a = {'A':'D','B':'D','C':'D','D':None}
final_a = resolve_delegations(deleg_a)
votes_a = {'D':'FOR'}
print("Test (a) star:")
raw_a = penalized_pagerank(deleg_a)
pen_a = penalized_pagerank(deleg_a, final_holders=final_a, votes=votes_a)
show_scores("  before penalty:", raw_a)
show_scores("  after  penalty:", pen_a)

# (b) Chain: Z fails to vote
deleg_b = {'X':'Y','Y':'Z','Z':None}
final_b = resolve_delegations(deleg_b)
votes_b = {'Y':'FOR'}  # Z did not vote
print("\nTest (b) chain (and Z not voted):")
raw_b = penalized_pagerank(deleg_b)
pen_b = penalized_pagerank(deleg_b, final_holders=final_b, votes=votes_b)
show_scores("  before penalty:", raw_b)
show_scores("  after  penalty:", pen_b)

# (c) Mixed scenario: R fails to vote
deleg_c = {'P':'Q','Q':'R','R':None,'S':'Q','T':None}
final_c = resolve_delegations(deleg_c)
votes_c = {'Q':'FOR','T':'AGAINST'}  # R did not vote
print("\nTest (c) mixed (and R not voted):")
raw_c = penalized_pagerank(deleg_c)
pen_c = penalized_pagerank(deleg_c, final_holders=final_c, votes=votes_c)
show_scores("  before penalty:", raw_c)
show_scores("  after  penalty:", pen_c)

# (d) Random 10 000-node graph
users = [f'U{i}' for i in range(10000)]
deleg_d = {u: None for u in users}
for u in users:
    if random.random() < 0.5:
        idx = users.index(u)
        if idx:
            deleg_d[u] = random.choice(users[:idx])

# Resolve final holders and pick ~60% to vote “FOR”
final_d = resolve_delegations(deleg_d)
holders = list(set(final_d.values()))
votes_d = {h: 'FOR' for h in random.sample(holders, int(0.6 * len(holders)))}

# Measure and report only convergence time
t0 = time.time()
_ = penalized_pagerank(deleg_d, final_holders=final_d, votes=votes_d)
print(f"\nTest (d): 10 000-node graph resolved in {time.time()-t0:.3f}s")

print("\nAll reputation computaion tests passed.")

Test (a) star:
  before penalty:              D:0.888, A:0.037, B:0.037, C:0.037
  after  penalty:              D:0.888, A:0.037, B:0.037, C:0.037

Test (b) chain (and Z not voted):
  before penalty:              Z:0.857, Y:0.092, X:0.050
  after  penalty:              Z:0.800, Y:0.130, X:0.070

Test (c) mixed (and R not voted):
  before penalty:              R:0.659, T:0.200, Q:0.081, P:0.030, S:0.030
  after  penalty:              R:0.547, T:0.266, Q:0.108, P:0.040, S:0.040

Test (d): 10 000-node graph resolved in 0.037s

All reputation computaion tests passed.
