In [1]:
%%writefile ss/mockendpoint.py

import math
import re
import pprint

import numpy

class MockEndpoint(object):
    def __init__(self, r, desired_span_count, weight=0):
        super(MockEndpoint, self).__init__()
        
        self.r = r
        self.weight = weight
        self.displayed_graph = None
        self.dbratio = 0.7

        self.desired_span_count = desired_span_count
        self.desired_depth = self.sub_sample(self.desired_span_count)
        self.desired_external_count = max(1,
            self.sub_sample(self.desired_span_count) - 1)

        self.base_graph = self.make_base_graph(
            self.desired_span_count,
            self.desired_depth)
        
        self.assign_externals(self.desired_external_count)
        self.total_spans = (self.actual_external_count
                            + self.sum_base_graph_nodes(
                               self.base_graph, include_links=True))
        
        self.pp = None
        
    def __repr__(self):
        self.pp = self.pp or pprint.PrettyPrinter(indent=4)

        infostring = "%s // %s %s // %s" % (
            self.weight,
            self.desired_span_count, self.total_base_nodes,
            self.desired_depth,
        )

        external_string = "%s // %s (%s/%s)" % (
            self.desired_external_count, self.actual_external_count,
            self.iinfo['ce'], self.iinfo['cd']
        )

        pps = self.pp.pformat(self.expanded_graph)
        return "%s\n%s\n%s" % (infostring, external_string, pps)

    def sub_sample(self, mean):
        if mean < 3:
            return 2
        return int(max(2, self.r.normal(math.log(mean) + 1, 0.5)))

    def allocate_spans(self, total_spans_remaining):
        if total_spans_remaining < 2:
            return [max(1, total_spans_remaining)]

        num_chunks = self.sub_sample(total_spans_remaining)
        allocation = numpy.abs(self.r.normal(0, 1, num_chunks))

        return [int(x)
            for x in numpy.rint(
                total_spans_remaining * allocation / sum(allocation)
            )]

    def make_base_graph(self, total_spans, total_layers):
        return self._make_base_graph(
            total_spans - 1, total_layers, total_layers)

    def _make_base_graph(self, total_spans_remaining, total_layers,
                         layers_remaining):
        if layers_remaining == 1:
            return [total_spans_remaining]

        allocation = self.allocate_spans(total_spans_remaining)
        ma = float(max(allocation))

        bonus = max(0, (
            layers_remaining - (total_layers-1)/2) / float(total_layers))
        deepening_probability = [bonus + x/ma for x in allocation]

        graph = []
        for i in range(0, len(allocation)):
            is_deepened = self.r.uniform(0, 1) < deepening_probability[i]
            
            if (allocation[i] > 2) and is_deepened:
                graph.append(
                    self._make_base_graph(
                        allocation[i] - 1,
                        total_layers,
                        layers_remaining - 1
                    ))
            else:
                graph.append(allocation[i])

        return graph

    def sum_base_graph_nodes(self, graph, include_links=False):
        total = 0

        for node in graph:
            if type(node) == list:
                total += self.sum_base_graph_nodes(
                    node, include_links=include_links)
                if include_links:
                    total += 1
            else:
                total += node

        return total

    def expand_graph_nodes(self, graph):
        expanded_graph = []

        for node in graph:
            if type(node) == list:
                expanded_graph.append(self.expand_graph_nodes(node))
            else:
                expanded_graph += ['node'] * node

        return expanded_graph

    def assign_externals(self, num_externals):
        self.total_base_nodes = self.sum_base_graph_nodes(self.base_graph)
        self.expanded_graph = self.expand_graph_nodes(self.base_graph)

        self.actual_external_count = min(
            int(self.total_base_nodes/2), 2*num_externals)
        external_locs = self.r.uniform(0, 1, self.total_base_nodes)

        aec = self.actual_external_count
        external_locs = numpy.argpartition(external_locs, -aec)[-aec:]

        self.iinfo = {x:0 for x in ['ci', 'cd', 'ce', 'cn']}
        self._assign_externals(external_locs, self.expanded_graph)

    def _assign_externals(self, external_locs, expanded_graph):
        for i in range(0, len(expanded_graph)):
            if expanded_graph[i] == 'node':
                if self.iinfo['ci'] in external_locs:
                    if self.r.uniform(0, 1) < self.dbratio:
                        expanded_graph[i] = 'dbcall' + str(self.iinfo['cd'])
                        self.iinfo['cd'] += 1

                    else:
                        expanded_graph[i] = 'external' + str(self.iinfo['ce'])
                        self.iinfo['ce'] += 1

                else:
                    expanded_graph[i] = 'node' + str(self.iinfo['cn'])
                    self.iinfo['cn'] += 1

                self.iinfo['ci'] += 1

            else:
                self._assign_externals(external_locs, expanded_graph[i])

