In [1]:
#Prints **all** console output, not just last item in cell 
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Table-of-Contents" data-toc-modified-id="Table-of-Contents-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Table of Contents</a></span></li><li><span><a href="#Misc" data-toc-modified-id="Misc-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Misc</a></span></li><li><span><a href="#Dictionary-manipulation" data-toc-modified-id="Dictionary-manipulation-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Dictionary manipulation</a></span></li><li><span><a href="#Manipulating-string-representations" data-toc-modified-id="Manipulating-string-representations-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Manipulating string representations</a></span><ul class="toc-item"><li><span><a href="#$k$-factors" data-toc-modified-id="$k$-factors-4.1"><span class="toc-item-num">4.1&nbsp;&nbsp;</span>$k$-factors</a></span></li></ul></li><li><span><a href="#Probability-distributions" data-toc-modified-id="Probability-distributions-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Probability distributions</a></span></li><li><span><a href="#Template-for-processing-w/-progress-report" data-toc-modified-id="Template-for-processing-w/-progress-report-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Template for processing w/ progress report</a></span></li><li><span><a href="#Saving/Loading-NumPy-arrays" data-toc-modified-id="Saving/Loading-NumPy-arrays-7"><span class="toc-item-num">7&nbsp;&nbsp;</span>Saving/Loading NumPy arrays</a></span></li><li><span><a href="#NumPy-examples" data-toc-modified-id="NumPy-examples-8"><span class="toc-item-num">8&nbsp;&nbsp;</span>NumPy examples</a></span></li></ul></div>

*Boilerplate Central*

# Misc

In [4]:
from functools import reduce

def union(Ss):
    return reduce(set.union, Ss)

In [5]:
union([{0,1}, {1}, {1}, {2,3}])

{0, 1, 2, 3}

# Dictionary manipulation

In [6]:
def project_dict(the_dict, keys_to_keep):
    new_dict = {key:the_dict[key] for key in the_dict.keys() if key in keys_to_keep}
    return new_dict

In [7]:
def edit_dict(the_dict, the_key, the_new_value):
    '''
    Composable (because it returns a value) but stateful(= in-place) dictionary update.
    '''
    the_dict.update({the_key: the_new_value})
    return the_dict

def modify_dict(the_dict, the_key, the_new_value):
    '''
    Composable and (naively-implemented) non-mutating dictionary update.
    '''
    new_dict = {k:the_dict[k] for k in the_dict}
    new_dict.update({the_key: the_new_value})
    return new_dict

In [8]:
import random

In [9]:
def getRandomKey(a_dict, printKey = False):
    randKey = random.choice(list(a_dict.keys()))
    if printKey:
        print('Random key: {0}'.format(randKey))
    return randKey

def testRandomKey(a_dict, printKey = True, printVal = True):
    randKey = getRandomKey(a_dict)
    if printKey:
        print('Random key: {0}'.format(randKey))
    if printVal:
        print('value ⟶ {0}'.format(a_dict[randKey]))
    return {'key': randKey, 'val': a_dict[randKey]}

In [1]:
def transpose(d, row_keys, col_keys):
    return {row_key:{col_key:d[col_key][row_key]
                     for col_key in col_keys if col_key in d and row_key in d[col_key]}
            for row_key in row_keys}
    
def filter_dict(d, cond):
    return {k:d[k] for k in d if cond(k, d[k])}

gtZero = lambda k,v: v > 0.0

def mapValues(f, d):
    return {k:f(d[k]) for k in d}

# Manipulating string representations

In [10]:
def tupleToDottedString(pair): 
    return '.'.join(pair)

def dottedStringToTuple(s): 
    return tuple(s.split('.'))

t2ds = tupleToDottedString
ds2t = dottedStringToTuple

In [11]:
def importSeqs(seq_fn):
    phoneSeqsAsStr = []
    with open(seq_fn, 'r') as the_file:
        for row in the_file:
            phoneSeqsAsStr.append(row.rstrip('\r\n'))
    return set(phoneSeqsAsStr)

def exportSeqs(seq_fn, seqs):
    with open(seq_fn, 'w') as the_file:
        for seq in seqs:
            the_file.write(seq + '\n')

In [12]:
leftEdge = '⋊'
rightEdge = '⋉'
edgeSymbols = {leftEdge, rightEdge}

## $k$-factors

In [13]:
from itertools import takewhile, product
from random import choice

def dsToKfactors(k, ds):
    seq = ds2t(ds)
    l = len(seq)
    if k > l:
        return tuple()
    kFactor_start_indices = takewhile(lambda pair: pair[0] <= l-k, enumerate(seq))
    kFactors = tuple(seq[index[0]:index[0]+k] for index in kFactor_start_indices)
    return set(map(t2ds, kFactors))

