In [1]:
#!pip3 install git+http://github.com/aarjaneiro/parallelqueue@cythonized
#!pip3 install pymc3
#!pip3 install altair
#!pip3 install bokeh

In [3]:
%load_ext Cython


In [23]:
%%cython --annotate

import matplotlib.pyplot as plt
import numpy as np
from parallelqueue.base_models import *
from parallelqueue.monitors import *
from typing import *
from pandas import DataFrame
import os
import pickle as pkl
import multiprocessing as mp
import bokeh.plotting as bkp
import bokeh.io as bki
import bokeh.palettes as bkpal
from bokeh.models import Span, FuncTickFormatter
import gc

cdef int ncpus = mp.cpu_count()

class Concurrent:
    def __init__(self, float maxtime = 1000, float SArgs = 1, float AArgs = 2, int d = 2, int r = 2,
                 order=range(2, 1000, 2), int seed=123):
        self.rhoJSim = None
        self.r = r
        self.order = order
        self.d = d
        self.AArgs = AArgs
        self.SArgs = SArgs
        print(f"rho = {AArgs / SArgs}")
        self.maxtime = maxtime
        self._ts = None
        self.seed = seed

    def rhochange(self, int which = 2):
        self.order = range(2, 30, 1)
        labels = {self.rhoRSim: f"Redundancy({self.d})", self.rhoJSim: f"JSQ({self.d})",
                  self.rhoTSim: f"Thresh({self.d},{self.r})"}
        test = list(labels.keys())[which]
        print(f"Running {test}")
        results = self.ParallelSim(test)
        # i is {N, copies}
        self.res = pd.DataFrame({i[0]: i[1] for i in results}, index=[i[0] for i in results][::-1])  # have to reverse
        self.res.plot(kind="bar", legend=None)
        plt.title("Average Time in System for Varying rho and N")
        self.__init__()  # Re-init

    def WriteEach(self, int simrep=1, of=["TSim"], bint ts=True):
        """                   v rep within run
        res['Thresh(2,2)'][0][0].keys()
                           ^run (N-size)
        Out[9]: dict_keys(['ReplicaSets', 'TimeQueueSize'])"""
        self._ts = ts
        self._sims = simrep
        labels = {self.RSim: f"Redundancy({self.d})", self.JSim: f"JSQ({self.d})",
                  self.TSim: f"Thresh({self.d},{self.r})"}
        self.res = {}
        for sim in [self.__getattribute__(i) for i in of]:
            print(f"Running {sim}")
            self.res[labels[sim]] = self.ParallelSim(sim)  # Annoying to unpack ¯\_(ツ)_/¯

    def DoEach(self, of=["TSim"], iters=1):
        self._ts = False
        labels = {self.RSim: f"Redundancy({self.d})", self.JSim: f"JSQ({self.d})",
                  self.TSim: f"Thresh({self.d},{self.r})"}
        self.res = {}
        for sim in [self.__getattribute__(i) for i in of]:
            print(f"Running {sim}")
            results = self.ParallelSim(sim)
            self.res[labels[sim]] = pd.DataFrame(results)
        for k, v in self.res.items():
            plt.plot(v, label=k)
        plt.legend()
        if iters == 1:
            plt.title("Average Time in System as N → ∞")

    def RSim(self, int N):
        mons = [ReplicaClassCounts]
        testvalues = []
        for i in range(self._sims):
            _sim = RedundancyQueueSystem(maxTime=self.maxtime, parallelism=N, seed=self.seed + 2331 * N + i, d=self.d,
                                         Arrival=random.expovariate,
                                         AArgs=self.AArgs, Service=random.expovariate, SArgs=self.SArgs,
                                         Monitors=mons)
            _sim.RunSim()
            testvalues.append(_sim.MonitorOutput)
        if not self._ts:
            return np.array(testvalues)
        else:
            return np.mean(testvalues)

    def JSim(self, int N):
        mons = [ReplicaClassCounts]
        testvalues = []
        for i in range(self._sims):
            _sim = JSQd(maxTime=self.maxtime, parallelism=N, seed=self.seed + 2331 * N + i, d=self.d,
                        Arrival=random.expovariate,
                        AArgs=self.AArgs, Service=random.expovariate, SArgs=self.SArgs,
                        Monitors=mons)
            _sim.RunSim()
            testvalues.append(_sim.MonitorOutput)
        if not self._ts:
            return np.array(testvalues)
        else:
            return np.mean(testvalues)

    def TSim(self, int N):
        mons = [ReplicaClassCounts]
        testvalues = []
        for i in range(self._sims):
            _sim = RedundancyQueueSystem(maxTime=self.maxtime, parallelism=N, seed=self.seed + 2331 * N + i, d=self.d,
                                         r=self.r,
                                         Arrival=random.expovariate,
                                         AArgs=self.AArgs, Service=random.expovariate, SArgs=self.SArgs,
                                         Monitors=mons)
            _sim.RunSim()
            testvalues.append(_sim.MonitorOutput)
        if not self._ts:
            return np.array(testvalues)
        else:
            return np.mean(testvalues)

    def rhoTSim(self, int N):
        testvalues = []
        for i in range(2, 30, 1):
            _sim = RedundancyQueueSystem(maxTime=self.maxtime, parallelism=i, seed=self.seed + 2331 * N + i, d=self.d,
                                         r=self.r,
                                         Arrival=random.expovariate,
                                         AArgs=self.AArgs + (N / 10), Service=random.expovariate, SArgs=self.SArgs,
                                         Monitors=[ReplicaSets])
            print(f"testing rho = {self.AArgs + (N / 10)}@i={i}")
            _sim.RunSim()
            interim = pd.DataFrame(_sim.MonitorOutput["ReplicaSets"]).transpose()
            testvalues.append((pd.DataFrame(interim["exit"] - interim["entry"]).mean()).values)
        return [N / 10, [i[0] for i in testvalues]]

    def rhoRSim(self, int N):
        testvalues = []
        for i in range(2, 30, 1):
            _sim = RedundancyQueueSystem(maxTime=self.maxtime, parallelism=i, seed=self.seed + 2331 * N + i, d=self.d,
                                         Arrival=random.expovariate,
                                         AArgs=self.AArgs + (N / 10), Service=random.expovariate, SArgs=self.SArgs,
                                         Monitors=[ReplicaSets])
            print(f"testing rho = {self.AArgs + (N / 10)}@i={i}")
            _sim.RunSim()
            interim = pd.DataFrame(_sim.MonitorOutput["ReplicaSets"]).transpose()
            testvalues.append((pd.DataFrame(interim["exit"] - interim["entry"]).mean()).values)
        return [N / 10, [i[0] for i in testvalues]]

    def ParallelSim(self, sim):
        with mp.Pool(processes=ncpus) as p:
            res = p.map(sim, self.order)
        return res

    def Results(self):
        return self.res
    
