# Luigi ETL
> "The docs are a head scratcher.  Let's explore the package together."

- toc:true- branch: master- badges: true- comments: true
- author: Hamel Husain & Jeremy Howard
- categories: [fastpages, jupyter]

## About

I got excited when I first heard about Luigi.  It's an python ETL framework built by Spotify.  I use pandas in my every day job and have built numerous pipeline jobs to move, transform, and analyze data across my organization.  I thought Luigi would be a great addition to help manage these pipelines but after reading there getting started docs it left me confused and scratching my head.

If you made it here then I assume the docs have you confused as well and hopefully my post below can provide you with a bit more clarity.  The post assumes you have already read the docs so if you haven't please read those as I assume in this post you already have some knowledge of Luigi.

In [1]:
#hide
from utils import DisplayablePath

## Task Execution

Typical ETL Execution:
> Task A $\longrightarrow$ Task B $\longrightarrow$ Task C

Luig ETL Execution:
> Task A $\longleftarrow$ Task B $\longleftarrow$ Task C

The most important thing to understand about Luigi is that it executes the etl from the last task going backwards.  It checks first to see if the current task (Task C) is completed and if not will then move backwards to check if the previous task is completed (Task B).  Once it find the first completed task it will then begin to executes the Tasks moving forward.  

This approach can save you a lot of time in your etl processes as you won't re-run completed tasks but makes it a bit trickier to implement because you can find yourself in a situation where Task B or Task C will always return that they are completed and Task A will never run again.




## How are Tasks linked?

> Task A $\longleftarrow$ Task B $\longleftarrow$ Task C

With the exception of External Tasks, most other Tasks are dependent on another Task. The way you define this dependency is defining a `requires()` method in the Task Class you created and return in the other Task(s).  If a Task is completed it won't bother to check the dependent tasks.  This is a very important concept to understand.

## What defines a completed Task?

In the Luigi docs it states that a Task is considered completed when the Task output exists.  So if Task A outputs `taks_a.csv` and it exists then Task A will be considered complete.  What the getting started docs fails to mention is that in reality Task A is considered complete with Task A's method `complete()` returns `True`.  The `complete()` methods default behaviour is to check if the output exists but we can override this behaviour and I would probably bet most do that have deployed Luigi into production.

## Coding Demonstration of Task Execution

> Task A $\longleftarrow$ Task B

The below code is an example of how to setup a Luigi Task.  We have two classes, `Task_A` and `Task_B`, where `Task_B` is dependent on `Task_A`.  I've provided comments in the output to help visualize the order of events that take place when the Tasks are ran.

There are few things to note about the code:
* `GlobalParam` is a helper class to provide a global variable so I can count the execution events i.e. **1:** complete () ...
* I replaced the luigi `complete()` method with a similiar method that checks if the output file exists so we could see the method executed in the print statements.
* `MockTarget` creates and in memory file object that we can write to and check if it exists.

In [2]:
import luigi
import pandas as pd
from luigi.mock import MockTarget

class GlobalParams(luigi.Config):
    count = luigi.IntParameter(default=1)

class Task_A(luigi.Task):

    def output(self):
        return MockTarget("Task_A")

    def run(self):
        print(f"{g.count}: run() {self.__class__.__name__} has no prior Task dependency. It is now running to complete the task")
        g.count += 1 
        out = self.output().open("w")
        out.write('complete')
        out.close()
    
    def complete(self):
        print(f'{g.count}: complete() Checking to see if {self.__class__.__name__} has been completed')
        g.count += 1 
        return self.output().exists() 

        
class Task_B(luigi.Task):
    
    def requires(self):
        print(f'{g.count}: requires() {self.__class__.__name__} is not completed, checking to see if previous tasks are required and completed')
        g.count += 1 
        return Task_A()
            
    def output(self):
        return MockTarget("Task_B")

    def run(self):
        print(f'{g.count}: run() All previous tasks are completed and {self.__class__.__name__} is running to complete the task')
        g.count += 1 
        out = self.output().open("w")
        out.write('complete')
        out.close()
        print(f'{g.count}: All Tasks are completed')
        
    def complete(self):
        print(f'{g.count}: complete() Checking to see if {self.__class__.__name__} has been completed')
        g.count += 1 
        
        if self.output().exists():
            print(f'{g.count}: All Tasks are completed')
        return self.output().exists()

