In [1]:
#export
"""
This module is for color formats, units and whatnot. This is exposed
automatically with::

   from k1lib.imports import *
   fmt.txt # exposed
"""
import k1lib, math; from k1lib import cli
from typing import Dict, Iterator, Tuple
pygments = k1lib.dep("pygments")
__all__ = ["generic", "metricPrefixes", "size", "fromSize", "sizeOf",
           "comp", "compRate", "time", "item", "throughput", "txt", "code", "h", "pre", "row", "col", "colors"]

In [2]:
#export
k1lib.settings.add("fmt", k1lib.Settings().add("separator", True, "whether to have a space between the number and the unit"), "from k1lib.fmt module");
settings = k1lib.settings.fmt
metricPrefixes = {-8:"y",-7:"z",-6:"a",-5:"f",-4:"p",-3:"n",-2:"u",-1:"m",0:"",1:"k",2:"M",3:"G",4:"T",5:"P",6:"E",7:"Z",8:"Y"}
#metricPrefixes = ["", "k", "M", "G", "T", "P", "E", "Z", "Y"]
def generic(x, units:Dict[int, str]):
    c = " " if settings.separator else ""
    for i, unit in units.items():
        upperBound = 1000 * 1000**i
        if abs(x) < upperBound:
            return f"{round(1e3*x/upperBound, 2)}{c}{unit}"
    return (f"{round(1e3*x/upperBound, 2)}{c}{unit}").strip()

In [12]:
#export
sizes = {i: f"{p}B" for i, p in metricPrefixes.items() if i >= 0}; #sizes[0] = "bytes"
def size(_bytes=0):
    """Formats disk size.
Example::

    # returns "50.0 bytes"
    fmt.size(50)
    # returns "12.0 MB"
    fmt.size(1.2e7)
"""
    return generic(_bytes, sizes)
sizeInv = {"k": 1e3, "m": 1e6, "g": 1e9, "t": 1e12, "p": 1e15, "e": 1e18, "z": 1e21, "y": 1e24}
def fromSize(s:str) -> int: # this is ugly I know, doesn't fit well into others. But making a generalized version seems hard, and I just need this right now
    """Grabs size from string representation.
Example::

    fromSize("31.5k") # returns 31500
    fromSize("31.5kB") # also returns 31500
"""
    s = s.lower().replace(" ", "").rstrip("b"); ch = s[-1]
    if ch in sizeInv: return int(float(s[:-1])*sizeInv[ch])
    return int(s)

In [13]:
assert size(50) == "50.0 B"
assert size(1.2e7) == "12.0 MB"
assert fromSize("31.5k") == 31500
assert fromSize("31.5kB") == 31500

In [5]:
#export
def sizeOf(l:Iterator[float]) -> Tuple[str, Iterator[float]]:
    """Figures out appropriate scale, scales back the Iterator, and return both.
Example::

    x = torch.abs(torch.randn(2)) * 1e4 + 1e5
    label, t = fmt.sizeOf(x) # label is "kB"
    (t | toTensor()).min() # min value should be close to 100"""
    l = list(l | cli.apply(lambda n: abs(n)))
    v = l | cli.toMax()
    v = math.log10(v) if v > 0 else -math.log10(-v)
    idx = math.floor(v/3)
    coef = 1.0/1000**idx
    return sizes[idx], l | cli.apply(lambda x: x * coef) | cli.deref()

In [6]:
import torch
x = torch.abs(torch.randn(2)) * 1e4 + 1e5
label, i = sizeOf(x); assert label == "kB"
assert 50 < (i | cli.toTensor()).min() < 150

In [7]:
#export
computations = {i: f"{p}FLOPs" for i, p in metricPrefixes.items() if i >= 0}
def comp(flop=0):
    """Formats computation amount.
Example::

    # returns "50.0 FLOPs"
    fmt.computation(50)
    # returns "50.0 MFLOPs"
    fmt.computation(5e7)
"""
    return generic(flop, computations)

In [8]:
assert comp(50) == "50.0 FLOPs"
assert comp(5e7) == "50.0 MFLOPs"
assert comp(1e30) == "1000000.0 YFLOPs"

