In [1]:
# this gets rid of uuid problem
%pip uninstall -y mlflow

Note: you may need to restart the kernel to use updated packages.


In [2]:
%pip install altair pyarrow pandas==1.0.3 matplotlib>=2.2.3 numba>=0.38.1 scipy>=1.1.0

You should consider upgrading via the 'pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [3]:
from scipy import stats
import numpy as np
import pandas as pd
import altair as alt
import heapq
import time

In [4]:
def simulate(event_generators, initial_time=0):    
    def setup_e(e, i):
        offset, result = next(e)
        return ((offset + i), result, e)
    
    pq = [setup_e(event, initial_time)
          for event in event_generators]
    heapq.heapify(pq)
    
    while True:
        timestamp, result, event = pq[0]
        offset, next_result = event.send(timestamp)
        heapq.heappushpop(pq, (timestamp + offset, next_result, event))
        yield (timestamp, result)

In [5]:
def makeprng(func):
    import time
    def call_with_prng(*args, prng=None, seed=None, **kwargs):
        if prng is None:
            if seed is None:
                seed = int(1000*time.time()) & ((1 << 32) - 1)
            prng = np.random.default_rng(seed=seed)
        return func(*args, prng=prng, seed=seed, **kwargs)
    return call_with_prng

In [6]:
def makedist(dist_cls, seed=None, prng=None, **kwargs):
    d = dist_cls(**kwargs)
    d.random_state = (seed and seed) or prng.integers((1 << 32) - 1)
    return d

### A stream of events of some particular kind coming from a piece of factory equipment

In [7]:
@makeprng
def equipment_event_stream(eqid, event, rate, prng=None, seed=None):
    # exponential inter-arrival distribution yields poisson event distribution
    iadist = makedist(stats.expon, prng=prng, scale=(1.0 / float(rate)))
    while True:
        yield (int(iadist.rvs()), (eqid, event))

In [8]:
demo = equipment_event_stream("donut-stamper", "force", 1.0 / 10.0)
[next(demo) for x in range(5)]

[(4, ('donut-stamper', 'force')),
 (28, ('donut-stamper', 'force')),
 (13, ('donut-stamper', 'force')),
 (14, ('donut-stamper', 'force')),
 (14, ('donut-stamper', 'force'))]

### generate all events from a piece of equipment

In [9]:
@makeprng
def equipment_stream(eqid, events, rates=None, functions=None, prng=None, seed=None):
    n = len(events)
    # default once / minute
    rates = rates or [1.0 / 60.0 for x in range(n)]
    assert len(rates) == n
    # default is id(event-time)
    functions = functions or [lambda t: None for x in range(n)]
    assert len(functions) == n
    fmap = {e:f for e,f in zip(events,functions)}
    estreams = [equipment_event_stream(eqid, e, r, prng=prng, seed=seed) for e,r in zip(events, rates)]
    esim = simulate(estreams, initial_time=0)
    while True:
        (t, (eqi, e)) = next(esim)
        sv = fmap[e](t)
        yt = (eqi, e, sv) if sv else (eqi, e)
        yield (t, yt)

In [10]:
demo = equipment_stream('donut-stamper', ['force', 'enthusiasm'], functions=[lambda t: 5, lambda t: None])
[next(demo) for x in range(5)]

[(153, ('donut-stamper', 'enthusiasm')),
 (176, ('donut-stamper', 'enthusiasm')),
 (224, ('donut-stamper', 'force', 5)),
 (253, ('donut-stamper', 'force', 5)),
 (287, ('donut-stamper', 'force', 5))]

### some event state output functions for temperature and pressure

In [11]:
# Frobulators operate at just over the boiling point of water (centigrade)
gdt = stats.norm(loc=105, scale=1)
def tempfunc(t):
    return gdt.rvs(random_state = t)

# Also they operate at 5 atmospheres (kilopascals)
gdp = stats.norm(loc=550, scale=10)
def presfunc(t):
    return gdp.rvs(random_state = t)

### simulate some frobulators on the factory floor

In [12]:
frob1stream = equipment_stream("frob1", ['temperature', 'pressure'],[0.1,0.05],[tempfunc,presfunc], seed=77)
frob2stream = equipment_stream("frob2", ['temperature', 'pressure'],[0.1,0.05],[tempfunc,presfunc], seed=88)
frob3stream = equipment_stream("frob3", ['temperature', 'pressure'],[0.1,0.05],[tempfunc,presfunc], seed=99)

frobsim = simulate([frob1stream, frob2stream, frob3stream], initial_time=0)

simdata = [next(frobsim) for xx in range(25)]
simdata