g = GlobalParams()
luigi.build([Task_B()], local_scheduler=True)

DEBUG: Checking if Task_B() is complete
DEBUG: Checking if Task_A() is complete
INFO: Informed scheduler that task   Task_B__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Task_A__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 16140] Worker Worker(salt=665276073, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) running   Task_A()
INFO: [pid 16140] Worker Worker(salt=665276073, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) done      Task_A()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Task_A__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 16140] Worker Worker(salt=665276073, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) running   Task_B()
INFO: [pid 16140] Worker Worker(salt=665276073, workers=1, host=DE

1: complete() Checking to see if Task_B has been completed
2: requires() Task_B is not completed, checking to see if previous tasks are required and completed
3: complete() Checking to see if Task_A has been completed
4: run() Task_A has no prior Task dependency. It is now running to complete the task
5: requires() Task_B is not completed, checking to see if previous tasks are required and completed
6: complete() Checking to see if Task_A has been completed
7: run() All previous tasks are completed and Task_B is running to complete the task
8: All Tasks are completed


True

When we run the Task a second time note how `Task_A` isn't even referenced.  Luigi checked to see if `Task_B` was complete and stopped the execution since it returned `True`.  That means that if some file upstream was updated and needed to be transformed by `Task_A`it would never occure since Luigi would always stop at `Task_B`. 

In [3]:
g.count=1
luigi.build([Task_B()], local_scheduler=True)

DEBUG: Checking if Task_B() is complete
INFO: Informed scheduler that task   Task_B__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=240940604, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 Task_B()

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====



1: complete() Checking to see if Task_B has been completed
2: All Tasks are completed


True

In [4]:
#hide

# Clean Up...Remove the Memory Files so be able to run again with same ouput
Task_B().output().remove()
Task_A().output().remove()

## Luigi Paramaters

> Words $\longleftarrow$ Count

Paramaters are Luigi's intended way to make sure task get updated based on some frequency so they don't get stuck in a completed status.  Luigi offers there own `Paramater` object which mostly help with converting types when executing tasks from the command line.

Below we have created two new Tasks, `Words` and `Count`.  Each task takes a date as a paramater and appends the date to the file name output.  You'll also notice I removed the the `complete()` method which means it will default to the origial method which also checks in the output target exists.

In [5]:
import datetime
from pathlib import Path
OUTPUT_PATH = Path('output')

class Words(luigi.Task):
    date = luigi.DateParameter(default=datetime.date.today())
    
    def output(self):
        return luigi.LocalTarget(OUTPUT_PATH/f'words_{self.date}.csv')

    def run(self):
        words = ['apple','banana','grapefruit']

        df = pd.DataFrame(dict(words=words))
        df.to_csv(self.output().path, index=False)
        
class Count(luigi.Task):
    date = luigi.DateParameter(default=datetime.date.today())
    
    def requires(self):
        # Passing the luigi paramater back to upstream task
        return Words(self.date) 
            
    def output(self):
        return luigi.LocalTarget(OUTPUT_PATH/f'count_{self.date}.csv')

    def run(self):
        df = pd.read_csv(self.input().path)
        df['letter_count'] = df.words.map(len)
        df.to_csv(self.output().path, index=False)
        
luigi.build([Count()], local_scheduler=True)

DEBUG: Checking if Count(date=2020-10-26) is complete
INFO: Informed scheduler that task   Count_2020_10_26_424115e443   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=851362906, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 Count(date=2020-10-26)

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====



True

In [7]:
#hide_input

# Used to display the files for demonstration purposes
paths = DisplayablePath.make_tree(OUTPUT_PATH)
print('Output Directory Tree:')
for path in paths:
    print(path.displayable())

Output Directory Tree:
output/
├── count_2020-10-26.csv
└── words_2020-10-26.csv


Above our Tasks run succesfully and saved the outputs in our output directory.  So what happens if we run it again?

In [8]:
luigi.build([Count()], local_scheduler=True)

DEBUG: Checking if Count(date=2020-10-26) is complete
INFO: Informed scheduler that task   Count_2020_10_26_424115e443   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=137048837, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 Count(date=2020-10-26)

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====



True

In [9]:
#hide_input

# Used to display the files for demonstration purposes
paths = DisplayablePath.make_tree(OUTPUT_PATH)
print('Output Directory Tree:')
for path in paths:
    print(path.displayable())

Output Directory Tree:
output/
├── count_2020-10-26.csv
└── words_2020-10-26.csv


As you can nothing happened since the `Count` task encountered an ouput that already existed with the same name.  This is where paramaters become useful.  Below we'll provide different date to the `Count` task.

In [10]:
luigi.build([Count(date=pd.to_datetime('10/25/2021'))], local_scheduler=True)

DEBUG: Checking if Count(date=2021-10-25) is complete
DEBUG: Checking if Words(date=2021-10-25) is complete
INFO: Informed scheduler that task   Count_2021_10_25_8a7563aba6   has status   PENDING
INFO: Informed scheduler that task   Words_2021_10_25_8a7563aba6   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 16140] Worker Worker(salt=005117057, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) running   Words(date=2021-10-25)
INFO: [pid 16140] Worker Worker(salt=005117057, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) done      Words(date=2021-10-25)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Words_2021_10_25_8a7563aba6   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 16140] Worker Worker(salt=005117057, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) ru