def SafeRun(int maxtime=1000, float SArgs=1, float AArgs=2, int d=2, int r=2, order=range(2, 20, 2),
        str of="TSim", int seed=123, int simrep=1, bint ts=True):  # Throws out Concurrent when done
    run = Concurrent(maxtime, SArgs, AArgs, d, r, order, seed)
    run.WriteEach(of=[of], simrep=simrep, ts=ts)
    return run.Results()


In [6]:
import json as js
import gc


In [7]:
%%cython --annotate
import json as js
cpdef void fileMaker(list results,list ToOrder,str name="Tsim",int reps=30):
    """
    Serializes data into a dictionary --> JSON(str(Data))
    """
    cdef dict ret, subret, mret, m
    cdef int N, i
    ret = {}
    for N in ToOrder:
        subret = {}
        for i in range(reps):
            mret = {}
            try:
                # WARNING: k can be NaN => must be cast to string
                for k, v in results[N][i]["ReplicaClassCounts"].items():
                    mret[str(k)] = eval(v.to_json()) # so as not to disturb structure 
            except Exception as e: 
                print(f"{e}@{N}-{i}")
                pass
            subret[i] = mret
        ret[N] = subret
    with open(f"{name}.json", mode="w") as fp:
        js.dump(ret,fp)


In [8]:
# print('start')
# fileMaker(results,list(range(len(ToOrder))),"Tsim")
# print("to jsim")
# fileMaker(results2,list(range(len(ToOrder))), "Jsim")
# print("to rsim")
# fileMaker(results3,list(range(len(ToOrder))), "Rsim")