def dsTo2factors(ds):
    return dsToKfactors(2, ds)
def dsTo3factors(ds):
    return dsToKfactors(3, ds)

def lexiconToKfactors(DSs, k):
    myDsToKfactors = lambda ds: dsToKfactors(k, ds)
    return union(map(set, map(myDsToKfactors, DSs)))

def lexiconTo2factors(DSs):
    return union(map(set, map(dsTo2factors, DSs)))
def lexiconTo3factors(DSs):
    return union(map(set, map(dsTo3factors, DSs)))


def compareKfactors(DSs_A, DSs_B, k):
    A = lexiconToKfactors(DSs_A, k)
    B = lexiconToKfactors(DSs_B, k)
    return {"A == B":A == B, "A - B": A - B, "B - A": B - A}

def sameKfactors(DSs_A, DSs_B, k):
    return compareKfactors(DSs_A, DSs_B, k)["A == B"]

def hasIllicitKfactors(W, illicit_k_factors):
    if type(W) == str:      
        # gather the k-factors into an immutable data structure
        illicit_kfs = tuple(illicit_k_factors)
        # get the set of k-factor lengths (values of k) among the illicit_kfs
        illicit_factor_lengths = set([len(ds2t(kf)) for kf in illicit_kfs])
        # map each k to the set of k-factors of dotted string ds
        kFactorSets = {kf_l:dsToKfactors(kf_l, W) for kf_l in illicit_factor_lengths}
        illegal_kfactors_discovered = tuple(ikf for ikf in illicit_kfs if ikf in kFactorSets[len(ds2t(ikf))])
        if illegal_kfactors_discovered == tuple():
            return False
        return illegal_kfactors_discovered
    else:
        myFunc = lambda w: hasIllicitKfactors(w, illicit_k_factors)
        results = tuple(map(myFunc, W))
        if not any(results):
            return False
        return set(t2ds(each) for each in results if each != False)

def dsToKfactorSequence(k, ds):
    seq = ds2t(ds)
    l = len(seq)
    if k > l:
        return tuple()
    kFactor_start_indices = takewhile(lambda pair: pair[0] <= l-k, enumerate(seq))
    kFactors = tuple(seq[index[0]:index[0]+k] for index in kFactor_start_indices)
    return tuple(map(t2ds, kFactors))
    
def sigmaK(sigma, k):
    return product(sigma, repeat=k)

def randomString(sigma, l, hasLeftEdge=True):
    s_t = tuple([choice(list(sigma)) for each in range(l)])
    s = t2ds(s_t)
    if hasLeftEdge:
        return leftEdge + '.' + s
    return s

In [31]:
randomString({'p','i','b'}, 10)

'⋊.i.p.p.b.i.p.p.i.p.p'

In [15]:
any([False, set()])

False

In [16]:
type('ab') == str

True

In [17]:
w = 'a.b.c.d'
len(w)
c = 2
w_ = ds2t(w)
w_
w_[0:c]
w_[1:1+c]

7

('a', 'b', 'c', 'd')

('a', 'b')

('b', 'c')

In [18]:
list(enumerate(w_))

[(0, 'a'), (1, 'b'), (2, 'c'), (3, 'd')]

In [19]:
l_prime = len(w_)
l_prime
c_prime = 2
list(takewhile(lambda pair: pair[0] <= l_prime-c_prime, enumerate(w_)))

4

[(0, 'a'), (1, 'b'), (2, 'c')]

In [20]:
w_[2:2+c_prime]

('c', 'd')

In [21]:
dsToKfactors(2, w)
lexiconToKfactors(['a.b.c.d','a.b.a.b', 'c.d.b.a'], 1)
lexiconToKfactors(['a.b.c.d','a.b.a.b', 'c.d.b.a'], 2)
lexiconTo3factors(['a.b.c.d','a.b.a.b', 'c.d.b.a'])
compareKfactors(['a.b.c.d','a.b.a.b', 'c.d.b.a'], ['a.b'], 2)
sameKfactors(['a.b.c.d','a.b.a.b', 'c.d.b.a'], ['a.b'], 2)
hasIllicitKfactors('f.o.i.o.i.i.o', ['o.o','i.i'])
hasIllicitKfactors('f.o.i.o.i.o', ['o.o','i.i'])
hasIllicitKfactors(['f.o.i.o.i.i.o', 'f.o.i.o.i.o','b.o.o','i.i.p'], ['o.o','i.i'])
set(sigmaK('abc', 2))