True

In [11]:
#hide_input

# Used to display the files for demonstration purposes
paths = DisplayablePath.make_tree(OUTPUT_PATH)
print('Output Directory Tree:')
for path in paths:
    print(path.displayable())

Output Directory Tree:
output/
├── count_2020-10-26.csv
├── count_2021-10-25.csv
├── words_2020-10-26.csv
└── words_2021-10-25.csv


In [12]:
#hide

# Remove Files
for file in OUTPUT_PATH.glob('*'):
    file.unlink()

## External Tasks

If your pipeline start with some dependency from an External Task you can utilize the `ExternalTask` object which is exactly the same as the `Task` object except it doesn't have a `run()` method.

This is useful because it allows for your task to gracefully end a job if the external source criteria is not met.

In [13]:
class Words(luigi.ExternalTask):
    def output(self):
        return luigi.LocalTarget(OUTPUT_PATH/f'words.csv')

luigi.build([Words()], local_scheduler=True)

DEBUG: Checking if Words() is complete
INFO: Informed scheduler that task   Words__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=539334424, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 were left pending, among these:
    * 1 were missing external dependencies:
        - 1 Words()

Did not run any tasks
This progress looks :| because there were missing external dependencies

===== Luigi Execution Summary =====



True

Above the `Words` external task did not run because `words.csv`, the external dependency, was missing.

In [14]:
OUTPUT_PATH = Path('output')

words = ['apple','banana','grapefruit']

df = pd.DataFrame(dict(words=words))
df.to_csv(OUTPUT_PATH/'words.csv', index=False)

Now that we created `words.csv` our external task will return as completed and pass it's ouput to the next Task if it exists.

In [15]:
luigi.build([Words()], local_scheduler=True)

DEBUG: Checking if Words() is complete
INFO: Informed scheduler that task   Words__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=364581214, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 Words()

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====



True

## Alternate Complete Method

As I mentioned earlier by default Luigi determines if a Task is complete by checking if the output exists.  However, there is a common use case in pipeline workflows where Tasks should be ran when a file is updated.  Since Luigi only checks the output name it will determine that a Task is completed no matter how many times a file gets updated.

However, we can override this method by overriding `complete()` method in the `Task` object by defining your own criteria.  It needs to return `False` if the task is not complete and `True` if the task is complete.

Below we will create our own `complete()` function that will update all tasks where there dependent task output files have been updated.

In [16]:
import os
import time

class MTimeMixin:
    """
        Mixin that flags a task as incomplete if any requirement
        is incomplete or has been updated more recently than this task
        This is based on http://stackoverflow.com/a/29304506, but extends
        it to support multiple input / output dependencies.
    """
    
    def complete(self):
        def to_list(obj):
            if type(obj) in (type(()), type([])):
                return obj
            else:
                return [obj]

        def mtime(path):
            return time.ctime(os.path.getmtime(path))

        if not all(os.path.exists(out.path) for out in to_list(self.output())):
            return False

        self_mtime = min(mtime(out.path) for out in to_list(self.output()))

        # the below assumes a list of requirements, each with a list of outputs. YMMV
        for el in to_list(self.requires()):
            if not el.complete():
                # Fixes Windows FileExistsError
                if os.path.exists(self.output().path):
                    os.remove(self.output().path)
                return False
            for output in to_list(el.output()):
                if mtime(output.path) > self_mtime:
                    # Fixes Windows FileExistsError
                    os.remove(self.output().path)
                    
                    return False

        return True

