In [1]:
import pandas as pd
import luigi
import d6tflow
import collections

Loading postgres module without psycopg2 installed. Will crash at runtime if postgres functionality is used.
Loading S3 module without the python package boto3. Will crash at runtime if S3 functionality is used.


Welcome to d6tflow!


# Base code to put in a package for reuse

In [2]:
class requires_instance(object):
    """
    Modified version of luigi.util.requires, requiring instanced Tasks and ignoring inherits

    TODO: Name this better!
    """

    def __init__(self, *tasks_to_require):
        super().__init__()
        if not tasks_to_require:
            raise TypeError("tasks_to_require cannot be empty")

        self.tasks_to_require = tasks_to_require

    def __call__(self, task_that_requires):
        # task_that_requires = inherits(*self.tasks_to_require)(task_that_requires)

        # Modify task_that_requires by adding requires method.
        # If only one task is required, this single task is returned.
        # Otherwise, list of tasks is returned
        def requires(_self):
            # Should this be returning based on _self?  I'm not sure the difference...
            # Warning about using multiple requirements with d6tflow's inputLoad().  The logic looks for
            # a tuple, else a dict, else treats as a single Task.  For now force non-single case to a tuple, but really
            # the logic should be changed in d6tflow
            return self.tasks_to_require[0] if len(self.tasks_to_require) == 1 else tuple(self.tasks_to_require)
        task_that_requires.requires = requires

        return task_that_requires


class TaskMixin:
    @classmethod
    def make_subclass(cls, subclass_name=None, upstream_instances=None, **kwargs):
        """
        Returns a subclass of the base class,

        :param subclass_name: Name of the subclass.  Default is the parent class's name with "BaseTask" stripped from
                              its ends
        :param requires_instance: If set, adds a @requires_instance decorator on the subclass which maps the subclass's
                                  requires using INSTANCED Tasks (not Task classes like normal Luigi)
        :param kwargs: Anything to be overwritten on the parent class.  This is passed as the third argument of the
                       "type" function and can contain class variables and functions.  Suggested use:
                            target_dir
                            additional class parameters?

        :return:
            The subclass (not instantiated)
        """
        subclass_name = subclass_name if subclass_name else cls.__name__.strip("BaseTask")
        subclass = type(subclass_name, (cls,), kwargs)
        if upstream_instances:
            # Make everything a list that we can expand into the below decorator
            upstream_instances = make_iterable(upstream_instances)

            subclass = requires_instance(*upstream_instances)(subclass)
        return subclass


# Helpers

In [3]:
def is_iterable(arg):
    """
    Returns whether an argument is an iterable but not a string
    From stackoverflow: "how to tell a varaiable is iterable but not a string"
    Args:
        arg: some variable to be tested
    Returns:
        (bool)
    """
    return (
            isinstance(arg, collections.Iterable)
            and not isinstance(arg, str)
    )


def make_iterable(arg):
    """
    Makes arg into an iterable if it isn't already (note that strings are ignored and treated as non-iterable)
    """
    return arg if is_iterable(arg) else (arg,)

# Notes

Task output naming conventions:
  * (output_dir)/(target_dir)/(task_id)-(persist_output_name).extension
This schema is defined in d6tflow's TaskData._getpath() (a base class inherrited by all dt6flow Tasks), which sets the output paths for all Targets (outputs) of a Task
output_dir: (added by d6tflow)
  * a global output directory used by all Tasks in a pipeline in d6tflow.
  * Set by: d6tflow.set_dir()
target_dir: (added by d6tflow)
  * Subdir used by a task
  * Set by: cls.target_dir
task_id: (from luigi.Task)
  * Defined by luigi.Task.__init__() to be (task_namespace)_(task_params_as_kwargs)_(hash_of_params)
  * Set by: indirectly (via namespace and kwargs)
task_namespace: (from luigi.Task)
  * Also mentioned as a task_family in luigi code.  namespace prefix on all output files.  Feels like it argues a bit with target_dir.  Defaults to cls.__name__
  * luigi.Task.get_task_family() and luigi.Task.get_task_namespace() are the toolchain that use this
  * set by: cls.task_namespace
