
# `luigi` in the notebook


## Developing `luigi` jobs in a notebook

* Debuggin luigi in the notebook
* Using targets effectively.
* Pandas and sci kit learn

In [8]:
from __future__ import print_function
from luigi import *
import pandas as pd
import sklearn.datasets
from whatever.boilerplate import sklearn as bp
from whatever.chain import _x

In [2]:
iris = bp.load('iris')

In [3]:
df = pd.DataFrame(*iris[:2])

In [4]:
df.sample(2)

Unnamed: 0,0,1,2,3
0,5.1,3.3,1.7,0.5
2,6.0,3.0,4.8,1.8


> Luigi generalizes the read and write problem

The simplest utility that luigi provides are targets

In [21]:
target = LocalTarget('iris.csv')
with target.open('w') as f: 
    df.to_csv(f)

> Targets are used by task to schedule and execute function.

In [22]:
class MyTask(Task): pass

Common pattern: Converting a target to a task.

In [34]:
class ExternalizedTarget(ExternalTask):
    def output(self): 
        return target

def requires(self):
    return ExternalizedTarget()
MyTask.requires = requires

> task

In [24]:
MyTask().requires()

ExternalizedTarget()

> target

In [35]:
MyTask().input()

<luigi.file.LocalTarget at 0x11ce6a630>

> Another target

In [36]:
def output(self): 
    return LocalTarget('bar.csv')
MyTask.output = output

> Where the compute happens.

In [37]:
def run(self):
    with self.input().open('r') as f: 
        df = pd.read_csv(f)

    value = (
        df.describe(include='all')
        # Print something to stdout
        .pipe(do(compose(print, "{} rows".format, len)))
    )

    with self.output().open('w') as f:
        value.to_csv(f)
MyTask.run = run

> duh!

In [39]:
MyTask().run()

8 rows


## `build` luigi's scheduler

In [40]:
build([MyTask()], local_scheduler=True)

DEBUG: Checking if MyTask() is complete
INFO: Informed scheduler that task   MyTask__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=234612706, workers=1, host=Admins-MacBook-Pro.local, username=tonyfast, pid=66619) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 present dependencies were encountered:
    - 1 MyTask()

Did not run any tasks
This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====



True

In [33]:
!rm bar.csv
build([MyTask()], local_scheduler=True)

DEBUG: Checking if MyTask() is complete
DEBUG: Checking if ExternalizedTarget() is complete
INFO: Informed scheduler that task   MyTask__99914b932b   has status   PENDING
INFO: Informed scheduler that task   ExternalizedTarget__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 66619] Worker Worker(salt=890415156, workers=1, host=Admins-MacBook-Pro.local, username=tonyfast, pid=66619) running   MyTask()
INFO: [pid 66619] Worker Worker(salt=890415156, workers=1, host=Admins-MacBook-Pro.local, username=tonyfast, pid=66619) done      MyTask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   MyTask__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=890415156, workers=1, host=Admins-MacBook-Pro.local, username=tonyfast, pid=66619

8 rows


True

In [None]:
!rm bar.csv
!luigi --local-scheduler 

In [31]:
with MyTask().output().open('r') as f:
    df2 = pd.read_csv(f)

In [32]:
df2

Unnamed: 0.2,Unnamed: 0,Unnamed: 0.1,0,1,2,3
0,count,150.0,150.0,150.0,150.0,150.0
1,mean,1.0,5.843333,3.054,3.758667,1.198667
2,std,0.819232,0.828066,0.433594,1.76442,0.763161
3,min,0.0,4.3,2.0,1.0,0.1
4,25%,0.0,5.1,2.8,1.6,0.3
5,50%,1.0,5.8,3.0,4.35,1.3
6,75%,2.0,6.4,3.3,5.1,1.8
7,max,2.0,7.9,4.4,6.9,2.5