Overwriting ss/mockendpoint.py


In [2]:
%%writefile ss/mockexecution.py

class MockExecution(object):
    def __init__(self, r, endpoint_index, completed, path_spans, priority):
        self.r = r

        self.endpoint_index = endpoint_index
        self.priority = priority
        self.completed = completed
        self.path_spans = path_spans
        self.partial = False

        if self.completed is False:
            self.path_spans = self.r.poisson(path_spans)

    def __repr__(self):
        return "%s %s %s %s %s" % (
            self.endpoint_index, self.priority,
            self.path_spans, self.partial, self.completed)

    def mark_partial(self, spans_not_fitting_in_reservoir):
        self.partial = True
        self.completed = False
        self.path_spans -= spans_not_fitting_in_reservoir

Overwriting ss/mockexecution.py


In [3]:
%%writefile ss/mockreservoir.py

class MockReservoir(object):
    def __init__(self, reservoir_size, sampledTrue_count, raw_results):
        self.reservoir_size = reservoir_size
        self.sampledTrue_count = sampledTrue_count
        self.raw_results = raw_results

        sorted_executions = sorted(
            raw_results,
            key=lambda x: x.priority,
            reverse=True)

        self.reservoir = []
        reservoir_space_remaining = self.reservoir_size
        for execution_path in sorted_executions:
            reservoir_space_remaining -= execution_path.path_spans
            self.reservoir.append(execution_path)

            if reservoir_space_remaining == 0:
                break

            elif reservoir_space_remaining < 0:
                execution_path.mark_partial(-reservoir_space_remaining)
                break

    def __repr__(self):
        total_spans_sent = sum([x.path_spans for x in self.reservoir])
        header = "**%s %s\n  " % (self.sampledTrue_count, total_spans_sent)
        return header + "\n  ".join([str(x) for x in self.reservoir])

Overwriting ss/mockreservoir.py


In [4]:
%%writefile ss/mockagent.py

import math
import numpy
import sys

from ss.mockendpoint import MockEndpoint
from ss.mockexecution import MockExecution
from ss.mockreservoir import MockReservoir

