# Flow-based Programming with Luigi for doing Science

A better way to make + manage data pipelines

# Harry Jack
harry.jack@outlook.com

https://github.com/hazbottles/luigi_talk


- 2013: B. Sc. (Mathematics) & B. Eng. (Chemical Engineering), __The University of Sydney__
- 2014 - 2017: Meteorologist / Forecast System Scientist, __Australian Bureau of Meteorology__
- 2018 - Present: Modeller / Lead Modeller, __Solcast__


# Data Pipelines

## Imperative Programming

```
data = []
dates = ['2019-04-01', '2019-04-02']
for date in dates:
    input1 = get_input1_data(date)
    output1 = make_output0(date, input1)
    
    input2 = get_input2_data(date)
    output2 = make_output1(date, input2, output1)
    
    data.append([output1, output2])
    
# debugging + recovering from errors?
# starting from half-done?
# making new outputs?
```

## Flow-based Programming

- specify Tasks with inputs+outputs
- let the framework decide how to execute the code

`luigi --module examples.py MakeData --first 2019-04-09 --last 2019-04-10`

![dependency graph](./example_dependency_graph.png)

## How can this make my data-pipelines better?


- modularise-ability
- reproduce-ability
- extend-ability
- port-ability
- paralellise-ability


## Luigi

https://github.com/spotify/luigi

https://luigi.readthedocs.io/en/latest/

### Installation

`pip install luigi`

`conda install luigi`

### Alternatives

- dask
- celery
- airflow

In [1]:
! python -V

Python 3.7.3


In [2]:
! pip show luigi

Name: luigi
Version: 2.8.3
Summary: Workflow mgmgt + task scheduling + dependency resolution
Home-page: https://github.com/spotify/luigi
Author: The Luigi Authors
Author-email: None
License: Apache License 2.0
Location: /home/harry/miniconda3/envs/luigi_talk/lib/python3.7/site-packages
Requires: tornado, python-dateutil, python-daemon
Required-by: 


In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [3]:
import datetime
import time
import os

import luigi

In [4]:
# make somewhere to dump luigi output
! rm -rf /tmp/luigi_talk_examples/
! mkdir /tmp/luigi_talk_examples/

# Basic Luigi Task

In [5]:
class DayName(luigi.Task):
    date = luigi.DateParameter() # cast to a datetime.date
    
    def output(self):
        return luigi.LocalTarget(
            # ISO-8601 date format on pain of death
            f'/tmp/luigi_talk_examples/DayName_{self.date:%Y-%m-%d}.txt'
        )
            
    def run(self):
            
        # simulate taking some time to process
        time.sleep(5)
        result = self.date.strftime('%A')  # self.date is a datetime.date
        
        # Number 1 Luigi gotcha: don't stream directly to output file!
        tmpfile = f'{self.output().path}.tmp'
        with open(tmpfile, 'w') as fd:
            fd.write(result)
        # make the output atomically
        os.rename(tmpfile, self.output().path)
            
# inspect the Task object
task = DayName(date=datetime.date(2019,4,9))
target = task.output()
print(f'task: {repr(task)}')
print(f'task.complete(): {task.complete()}')
print('')
print(f'target: {repr(target)}')
print(f'target.path: {target.path}')
print(f'target.exists(): {target.exists()}')

task: DayName(date=2019-04-09)
task.complete(): False

target: <luigi.local_target.LocalTarget object at 0x7f5a602a1668>
target.path: /tmp/luigi_talk_examples/DayName_2019-04-09.txt
target.exists(): False


In [6]:
# 1. check if tasks are complete `task.complete()`
# 2. schedule tasks which are not complete
# 3. run tasks `task.run()`

# command line: `luigi --module examples DayName --date 2019-04-09 --local-scheduler`
luigi.build(
    [DayName(date=datetime.date(2019,4,9))],
    local_scheduler=True,
)
print(DayName(date=datetime.date(2019,4,9)).complete())

