In [None]:
import sys

import numpy as np
import pandas as pd
import dask.dataframe as ddf
import dask.array as dda
from dask.diagnostics import ProgressBar

sys.path.append('../')
from sed.core.workflow import WorkflowManager, WorkflowStep


# generate a test dataframe

In [None]:
arr = np.random.randn(1000,3)
arr[:,0] = np.random.randint(600,900,1000)
df = pd.DataFrame(data=arr,columns=['tof','x','y'])
df.head()
dsk = ddf.from_pandas(df,npartitions=2)

In [None]:
dsk

# define a workflow step function

In [None]:
class Tof_to_energy(WorkflowStep):

    # init contains the parameters one wants to use
    def __init__(
        self,
        tof_column:str,
        tof_offset:float,
        tof_distance:float,
        energy_offset:float=0,
        out_cols='energy',
        duplicate_policy='raise'
    ) -> None:
        self.tof_column = tof_column
        self.tof_offset = tof_offset
        self.tof_distance = tof_distance
        self.energy_offset = energy_offset
        super().__init__(
            out_cols=out_cols,
            duplicate_policy=duplicate_policy,
        )
    
    # can define arbitrary functions which can be called internally

    # the main required function. this is called and mapped on the dataframe
    # this is the only hard requirement for these classes. all functionality is 
    # then inherited from the WorkflowStep parent class
    def func(self,df: ddf.DataFrame) ->  ddf.DataFrame:
        k = 0.5 * 1e18 * 9.10938e-31 / 1.602177e-19
        return k * np.power(
            self.tof_distance / ((df[self.tof_column]) - self.tof_offset), 2.
            ) - self.energy_offset


# initialize the workflow step
this is not necessary, but shows it works internally

In [None]:
fn = Tof_to_energy(
    tof_column='tof',
    tof_offset = 1,
    tof_distance = 1,
    energy_offset = 100,
    out_cols = 'energy'
)

In [None]:
fn

In [None]:
fn(dsk).compute().head()

# now the actua workflow manager

In [None]:
from sed.core.metadata import MetaHandler

In [None]:
wf = WorkflowManager(
    dataframe = dsk,
    workflow=[fn],# this can be a list of workflow steps which will be performed in order.
    metadata=MetaHandler(),
    config={'binning':{'num_cores':8}},
)

In [None]:
wf

# run the workflow
this runs through the workflow queue and applies all transformations. If a step was used already, it rises DuplicateEntryError, unless a different duplicate_policy is defined in the workflow_step.

In [None]:
wf.apply_workflow()

# do some binning

In [None]:
out = wf.compute_binning(
    bins=10,
    axes=['energy','x','y'],
    ranges=[(-96,-93),(-2,2),(-2,2)],
    num_cores=8,
    )

In [None]:
out

# TODOs:
- define how to handle complex steps which require generating parameters
- fetch parameters from config file
- create the workflow steps based on the transformations we currently are familiar with (e.g. jittering)