{'a.b', 'b.c', 'c.d'}

{'a', 'b', 'c', 'd'}

{'a.b', 'b.a', 'b.c', 'c.d', 'd.b'}

{'a.b.a', 'a.b.c', 'b.a.b', 'b.c.d', 'c.d.b', 'd.b.a'}

{'A == B': False, 'A - B': {'b.a', 'b.c', 'c.d', 'd.b'}, 'B - A': set()}

False

('i.i',)

False

{'i.i', 'o.o'}

{('a', 'a'),
 ('a', 'b'),
 ('a', 'c'),
 ('b', 'a'),
 ('b', 'b'),
 ('b', 'c'),
 ('c', 'a'),
 ('c', 'b'),
 ('c', 'c')}

In [22]:
dsToKfactors(2, 'a.b.a.b')
dsToKfactorSequence(2, 'a.b.a.b')

{'a.b', 'b.a'}

('a.b', 'b.a', 'a.b')

# Probability distributions

In [1]:
my_epsilon = 1e-13

def norm(dist):
    return sum(dist.values())

def norms(dists):
    return map(norm, dists)

def isNormalized(dist, epsilon = None):
    if epsilon == None:
        epsilon = my_epsilon
    return abs(norm(dist) - 1) < my_epsilon

def areNormalized(dists, epsilon = None):
    if epsilon == None:
        epsilon = my_epsilon
    return all(map(lambda k: isNormalized(dists[k]), dists))

In [2]:
pX = {'0':0.5, '1':0.5}
pYX = {'0':{'0':0.9, '1':0.1},
       '1':{'0':0.1, '1':0.9}}
isNormalized(pX)
areNormalized(pYX)
[isNormalized(pYX[each]) for each in pYX]

[True, True]

In [25]:
import json, codecs

def exportProbDist(fn, dist):
    with codecs.open(fn, 'w', encoding='utf-8') as f:
        json.dump(dist, f, ensure_ascii = False, indent = 4)
        
def importProbDist(fn):
    with open(fn, encoding='utf-8') as data_file:
        dist_in = json.loads(data_file.read())
    return dist_in

In [1]:
# Modified version of Norvig's ProbDist code

import random
from fractions import Fraction
from collections import defaultdict, Counter

is_predicate = callable

def P(event, space): 
    """The probability of an event, given either a sample space of equiprobable outcomes
    or a pmf. 
    event: a collection of outcomes, or a predicate that is true of outcomes in the event. 
    space: a set of outcomes or a probability distribution of {outcome: frequency} pairs."""
    if is_predicate(event):
#         print('Event is a predicate. Constructing a conditional distribution.')
        event = such_that(event, space)
    if isinstance(space, ProbDist):
#         print('Space is a ProbDist.')
        if isinstance(event, str):
#             print('Event is a string.')
            return space[event]
        else:
#             print('Event is not a string.')
            return sum(space[o] for o in space if o in event)
    else:
#         print('Space is not a prob dist and event is assumed to be a collection.')
        return Fraction(len(event & space), len(space))

def such_that(predicate, space): 
    """The outcomes in the sample pace for which the predicate is true.
    If space is a set, return a subset {outcome,...};
    if space is a ProbDist, return a ProbDist {outcome: frequency,...};
    in both cases only with outcomes where predicate(element) is true."""
    if isinstance(space, ProbDist):
        return ProbDist({o:space[o] for o in space if predicate(o)})
    else:
        return {o for o in space if predicate(o)}

# class ProbDist(dict):
class ProbDist(Counter):
    "A Probability Distribution; an {outcome: probability} mapping where probabilities sum to 1."
    def __init__(self, mapping=(), **kwargs):
        self.update(mapping, **kwargs)
        total = sum(self.values())
        if isinstance(total, int): 
            total = Fraction(total, 1)
        for key in self: # Make probabilities sum to 1.
            self[key] = self[key] / total
            
    def __and__(self, predicate): # Call this method by writing `probdist & predicate`
        "A new ProbDist, restricted to the outcomes of this ProbDist for which the predicate is true."
        return ProbDist({e:self[e] for e in self if predicate(e)})
    
    def __repr__(self):
        s = ""
        for k in self:
            if isinstance(self[k], Fraction):
                s+="{0}: {2}/{3} = {1}\n".format(transcriptionReprHack(k), float(self[k]), self[k].numerator, self[k].denominator)
            else:
                s+="{0}: {1}\n".format(transcriptionReprHack(k), float(self[k]))
        return s