class MockAgent(object):
    def __init__(self, r, sampler,
                 reservoir_size=1000, bundle_span_counts=False): # seed=1234567
        self.r = r

        self.reservoir_size = reservoir_size
        self.sampler = sampler
        
        self.completion_threshold = 1.00

        self.num_endpoints = int(self.r.uniform(5, 11))
        self.weights = self.r.uniform(0, 1, self.num_endpoints)
        self.weights /= sum(self.weights)

        # for scipy: lognorm(s=1.22, loc=0, scale=14)
        if bundle_span_counts is False:
            expected_span_count = int(max(8, self.r.lognormal(math.log(14), 1.22)))
            span_sigma = math.sqrt(expected_span_count) * math.log(expected_span_count) / 2 
            path_span_counts = self.r.normal(expected_span_count, span_sigma, self.num_endpoints)

        else:
            path_span_counts = self.r.lognormal(math.log(14), 1.22, self.num_endpoints)

        self.path_span_counts = [max(8, int(x)) for x in numpy.rint(path_span_counts)]
        self.span_count_estimate = numpy.mean(self.path_span_counts)
        self.span_count_variation = numpy.std(self.path_span_counts, ddof=1)

        self.endpoint_paths = [
            MockEndpoint(self.r, self.path_span_counts[i], self.weights[i])
                for i in range(self.num_endpoints)]

        self.cum_weights = numpy.cumsum(self.weights)

    def __repr__(self):
        return "\n\n".join([
            str(x) for x in sorted(
                self.endpoint_paths,
                key=lambda x: x.weight,
                reverse=True)])
    
    def montecarlo_simulate(self, fan_in, num_harvests):
        percent_traced = []
        choice_hist = numpy.zeros(self.num_endpoints)
        target_trace_count = self.sampler.sampling_target * (fan_in + 1) * num_harvests
        
        total_trace_count = 0
        total_sampled_count = 0
        
        datasets = []
        while target_trace_count > 0:
            result = self.simulate_harvest(fan_in)
            target_trace_count -= result.sampledTrue_count
            
            complete_trace_count = 0

            for execution in result.reservoir:
                if execution.completed:
                    choice_hist[execution.endpoint_index] += 1
                    complete_trace_count += 1

            datasets.append((complete_trace_count, result.sampledTrue_count))
            total_trace_count += complete_trace_count
            total_sampled_count += result.sampledTrue_count
            
        #print("  estimated length:%d, traced:%d, sampled: %d" % (
        #    self.span_count_estimate, total_trace_count, total_sampled_count), file=sys.stderr)
        #for ds in datasets:
        #    print("    traced:%d, sampled: %d" % ds, file=sys.stderr)
        
        percent_traced = total_trace_count / total_sampled_count

        s = sum(choice_hist)
        if s > 0:
            choice_hist /= s
        error_hist = (choice_hist - self.weights) ** 2

        header = "  %d, %.3f/%.3f, %.3E" % (
            fan_in+1, percent_traced, 0.0, sum(error_hist))
        
        return header

    def simulate_harvest(self, fan_in):
        sampledTrue_count = self.sampler.generate_samples(fan_in)
        
        if sampledTrue_count == 0:
            return MockReservoir(self.reservoir_size, 0, [])

        endpoint_path_choices = self.r.uniform(0, 1, sampledTrue_count)
        priorities = self.r.uniform(0, 1, sampledTrue_count)

        executions = []
        for i in range(0, sampledTrue_count):
            choice = endpoint_path_choices[i]
            priority = priorities[i]

            chosen_index = sum(choice > self.cum_weights)
            path = self.endpoint_paths[chosen_index]

            e = MockExecution(
                self.r, chosen_index, True,
                path.total_spans, priority)

            executions.append(e)

        return MockReservoir(self.reservoir_size, sampledTrue_count, executions)
    

Overwriting ss/mockagent.py


In [16]:
%%writefile ss/simulate_degradation.py

import sys
import numpy

from ss.mockagent import MockAgent
from ss.mocksampler import MockSampler

num_runs = 1000
num_agents = 1000

#param_sets = [(100, 1), (100, 10), (1000, 10)]
param_sets = [(1000, 10)]

class UniformSampler(MockSampler):
    def _generate_samples(self, fan_in):
        return self.r.uniform(0, 2 * self.sampling_target, fan_in + 1)
        
class NormalSampler(MockSampler):
    def _generate_samples(self, fan_in):
        sigma = numpy.log(max(2, self.sampling_target))
        return self.r.normal(self.sampling_target, sigma, fan_in + 1)
        
def simulate_for_reservoir(num_runs, num_agents, reservoir_size,
                           sampling_target, sampler_class):
    
    r = numpy.random.RandomState(1234567)
    seeds = [int(x) for x in r.uniform(1, 999999999, num_agents)]
    sampler = sampler_class(r, sampling_target)
    
    out_filename = 'ss/degradation_results_%s_%d_%d.txt' % (
        sampler.name, reservoir_size, sampler.sampling_target)
    
    with open(out_filename, 'w') as outfile:
        #print(sampler.name, file=sys.stderr)
        
        for agent_id in range(num_agents):
            ir = numpy.random.RandomState(seeds[agent_id])
            
            ms = MockAgent(
                ir, sampler=sampler,
                reservoir_size=reservoir_size)

            outfile.write("for agent with %d endpoints with mean/std length %.3f/%.3f:" % (
                ms.num_endpoints, ms.span_count_estimate, ms.span_count_variation))
            outfile.write("\n")

            for fan_in in range(10):
                result = ms.montecarlo_simulate(fan_in, num_runs)
                outfile.write(result)
                outfile.write("\n")

            outfile.write("\n")
            if (agent_id % 10) == 9:
                print("For sampler %s, reservoir size %d, and target %d, done with run %d" % (
                    sampler.name, reservoir_size,
                    sampler.sampling_target, agent_id), file=sys.stderr)
                
        print("", file=sys.stderr)

