In [1]:
import numpy as np

import copy

from collections import Counter, defaultdict
from itertools import chain, cycle
from functools import partial

In [2]:
sample_size = 5000
products = ["A", "B", "C", "D"]
totals = [10, 5, 15, 8]
# totals = [10, 10, 10, 10]
flush_lock = 21
transition_lock = 3

In [3]:
def generate_batches(batch_size, products, totals, sample_size):
    sizes = (np.ceil(np.array(totals) / np.min(totals)) * batch_size).astype(int).tolist()
    iterators = [[product]*size for product, size in zip(products, sizes)]
    base = cycle(chain(*iterators))
    return list(dict(zip(range(sample_size), base)).values())

In [4]:
# sequence 1
denominator = sum(totals)
sequence1 = np.random.choice(products, size=sample_size, p=[t/denominator for t in totals]).tolist()

# sequence 2
sequence2 = generate_batches(20, products, totals, sample_size)

# sequence 3
sequence3 = generate_batches(40, products, totals, sample_size)

In [5]:
def no_priority(machine_tuple, product=None):
    
    if product is None:
        raise RuntimeError("No product set")
    
    return machine_tuple[0]

def sum_of_product_priority(machine_tuple, product=None):
    
    if product is None:
        raise RuntimeError("No product set")
    
    return sum(machine_tuple[1].values())

def product_count_priority(machine_tuple, product=None):
    
    if product is None:
        raise RuntimeError("No product set")
    
    return machine_tuple[1][product]

def double_priority(machine_tuple, product=None):
    
    if product is None:
        raise RuntimeError("No product set")
    
    return (machine_tuple[1][product], sum(machine_tuple[1].values()))

In [6]:
def simulate(
    num_of_machines,
    sequence,
    products,
    totals,
    flush_lock,
    transition_lock,
    priority,
    reverse=True
):

    mix = dict(zip(products, totals))
    machines = {i: {p: 0 for p in products + ["lock"]} for i in range(num_of_machines)}
    flushes = {i: 0 for i in range(num_of_machines)}
    trash = []
    data = []

    for product in sequence:

        consumed = False
        for i, mach in sorted(machines.items(), key=partial(priority, product=product), reverse=reverse):
            
            # Machine is available to accept products
            machine_is_available = machines[i]["lock"] == 0
            
            # Machine has space to accept product
            product_fits = machines[i][product] < mix[product]
            
            # Complete cases for 3 binary variables
            if not consumed and machine_is_available and product_fits:
                machines[i][product] += 1
                machines[i]["lock"] = transition_lock
                consumed = True
            
            elif not consumed and machine_is_available and not product_fits:
                continue
    
            elif not consumed and not machine_is_available and product_fits:
                machines[i]["lock"] -= 1
    
            elif not consumed and not machine_is_available and not product_fits:
                machines[i]["lock"] -= 1
    
            elif consumed and machine_is_available and product_fits:
                continue
    
            elif consumed and machine_is_available and not product_fits:
                continue
    
            elif consumed and not machine_is_available and product_fits:
                machines[i]["lock"] -= 1
    
            elif consumed and not machine_is_available and not product_fits:
                machines[i]["lock"] -= 1
                
            
        # Trash un-consumed products
        if not consumed:
            trash.append(product)

        # Save the data
        data.append(copy.deepcopy(machines))
        
        
        # Flush ready machines
        for j in machines:
            if all(machines[j][p] == mix[p] for p in products):
                machines[j] = {p: 0 for p in products + ["lock"]}
                machines[j]["lock"] = flush_lock
                flushes[j] += 1
    
    return ({
        "trash": len(trash), 
        "total": len(sequence), 
        "consumed": len(sequence) - len(trash),
        "flushes": flushes
    }, data)

In [7]:
print(simulate(5, sequence1, products, totals, flush_lock, transition_lock, no_priority, reverse=False)[0])
print(simulate(5, sequence2, products, totals, flush_lock, transition_lock, no_priority, reverse=False)[0])
print(simulate(5, sequence3, products, totals, flush_lock, transition_lock, no_priority, reverse=False)[0])
print()
print(simulate(5, sequence1, products, totals, flush_lock, transition_lock, double_priority, reverse=True)[0])
print(simulate(5, sequence2, products, totals, flush_lock, transition_lock, double_priority, reverse=True)[0])
print(simulate(5, sequence3, products, totals, flush_lock, transition_lock, double_priority, reverse=True)[0])