In [2]:
def transcriptionReprHack(k):
    if type(k) == type(tuple()):
        if all(map(lambda el: type(el) == type(''), k)):
            return tupleToDottedString(k)
    return k.__repr__()    

def Uniform(outcomes): return ProbDist({e: 1 for e in outcomes})

def joint(A, B):
    """The joint distribution of two independent probability distributions. 
    Result is all entries of the form {(a, b): P(a) * P(b)}"""
    return ProbDist({(a,b): A[a] * B[b]
                    for a in A
                    for b in B})

from itertools import product, accumulate
# from functools import reduce
import operator

def prod(iterable):
    return reduce(operator.mul, iterable, 1)

# def union(iterable):
#     return reduce(set.union, iterable)

def joint2(iter_of_dists):
    #ProbDist({(a,b): A[a] * B[b] for a in A for b in B})
    #ProbDist({ab: A[ab[0]] * B[ab[1]] for ab in product(A,B)})
    return ProbDist({each : prod(dist[each[i]] for i,dist in enumerate(iter_of_dists)) for each in list(product(*iter_of_dists))})

# first = lambda seq: seq[0]
# second = lambda seq: seq[1]

def first(seq):
    return seq[0]

def second(seq):
    return seq[1]

# from random import choices

from bisect import bisect

def choices(population, weights=None, k=1):
# def choices(population, weights=None, *, cum_weights=None, k=1):
        """Return a k sized list of population elements chosen with replacement.
        If the relative weights or cumulative weights are not specified,
        the selections are made with equal probability.
        """
        # random = random.random
        cum_weights = None #moved inside
        if cum_weights is None:
            if weights is None:
                _int = int
                total = len(population)
                return [population[_int(random.random() * total)] for i in range(k)]
            cum_weights = list(accumulate(weights))
        elif weights is not None:
            raise TypeError('Cannot specify both weights and cumulative weights')
        if len(cum_weights) != len(population):
            raise ValueError('The number of weights does not match the population')
#         bisect = _bisect.bisect
        total = cum_weights[-1]
        hi = len(cum_weights) - 1
        return [population[bisect(cum_weights, random.random() * total, 0, hi)]
                for i in range(k)]


def sampleFrom(dist, num_samples = None):
    """
    Given a distribution (either an {outcome: probability} mapping where the 
    probabilities sum to 1 or an implicit definition of a distribution via a thunk), 
    this returns a single sample from the distribution, unless num_samples is specified, 
    in which case a generator with num_samples samples is returned.
    """
    if num_samples == None:
        if callable(dist):
            return dist()
        elif isinstance(dist, ProbDist) or isinstance(dist, dict):
            assocMap = tuple(dist.items())
            outcomes = tuple(map(first, assocMap))
            weights = tuple(map(second, assocMap))
            return choices(population=outcomes, weights=weights)[0]
    else:
        if callable(dist):
            return (dist() for each in range(num_samples))
        elif isinstance(dist, ProbDist) or isinstance(dist, dict):
            assocMap = dist.items()
            outcomes = tuple(map(first, assocMap))
            weights = tuple(map(second, assocMap))
            return tuple(choices(population=outcomes, weights=weights, k=num_samples))

from collections import Counter

def frequencies(samples):
    return Counter(samples)

def makeSampler(dist):
    """
    Given a ProbDist, returns a thunk that when called, returns one sample from dist.
    """
    return lambda: sampleFrom(dist)

In [2]:
def support(dist):
    return {e for e in dist if dist[e] > 0.0}

def trimToSupport(dist):
    if isinstance(dist[getRandomKey(dist)], ProbDist):
        return {conditioning_event:trimToSupport(dist[conditioning_event]) 
                for conditioning_event in dist}
    else:
        return ProbDist({e:dist[e] for e in support(dist)})

In [3]:
from math import log2

def log(x):
    if x == 0.0:
        return 0.0
    return log2(x)

def h(event, space):
    p = P(event, space)
#     p = space[event]
    return -1.0 * log(p)

def H(space, prior = None):
    """
    Given a ProbDist p(X), returns the Shannon entropy H(X).
    
    Given a dictionary representing a family of conditional 
    distributions p(Y|X) and a prior p(X), returns H(Y|X).
    """
    if prior is not None and isinstance(space[getRandomKey(space)], ProbDist):