The above code checks to see if the Tasks ouput exists and it checks to see if the dependency task(s) outputs modified dates (mtimes) are greater then the current Tasks ouput.  It's put in a Class object so we can have our Task objects inherit the `complete()` method and avoid repeating ourselves.

In [17]:
class Words(luigi.ExternalTask):
    def output(self):
        return luigi.LocalTarget(OUTPUT_PATH/'words.csv')
    
class CountLetters(MTimeMixin, luigi.Task):

    def requires(self):
        return Words()

    def output(self):
        return luigi.LocalTarget(OUTPUT_PATH/'count_letters.csv')

    def run(self):
        df = pd.read_csv(self.input().path)
        df['letter_count'] = df.words.map(len)
        df.to_csv(self.output().path, index=False)

In [18]:
luigi.build([CountLetters()], local_scheduler=True)

DEBUG: Checking if CountLetters() is complete
DEBUG: Checking if Words() is complete
INFO: Informed scheduler that task   CountLetters__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Words__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 16140] Worker Worker(salt=407857158, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) running   CountLetters()
INFO: [pid 16140] Worker Worker(salt=407857158, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) done      CountLetters()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   CountLetters__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=407857158, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) was stopped. Shutting down Keep-A

True

In [19]:
#hide_input

# Used to display the files for demonstration purposes
paths = DisplayablePath.make_tree(OUTPUT_PATH)
print('Output Directory Tree:')
for path in paths:
    print(path.displayable())

Output Directory Tree:
output/
├── count_letters.csv
└── words.csv


When we run the task again it doesn't run any tasks because `words.csv` exists and it's modified time (mtime) is greater then `count_letters.csv`.

In [20]:
luigi.build([CountLetters()], local_scheduler=True)

DEBUG: Checking if CountLetters() is complete
INFO: Informed scheduler that task   CountLetters__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=834721361, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 CountLetters()

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====



True

We will now update `count_letters.csv` to include a few more words and watch our Luigi run the Task becasue of the updated modification times on the files.

In [21]:
words = ['apple','banana','grapefruit', 'cherry', 'orange']

df = pd.DataFrame(dict(words=words))
df.to_csv(OUTPUT_PATH/'words.csv', index=False)

luigi.build([CountLetters()], local_scheduler=True)

DEBUG: Checking if CountLetters() is complete
DEBUG: Checking if Words() is complete
INFO: Informed scheduler that task   CountLetters__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Words__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 16140] Worker Worker(salt=009435230, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) running   CountLetters()
INFO: [pid 16140] Worker Worker(salt=009435230, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) done      CountLetters()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   CountLetters__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=009435230, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) was stopped. Shutting down Keep-A

True

## SQL Tasks

> Words $\longleftarrow$ Count $\longleftarrow$ StoreSQL $\longleftarrow$ PrintSQL

SQL is a common step in many pipelines but the Luigi getting started docs barely cover the topic.  In this section we will create two new tasks.  The first task, `StoreSql`, will take the ouput from `CountLetters` and store it in a `SQLite` database.  The second task, `PrintSQL`, will then read from out database and print both tables that Luigi created.

Luigi provides a target object called `SQLAlchemyTarget` which assists with working with SQL Databases.  There are other sql like target objects but I find this one provides the most abstraction so that it can be used with many different SQL flavors.

The big difference between `LocalTarget` which we have been using prior is that `SQLAlchemyTarget` creates and updates what is called a "Marker Table" to keep track of whether a task is complete or not.  You provide the Marker Table with an `update_id` which Luigi will check if it exists before running the Task.

Below I've provided the code for our `StoreSQL` and `PrintSQL` tasks.  There are a couple of things worth noting.