INFO: Informed scheduler that task   DayName_2019_04_09_2e0ba28ae6   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 74] Worker Worker(salt=376876960, workers=1, host=DESKTOP-4FKKS4Q, username=harry, pid=74) running   DayName(date=2019-04-09)
INFO: [pid 74] Worker Worker(salt=376876960, workers=1, host=DESKTOP-4FKKS4Q, username=harry, pid=74) done      DayName(date=2019-04-09)
INFO: Informed scheduler that task   DayName_2019_04_09_2e0ba28ae6   has status   DONE
INFO: Worker Worker(salt=376876960, workers=1, host=DESKTOP-4FKKS4Q, username=harry, pid=74) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 DayName(date=2019-04-09)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====



True


In [7]:
! ls /tmp/luigi_talk_examples

DayName_2019-04-09.txt


In [8]:
! cat /tmp/luigi_talk_examples/DayName_2019-04-09.txt

Tuesday

In [9]:
# 1. check if tasks are complete `task.complete()`
# 2. schedule tasks which are not complete
# 3. run tasks `task.run()`

# Task output already exists, so it isn't run!

# luigi --module examples DayName --date 2019-04-09 --local-scheduler
luigi.build(
    [DayName(date=datetime.date(2019,4,9))],
    local_scheduler=True,
)

INFO: Informed scheduler that task   DayName_2019_04_09_2e0ba28ae6   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: Worker Worker(salt=706858153, workers=1, host=DESKTOP-4FKKS4Q, username=harry, pid=74) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 DayName(date=2019-04-09)

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====



True

# Importance of atomicity

In [10]:
class DayNameErrorful(luigi.Task):
    date = luigi.DateParameter() # a datetime.date
    
    def output(self):
        return luigi.LocalTarget(
            f'/tmp/luigi_talk_examples/DayNameErrorful_{self.date:%Y-%m-%d}.txt'
        )
            
    def run(self):
        time.sleep(3)
        with open(self.output().path, 'w') as fd:
            fd.write(self.date('%A'))  # will error but makes an empty file


# note: you get a proper stack trace
luigi.build(
    [DayNameErrorful(date=datetime.date(2019,4,9))],
    local_scheduler=True,
)

INFO: Informed scheduler that task   DayNameErrorful_2019_04_09_2e0ba28ae6   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 74] Worker Worker(salt=574673838, workers=1, host=DESKTOP-4FKKS4Q, username=harry, pid=74) running   DayNameErrorful(date=2019-04-09)
ERROR: [pid 74] Worker Worker(salt=574673838, workers=1, host=DESKTOP-4FKKS4Q, username=harry, pid=74) failed    DayNameErrorful(date=2019-04-09)
Traceback (most recent call last):
  File "/home/harry/miniconda3/envs/luigi_talk/lib/python3.7/site-packages/luigi/worker.py", line 199, in run
    new_deps = self._run_get_new_deps()
  File "/home/harry/miniconda3/envs/luigi_talk/lib/python3.7/site-packages/luigi/worker.py", line 139, in _run_get_new_deps
    task_gen = self.task.run()
  File "<ipython-input-10-2becc3ea137d>", line 12, in run
    fd.write(self.date('%A'))  # will error but makes an empty file
TypeError: 'datetime.date' object is not callable
INFO: Informed scheduler that

False

In [11]:
# we've made an empty file by mistake
! ls /tmp/luigi_talk_examples/ -lh

total 0
-rw-rw-rw- 1 harry harry 7 Apr  9 16:38 DayName_2019-04-09.txt
-rw-rw-rw- 1 harry harry 0 Apr  9 16:38 DayNameErrorful_2019-04-09.txt


In [12]:
# Future runs think it has succeeded!
luigi.build(
    [DayNameErrorful(date=datetime.date(2019,4,9))],
    local_scheduler=True,
)

INFO: Informed scheduler that task   DayNameErrorful_2019_04_09_2e0ba28ae6   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: Worker Worker(salt=942812875, workers=1, host=DESKTOP-4FKKS4Q, username=harry, pid=74) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 DayNameErrorful(date=2019-04-09)

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====



True

# Task Dependencies

