# Python 3.9 StatsProfile

Progrmmatically access and analyze the *Stats* profling data of your python application.

You can read [this medium](https://medium.com/p/9dd6847eb802/) article for a more in depth discussion of the change.

In [None]:
#@title Manually import the StatsProfile Change

#@markdown The [change](https://github.com/python/cpython/pull/15495/files) that introduces `Stats.get_stats_profile` will only be available in Python 3.9, so we need to import manually import the code for now with some minor modifications so it's usable in the notebook.

#@markdown *Note: You can expand this cell to view the code.*

from pstats import func_std_string, f8
from dataclasses import dataclass
from typing import Dict

@dataclass(unsafe_hash=True)
class FunctionProfile:
    ncalls: int
    tottime: float
    percall_tottime: float
    cumtime: float
    percall_cumtime: float
    file_name: str
    line_number: int

@dataclass(unsafe_hash=True)
class StatsProfile:
    '''Class for keeping track of an item in inventory.'''
    total_tt: float
    func_profiles: Dict[str, FunctionProfile]
        
def get_stats_profile(stats):
    """This method returns an instance of StatsProfile, which contains a mapping
    of function names to instances of FunctionProfile. Each FunctionProfile
    instance holds information related to the function's profile such as how
    long the function took to run, how many times it was called, etc...
    """
    func_list = stats.fcn_list[:] if stats.fcn_list else list(stats.stats.keys())
    if not func_list:
        return StatsProfile(0, {})

    total_tt = float(f8(stats.total_tt))
    func_profiles = {}
    stats_profile = StatsProfile(total_tt, func_profiles)

    for func in func_list:
        cc, nc, tt, ct, callers = stats.stats[func]
        file_name, line_number, func_name = func
        ncalls = str(nc) if nc == cc else (str(nc) + '/' + str(cc))
        tottime = float(f8(tt))
        percall_tottime = -1 if nc == 0 else float(f8(tt/nc))
        cumtime = float(f8(ct))
        percall_cumtime = -1 if cc == 0 else float(f8(ct/cc))
        func_profile = FunctionProfile(
            ncalls,
            tottime, # time spent in this function alone
            percall_tottime,
            cumtime, # time spent in the function plus all functions that this function called,
            percall_cumtime,
            file_name,
            line_number
        )
        func_profiles[func_name] = func_profile

    return stats_profile

In [None]:
#@title Log Generation

#@markdown Timestamped logging is simulated by appending *(timestamp, stats_profile)* tuples to *timestamped_stats_profile* over several different iterations.

sleep1_time = 0.1  #@param {type: "number"}
sleep2_time = 0.2  #@param {type: "number"}
sleep3_time = 0.3  #@param {type: "number"}
num_terations = 10  #@param {type: "number"}

import cProfile, pstats
import time
from random import randint

START_TIME = int(time.time())

timestamped_stats_profiles = []

def sleep1():
    time.sleep(sleep1_time)

def sleep2():
    time.sleep(sleep2_time)
    
def sleep3():
    time.sleep(sleep3_time)

for _ in range(num_terations):
    pr = cProfile.Profile()
    pr.enable()

    for _ in range(randint(1, 10)):
        sleep1()

    for _ in range(randint(1, 10)):
        sleep2()

    for _ in range(randint(1, 10)):
        sleep3()        

    pr.create_stats()
    ps = pstats.Stats(pr)
    
    stats_profile = get_stats_profile(ps)
    timestamped_stats_profiles.append((int(time.time()), stats_profile))

In [None]:
#@title Log aggregation

#@markdown The process of aggregating timesliced data will probably be specific to the logging/visualization platform you use (DataDog, Sumo Logic, Elasticsearch, etc...). Below is just a naive python solution to bucket and and aggregate cumulative runtimes.

# Aggregate logs every 10 seconds
time_slice = 10  #@param {type: "number"}

from collections import Counter
import itertools

def time_to_bucket(time):
    return (time-START_TIME) // time_slice

def bucket_to_time(bucket):
    return bucket * time_slice + START_TIME

time_sliced_counters = []
headers = set()

for bucket, grp in itertools.groupby(timestamped_stats_profiles,key=lambda timestamped_stats_profile:time_to_bucket(timestamped_stats_profile[0])):
    time_slice_counter = Counter()
    for (timestamp, stats_profile) in grp:
        for f_name, f_profile in stats_profile.func_profiles.items():
            f_name = f_name.lstrip("_") # matplotlib can't allow legend values to start with an underscore
            headers.add(f_name)
            time_slice_counter[f_name] += f_profile.cumtime
    time_sliced_counters.append((bucket_to_time(bucket), time_slice_counter))

In [None]:
#@title Results Visualization

#@markdown Use a stacked bar graph to visualize the cumtime of each python call over the timeslice specified above.

bar_width = 0.4  #@param {type: "number"}

import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.ticker import FormatStrFormatter

ind = np.arange(len(time_sliced_counters))

x_axis = tuple(time for (time, c) in time_sliced_counters)
y_axis = [[] for _ in range(len(headers))]
for idx, header in enumerate(headers):
    for (time, counter) in time_sliced_counters:
        y_axis[idx].append(counter[header])

fig = matplotlib.pyplot.gcf()
fig.set_size_inches(18.5, 10.5)  

boxes = []
titles = []
bottom = np.zeros(len(y_axis[0]))
for idx, header in enumerate(headers):
    p = plt.bar(ind, tuple(y_axis[idx]), bar_width, bottom=bottom)
    bottom += np.array(y_axis[idx])
    boxes.append(p[0])
    titles.append(header)

y_range = round(max(bottom) + 5)
plt.ylabel("Total time (s)", fontsize=12)
plt.yticks(np.arange(0, y_range, y_range/10), fontsize=12)
plt.gca().yaxis.set_major_formatter(FormatStrFormatter('%.2f'))

plt.xlabel('time (epoch)', fontsize=12)
plt.xticks(ind, x_axis, fontsize=12, rotation=45)

plt.title(f"Application profile at {time_slice}s time slices")
plt.legend(tuple(boxes), tuple(titles), fontsize=12, ncol=3, fancybox=True, loc='upper center', bbox_to_anchor=(0.5, -0.2), shadow=True)

plt.show()