# with open("Tsim.json","r") as f:
#     test=js.loads(f.read())
#js.JSONEncoder().encode()
# fileMaker()
# to jsim
# to rsim
# CPU times: user 2min 18s, sys: 10.7 s, total: 2min 29s
# Wall time: 2min 33s


In [9]:
%%time

ToOrder = [25, 50, 100, 500, 1000]
results = SafeRun(of="TSim", order=ToOrder, maxtime=1000, AArgs=5, simrep=30, ts=False, seed=64)

results = results[list(results.keys())[0]]
print("Saving")
fileMaker(results,list(range(len(ToOrder))))

results2 = SafeRun(of="JSim", order=ToOrder, maxtime=1000, AArgs=5, simrep=30, ts=False, seed=1021)
# getPerN(2)[0]

# while True:  # cleans
#     arr3.remove({})

results2 = results2[list(results2.keys())[0]]

print("Saving")
fileMaker(results2,list(range(len(ToOrder))), "Jsim")


results3 = SafeRun(of="RSim", order=ToOrder, maxtime=1000, AArgs=5, simrep=30,ts=False, seed=1111)
results3 = results3[list(results3.keys())[0]]

print("Saving")
fileMaker(results3,list(range(len(ToOrder))), "Rsim")

# Saving
# CPU times: user 22min 30s, sys: 2min 5s, total: 24min 35s
# Wall time: 3h 42min 7s -> 36 mins, and now... CPU times: user 19min 34s, sys: 56.5 s, total: 20min 31s
# Wall time: 45min 34s

# CPU times: user 15min 34s, sys: 1min 32s, total: 17min 6s
# Wall time: 44min 31s

rho = 5.0
Running <bound method Concurrent.TSim of <__main__.Concurrent object at 0x7fad94b71eb8>>
Saving
rho = 5.0
Running <bound method Concurrent.JSim of <__main__.Concurrent object at 0x7faccc834eb8>>
Saving
rho = 5.0
Running <bound method Concurrent.RSim of <__main__.Concurrent object at 0x7fabc95fbef0>>
Saving
CPU times: user 17min 40s, sys: 1min 17s, total: 18min 58s
Wall time: 45min 6s


In [10]:
# %%time
# %%px --local
# CPU times: user 10min 15s, sys: 17.3 s, total: 10min 32s
# Wall time: 10min 54s
# gc.disable()
# ToOrder = [25, 50, 100, 500, 1000]
# try: type(results)
# except:
#     with open("Tsim3.pkl","rb") as p:
#         results = pkl.load(p)
# try: type(results2)
# except:
#     with open("Jsim3.pkl","rb") as p:
#         results2 = pkl.load(p)
# try: type(results3)
# except:
#     with open("Rsim3.pkl","rb") as p:
#         results3 = pkl.load(p)
# gc.enable()


In [11]:
import matplotlib.pyplot as plt
def getPerN(N, rep=0, result = results):
    return result[N][rep]["ReplicaClassCounts"]

def PrepPlot(nth, ToOrder=ToOrder, rep=0, result = results):
    N = list(ToOrder)[nth]
    vals = []
    for timeSlice in getPerN(nth, rep, result):
        try:
            zeroes = (N - len(timeSlice)) / N

            def Ecdf(m, zeroes=zeroes, slice=timeSlice):
                return zeroes + sum([1 for i in slice if i <= m]) / N

            vals.append(Ecdf)
        except:
            pass
    return vals