* We have overriden the `complete()` method to check if the prior task `CountLetters` has been completed and if the `StoreSQL` tasks output exits.  If either returns `False` then the Task will run.
* We are creating a sqlite database called `my.db`.
* `self.output().touch()` is what marks the Task as complete and creates/updates the Marker Table
* `PrintSQL` is complete method is set to False so that it always runs for demonstration purposes.

In [22]:
#hide_input

# Used to display the files for demonstration purposes
paths = DisplayablePath.make_tree(OUTPUT_PATH)
print('Output Directory Tree:')
for path in paths:
    print(path.displayable())

Output Directory Tree:
output/
├── count_letters.csv
└── words.csv


In [23]:
from luigi.contrib import sqla
from sqlalchemy import create_engine

OUTPUT_PATH = Path('output')
connection_string = f"sqlite:///{OUTPUT_PATH}/my.db"

class StoreSQL(luigi.Task):
    connection_string = luigi.Parameter()
    target_table = luigi.Parameter()
    
    @property
    def update_id(self):
        mtime = os.path.getmtime(self.input().path)
        mtime = datetime.datetime.fromtimestamp(mtime).strftime("%Y-%m-%d %H:%M:%S")
        return mtime + '_' + self.target_table

    def complete(self):
        if not self.requires().complete():
            return False
        else:
            return self.output().exists()
        
    def requires(self):
        return CountLetters()

    def output(self):
        return sqla.SQLAlchemyTarget(
            connection_string=self.connection_string,
            target_table=self.target_table,
            update_id=self.update_id
        )

    def run(self):
        con = self.output().engine
        df = pd.read_csv(self.input().path)
        df.to_sql(name=self.target_table, con=con, if_exists='replace')

        # Update Marker Table
        self.output().touch()


class PrintSQL(luigi.Task):
    connection_string = luigi.Parameter()
    target_table = luigi.Parameter()
    
    def requires(self):
        return StoreSQL(self.connection_string, self.target_table)

    def complete(self):
        return False

    def output(self):
        pass

    def run(self):
        input = self.input()
        con = input.engine
        table = input.target_table
        
        print('// Letter Count Table')
        print(pd.read_sql(sql=table, con=con), end='\n\n')
        print('// Marker Table')
        print(pd.read_sql(sql='table_updates', con=con))

In [24]:
luigi.build([PrintSQL(connection_string, target_table='letter_count')], local_scheduler=True)