for sampler_class in [UniformSampler, NormalSampler]:
    for params in param_sets:
        simulate_for_reservoir(num_runs, num_agents, params[0], params[1], sampler_class)

        

Overwriting ss/simulate_degradation.py


In [17]:
%run ss/simulate_degradation.py


For sampler uniform, reservoir size 1000, and target 10, done with run 9
For sampler uniform, reservoir size 1000, and target 10, done with run 19
For sampler uniform, reservoir size 1000, and target 10, done with run 29
For sampler uniform, reservoir size 1000, and target 10, done with run 39
For sampler uniform, reservoir size 1000, and target 10, done with run 49
For sampler uniform, reservoir size 1000, and target 10, done with run 59
For sampler uniform, reservoir size 1000, and target 10, done with run 69
For sampler uniform, reservoir size 1000, and target 10, done with run 79
For sampler uniform, reservoir size 1000, and target 10, done with run 89
For sampler uniform, reservoir size 1000, and target 10, done with run 99
For sampler uniform, reservoir size 1000, and target 10, done with run 109
For sampler uniform, reservoir size 1000, and target 10, done with run 119
For sampler uniform, reservoir size 1000, and target 10, done with run 129
For sampler uniform, reservoir size 

For sampler normal, reservoir size 1000, and target 10, done with run 109
For sampler normal, reservoir size 1000, and target 10, done with run 119
For sampler normal, reservoir size 1000, and target 10, done with run 129
For sampler normal, reservoir size 1000, and target 10, done with run 139
For sampler normal, reservoir size 1000, and target 10, done with run 149
For sampler normal, reservoir size 1000, and target 10, done with run 159
For sampler normal, reservoir size 1000, and target 10, done with run 169
For sampler normal, reservoir size 1000, and target 10, done with run 179
For sampler normal, reservoir size 1000, and target 10, done with run 189
For sampler normal, reservoir size 1000, and target 10, done with run 199
For sampler normal, reservoir size 1000, and target 10, done with run 209
For sampler normal, reservoir size 1000, and target 10, done with run 219
For sampler normal, reservoir size 1000, and target 10, done with run 229
For sampler normal, reservoir size 100

In [7]:
%%writefile ss/mocktrace.py

import pprint
import functools
import numpy

from ss.mockspan import MockSpan

import os
os.environ["PATH"] += os.pathsep + '/usr/local/Cellar/graphviz/2.40.1/bin/'

from graphviz import Digraph
from IPython.display import SVG