persist_output_name: (added by d6tflow)
  * Naming convention used for outputs.  This defines the output naming used by the .save() function and the suffix on the output files
  * default: cls.persist = ['data']  (single output named 'data')
  * If only one output, the output routing functions (eg: load()) deliver it directly.  If a list, load() loads a list of the data.  (Can it handle named references using dicts too?)
  * When .save()ing data in a Task, use a dict like: self.save({'output1': df_to_save1, 'output2': df_to_save2})
  * set by: cls.persist

Possible workflows:
  -   Use classmethod that subclasses and names the subclass/target_dir to keep things identifiable
  -   have an initialization constructor rather than subclassing constructor
      -   This CANNOT use task_namespace from luigi.Task directly as it's init() statically defines task_id based on
          task_namespace at init() time.  task_namespace must be defined on the CLASS rather than instance because
          it is gotten by a class methods (get_task_namespace).  I THINK target_dir would work without subclassing
          but haven't tested fully
      -   We could override some of these methods and then avoid needing to subclass things.
      -   If we do a different workflow than subclassing we'd need something like:
              my_instance = requires_decorator(upstream_task_instance)(MyClass.\[SOMEHOW APPLY NAMESPACE/DIR AND RETURN CLS\])(instance kwargs)
  -   In both cases, I can instance ahead of time (for parameter setting) and use @requires_instance to add
      requirements

# Examples of reusable Task classes
(these are what we'd define in a package for reusing to build a pipeline from)

In [4]:
# Helper for fake data...
DATA_VALUES = [1, 2, 3]

class BaseTaskGetTwoData(TaskMixin, d6tflow.tasks.TaskPqPandas):
    """
    This is a fake 'data generation' task that creates two dataframes with a little data
    """
    
    # Define parameters as luigi.Parameter subclasses.  Luigi looks for these if you ever use inherrits(), and I think
    # also checks to make sure they're defined early in a pipeline run?
    columns = luigi.ListParameter(default=['a', 'b', 'c'])

    # Set persist to explicitly name the outputs of a process and/or define more than one
    # persist = ['data']  # Default set internally
    persist = ['output1', 'output2']

    def run(self):
        # Fabricate some data
        df = pd.DataFrame([{k: v*i_k for i_k, k in enumerate(self.columns)} for v in DATA_VALUES])

        to_save = {output_name: df for output_name in self.persist}
        self.save(to_save)


class BaseTaskGetOneData(TaskMixin, d6tflow.tasks.TaskPqPandas):
    """
    This is a fake 'data generation' task that creates one dataframe with a little data
    """
    columns = luigi.ListParameter(default=['a', 'b', 'c'])

    def run(self):
        # Fabricate some data
        df = pd.DataFrame([{k: v*i_k for i_k, k in enumerate(self.columns)} for v in DATA_VALUES])

        self.save(df)


class BaseTaskPrintDf(TaskMixin, d6tflow.tasks.TaskCache):
    """
    This task prints one or more dataframes to the screen
    """
    def run(self):
        # Basic Luigi: .input() returns whatever self.requires() returns, so to use input() you need to interpret
        # whatever comes from upstream (could be a Target, Tuple(Target1, ...), Tuple(Dict('name1': Target1, ...), ...)
        # dfs = self.input().load()  # For a single input.  self.input() returns a Target, which has a .load()
        # For loading all of multiple inputs (can also do key's here I think?).  Note it always flattens to a list.
        # This function could be improved to reflect the structure of whatever is in requires() better
        dfs = self.inputLoad()

        # Do some "work"
        print("Printing results:")
        for i, df in enumerate(dfs):
            print(f"df {i}")
            print(df)


class BaseTaskDropColumn(TaskMixin, d6tflow.tasks.TaskPqPandas):
    """
    Drop a column from a dataframe, saving the resulting dataframe for future tasks
    """
    column = luigi.Parameter()

    def run(self):
        df = self.input().load()

        df = df.drop(columns=[self.column])
        self.save(df)

# Example pipeline

## Define

<img src="images/schematic_of_pipeline.png">

In [5]:
# Example of defining output by making a Subclass and then instantiating it
# Subclassing gives us control over the target_dir.  Everything from the same subclass goes to the same target_dir, so
# each Task (atomic action in a pipeline) needs its own subclass of a Task
TaskGetTwoData = BaseTaskGetTwoData.make_subclass(subclass_name="MyGetTwoData", target_dir='my_get_two_data')
my_get_two_data = TaskGetTwoData(columns=['a', 'b', 'c'])

# We can also do it all in one command
my_get_two_data2 = BaseTaskGetTwoData.make_subclass(subclass_name="MyGetTwoData2",
                                                    target_dir='my_get_two_data2',)(columns=['d', 'e', 'f'])

my_get_one_data = BaseTaskGetOneData.make_subclass(subclass_name="MyGetOneData",
                                                   target_dir='my_get_one_data')(columns=['aa', 'bb'])

# Downstream tasks can consume upstream, fully instantiated tasks
my_drop_column = BaseTaskDropColumn.make_subclass(subclass_name="MyDropColumn", 
                                                  target_dir="my_drop_column", 
                                                  upstream_instances=[my_get_one_data])(column='aa')

my_print_df_1 = BaseTaskPrintDf.make_subclass(subclass_name="MyPrintDf1", upstream_instances=my_drop_column)()

# We can also map multiple inputs to a single task
# (target_dir doesn't matter here - no output!)
my_print_df_2 = BaseTaskPrintDf.make_subclass(upstream_instances=[
                                                                  my_get_two_data,
                                                                  my_get_two_data2,
                                                                  my_drop_column,
                                                                  ],
                                              target_dir='this_doesnt_matter_but')()


  # This is added back by InteractiveShellApp.init_path()


## Run

This runs everything it needs because nothing is pre-computed

In [6]:
# .run() is supposed to support multiple tasks but it wasn't working.  We run once for each (could also have a single
# aggregation task that requires all the above tasks)
d6tflow.run(my_print_df_1)

INFO: Informed scheduler that task   MyPrintDf1__99914b932b   has status   PENDING
INFO: Informed scheduler that task   MyDropColumn_aa_8d8cf8322c   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 20463] Worker Worker(salt=174418553, workers=1, host=scribs-desktop, username=scribs, pid=20463) running   MyPrintDf1()
INFO: [pid 20463] Worker Worker(salt=174418553, workers=1, host=scribs-desktop, username=scribs, pid=20463) done      MyPrintDf1()
INFO: Informed scheduler that task   MyPrintDf1__99914b932b   has status   DONE
INFO: Worker Worker(salt=174418553, workers=1, host=scribs-desktop, username=scribs, pid=20463) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 complete ones were encountered:
    - 1 MyDropColumn(column=aa)