In [9]:
#export
computationRates = {i: f"{p}FLOPS" for i, p in metricPrefixes.items() if i >= 0}
def compRate(flops=0):
    """Formats computation rate.
Example::

    # returns "50.0 FLOPS"
    fmt.computationRate(50)
    # returns "50.0 MFLOPS"
    fmt.computationRate(5e7)
"""
    return generic(flops, computationRates)

In [10]:
assert compRate(50) == "50.0 FLOPS"
assert compRate(5e7) == "50.0 MFLOPS"
assert compRate(1e30) == "1000000.0 YFLOPS"

In [11]:
#export
times = {i:f"{p}s" for i, p in metricPrefixes.items() if i <= 0}
def time(seconds=0):
    """Formats small times.
Example::

    fmt.time(50) # returns "50.0 s"
    fmt.time(4000) # returns "4000.0 s"
    fmt.time(0.02) # returns "20.0 ms"
    fmt.time(1e-5) # returns "10.0 us"
"""
    return generic(seconds, times)

In [12]:
assert time(50) == "50.0 s"
assert time(4000) == "4000.0 s"
assert time(0.02) == "20.0 ms"
assert time(1e-5) == "10.0 us"
assert time(1e-10) == "100.0 ps"
assert time(1e-25) == "0.1 ys"

In [13]:
#export
items = {0: "", 1: "k", 2: "M", 3: "B", 4: "T"}
def item(n=0):
    """Formats generic item.
Example::

    # returns "50.0"
    fmt.item(50)
    # returns "500.0 k"
    fmt.item(5e5)
"""
    return generic(n, items)

In [14]:
assert item(50) == "50.0 "
assert item(5e5) == "500.0 k"

In [15]:
#export
def throughput(n, unit=""):
    """Formats item throughput.
Example::

    # returns "3.16/year"
    fmt.throughput(1e-7)
    # returns "2.63/month"
    fmt.throughput(1e-6)
    # returns "3.6/hour"
    fmt.throughput(1e-3)
    # returns "100.0 k/s"
    throughput(1e5)
    
    # returns "100.0 k epochs/s"
    throughput(1e5, " epochs")

:param n: items per second
:param unit: optional item unit"""
    if n < 10/(365.25*86400): return item(n*(365.25*86400)) + f"{unit}/year"
    if n < 10/(30.4375*86400): return item(n*(30.4375*86400)) + f"{unit}/month"
    if n < 10/86400: return item(n*86400) + f"{unit}/day"
    if n < 10/3600: return item(n*3600) + f"{unit}/hour"
    if n < 10/60: return item(n*60) + f"{unit}/minute"
    return item(n) + f"{unit}/s"

In [18]:
assert throughput(1e-7) == "3.16 /year"
assert throughput(1e-6) == "2.63 /month"
assert throughput(1e-3) == "3.6 /hour"
assert throughput(1e5) == "100.0 k/s"
assert throughput(1e5, " epochs") == "100.0 k epochs/s"

In [19]:
#export
_esc = '\033['
_end = f'{_esc}0m'
class txt:
    """Text formatting.
Example::

    # will print out red text
    print(fmt.txt.red("some text"))"""
    @staticmethod
    def darkcyan(s:str):  return f"{_esc}36m{s}{_end}"
    @staticmethod
    def red(s:str):       return f"{_esc}91m{s}{_end}"
    @staticmethod
    def green(s:str):     return f"{_esc}92m{s}{_end}"
    @staticmethod
    def yellow(s:str):    return f"{_esc}93m{s}{_end}"
    @staticmethod
    def blue(s:str):      return f"{_esc}94m{s}{_end}"
    @staticmethod
    def purple(s:str):    return f"{_esc}95m{s}{_end}"
    @staticmethod
    def cyan(s:str):      return f"{_esc}96m{s}{_end}"
    @staticmethod
    def bold(s:str):      return f"{_esc}1m{s}{_end}"
    @staticmethod
    def grey(s:str):      return f"{_esc}38;2;150;150;150m{s}{_end}"
    @staticmethod
    def darkgrey(s:str):  return f"{_esc}38;2;100;100;100m{s}{_end}"
    @staticmethod
    def underline(s:str): return f"{_esc}4m{s}{_end}"
    @staticmethod
    def identity(s:str):  return f"{s}"