In [12]:
getPerN(0)

{0: choices
 (8, 1)    1
 Name: entry, dtype: int64,
 0.5608963966369629: choices
 (8, 1)     1
 (2, 20)    1
 Name: entry, dtype: int64,
 1.0386778712272644: choices
 (8, 1)        1
 (2, 20, 7)    2
 Name: entry, dtype: int64,
 1.0461418218910694: choices
 (8, 1)        1
 (2, 20, 7)    2
 (16, 12)      1
 Name: entry, dtype: int64,
 1.1031557135283947: choices
 (8, 1)        1
 (2, 20, 7)    2
 (16, 12)      1
 (0, 21)       1
 Name: entry, dtype: int64,
 1.1466121934354305: choices
 (8, 1)        1
 (2, 20, 7)    2
 (16, 12)      1
 (0, 21)       1
 (14, 6)       1
 Name: entry, dtype: int64,
 1.2774790860712528: choices
 (8, 1)        1
 (2, 20, 7)    2
 (16, 12)      1
 (0, 21)       1
 (14, 6)       1
 Name: entry, dtype: int64,
 1.2946128249168396: choices
 (8, 1)        1
 (2, 20, 7)    2
 (0, 21)       1
 (14, 6)       1
 Name: entry, dtype: int64,
 1.3269823752343655: choices
 (8, 1)     1
 (20, 7)    1
 (0, 21)    1
 (14, 6)    1
 Name: entry, dtype: int64,
 1.3341966532170

In [13]:
# from bokeh.resources import INLINE
# from bokeh.models import ColumnDataSource
# from bokeh.io import output_notebook
# output_notebook(INLINE)

In [14]:
# fig = bkp.figure(x_axis_label = "Time",y_axis_label='ECDF deviation', x_range = (0,1000))
# ev=0
# local = pd.DataFrame([1- j(ev) for j in PrepPlot(1,rep = i)] for i in range(30)).transpose()
# mid = local.mean(1)
# var = np.sqrt(local.var(1))
# a =pd.DataFrame(mid + 1.96*var/30, columns=["Upper CI"])
# b =pd.DataFrame(mid - 1.96*var/30, columns=["Lower CI"])
# source = pd.DataFrame(mid, columns=["Mean"])
# source["Upper CI"] = a
# source["Lower CI"] = b
# source = ColumnDataSource(source)

# fig.circle(y = "Upper CI", source=source)
# fig.circle(y = "Lower CI", source=source)
# fig.circle(y = "Mean", source=source)

# fig.title.text = "ECDF Supremum Difference From 1"
# bkp.save(fig,"dev.html")

In [15]:
# for ev in range(6):
#     local = pd.DataFrame([1- j(ev) for j in PrepPlot(1,rep = i, result=results3)] for i in range(30)).transpose()
#     mid = local.mean(1)
#     var = np.sqrt(local.var(1))
#     (mid + 1.96*var/30).plot(color=f"C{ev}", alpha=0.5)
#     (mid - 1.96*var/30).plot(color=f"C{ev}", alpha=0.5)
#     mid.plot(color=f"C{ev}")
    

In [16]:
# epsiloncheck = lambda x,ep: 1 if np.abs(x) > ep else 0
# local = pd.DataFrame([epsiloncheck(1- j(0), 0.09) for j in PrepPlot(1,rep = i, result=results)] for i in range(30)).transpose()
# mid = local.mean(1)
# var = np.sqrt(local.var(1))
# (mid + 1.96*var/30).plot(color=f"C{ev}", alpha=0.5)
# (mid - 1.96*var/30).plot(color=f"C{ev}", alpha=0.5)
# plt.plot([2*np.exp(-2*n*(0.09**2)) for n in range(mid.shape[0])])
# mid.plot(color=f"C{ev}")