# Dynamic Programming

In [None]:
import numpy as np

In [None]:
import functools

In [None]:
!pip install frozendict
from frozendict import frozendict

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting frozendict
  Downloading frozendict-2.3.4-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (110 kB)
[K     |████████████████████████████████| 110 kB 4.0 MB/s 
[?25hInstalling collected packages: frozendict
Successfully installed frozendict-2.3.4


In [None]:
from numpy import argmax

In [None]:
from copy import deepcopy

In [None]:
from math import ceil

In [None]:
from ipsum import lorem

In [None]:
from custom_types import *

In [None]:
from fractions import Fraction

In [11]:
from random import randint

# Memoization

In [10]:
def memoize(func):
    """Memoize function returns"""
    
    memo = {}
    hits = misses = 0
    
    def cache_info():
        return f'Memo(hits={hits},misses={misses})'
    
    def deep_freeze(item):
        if isinstance(item, tuple):
            return tuple(deep_freeze(i) for i in item)

        if isinstance(item, list):
            # ensure all items are hashable
            return tuple(deep_freeze(i) for i in item)

        if isinstance(item, dict):
            # deepfreeze
            return frozendict({deep_freeze(k): deep_freeze(v) for k,v in item.items()})

        # if we can hash, no problem
        if item.__hash__:
            return item

        if item.__str__:
            return str(item)

        if item.__repr__:
            return repr(item)

        raise Exception(f"Unable to hash item: {item}")

    
    @functools.wraps(func)
    def wrapper_memo(*args, **kwargs):
        nonlocal hits, misses, memo
        
        # memoize
        arg_hash = (deep_freeze(args), deep_freeze(kwargs))
        
        # check if in memo
        if arg_hash in memo:
            hits += 1
            return deepcopy(memo[arg_hash])
        
        # call function
        misses += 1
        # save to memo
        memo[arg_hash] = func(*args, **kwargs)
        
        return deepcopy(memo[arg_hash])
    
    # attach cache metadata
    wrapper_memo.cache_info = cache_info
        
    return wrapper_memo

### Generate Random Numbers

In [12]:
g = list(dict.fromkeys(np.random.randint(0, 25, 20)))

In [13]:
nums = [1, 4, 2, 8, 5, 7]
nums

[1, 4, 2, 8, 5, 7]

## Find Largest Ascending Subset

In [14]:
@memoize
def pick_biggest(numbers: list, root=True):
    # base case 1:
    if len(numbers) == 0:
        return 0, [[]]
    
    # base case 2:
    if len(numbers) == 1:
        return numbers[0], [[numbers[0]]]
    
    all_max = []
    for idx, num in enumerate(numbers):
        # subproblem : pick or don't pick
        
        # list which certainly includes num.
        # i[1] is the number itself | i[0] is the index (from enumerate)
        must_include = [i[1] for i in filter(lambda v: v[1] < num and v[0] < idx or
                                                       v[1] > num and v[0] > idx, enumerate(numbers))]
        # calc max number and subset
        max_with_num, max_with_num_subset = pick_biggest(must_include, root=False)
        # add with num
        max_with_num += num
        # add (and sort) subset
        for subset in max_with_num_subset:
            subset.append(num)
            subset.sort()
        
        # try without
        not_include = numbers[:idx] + numbers[idx+1:]
        max_not_num, max_not_num_subset = pick_biggest(not_include, root=False)
        
        # add max to list
        if (max_with_num == max_not_num):
            solution = [max_with_num, max_with_num_subset + max_not_num_subset]
        elif (max_with_num > max_not_num):
            solution = [max_with_num, max_with_num_subset]
        else:
            solution = [max_not_num, max_not_num_subset]
            
        if solution not in all_max:
            all_max.append(solution)
    
    highest_sum = max(all_max, key=lambda x: x[0])[0]
    # get all values which add to highest sum
    solutions = list(filter(lambda x: x[0] == highest_sum, all_max))
    # return working subsets
    all_subsets = []
    for solution in solutions:
        for subset in solution[1]: # [1] select subsets and not num
            all_subsets.append(subset)
            
    return highest_sum, all_subsets
        
    

In [15]:
pick_biggest(nums)

(17, [[1, 4, 5, 7]])

In [16]:
pick_biggest.cache_info()

'Memo(hits=309,misses=64)'

# Find Leading Numbers

In [None]:
@memoize
def find_leading_numbers(n: int):
    
    # base case:
    if (n == 1):
        return [[1]]
    
    # find solutions
    solutions = []
    # loop from 1 to n/2
    for i in range(1, ceil(n/2)+1):
        # recusively find solutions for each
        # component/complement
        component = find_leading_numbers(i)
        complement = find_leading_numbers(n-i)
        
        # add component/complement solutions
        # i.e. n=3, add (1,2)
        solutions.append([i,n-i])
        
        # for every way to write the component number
        for c in component:
            # for every way to write the complement number
            for cx in complement:
                # propose a new solution combining both solutions
                solution = [*c, *cx]
                # sort them (so we don't double count (1,2) and (2,1)
                solution.sort()
                
                # ensure we don't add an existing solution
                if solution not in solutions:
                    solutions.append(solution)
                
    return solutions

In [None]:
n = find_leading_numbers(8)
n

[[1, 7],
 [1, 1, 6],
 [1, 1, 1, 5],
 [1, 1, 1, 1, 4],
 [1, 1, 1, 1, 1, 3],
 [1, 1, 1, 1, 1, 1, 2],
 [1, 1, 1, 1, 1, 1, 1, 1],
 [1, 1, 1, 1, 2, 2],
 [1, 1, 1, 2, 3],
 [1, 1, 2, 4],
 [1, 1, 3, 3],
 [1, 2, 5],
 [1, 3, 4],
 [1, 1, 2, 2, 2],
 [2, 6],
 [3, 5],
 [1, 2, 2, 3],
 [4, 4],
 [2, 2, 2, 2]]

In [None]:
find_leading_numbers.cache_info()

'Memo(hits=31,misses=8)'

### Test Cases

In [None]:
def test_no_dups():
    n = find_leading_numbers(10)
    n_set = set(tuple(s) for s in n)
    assert len(n) == len(n_set), 'Some duplicates exist!'
test_no_dups()

In [None]:
def test_all_sum():
    n = 10
    nums = find_leading_numbers(n)
    n_sums = [sum(l) == n for l in nums]
    assert all(n_sums)
test_all_sum()

# Graph

### Problem Setup

In [None]:
class Vertex:
    counter: int = 0
    index: int
    value: int
    
    def __init__(self, value):
        Vertex.counter += 1
        self.index = Vertex.counter
        self.value = value
        
    def __repr__(self):
        return f'Vertex(index={self.index},value={self.value})'

In [None]:
Vertex(1),Vertex(2)

(Vertex(index=1,value=1), Vertex(index=2,value=2))

### Create Vertex List

In [None]:
g = [Vertex(i) for i in range(0,5)]
g

[Vertex(index=3,value=0),
 Vertex(index=4,value=1),
 Vertex(index=5,value=2),
 Vertex(index=6,value=3),
 Vertex(index=7,value=4)]

In [None]:
@memoize
def find_max_independent(graph: list):  
    # base case 1 : no graph returns no value
    if len(graph) == 0:
        return 0, []
    
    # base case 2 : one vertex returns it's own value
    if len(graph) == 1:
        return graph[0].value, graph
    
    # base case 3 : two vertices returns the higher value    
    if len(graph) == 2:
        if (graph[0].value > graph[1].value):
            return graph[0].value, [graph[0]]
        
        else:
            return graph[1].value, [graph[1]]

    vertex = graph[0]
        
    # try with : range: +2->end (plus item value)
    # find the max if we take vertex
    max_with, max_with_subset = find_max_independent(graph[2:])
    max_with+=vertex.value # add value of vertex
    max_with_subset.append(vertex) # add the vertex

    # try without : range: +1->end (no item value)
    # find the max if we don't take the vertex
    max_without, max_without_subset = find_max_independent(graph[1:])

    # determine which subset to choose
    max_subset = max_with_subset if max_with > max_without else max_without_subset

    # calculate the sum of our chosen set
    total_sum = sum(x.value for x in max_subset)
    
    # return the sum and the corresponding set
    return total_sum, max_subset

In [None]:
find_max_independent(g)

(6, [Vertex(index=7,value=4), Vertex(index=5,value=2)])

In [None]:
find_max_independent.cache_info()

'Memo(hits=2,misses=5)'

# High / Low Stress Jobs

In [None]:
job_values = [
    [10, 1, 10, 10], # low stress
    [5, 50, 5, 1], # high stress
]

In [None]:
@memoize
def best_job(pay: list, is_first_week=True) -> int:
    # base cases
    if (len(pay[0])) == 1:
        # Only 1 week left, complete it as low-paying
        return pay[0][0]
    
    if (len(pay[0])) == 2:
        # Only 2 weeks left, pick higher
        # from 2 low-pay or 1 high-pay
        return max(sum(pay[0]), pay[1][1])
    
    # add pay of low-job now. See what's possible with remaining weeks
    choose_low = best_job([pay[0][1:],pay[1][1:]], False) + pay[0][0]
    # take this week off, add pay of next weeks high job. See what's possible with remaining weeks
    choose_high = best_job([pay[0][2:],pay[1][2:]], False) + pay[1][1]
    # first week exception : we can take a high stress the first week if wanted
    if is_first_week is True:
        # determine if it's worth doing
        high_first = best_job([pay[0][1:],pay[1][1:]], False) + pay[1][0]
        choose_high = high_first if high_first > choose_high else choose_high
    
    return max(choose_low, choose_high)

In [None]:
best_job(job_values)

70

In [None]:
best_job.cache_info()

'Memo(hits=2,misses=4)'

# Text Justify

In [None]:
words[:10]

['Lorem',
 'ipsum',
 'dolor',
 'sit',
 'amet,',
 'consectetur',
 'adipiscing',
 'elit.',
 'Integer',
 'ac']

In [None]:
words = lorem.replace("\n", " ").split(" ")

In [None]:
MAX_LINE = 70

# Minimize : Overall sum of slack
def slack(words: list):
    # Calculate how long the words take up on the current line
    words_len = len(' '.join(words))
    if words_len > MAX_LINE: return Infinity()
    
    # Calculate the slack
    return (MAX_LINE - words_len)**2

@memoize
def justify(words: list, pointer = 0):
    # print(f'recursion: {words} | pointer: {pointer} | slack: {slack(words[:pointer+1])}')
    # Base case
    if len(words) == 1:
        return slack(words), []
    
    if len(words) <= pointer:
        return slack(words), []
    
    # Grab the first word
    current_words = words[:pointer+1]
    current_slack = slack(current_words)
    
    # base case : current slack == infinity
    if Infinity.isInfinity(current_slack):
        return Infinity(), []
    
    # Calculate if we do not split here
    slack_without_split, without_split_idxs = justify(words, pointer=pointer+1)
    # adjust idxs
    
    # Calculate if we do split here
    slack_with_split, with_split_idxs = justify(words[pointer+1:])
    slack_with_split += current_slack
    
    if slack_with_split < slack_without_split:
        return slack_with_split, [pointer+1] + with_split_idxs
    else:
        return slack_without_split, without_split_idxs

In [None]:
j = words
# j = ['a' for _ in range(0, 100)]
min_sum, cutoffs = justify(j)
iwords = iter(j)
print(f'cutoffs: {cutoffs}')

lines = []
for cutoff in cutoffs:
    # setup splits
    lines.append([next(iwords) for _ in range(0, cutoff)])
# exhaust the iterator
lines.append(list(iwords))
_ = [print(' '.join(line)) for line in lines]

cutoffs: [10, 9, 11, 10, 10, 10, 9, 11, 8, 10, 11, 10, 11, 11, 9, 11, 9, 10, 11, 11, 9, 11, 9, 10, 12, 12, 11]
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Integer ac
maximus ex. Curabitur dignissim, turpis nec ullamcorper posuere, arcu
lacus auctor dolor, a sollicitudin magna sem sit amet eros. Donec
tincidunt porta tincidunt. Nam dolor massa, posuere vel pretium et,
dignissim a justo. Donec tempus arcu vel ligula iaculis pharetra.
Aliquam erat volutpat. Integer placerat at arcu sit amet luctus.
Praesent quam dolor, venenatis quis sollicitudin nec, vulputate et
nisi. Fusce eu risus laoreet, gravida tortor id, gravida odio. 
Mauris condimentum suscipit lacus, nec volutpat tellus sollicitudin
vel. Sed ut commodo eros. Curabitur neque turpis, rutrum porta
ornare in, pulvinar vel mauris. Integer ut tempus nunc, id malesuada
est. Sed eget auctor nisl. Cras pretium fermentum fringilla. Donec
et urna tellus. Aenean sed odio id tortor luctus vulputate. Etiam
non metus et ex imperd

In [None]:
justify.cache_info()

'Memo(hits=2678,misses=3249)'

# Robot Detonations

In [None]:
robots_arrivals = [1, 10, 10, 1]

In [None]:
def emp_power(seconds: int):
    """
    The example provided in the book followed 2^(x-1)
    However, it could be easily adjusted to any other
    formula.
    """
    return 2**(seconds-1)

In [None]:
@memoize
def detonate(robots, emp_buildup=1):
    
    current_robot_arrivals = robots[0]
    
    # base case 1 : 1 robot arrival left
    if len(robots) == 1:
        return min(current_robot_arrivals, emp_power(emp_buildup)), [emp_buildup]
    
    # Compare whether it's better to detonate now or later
    detonate_now_robots, detonate_now_idxs = detonate(robots[1:])
    # add robots detonated now
    detonate_now_robots += min(current_robot_arrivals, emp_power(emp_buildup))
    
    # Detonate later, increment emp_buildup
    detonate_later_robots, detonate_later_idxs = detonate(robots[1:], emp_buildup=emp_buildup+1)
    
    if (detonate_now_robots > detonate_later_robots):
        return detonate_now_robots, [emp_buildup] + detonate_now_idxs
    else:
        return detonate_later_robots, detonate_later_idxs

In [None]:
robots_destroyed, detonations = detonate([1, 1, 1, 1, 1, 1, 30])
robots_destroyed, detonations

(31, [1, 6])

In [None]:
detonate.cache_info()

'Memo(hits=15,misses=28)'

In [None]:
[detonations[i] + (detonations[i-1] if i > 0 else 0) for i in range(0, len(detonations))]

[1, 7]

# Cyclic Stock Trading 

In [None]:
stocks = ['a', 'b', 'c', 'd', 'e', 'f', 'g']
stock_ratios = {
}

Generate some random equivalent stock ratios

In [None]:
def gen_ratios():
    for stock in stocks:
        for other_stock in stocks:
            # No ratio where i == j
            if stock == other_stock:
                continue
            
            from_ = randint(1, 1)
            to_ = randint(1, 1)
            
            stock_ratios[(stock, other_stock)] = Fraction(to_, from_)
            stock_ratios[(other_stock, stock)] = Fraction(from_,to_)
gen_ratios()
stock_ratios

{('a', 'b'): Fraction(1, 1),
 ('b', 'a'): Fraction(1, 1),
 ('a', 'c'): Fraction(1, 1),
 ('c', 'a'): Fraction(1, 1),
 ('a', 'd'): Fraction(1, 1),
 ('d', 'a'): Fraction(1, 1),
 ('a', 'e'): Fraction(1, 1),
 ('e', 'a'): Fraction(1, 1),
 ('a', 'f'): Fraction(1, 1),
 ('f', 'a'): Fraction(1, 1),
 ('a', 'g'): Fraction(1, 1),
 ('g', 'a'): Fraction(1, 1),
 ('b', 'c'): Fraction(1, 1),
 ('c', 'b'): Fraction(1, 1),
 ('b', 'd'): Fraction(1, 1),
 ('d', 'b'): Fraction(1, 1),
 ('b', 'e'): Fraction(1, 1),
 ('e', 'b'): Fraction(1, 1),
 ('b', 'f'): Fraction(1, 1),
 ('f', 'b'): Fraction(1, 1),
 ('b', 'g'): Fraction(1, 1),
 ('g', 'b'): Fraction(1, 1),
 ('c', 'd'): Fraction(1, 1),
 ('d', 'c'): Fraction(1, 1),
 ('c', 'e'): Fraction(1, 1),
 ('e', 'c'): Fraction(1, 1),
 ('c', 'f'): Fraction(1, 1),
 ('f', 'c'): Fraction(1, 1),
 ('c', 'g'): Fraction(1, 1),
 ('g', 'c'): Fraction(1, 1),
 ('d', 'e'): Fraction(1, 1),
 ('e', 'd'): Fraction(1, 1),
 ('d', 'f'): Fraction(1, 1),
 ('f', 'd'): Fraction(1, 1),
 ('d', 'g'): F

Adjust one of the ratios to add an opportunity cycle

In [None]:
key = ('c', 'a')
ratio = stock_ratios[key]
stock_ratios[key] = Fraction(r.denominator, ratio.numerator+1)
print(f'Switched {key} from {repr(ratio)} to {repr(stock_ratios[key])}')

NameError: name 'r' is not defined

In [None]:
def discover_opportunity_cycle(available_stocks):
    @memoize
    def detect_cycle(stock_pile, target_stock, previous_stock, current_product):
        # print(f'recursion: stock_pile={stock_pile} | target={target_stock} | previous={previous_stock} | current={current_product}')
        
        # calculate ratio if we trade back to the target
        target_ratio = (stock_ratios[(previous_stock, target_stock)] * current_product) \
                            if previous_stock != target_stock else 0
        
        # base case 1 : arrived at an opportunity cycle
        if target_ratio > 1:
            print(f'Found a cycle with stock: {previous_stock}')
            return target_ratio


        # base case 2 : no more stocks to trade, return ratio back to target
        if len(available_stocks) == 1:
            return target_ratio
               

        # last resort : try trading to another of the remaining stocks
        for candidate_stock_idx, candidate_stock in enumerate(stock_pile):
            # calculate product for this stock
            new_product_ratio = current_product * stock_ratios[(previous_stock, candidate_stock)]
            
            # copy stocks
            new_stock_pile = stock_pile.copy()

            # recurse
            detect_cycle(
                new_stock_pile[:candidate_stock_idx] + new_stock_pile[candidate_stock_idx+1:],
                target_stock,
                candidate_stock,
                new_product_ratio,
            )
                # , current_product=product, target_stock=target_stock)

#             # set the target stock for first iteration else inherit
#             target_stock = target_stock if target_stock is not None else current_stock

            # recurse       
    
    # Check every stock to see if we can get a cycle
    for stock in available_stocks:
        stock_pile = available_stocks.copy()
        stock_pile.remove(stock)
        detect_cycle(stock_pile, stock, stock, 1)
        # print(f'checked {stock}')
        
    return detect_cycle.cache_info()
    
    
    
    
    


In [None]:
discover_opportunity_cycle(stocks)

In [None]:
discover_opportunity_cycle.cache_info()