In [13]:
class DayNameFrench(luigi.Task):
    date = luigi.DateParameter()
    
    def requires(self):
        return DayName(date=self.date)
    
    def output(self):
        return luigi.LocalTarget(f'/tmp/luigi_talk_examples/DayNameFrench_{self.date:%Y-%m-%d}.txt')

    def run(self):
        with open(self.input().path, 'r') as fd:
            english = fd.read()
        french = f"Oh la la c'est le {english}"
        time.sleep(5)
        
        tmpfile = f'{self.output().path}.tmp'
        with open(tmpfile, 'w') as fd:
            fd.write(french)
        os.rename(tmpfile, self.output().path)

In [14]:
! ls /tmp/luigi_talk_examples/

DayName_2019-04-09.txt	DayNameErrorful_2019-04-09.txt


In [15]:
# Uses output of DayName(2019-04-09)
# i.e. we pick up the computation half-way through, without having to write special logic to do so!

# luigi --module examples DayNameFrench --date 2019-04-09 --local-scheduler
luigi.build([DayNameFrench(date=datetime.date(2019,4,9))], local_scheduler=True)

INFO: Informed scheduler that task   DayNameFrench_2019_04_09_2e0ba28ae6   has status   PENDING
INFO: Informed scheduler that task   DayName_2019_04_09_2e0ba28ae6   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 74] Worker Worker(salt=709494939, workers=1, host=DESKTOP-4FKKS4Q, username=harry, pid=74) running   DayNameFrench(date=2019-04-09)
INFO: [pid 74] Worker Worker(salt=709494939, workers=1, host=DESKTOP-4FKKS4Q, username=harry, pid=74) done      DayNameFrench(date=2019-04-09)
INFO: Informed scheduler that task   DayNameFrench_2019_04_09_2e0ba28ae6   has status   DONE
INFO: Worker Worker(salt=709494939, workers=1, host=DESKTOP-4FKKS4Q, username=harry, pid=74) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 complete ones were encountered:
    - 1 DayName(date=2019-04-09)
* 1 ran successfully:
    - 1 DayNameFrench(date=2019-04-09)

This progress looks :) because 

True

In [16]:
! ls /tmp/luigi_talk_examples/

DayName_2019-04-09.txt		DayNameFrench_2019-04-09.txt
DayNameErrorful_2019-04-09.txt


In [17]:
# Runs both DayName(2019-04-08) and DayNameFrench(2019-04-08)

# luigi --module examples DayNameFrench --date 2019-04-08 --local-scheduler
luigi.build([DayNameFrench(date=datetime.date(2019,4,8))], local_scheduler=True)

INFO: Informed scheduler that task   DayNameFrench_2019_04_08_f7ae138f92   has status   PENDING
INFO: Informed scheduler that task   DayName_2019_04_08_f7ae138f92   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 74] Worker Worker(salt=701873738, workers=1, host=DESKTOP-4FKKS4Q, username=harry, pid=74) running   DayName(date=2019-04-08)
INFO: [pid 74] Worker Worker(salt=701873738, workers=1, host=DESKTOP-4FKKS4Q, username=harry, pid=74) done      DayName(date=2019-04-08)
INFO: Informed scheduler that task   DayName_2019_04_08_f7ae138f92   has status   DONE
INFO: [pid 74] Worker Worker(salt=701873738, workers=1, host=DESKTOP-4FKKS4Q, username=harry, pid=74) running   DayNameFrench(date=2019-04-08)
INFO: [pid 74] Worker Worker(salt=701873738, workers=1, host=DESKTOP-4FKKS4Q, username=harry, pid=74) done      DayNameFrench(date=2019-04-08)
INFO: Informed scheduler that task   DayNameFrench_2019_04_08_f7ae138f92   has status   DONE
INFO: Wo

True

In [18]:
! ls /tmp/luigi_talk_examples/

DayName_2019-04-08.txt		DayNameFrench_2019-04-08.txt
DayName_2019-04-09.txt		DayNameFrench_2019-04-09.txt
DayNameErrorful_2019-04-09.txt


# Multiple Dependencies

In [19]:
# Dependencies on multiple tasks

def iter_days(first, last):
    while first <= last:
        yield first
        first += datetime.timedelta(days=1)

