# Project Management and Oversight for an Expanding Process

A technical implementation with pretend project set forth as a Choose your own adventure...

This Notebook simulates the execution of a particular process many, many, times. It demonstrates how the ThingStore API is used to save, track, and re-use process components; it comes with an accompanying set of functionality that is used behind the scenes. You do not **need** to go digging into that to get the high level takeaway, though it's a great demonstration of how to use this in a technical process.

## Environment Setup

This is implemented in Python 3.10, and should be runnable within 4 GB of working memory.

### Ibis - DataFrame solution

If you're an analytics professional you should check out the ibis framework. It's the DataFrame solution used here; I chose (this time) to back it with duckdb, instead of Polars or PySpark, but anything developed with ibis can swap the backend and 'just go'.

In [None]:
pip install 'ibis-framework[duckdb]' -q

### The Thing Store - Storage and Management Solution

If you like nice, neat, orderly Things, and you've not heard of the Thing Store, you chould check it out. I'm going to use it as the data layer which enacts the TS API and will be using it to store, track, and represent my work.

In [None]:
pip install -e /root/ThingStore/thethingstore -q

### plotnine - Pretty pictures!

Have you ever used ggplot2? Have you ever wished that you could use it in Python? You can!

In [None]:
pip install plotnine -q

In [None]:
pip install -e ..[dev] -q

In [None]:
# pip install thethingstore -q

### pyvis - Pictures of DAGs

Do you like to visualize DAGs? So does PyVis!

In [None]:
pip install pyvis -q

In [None]:
raise Exception('Restart your kernel')

## Project Highlights

I've got a pretend project in which I'm going to be directed, as part of my duties, to go from data collection to modeling to model publication.

So, I'm going to 'do the Thing' to mimic conducting that project. I'm going to pretend to pull data, build a model, etc...

I'm going to keep track of what I do, as I do it.