class MockTrace(object):
    important_types = ['root', 'external', 'dbcall', 'long_generics']
    
    def __init__(self, r, endpoint):
        self.r = r
        self.expanded_graph = endpoint.expanded_graph

        self.spans_by_name = {
            'root': MockSpan(0, 'root', None)
        }

        self.spans_by_type = {
            'root': set(['root']),
            'node': set(),
            'call': set(),
            'external': set(),
            'dbcall': set(),
            'long_generics': set(),
            'relevant': set()
        }

        self.gvid = 1
        self.fcount = 0
        self._convert_expanded_graph(self.spans_by_name['root'], self.expanded_graph)
        
        self.spans_by_category = {
            'unimportant': set(),
            'relevant': set(),
            'important': functools.reduce(
                lambda x, y: x | y, [
                self.spans_by_type[z] for z in MockTrace.important_types
            ])
        }

        self.pp = None
        self.displayed_graph = None
        
    def _convert_expanded_graph(self, current_parent, expanded_graph):
        for i in range(0, len(expanded_graph)):
            span = expanded_graph[i]
            name = span
            if type(span) != str:
                name = 'call' + str(self.fcount)
                self.fcount += 1

            s = MockSpan(self.gvid, name, current_parent)
            self.spans_by_name[name] = s
            self.spans_by_type[s.node_type].add(name)
            if s.long_generic:
                self.spans_by_type['long_generics'].add(name)

            self.gvid += 1

            if type(span) != str:
                self._convert_expanded_graph(self.spans_by_name[name], span)
        
    def count_spans(self):
        return self.spans_by_name['root'].count_spans()
    
    def count_nonprunable_spans(self):
        return self.spans_by_name['root'].count_nonprunable_spans()

    def mark_prunable_spans(self):
        self.spans_by_name['root'].mark_prunable_spans()
        self.spans_by_name['root'].mark_color = 'Green'
        
    #############################################################
    #   
    # first, assign 1s:
    #   dbs and externals get a 1
    #   "long segments" get a 1
    #   root gets a 1
    # 
    # next, assign 0s:
    #   non-long, non-db/ext leaves get 0
    #
    # next, count total spans, and then subtract off all the 1s so far
    # then count spans without a number so far
    # remaining spans will get an avg score of (remainders)/(total - 1s)
    # split that avg into 3 parts:
    #    a. 1/3 gifted directly
    #    b. 2/3 distributed based on which spans have the most '1s' children
    #

    def calculate_importance(self):
        self.mark_prunable_spans()

        total = self.spans_by_name['root'].count_spans()
        total_1s = len(self.spans_by_category['important'])

        total_0s = 0
        for span in self.spans_by_name.values():
            if span.is_unimportant:
                self.spans_by_category['unimportant'].add(span.name)
                total_0s += 1

        total_unassigned = total - total_1s - total_0s
        total_to_assign = float(total_unassigned / 2)
        total_to_distribute = (2.0/3.0) * total_to_assign
        
        if total_unassigned == 0:
            return
        
        baseline = (1.0/3.0) * total_to_assign / total_unassigned

        cic = 0 # cumulative_important_children
        weight_index = {}
        for name in self.spans_by_name:
            span = self.spans_by_name[name]
            if span.is_unimportant or span.is_important:
                continue

            ic = span.count_important_children()
            cic += ic
            weight_index[name] = ic

        sorted_order = sorted(
            weight_index.keys(),
            key=lambda x: weight_index[x],
            reverse=True)

        for name in sorted_order:
            self.spans_by_category['relevant'].add(name)
            span = self.spans_by_name[name]

            if cic > 0:
                fraction_share = weight_index[name] / cic
                weight_share = min(1 - baseline, fraction_share * total_to_distribute)

                cic -= weight_index[name]
                total_to_distribute -= weight_share
                span.importance_score = weight_share + baseline

    def display_graph(self, display_raw=False, show_importance=False):
        if display_raw:
            print("RAW FORM:\n")
            self.pp = self.pp or pprint.PrettyPrinter(indent=4)
            self.pp.pprint(self.expanded_graph)
        
        self.displayed_graph = Digraph()
        self.displayed_graph.attr(rankdir='LR', scale='0.5')

        self.spans_by_name['root'].display_span(self.displayed_graph, show_importance)
        display(SVG(self.displayed_graph.pipe(format='svg')))
        
    #############################################################
    
    def reset_seen_counts(self):
        for name in self.spans_by_name:
            self.spans_by_name[name].reset_seen_count()
    
    def simulate_secondary_sort(self, harvests, fraction_captured, random_factor):
        self.reset_seen_counts()
        
        finish_times = {}
        traces = {
            'full': set(self.spans_by_name.keys()),
            'basic': set(self.spans_by_category['important'])
        }
        traces['relevant'] = traces['basic'] | set(self.spans_by_category['relevant'])
        
        for i in range(0, harvests):
            if len(finish_times) == 3:
                return finish_times

            seen = self.simulate_harvest_clipping(fraction_captured, random_factor)

            for trace in traces:
                if trace in finish_times:
                    continue

                traces[trace] -= seen
                if len(traces[trace]) == 0:
                    finish_times[trace] = i

        for trace in traces:
            if trace not in finish_times:
                finish_times[trace] = harvests + 1

        return finish_times

    def simulate_harvest_clipping(self, fraction_captured, random_factor):
        sorted_spans = self.sort_spans_for_clipping(random_factor)
        num_captured = int(numpy.rint(fraction_captured * self.count_spans()))

        spans_captured = sorted_spans[0:num_captured]
        for name in spans_captured:
            self.spans_by_name[name].mark_seen()

        return set(spans_captured)

    def sort_spans_for_clipping(self, random_factor):
        ordered_spans = [
            (name, 1 + random_factor * self.r.uniform(0, 1))
                for name in self.spans_by_category['important']]

        for category in ['relevant', 'unimportant']:
            for name in self.spans_by_category[category]:
                span = self.spans_by_name[name]
                score = span.importance_score + random_factor * self.r.uniform(0, 1)
                ordered_spans.append((name, score))

        sorted_spans = sorted(ordered_spans, key=lambda x: x[1], reverse=True)
        return [y[0] for y in sorted_spans]