class SummaryCsv(luigi.Task):
    first = luigi.DateParameter()
    last = luigi.DateParameter()
    
    def requires(self):
        return {
            date: [DayName(date), DayNameFrench(date)]
            for date in iter_days(self.first, self.last)
        }
    
    def output(self):
        return luigi.LocalTarget(
            f'/tmp/luigi_talk_examples/SummaryCsv_{self.first:%Y-%m-%d}_{self.last:%Y-%m-%d}.csv'
        )
    
    def run(self):
        lines = ['date,english,french']
        
        for date, (english_target, french_target) in self.input().items():
            english = english_target.open().read()
            french = french_target.open().read()
            lines.append(f'{date:%Y-%m-%d},{english},{french}')
        
        tmpfile = f'{self.output().path}.tmp'
        with open(tmpfile, 'w') as fd:
            fd.write('\n'.join(lines))
        os.rename(tmpfile, self.output().path)

![SummaryCsv(first=2019-04-01, last=2019-04-02) Dependency Graph](./SummaryCsv_dependency_graph.png)

In [20]:
# luigi --module examples SummaryCsv --first 2019-04-01 --last 2019-04-02 --local-scheduler
luigi.build(
    [SummaryCsv(first=datetime.date(2019,4,1), last=datetime.date(2019,4,2))],
    local_scheduler=True
)

INFO: Informed scheduler that task   SummaryCsv_2019_04_01_2019_04_02_2fcce54d60   has status   PENDING
INFO: Informed scheduler that task   DayNameFrench_2019_04_02_2ffbc63b3b   has status   PENDING
INFO: Informed scheduler that task   DayName_2019_04_02_2ffbc63b3b   has status   PENDING
INFO: Informed scheduler that task   DayNameFrench_2019_04_01_5b092e82ff   has status   PENDING
INFO: Informed scheduler that task   DayName_2019_04_01_5b092e82ff   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 74] Worker Worker(salt=249099519, workers=1, host=DESKTOP-4FKKS4Q, username=harry, pid=74) running   DayName(date=2019-04-02)
INFO: [pid 74] Worker Worker(salt=249099519, workers=1, host=DESKTOP-4FKKS4Q, username=harry, pid=74) done      DayName(date=2019-04-02)
INFO: Informed scheduler that task   DayName_2019_04_02_2ffbc63b3b   has status   DONE
INFO: [pid 74] Worker Worker(salt=249099519, workers=1, host=DESKTOP-4FKKS4Q, username=harry, pid

True

In [21]:
! cat /tmp/luigi_talk_examples/SummaryCsv_2019-04-01_2019-04-02.csv

date,english,french
2019-04-01,Monday,Oh la la c'est le Monday
2019-04-02,Tuesday,Oh la la c'est le Tuesday

# Parallelisation

In [None]:
# luigi --module examples SummaryCsv --first 2019-04-01 --last 2019-04-09 --workers 4 --local-scheduler
luigi.build(
    [SummaryCsv(first=datetime.date(2019,4,1), last=datetime.date(2019,4,9))],
    local_scheduler=True,
    workers=4
)

# How can this make my data-pipelines better?

- modularise-ability
- reproduce-ability
- extend-ability
- port-ability
- paralellise-ability

Because:
- Makes you be explicit about inputs/outputs
- nice framework for interacting with inputs/outputs

# Calling luigi from command-line

This is the recommended way to run Luigi!

`luigi --module <module_name> <TaskName> --<param-name> <param-value> --local-scheduler`

e.g.

`luigi --module luigi_talk_examples DayName --date 2019-04-09 --local-scheduler`

# Using the central scheduler

Configure: https://luigi.readthedocs.io/en/stable/central_scheduler.html

-  luigid
- localhost:8082

On a machine: `luigi module my_module MyTask --my-param <param-value> --workers X`

1. Machine tells central scheduler which tasks it is prepared to run and how many workers it has.
1. Central scheduler tells machine's workers when to run those tasks.

![Central scheduler scrrenshot](./luigi_central_scheduler.png)

# Some guidance

#### Debugging
- using `breakpoint()` / `import pdb; pdb.set_trace()` in `.run()` method works with `--local-scheduler` and `--workers 1`

#### Targets
- `task.output()` does not need to be local filesystem `luigi.LocalTarget` - just an object with a `target.exists()` method.
- e.g. `.exists()` could make a SQL query

#### Making Tasks
- keep input parameters simple
- unique input parameters -> unique output