* 1 ran successfully:
    - 1 MyPrintDf1()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execu

Printing results:
df 0
bb


True

This only runs what didn't run above

In [7]:
d6tflow.run(my_print_df_2)

INFO: Informed scheduler that task   PrintDf__99914b932b   has status   PENDING
INFO: Informed scheduler that task   MyDropColumn_aa_8d8cf8322c   has status   DONE
INFO: Informed scheduler that task   MyGetTwoData2___d____e____f___a0fcbe71f2   has status   DONE
INFO: Informed scheduler that task   MyGetTwoData___a____b____c___bc7315d43a   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 20463] Worker Worker(salt=538919802, workers=1, host=scribs-desktop, username=scribs, pid=20463) running   PrintDf()
INFO: [pid 20463] Worker Worker(salt=538919802, workers=1, host=scribs-desktop, username=scribs, pid=20463) done      PrintDf()
INFO: Informed scheduler that task   PrintDf__99914b932b   has status   DONE
INFO: Worker Worker(salt=538919802, workers=1, host=scribs-desktop, username=scribs, pid=20463) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 3 complete ones were encoun

Printing results:
df 0
   a  b  c
0  0  1  2
1  0  2  4
2  0  3  6
df 1
   a  b  c
0  0  1  2
1  0  2  4
2  0  3  6
df 2
   d  e  f
0  0  1  2
1  0  2  4
2  0  3  6
df 3
   d  e  f