Overwriting ss/mocktrace.py


In [8]:
%%writefile ss/mockspan.py

import re

class MockSpan(object):
    def __init__(self, gvid, name, parent):
        self.gvid = str(gvid)
        self.name = name
        self.node_type = re.sub(r'\d', '' , name)
        self.parent = parent

        self.children = []
        self.mark_color = None
        self.times_seen = 0
        self.long_generic = False

        self.importance_score = 0
        if (self.node_type in ['root', 'dbcall', 'external']) or (self.long_generic):
            self.importance_score = 1
        
        if parent is not None:
            self.parent.add_child(self)

    @property
    def is_important(self):
        return self.importance_score == 1

    @property
    def is_unimportant(self):
        if ((self.mark_color == 'Red')
            or ((self.node_type == 'node')
                and (len(self.children) == 0)
                and (self.long_generic is False))):

            self.importance_score = 0
            return True

        return False

    def reset_seen_count(self):
        self.times_seen = 0
        
    def add_child(self, child):
        self.children.append(child)
        
    def count_spans(self):
        return 1 + sum([c.count_spans() for c in self.children])
        
    def count_important_children(self):
        initial = int(self.importance_score == 1)
        return initial + sum([c.count_important_children() for c in self.children])

    def count_nonprunable_spans(self):
        initial = int(self.mark_color == 'Green')
        return initial + sum([c.count_nonprunable_spans() for c in self.children])
        
    def mark_prunable_spans(self):
        if self.mark_color is not None:
            return self.mark_color

        if len(self.children) == 0:
            self.mark_color = 'Green'
            return
    
        all_green = True
        for child in self.children:
            child.mark_prunable_spans()
            if child.mark_color != 'Green':
                all_green = False

        self.mark_color = 'Red' if all_green is True else 'Green'

    def mark_seen(self):
        self.times_seen += 1
        
    #############################################################

    span_colors = {
        'root': '#FF00FF',
        'external': '#FF0000',
        'dbcall': '#00FF00',
        'call': '#666666',
        'node': '#000000',
        'Green': '#ceffe7',
        'Red': '#ffd6ce'
    }

    @property
    def fontcolor(self):
        return MockSpan.span_colors[self.node_type]

    @property
    def fillcolor(self):
        if self.mark_color is None:
            return '#ffffff'
        return MockSpan.span_colors[self.mark_color]

    def display_span(self, displayed_graph, show_importance):
        ic = ''
        if show_importance:
            ic = ' / %.3f' % self.importance_score
        
        displayed_graph.node(self.gvid,
                             self.name + ic,
                             style='filled',
                             fillcolor=self.fillcolor,
                             fontcolor=self.fontcolor)

        if self.parent is not None:
            displayed_graph.edge(self.parent.gvid, self.gvid)

        for child in self.children:
            child.display_span(displayed_graph, show_importance)

Overwriting ss/mockspan.py