#         assert set(prior.keys()) == set(space.keys())
        support_of_prior = set([k for k in prior.keys() if prior[k] > 0.0])
        assert all([e in support_of_prior for e in space])
        
        prior_probs = tuple([P(event, prior) for event in sorted(space)])
        entropies = tuple([H(space[event]) for event in sorted(space)])
        terms = tuple(zip(prior_probs, entropies))
        prods = tuple(map(prod, terms))
        s = sum(prods)
        return s
    else:
        probs = tuple([P(event, space) for event in sorted(space)])
    #     probs = tuple([space[event] for event in sorted(space)])
        surprisals = tuple([h(event, space) for event in sorted(space)])
        terms = tuple(zip(probs, surprisals))
        prods = tuple(map(prod, terms))
        s = sum(prods)
        return s

def pDKL(event, p, q):
    return log(P(event, p)) - log(P(event, q))
#     return log(p[event] / q[event])

def DKL(p, q):
    """
    Given two spaces p, q, returns the Kullback-Leibler divergence from P to Q.
    """
    assert p.keys() == q.keys(), "P and Q must share a common event space."
    
    probs = tuple([P(event, p) for event in p])
#     probs = tuple([p[event] for event in p])
    pointwiseDivergences = tuple([pDKL(event, p, q) for event in p])
    terms = tuple(zip(probs, pointwiseDivergences))
    prods = tuple(map(prod, terms))
    s = sum(prods)
    return s

In [4]:
def condDistsAsProbDists(condDist):
    return {i:ProbDist(condDist[i]) for i in condDist}

def condProbDistAsDicts(condDistFamily):
    return {i:dict(condDistFamily[i]) for i in condDistFamily}

In [6]:
from math import isclose

def distsAreClose(dA, dB, keysToCompare = None):
    if keysToCompare is None:
        keysToCompare = dA.keys()
    return {k:isclose(dA[k], dB[k]) for k in keysToCompare}

def condDistsAreClose(dA, dB):
    return {k:distsAreClose(dA[k], dB[k]) for k in dA.keys()}

# Template for processing w/ progress report

In [26]:
mydict = dict()

In [27]:
def bar(d):
    return d + 2

def foo(d):
    mydict.update({d:bar(d)})

In [28]:
mydict
foo(4)
mydict

{}

{4: 6}

In [29]:
from time import localtime, strftime
def stamp():
    return strftime('%H:%M:%S', localtime())

def stampedNote(note):
    print('{0} @ {1}'.format(note, stamp()))

def startNote(note):
    stampedNote('Start ' + note)
    
def endNote(note):
    stampedNote('End ' + note)
    
def processDataWProgressUpdates(f, data):
    print('Start @ {0}'.format(stamp()))
    l = len(data)
    benchmarkPercentages = [1,2,3,5,10,20,30,40,50,60,70,80,90,95,96,97,98,99,100]
    benchmarkIndices = [round(each/100.0 * l) for each in benchmarkPercentages]
    for i, d in enumerate(data):
        if i in benchmarkIndices:
            print('{0} | {0}/{1} = {2} | {3} | {4}'.format(i, l, i/l, d, stamp()))
        f(d)
    print('Finish @ {0}'.format(stamp()))
        
def constructDictWProgressUpdates(f, data, a_dict):
    def g(d):
        a_dict.update({d:f(d)})
    processDataWProgressUpdates(g, data)

In [3]:
def parallelDictDefinition(f, data, jobs, backend="multiprocessing", verbosity=5):
    def g(d):
        return (d, f(d))
    return dict( Parallel(n_jobs=jobs, backend=backend, verbose=verbosity)(delayed(g)(d) for d in data) )

# Saving/Loading NumPy arrays

In [1]:
import numpy as np

In [2]:
%ls *.npy

'Hammond-aligned_destressed_pseudocount0.01 p3Xhat012X012_np.npy'
'Hammond-aligned_destressed_pseudocount0.01 p3Xhat1_np.npy'
'Hammond-aligned_destressed_pseudocount0.01 p3Xhat1X012_np.npy'
'Hammond-aligned_destressed_pseudocount0.01 p3Xhat1X1_np.npy'


In [3]:
?np.load

In [4]:
my_fn = 'Hammond-aligned_destressed_pseudocount0.01 p3Xhat1_np.npy'

In [5]:
a_numpy_matrix = np.load(my_fn, allow_pickle=False)

In [6]:
np.save(my_fn, a_numpy_matrix, allow_pickle=False)

# NumPy examples

Filling a 3x4 matrix by calling a function $bar(i,j)$ of the indices:

In [34]:
import numpy as np

In [40]:
bar = lambda i, j: float(i+j)
np.fromfunction(bar, (3,4), dtype=float)

TypeError: only size-1 arrays can be converted to Python scalars

In [39]:
np.fromfunction(np.vectorize(bar), (3,4), dtype=float)

array([[0., 1., 2., 3.],
       [1., 2., 3., 4.],
       [2., 3., 4., 5.]])