[(0, ('frob2', 'pressure', 567.6405234596766)),
 (2, ('frob1', 'pressure', 545.8324215259453)),
 (9, ('frob3', 'temperature', 105.00110855471222)),
 (19, ('frob1', 'pressure', 552.7626589002132)),
 (23, ('frob2', 'temperature', 105.66698805635347)),
 (24, ('frob3', 'pressure', 546.8767151845645)),
 (44, ('frob1', 'temperature', 105.22827308966609)),
 (57, ('frob3', 'pressure', 546.8114649489187)),
 (68, ('frob2', 'pressure', 550.2637477284957)),
 (72, ('frob1', 'temperature', 105.44633237787005)),
 (92, ('frob3', 'pressure', 531.1026328991214)),
 (118, ('frob2', 'pressure', 534.3964789131635)),
 (131, ('frob1', 'temperature', 103.33071647860528)),
 (142, ('frob3', 'temperature', 103.43964789131634)),
 (170, ('frob2', 'temperature', 105.51947584061416)),
 (196, ('frob1', 'pressure', 541.6633517686731)),
 (206, ('frob3', 'temperature', 106.10032293923169)),
 (226, ('frob2', 'temperature', 103.96235682449742)),
 (261, ('frob1', 'temperature', 104.16633517686732)),
 (291, ('frob2', 'pressu

In [13]:
import pyarrow as pa
import pyarrow.parquet as pq

In [14]:
pa.array([1,2,3],type=pa.timestamp('s', tz='+00:00'))

<pyarrow.lib.TimestampArray object at 0x7f2a73557ee8>
[
  1970-01-01 00:00:01,
  1970-01-01 00:00:02,
  1970-01-01 00:00:03
]

In [15]:
# get a 'raw' parquet table, that is one with a default schema,
# from some common data formats
def data_to_rawtab(data):
    if isinstance(data, pa.Table):
        return data
    if isinstance(data, pd.DataFrame):
        return pa.Table.from_pandas(data)
    if isinstance(data, np.ndarray):
        return pa.Table.from_pandas(pd.DataFrame(data))
    return pa.Table.from_pandas(pd.DataFrame(data))

# get a parquet table from some tabular data object (list-of-lists, pandas, etc)
# optionally, impose a specific schema on it, otherwise it will be inferred
def data_to_tab(data, schema=None):
    # getting a table with default schema and then reassembling its columns with a new schema
    # is the only way I found to impose a schema cleanly
    tab = data_to_rawtab(data)
    schema = schema or tab.schema
    return pa.Table.from_arrays(tab.columns, schema=schema)

def append_to_parquet(data, filepath=None, writer=None, schema=None):
    table = data_to_tab(data, schema=schema)
    schema = table.schema
    close = False
    if writer is None:
        writer = pq.ParquetWriter(filepath, schema, flavor='spark')
        close = True
    writer.write_table(table=table)
    if close:
        writer.close()

In [16]:
frobschema = pa.schema([
    ('ts',pa.timestamp('s', tz='+00:00')),
    ('equip', pa.string()),
    ('event', pa.string()),
    ('data', pa.float64())
])
frobschema

ts: timestamp[s, tz=+00:00]
equip: string
event: string
data: double

In [17]:
@makeprng
def generate_data(nrecs = 10, t0=None, seed=None, prng=None):
    frob1stream = equipment_stream("frob1", ['temperature', 'pressure'],[0.1,0.05],[tempfunc,presfunc], seed=prng.integers(100))
    frob2stream = equipment_stream("frob2", ['temperature', 'pressure'],[0.1,0.05],[tempfunc,presfunc], seed=prng.integers(100))
    frob3stream = equipment_stream("frob3", ['temperature', 'pressure'],[0.1,0.05],[tempfunc,presfunc], seed=prng.integers(100))

    t0 = t0 or int(time.time())
    frobsim = simulate([frob1stream, frob2stream, frob3stream], initial_time=int(t0))
    simdata = [next(frobsim) for xx in range(nrecs)]
    return [[ts] + list(r) for ts, r in simdata]

In [18]:
raw = generate_data(nrecs = 100)
append_to_parquet(raw, filepath="frob", schema=frobschema)

In [19]:
df = pd.read_parquet("frob")
df

Unnamed: 0,ts,equip,event,data
0,2020-07-31 23:53:24,frob1,temperature,106.764052
1,2020-07-31 23:53:24,frob1,temperature,106.764052
2,2020-07-31 23:53:29,frob3,pressure,554.412275
3,2020-07-31 23:53:31,frob1,temperature,106.690526
4,2020-07-31 23:53:35,frob2,pressure,567.494547
...,...,...,...,...
95,2020-08-01 00:41:35,frob1,temperature,106.367858
96,2020-08-01 00:42:42,frob2,temperature,105.405948
97,2020-08-01 00:44:11,frob3,temperature,104.703773
98,2020-08-01 00:44:45,frob1,temperature,105.709691


In [20]:
writer = pq.ParquetWriter("frob2", frobschema, flavor='spark')
raw = generate_data(nrecs = 100)
append_to_parquet(raw, writer=writer, schema=frobschema)
raw = generate_data(nrecs = 100)
append_to_parquet(raw, writer=writer, schema=frobschema)
writer.close()

In [21]:
df = pd.read_parquet("frob2")
df

Unnamed: 0,ts,equip,event,data
0,2020-07-31 23:53:26,frob2,temperature,104.583242
1,2020-07-31 23:53:27,frob1,temperature,106.788628
2,2020-07-31 23:53:30,frob2,temperature,105.050562
3,2020-07-31 23:53:33,frob1,temperature,104.688216
4,2020-07-31 23:53:34,frob3,pressure,563.315865
...,...,...,...,...
195,2020-08-01 00:31:45,frob3,temperature,106.090775
196,2020-08-01 00:32:47,frob2,pressure,554.502232
197,2020-08-01 00:34:27,frob3,temperature,106.124590
198,2020-08-01 00:34:28,frob1,temperature,105.528954


In [23]:
tab = data_to_tab(generate_data(nrecs = 100), schema=frobschema)
pq.write_to_dataset(tab, "frob3", partition_cols=['event'], flavor='spark')
tab = data_to_tab(generate_data(nrecs = 100), schema=frobschema)
pq.write_to_dataset(tab, "frob3", partition_cols=['event'], flavor='spark')