I'm not just going to do it once, I'm going to do it *a bunch of times*, because my particular problem is a recurring business problem. (Something akin to 'insurance rating' or 'resource prioritization'.

This is neat, because it:

1. Creates a lot of functional examples, and
2. Shows how to conduct analysis on a lot of functional examples, and
3. Builds a corpus to do something neat, next time!

#### What Things are Useful?

There is a distinct set of Things which can be used to describe my project, the notional pieces of which can be seen below.

* Draw Data
* Model Ready
* Model
* Aggregate
* Review
* Publish

Each can be represented as a ThingStore compliant data structure, allowing it to be used simply; this will be demonstrated later when we start doing analysis.

### Simulation

Any one of these Things can be viewed as a probabilistic state within my project by providing next step probabilities. As an example, and **purely notional**, in 65% of our initial data draws we don't discover any new issues, and the SQL we execute 'just works'. 35% of the time we need to go *develop* on our data draw; perhaps there was an issue with the SQL, perhaps the underlying data changed in some manner, perhaps it is 'just because'.

Here I've provided initial notional probabilities which could be swapped with measured frequencies in real processes.

In [None]:
# From the source code for this example; this dictionary has a lot of information and is reused
#   across the source.
from stepfunctions import functions

for f in functions:
    print(f'''Function: {f}
    Description: {functions[f]["description"]}
    Potential Next Nodes (p): {functions[f]["next_node"]}''')

### Required Tools

These are the tools that will be used throughout the work; they're collected and described here in some small detail.

#### Imports

In [None]:
import ibis  # DataFrame solution
import tempfile  # Temporary storage directory to back ThingStore for transient experiments
from ibis import selectors as s  # This allows us to 'say' things in our dataframe manipulation which select one, or many, or no columns dynamically via patterns
from ibis import _  # Tool from ibis to allow saying 'the darn DataFrame that flows into this.'  See below for examples and think tidyverse.
from numpy.random import default_rng  # You can't simulate stuff without randomness (pseudo or otherwise.)
from pyarrow.fs import LocalFileSystem  # This is the filesystem which we'll use to back **our** data layer.
# Note that S3 and HDFS are among the implemented filesystems and this can (theoretically) use any fs spec compliant filesystem.
from stepfunctions import (
    random_project_generator,
    single_step,
    batch_step,
    get_step_files,
    step,
    convenience_table,
    vis_stepwise_thing_dist
)  # All the tools developed to back this Notebook
# These were all extracted to make this Notebook **not** have 2000 cells.
from thethingstore.thing_store_pa_fs import FileSystemThingStore as FSTS  # Heeeeere's Johnny. This is our data layer type.

#### Randomness

Pseudorandomness and experimentation go hand-in-hand. Simulation is a powerful tool for analysis and requires some sort of hat to pull numbers from that *you* do not make up. Let the computer make those up, for you!

The outcome of this is a random number generator from which we can draw a number of different distributional samples of varying characteristics.

In [None]:
# Seed my pseudorandomness
rng = default_rng(1234124312)

#### Storage (Happy little data layer)

Where are we going to store all of our stuff? The default (and the example in this Notebook) use a local filesystem backed ThingStore, but could easily instead back with an S3 filesystem backed, or HDFS backed, or potentially any other fs-spec compliant filesystem.

Here, it's simply a temporary local storage directory; after running through the Notebook, and before purging the directory with fire, you may explore using any of the demonstrated tooling and methods to discover exactly in what manner you can explore and reuse the data.

In [None]:
!rm -rf stupiddir

In [None]:
# Obviously notional and temporary directory

t = 'stupiddir'
data_layer = FSTS(  # <-- This is a FileSystemThingStore, in Python.
    metadata_filesystem=LocalFileSystem(),
    managed_location=f'{t}/managed_data'
)


### Time to get to work!

Now we're going to use some simulation tools that were developed for this Notebook to start a number of projects.

We're not actually **doing any work**, we're simply saying we are. Here we say that we 'draw data for project x' n times.

#### Project generation

Short explanation

In [None]:
%%capture
project_start_points = random_project_generator(
    data_layer=data_layer,  # Any TS compliant data layer
    n_projects=10,  # The number of projects to generate
    seed=4236321  # Fix my pseudorandomness
)
# Ignore or capture the output below, there's a print statement embedded in the code to allow tracking status
#   of a large number of runs in serial which you may turn off / tune as necessary for your case.

#### Summarize the initiated projects

We're going to pretend that every step is a 'sprint' (roughly two working week time-period); within every sprint we're going to collect the descriptive identifiers for the work done in a single file. We'll reuse that later!

To see these files collected you may use this boilerplate as an example. All of these summarization files are going to be given a notional project of -999 to allow distinguishing them.

```python
t = ibis.memtable(
    [
        data_layer.get_parameters(_)
        for _ in
        data_layer.get_dataset(
            "{start_point_fileid}"
        ).to_table().column('FILE_IDS').to_pandas()
    ]
)
```

In [None]:
# Summarize that activity
start_point_fileid = data_layer.log(
    dataset=ibis.memtable({'FILE_IDS': project_start_points}).to_pandas(),
    metadata={'PROJECT': -999, 'THINGTYPE': 'initial_projects'},
)
print(f"""Initial Projects Logged. Summary File: {start_point_fileid}""")
    

#### Simulate the next action

This is going to be the template we reuse for conducting every step within our experimentation.

We're going to:

1. Collect the metadata, (Note that replacement of metadata dataset by logged file and allowing for automatic partitioning or manual partitioning in datasets make the retrieval of metadata for large scale datasets much more feasible.)
2. Grab the metadata for the most recent step (see, reusing the file!),
3. Randomly select, for each row, the next action based on the specific probabilities,
4. Execute that next step to advance each project.

In [None]:
# Collect the metadata table (todo - prefiltering)
t = ibis.memtable(data_layer.browse())

In [None]:
from pprint import pprint
from thethingstore.thing_store_pa_fs import pyarrow_tree
pyarrow_tree(
    path=f'stupiddir/managed_data',
    filesystem=LocalFileSystem(),
    max_depth=3
)

In [None]:
# Simulate the next step action (this simply returns 'What will I do next, here'.)
current_step_files = data_layer.load(get_step_files(
    data_layer,
    latest=True
)).FILE_IDS.to_list()

t_1 = ibis.memtable(
    data_layer.get_metadata(_) for _ in current_step_files
).group_by('THINGTYPE').count().execute()

next_step = batch_step(
    data_layer=data_layer,
    current_state=current_step_files
)

In [None]:
# Execute the randomly selected steps.
steps = next_step.execute().apply(lambda x: single_step(data_layer, x.FILE_ID, x.next_step, t, rng), axis=1)

In [None]:
# Summarize that activity
import pandas as pd
project_state_fileid = data_layer.log(
    dataset=pd.DataFrame({'FILE_IDS': steps}),
    metadata={'PROJECT': -999, 'THINGTYPE': 'project_steps'},
)
print(f"""Project State Logged. Summary File: {project_state_fileid}""")
    

In [None]:
t_2 = ibis.memtable(
    data_layer.get_metadata(_) for _ in data_layer.load(project_state_fileid).FILE_IDS.to_list()
).group_by('THINGTYPE').count().execute()

In [None]:
t_1

In [None]:
t_2

In [None]:
# Collect the metadata table (todo - prefiltering)
t = ibis.memtable(data_layer.browse())
# Simulate the next step action (this simply returns 'What will I do next, here'.)
current_step_files = data_layer.load(get_step_files(
    data_layer,
    latest=True
)).FILE_IDS.to_list()

next_step = batch_step(
    data_layer=data_layer,
    current_state=current_step_files
)

# Execute the randomly selected steps.
steps = next_step.execute().apply(lambda x: single_step(data_layer, x.FILE_ID, x.next_step, t, rng), axis=1)

# Summarize that activity
import pandas as pd
project_state_fileid = data_layer.log(
    dataset=pd.DataFrame({'FILE_IDS': steps}),
    metadata={'PROJECT': -999, 'THINGTYPE': 'project_steps'},
)
print(f"""Project State Logged. Summary File: {project_state_fileid}""")

t_3 = ibis.memtable(
    data_layer.get_metadata(_) for _ in data_layer.load(project_state_fileid).FILE_IDS.to_list()
).group_by('THINGTYPE').count().execute()

In [None]:
t_1

In [None]:
t_2

In [None]:
t_3

Quite handily we've created a convenience function for this.

In [None]:
from stepfunctions import step
project_state_fileid = step(data_layer)
project_state_fileid

In [None]:
t_4 = ibis.memtable(
    data_layer.get_metadata(_) for _ in data_layer.load(project_state_fileid).FILE_IDS.to_list()
).group_by('THINGTYPE').count().execute()

In [None]:
t_4

#### Convenience Table

This is a table that uses the experimental project metadata `(_.PROJECT=-999)` to label all the folds of data.

In [None]:
convenience_table(data_layer, ibis.memtable(data_layer.browse()))

Now we're just going to run this for a number of time steps.

This piece isn't parallelized so it's not terribly quick, but it's a functional POC. Feel free to contribute!

#### Visualize ThingType distribution by 'step'

In [None]:
from IPython import display

In [None]:
for i in range(30):
    print(i)
    step(data_layer)
    display.display(vis_stepwise_thing_dist(data_layer))

## Project Analysis

This visualization represents the empirical CDF at iterative points in time.

In [None]:
vis_stepwise_thing_dist(data_layer)

## Data Discovery

All those Things that we logged out there can be looked at; the tools below demonstrate how to do just that and they detail, by Thing, the components that are attached!

In [None]:
data_layer.browse()

This visualization demonstrates, by project, how many files are in this data pool.

In [None]:
data_layer.browse().groupby('PROJECT').FILE_ID.count()#.plot(kind='bar')

Here we're going to lift up each of the files used in a specific project; in each one we're going to identify the files that were used as input to these specific pieces of work.

In [None]:
def get_project_data(data_layer, project: int) -> ibis.Table:
    def only_what_i_want(x):
        return [v for k, v in data_layer.get_parameters(x).items() if 'fileid' in k]
    t_metadata = data_layer.browse().query('PROJECT == @project')[['FILE_ID', 'THINGTYPE']]
    t_metadata = t_metadata.assign(
        upstream = t_metadata.FILE_ID.apply(only_what_i_want)
    )

    return t_metadata

project_data = get_project_data(data_layer, 72854088)
project_data

Now, we're going to make some pretties!

In [None]:
from pyvis.network import Network
from json import loads, dumps
from stepfunctions import functions

def build_alt_text(row_o_data):
    """Each row has elements required to make a nice string."""
    header=''
    fileid=row_o_data['FILE_ID']
    # metadata='JURISDICTION: ' + row_o_data['JURISDICTION'] + ' :::: THINGTYPE: ' + row_o_data['THINGTYPE']
    parameters=row_o_data['upstream_params']
    output_str = f"{header}\nFILE: {fileid}\n\n\tParameters:\n"
    for k, v in parameters.items():
        output_str += f'.     {k}: {v}\n'
    return output_str
    # return f"{header}\nFILE: {fileid}\n\n\tMetadata:\n{metadata}\n\n\tParameters:\n{parameters}\n\n"

# I have my data
def plot_project(data_layer, project):
    # Fetch all the data.
    project_data = ibis.memtable(get_project_data(data_layer, project))
    # Make the graph
    g = Network(
        notebook=True,
        directed=True,
        # neighborhood_highlight=True
        # select_menu = True,
        # layout='hierarchical',  # Couldn't get this to work right in studio
        bgcolor="#222222",
        # font_color="white",
        heading=str(project)
    )
    # Clean and clarify node information
    node_properties = ibis.memtable(
        [
            {**v['graph_properties'], 'THINGTYPE': k}
            for k, v in functions.items()
        ]
    )
    node_df = project_data.join(
        node_properties, 'THINGTYPE'
    ).execute()
    node_df = node_df.assign(
        upstream_params=node_df.FILE_ID.apply(data_layer.get_parameters)
    )
    node_df = node_df.assign(
        title=node_df.apply(build_alt_text, axis=1)
    )
    # Start adding nodes to it.
    for index, node in node_df.iterrows():
        g.add_node(
            node.FILE_ID,
            label=node.THINGTYPE,
            title=node.title,  # This is ALT TEXT
            color=node.color,
            shape=node['shape'],  # Stupid properties!
            #size=str(node['count'] * 10),  # Stupid properties!
            #mass=(node['count'] * 10),  # Stupid properties!
            font={'size': 20, 'color': 'white'}
        )
    # Start adding edges to it.
    for index, node in project_data.execute().iterrows():
        if node.upstream:
            for upstream_node in node.upstream:
                g.add_edge(upstream_node, node.FILE_ID)
    return g

This particular visualization displays the course of the project from initial data draw all the way to the final publication. It's interactive and has descriptive alt text!

In [None]:
plot_project(data_layer, 72854088).show('out.html')

## Modeling Parameters and Metrics

This section unpacks the parameters and metrics into tables.

In [None]:
thing = 'Model'

In [None]:
project_data = ibis.memtable(get_project_data(data_layer, 72854088)).filter(_.THINGTYPE==thing)

In [None]:
t_params = ibis.memtable(project_data.execute().FILE_ID.apply(data_layer.get_parameters).to_list()).mutate(i=ibis.row_number())

In [None]:
t_params.execute()

In [None]:
t_metrics = ibis.memtable(project_data.execute().FILE_ID.apply(data_layer.get_metrics).to_list()).mutate(i=ibis.row_number())

In [None]:
t_metrics.execute()

In [None]:
t_params.join(t_metrics, 'i').execute()