In [20]:
import IPython

In [21]:
#export
class code:
    def python(code:str):
        """Formats Python code.
Example::

    fmt.code.python(\"\"\"
    def f(x:int):
        return x + 3
    \"\"\") | aS(IPython.display.HTML)
"""
        a = "<style>" + pygments.formatters.HtmlFormatter().get_style_defs(".highlight") + "</style>"
        b = pygments.highlight(code, pygments.lexers.PythonLexer(), pygments.formatters.HtmlFormatter())
        return a + b

In [22]:
IPython.display.HTML(code.python("""
def f(x:int):
    return x + 3
"""))

In [43]:
#export
def h(code:str, level:int=1) -> str:
    """Wraps content inside a 'h' html tag.
Example::

    fmt.h("abc", 2) # returns "<h2>abc</h2>"

:param level: what's the header level?"""
    return f"<h{level}>{code}</h{level}>"
def pre(code:str) -> str:
    """Wraps content inside a 'pre' html tag.
Example::

    fmt.pre("abc")
"""
    return f"<pre style='font-family: courier'>{code}</pre>"
def col(args):
    """Creates a html col of all the elements.
Example::

    fmt.col(["abc", "def"]) | aS(IPython.display.HTML)
"""
    return args | cli.apply(lambda x: f"<div style='margin: 10px'>{x}</div>") | cli.join("") | cli.aS(lambda x: f"<div style='display: flex; flex-direction: column'>{x}</div>")
def row(args):
    """Creates a html row of all the elements.
Example::

    fmt.row(["abc", "def"]) | aS(IPython.display.HTML)
"""
    return args | cli.apply(lambda x: f"<div style='margin: 10px'>{x}</div>") | cli.join("") | cli.aS(lambda x: f"<div style='display: flex; flex-direction: row'>{x}</div>")

In [44]:
row(["abc", "def"]) | cli.aS(IPython.display.HTML)

In [45]:
col(["abc", "def"]) | cli.aS(IPython.display.HTML)

In [None]:
#export
settings.add("colors", ["#8dd3c7", "#ffffb3", "#bebada", "#fb8072", "#80b1d3", "#fdb462", "#b3de69", "#fccde5", "#d9d9d9", "#bc80bd", "#ccebc5", "#ffed6f"], "List of colors to cycle through in fmt.colors()")
def colors():
    """Returns an infinite iterator that cycles through 12 colors.
Example::

    fmt.colors() | head(3) | deref()

Color scheme taken from https://colorbrewer2.org/#type=qualitative&scheme=Set3&n=12"""
    return settings.colors | cli.repeatFrom()

In [6]:
!../export.py fmt

Current dir: /home/kelvin/repos/labs/k1lib, /home/kelvin/repos/labs/k1lib/k1lib/../export.py
rm: cannot remove '__pycache__': No such file or directory
Found existing installation: k1lib 1.4
Uninstalling k1lib-1.4:
  Successfully uninstalled k1lib-1.4
running install
running bdist_egg
running egg_info
creating k1lib.egg-info
writing k1lib.egg-info/PKG-INFO
writing dependency_links to k1lib.egg-info/dependency_links.txt
writing requirements to k1lib.egg-info/requires.txt
writing top-level names to k1lib.egg-info/top_level.txt
writing manifest file 'k1lib.egg-info/SOURCES.txt'
reading manifest file 'k1lib.egg-info/SOURCES.txt'
adding license file 'LICENSE'
writing manifest file 'k1lib.egg-info/SOURCES.txt'
installing library code to build/bdist.linux-x86_64/egg
running install_lib
running build_py
creating build
creating build/lib
creating build/lib/k1lib
copying k1lib/_learner.py -> build/lib/k1lib
copying k1lib/fmt.py -> build/lib/k1lib
copying k1lib/_k1a.py -> build/lib/k1lib
copying 