{'trash': 628, 'total': 5000, 'consumed': 4372, 'flushes': {0: 26, 1: 26, 2: 24, 3: 23, 4: 13}}
{'trash': 561, 'total': 5000, 'consumed': 4439, 'flushes': {0: 28, 1: 28, 2: 27, 3: 27, 4: 4}}
{'trash': 1862, 'total': 5000, 'consumed': 3138, 'flushes': {0: 17, 1: 17, 2: 16, 3: 16, 4: 14}}

{'trash': 489, 'total': 5000, 'consumed': 4511, 'flushes': {0: 24, 1: 23, 2: 23, 3: 23, 4: 23}}
{'trash': 257, 'total': 5000, 'consumed': 4743, 'flushes': {0: 25, 1: 25, 2: 25, 3: 24, 4: 24}}
{'trash': 1765, 'total': 5000, 'consumed': 3235, 'flushes': {0: 17, 1: 17, 2: 17, 3: 17, 4: 16}}


In [8]:
sim = simulate(9, sequence1, products, totals, flush_lock, transition_lock, double_priority, reverse=True)
data = sim[1] 

In [10]:
data[0]

{0: {'A': 0, 'B': 0, 'C': 1, 'D': 0, 'lock': 3},
 1: {'A': 0, 'B': 0, 'C': 0, 'D': 0, 'lock': 0},
 2: {'A': 0, 'B': 0, 'C': 0, 'D': 0, 'lock': 0},
 3: {'A': 0, 'B': 0, 'C': 0, 'D': 0, 'lock': 0},
 4: {'A': 0, 'B': 0, 'C': 0, 'D': 0, 'lock': 0},
 5: {'A': 0, 'B': 0, 'C': 0, 'D': 0, 'lock': 0},
 6: {'A': 0, 'B': 0, 'C': 0, 'D': 0, 'lock': 0},
 7: {'A': 0, 'B': 0, 'C': 0, 'D': 0, 'lock': 0},
 8: {'A': 0, 'B': 0, 'C': 0, 'D': 0, 'lock': 0}}

In [None]:
import matplotlib.pyplot as plt
from matplotlib import animation
import matplotlib.colors as mcolors

COLORS = list(mcolors.TABLEAU_COLORS.keys())

fig, axes = plt.subplots(3,3, figsize=(16, 9))

fig.tight_layout(pad=3.0)

n = len(data)

axes = axes.ravel()

bars = []
for a, ax in enumerate(axes):
    bar = ax.bar(data[0][a].keys(), data[0][a].values(), color=COLORS[:len(data[0][a])])
    bars.append(bar)

def annotate_axis(ax, bar, title=None, xlabel=None):
    for r, rect in enumerate(bar[:-1]):
        height = totals[r]
        ax.annotate('{}'.format(height),
            xy=(rect.get_x() + rect.get_width() / 2, height),
            arrowprops=dict(facecolor='black', arrowstyle="simple"),
            xytext=(0, 15),
            textcoords="offset points",
            ha='center',
            va='bottom'
        )
        ax.set_title(title)
        if xlabel:
            ax.set_xlabel("Products")

for a, (ax, bar) in enumerate(zip(axes, bars)):
    annotate_axis(ax, bar, title=f"Machine {a}", xlabel=False)

for ax in axes:
    ax.set_ylim(0, 21)

def animate(i):
    for machine, mix in data[i].items():
        for i, b in enumerate(bars[machine]):
            b.set_height(list(mix.values())[i])
    
ani = animation.FuncAnimation(fig, animate, repeat=False, blit=False,frames=n, interval=200)

f = r"animation.gif" 
writergif = animation.PillowWriter(fps=15) 
ani.save(f, writer=writergif)

# from IPython.display import HTML
# display(HTML(ani.to_jshtml()))
# plt.close()