0  0  1  2
1  0  2  4
2  0  3  6
df 4
bb


True

And we can see the results for these cached in the data directory

There is a directory for each pipeline step (defined by target_dir) and the filenames depend on the schema described in notes above

In [8]:
!ls -lR data

data:
total 16
drwxr-xr-x 2 scribs scribs 4096 Apr 30 14:00 my_drop_column
drwxr-xr-x 2 scribs scribs 4096 Apr 30 14:00 my_get_one_data
drwxr-xr-x 2 scribs scribs 4096 Apr 30 14:00 my_get_two_data
drwxr-xr-x 2 scribs scribs 4096 Apr 30 14:00 my_get_two_data2

data/my_drop_column:
total 4
-rw-r--r-- 1 scribs scribs 1685 Apr 30 14:00 MyDropColumn_aa_8d8cf8322c-data.parquet

data/my_get_one_data:
total 4
-rw-r--r-- 1 scribs scribs 2317 Apr 30 14:00 MyGetOneData___aa____bb___24e1609319-data.parquet

data/my_get_two_data:
total 8
-rw-r--r-- 1 scribs scribs 2928 Apr 30 14:00 MyGetTwoData___a____b____c___bc7315d43a-output1.parquet
-rw-r--r-- 1 scribs scribs 2928 Apr 30 14:00 MyGetTwoData___a____b____c___bc7315d43a-output2.parquet

data/my_get_two_data2:
total 8
-rw-r--r-- 1 scribs scribs 2928 Apr 30 14:00 MyGetTwoData2___d____e____f___a0fcbe71f2-output1.parquet
-rw-r--r-- 1 scribs scribs 2928 Apr 30 14:00 MyGetTwoData2___d____e____f___a0fcbe71f2-output2.parquet


And reusing parts (or all) of the above pipeline.  We see here they don't run a second time (except the prints)

In [9]:
d6tflow.run(my_print_df_1)

INFO: Informed scheduler that task   MyPrintDf1__99914b932b   has status   PENDING
INFO: Informed scheduler that task   MyDropColumn_aa_8d8cf8322c   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 20463] Worker Worker(salt=675251088, workers=1, host=scribs-desktop, username=scribs, pid=20463) running   MyPrintDf1()
INFO: [pid 20463] Worker Worker(salt=675251088, workers=1, host=scribs-desktop, username=scribs, pid=20463) done      MyPrintDf1()
INFO: Informed scheduler that task   MyPrintDf1__99914b932b   has status   DONE
INFO: Worker Worker(salt=675251088, workers=1, host=scribs-desktop, username=scribs, pid=20463) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 complete ones were encountered:
    - 1 MyDropColumn(column=aa)
* 1 ran successfully:
    - 1 MyPrintDf1()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execu

Printing results:
df 0
bb


True

In [10]:
d6tflow.run(my_print_df_2)

INFO: Informed scheduler that task   PrintDf__99914b932b   has status   PENDING
INFO: Informed scheduler that task   MyDropColumn_aa_8d8cf8322c   has status   DONE
INFO: Informed scheduler that task   MyGetTwoData2___d____e____f___a0fcbe71f2   has status   DONE
INFO: Informed scheduler that task   MyGetTwoData___a____b____c___bc7315d43a   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 20463] Worker Worker(salt=136766269, workers=1, host=scribs-desktop, username=scribs, pid=20463) running   PrintDf()
INFO: [pid 20463] Worker Worker(salt=136766269, workers=1, host=scribs-desktop, username=scribs, pid=20463) done      PrintDf()
INFO: Informed scheduler that task   PrintDf__99914b932b   has status   DONE
INFO: Worker Worker(salt=136766269, workers=1, host=scribs-desktop, username=scribs, pid=20463) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 3 complete ones were encoun

Printing results:
df 0
   a  b  c
0  0  1  2
1  0  2  4
2  0  3  6
df 1
   a  b  c
0  0  1  2
1  0  2  4
2  0  3  6
df 2
   d  e  f
0  0  1  2
1  0  2  4
2  0  3  6
df 3
   d  e  f
0  0  1  2
1  0  2  4
2  0  3  6
df 4
bb


True