DEBUG: Checking if PrintSQL(connection_string=sqlite:///output/my.db, target_table=letter_count) is complete
DEBUG: Checking if StoreSQL(connection_string=sqlite:///output/my.db, target_table=letter_count) is complete
INFO: Informed scheduler that task   PrintSQL_sqlite____output_letter_count_4c5210e673   has status   PENDING
DEBUG: Checking if CountLetters() is complete
INFO: Informed scheduler that task   StoreSQL_sqlite____output_letter_count_4c5210e673   has status   PENDING
INFO: Informed scheduler that task   CountLetters__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 16140] Worker Worker(salt=698183415, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) running   StoreSQL(connection_string=sqlite:///output/my.db, target_table=letter_count)
INFO: [pid 16140] Worker Worker(salt=698183415, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) done  

// Letter Count Table
   index       words  letter_count
0      0       apple             5
1      1      banana             6
2      2  grapefruit            10
3      3      cherry             6
4      4      orange             6

// Marker Table
                          update_id  target_table                   inserted
0  2020-10-26 13:59:26_letter_count  letter_count 2020-10-26 13:59:41.795176


True

In [26]:
# Used to display the files for demonstration purposes
paths = DisplayablePath.make_tree(OUTPUT_PATH)
print('Output Directory Tree:')
for path in paths:
    print(path.displayable())

Output Directory Tree:
output/
├── count_letters.csv
├── my.db
└── words.csv


As you can see from above our SQL Task have updated two tables, target and marker, and printed out the table results.  If you look at the Marker table `update_id` column you'll notice it is the concatenation of our `count_letters.csv` mtime and target table name.

Now let's update our `words.csv` and see what happens when we run the task again.

In [27]:
words = ['apple','banana','grapefruit', 'cherry', 'orange', 'peach', 'strawberry']

df = pd.DataFrame(dict(words=words))
df.to_csv(OUTPUT_PATH/'words.csv', index=False)
luigi.build([PrintSQL(connection_string, target_table='letter_count')], local_scheduler=True)

DEBUG: Checking if PrintSQL(connection_string=sqlite:///output/my.db, target_table=letter_count) is complete
DEBUG: Checking if StoreSQL(connection_string=sqlite:///output/my.db, target_table=letter_count) is complete
INFO: Informed scheduler that task   PrintSQL_sqlite____output_letter_count_4c5210e673   has status   PENDING
DEBUG: Checking if CountLetters() is complete
INFO: Informed scheduler that task   StoreSQL_sqlite____output_letter_count_4c5210e673   has status   PENDING
DEBUG: Checking if Words() is complete
INFO: Informed scheduler that task   CountLetters__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Words__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 16140] Worker Worker(salt=511017160, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) running   CountLetters()
INFO: [pid 16140] Worker Worker(salt=511017160, workers=1, 

// Letter Count Table
   index       words  letter_count
0      0       apple             5
1      1      banana             6
2      2  grapefruit            10
3      3      cherry             6
4      4      orange             6
5      5       peach             5
6      6  strawberry            10

// Marker Table
                          update_id  target_table                   inserted
0  2020-10-26 13:59:26_letter_count  letter_count 2020-10-26 13:59:41.795176
1  2020-10-26 13:59:57_letter_count  letter_count 2020-10-26 13:59:57.700856


True

As expected our Letter Count table has been updated and the Marker Table has a new entry to show the update.  

Let's run the task one more time without updating `words.csv` to really demonstrate whats occuring.

In [28]:
luigi.build([PrintSQL(connection_string, target_table='letter_count')], local_scheduler=True)

DEBUG: Checking if PrintSQL(connection_string=sqlite:///output/my.db, target_table=letter_count) is complete
DEBUG: Checking if StoreSQL(connection_string=sqlite:///output/my.db, target_table=letter_count) is complete
INFO: Informed scheduler that task   PrintSQL_sqlite____output_letter_count_4c5210e673   has status   PENDING
INFO: Informed scheduler that task   StoreSQL_sqlite____output_letter_count_4c5210e673   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 16140] Worker Worker(salt=975640648, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) running   PrintSQL(connection_string=sqlite:///output/my.db, target_table=letter_count)
INFO: [pid 16140] Worker Worker(salt=975640648, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=16140) done      PrintSQL(connection_string=sqlite:///output/my.db, target_table=letter_count)
DEBUG: 1 running tasks, waiting for next task to 

// Letter Count Table
   index       words  letter_count
0      0       apple             5
1      1      banana             6
2      2  grapefruit            10
3      3      cherry             6
4      4      orange             6
5      5       peach             5
6      6  strawberry            10

// Marker Table
                          update_id  target_table                   inserted
0  2020-10-26 13:59:26_letter_count  letter_count 2020-10-26 13:59:41.795176
1  2020-10-26 13:59:57_letter_count  letter_count 2020-10-26 13:59:57.700856


True

As you can see the only task that ran was the `PrintSQL` task since it's always set to run but our other tasks didn't run and note how the Marker Table is not updated.

In [29]:
#hide

# Remove Files
for file in OUTPUT_PATH.glob('*'):
    file.unlink()

## Resources
- https://stackoverflow.com/questions/40407936/mysql-targets-in-luigi-workflow/40423427#40423427
- https://stackoverflow.com/questions/40707004/using-luigi-to-update-postgres-table
- https://stackoverflow.com/questions/28793832/can-luigi-rerun-tasks-when-the-task-dependencies-become-out-of-date
- https://luigi.readthedocs.io/en/stable/_modules/luigi/contrib/sqla.html
- https://stackoverflow.com/questions/9727673/list-directory-tree-structure-in-python
- https://stackoverflow.com/questions/11349333/how-to-ignore-the-first-line-of-data-when-processing-csv-data
- https://stackoverflow.com/questions/35918605/how-to-delete-a-table-in-sqlalchemy
- https://stackoverflow.com/questions/11900553/sqlalchemy-table-already-exists
- https://stackoverflow.com/questions/237079/how-to-get-file-creation-modification-date-times-in-python
- https://stackoverflow.com/questions/48509083/how-to-make-a-parameter-available-to-all-luigi-tasks