# Data Pipelines

In [1]:
%matplotlib inline 
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy import stats
import seaborn as sns
import warnings
import random
from datetime import datetime
random.seed(datetime.now())
warnings.filterwarnings('ignore')

# Make plots larger
plt.rcParams['figure.figsize'] = (10, 6)

A data pipeline is a set of data processing elements connected in series, where the output of one element is the input of the next one. The elements of a pipeline are often executed in parallel or in time-sliced fashion; in that case, some amount of buffer storage is often inserted between elements.

Software pipelines, where commands can be written where the output of one operation is automatically fed to the next, following operation. The Unix system call pipe is a classic example of this concept, although other operating systems do support pipes as well.


## Queue


In computer science, a [queue](https://en.wikipedia.org/wiki/Queue_(abstract_data_type)) is a particular kind of abstract data type or collection in which the entities in the collection are kept in order and the principle (or only) operations on the collection are the addition of entities to the rear terminal position, known as *enqueue,* and removal of entities from the front terminal position, known as *dequeue*. This makes the queue a First-In-First-Out (FIFO) data structure. In a FIFO data structure, the first element added to the queue will be the first one to be removed. This is equivalent to the requirement that once a new element is added, all elements that were
added before have to be removed before the new element can be removed.
Often a *peek* or *front* operation is also entered, returning the
value of the front element without dequeuing it. A queue is an example
of a linear data structure, or more abstractly a sequential
collection.

![Representation of a FIFO (first in, first out) queue](http://nikbearbrown.com/YouTube/MachineLearning/IMG/405px-Data_Queue.svg.png)  

_Representation of a FIFO (first in, first out) queue_
 

## Directed acyclic graph

In mathematics and computer science, a [directed acyclic graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph)
(**DAG**), is a finite directed graph with no directed cycles. That
is, it consists of finitely many vertices and edges, with each edge
directed from one vertex to another, such that there is no way to start
at any vertex and follow a consistently-directed sequence of edges that
eventually loops back to again. Equivalently, a DAG is a directed graph
that has a topological ordering, a sequence of the vertices such that
every edge is directed from earlier to later in the sequence.

![A topological ordering of a directed acyclic graph](http://nikbearbrown.com/YouTube/MachineLearning/IMG/Topological_Ordering.svg.png)

_A topological ordering of a directed acyclic graph_  


![A DAG](http://nikbearbrown.com/YouTube/MachineLearning/IMG/252px-DAG-Tred-G.svg.png)

_A DAG_

## RabbitMQ 

[RabbitMQ](https://www.rabbitmq.com) is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that Mr. Postman will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office and a postman.

The major difference between RabbitMQ and the post office is that it doesn't deal with paper, instead it accepts, stores and forwards binary blobs of data ‒ messages.

RabbitMQ, and messaging in general, uses some jargon.

* Producing - _Producing_ means nothing more than sending. A program that sends messages is a producer.

* Queue - A _queue_ is the name for a post box which lives inside RabbitMQ. Although messages flow through RabbitMQ and your applications, they can only be stored inside a queue. A queue is only bound by the host's memory & disk limits, it's essentially a large message buffer. Many producers can send messages that go to one queue, and many consumers can try to receive data from one queue. This is how we represent a queue:

*Consuming - _Consuming_ has a similar meaning to receiving. A consumer is a program that mostly waits to receive messages.

### RabbitMQ   Tutorials   

* [https://www.rabbitmq.com/tutorials/tutorial-one-python.html]( https://www.rabbitmq.com/tutorials/tutorial-one-python.html)     
* [https://www.rabbitmq.com/getstarted.html](https://www.rabbitmq.com/getstarted.html)  
* [gitHub rabbitmq-tutorials](https://github.com/rabbitmq/rabbitmq-tutorials) 
 

## Celery

**Celery** is an open source asynchronous task queue or job queue
which is based on distributed message passing. While it supports
scheduling, its focus is on operations in real time. Celery is a distributed task queue built in Python and heavily used by the Python community for task-based workloads.



### Overview   

The execution units, called *tasks,* are executed concurrently on one or
more worker nodes using multiprocessing, eventlet or gevent. Tasks
can execute asynchronously (in the background) or synchronously (wait
until ready). Celery is used in production systems, for instance
Instagram, to process millions of tasks every day

### Technology    

Celery is written in Python, but the protocol can be implemented in
any language. It can also operate with other languages using
webhooks.^1 

The recommended message broker is RabbitMQ, or Redis. Additionally,
MongoDB, Beanstalk, Amazon SQS, CouchDB, IronMQ, and databases
(using SQLAlchemy or the Django ORM) are supported in status
*experimental*.




## First Steps with Celery  

Celery is a task queue with batteries included. It’s easy to use so that
you can get started without learning the full complexities of the
problem it solves. It’s designed around best practices so that your
product can scale and integrate with other languages, and it comes with
the tools and support you need to run such a system in production.

In this tutorial you’ll learn the absolute basics of using Celery.

Learn about;

-   Choosing and installing a message transport (broker).   
-   Installing Celery and creating your first task.   
-   Starting the worker and calling tasks.    
-   Keeping track of tasks as they transition through different states,
    and inspecting return values.   

Celery may seem daunting at first - but don’t worry - this tutorial will
get you started in no time. It’s deliberately kept simple, so as to not
confuse you with advanced features. After you have finished this
tutorial, it’s a good idea to browse the rest of the documentation. For
example the [[Next Steps]{.std .std-ref}] tutorial will showcase
Celery’s capabilities.


### Choosing a Broker 

Celery requires a solution to send and receive
messages; usually this comes in the form of a separate service called a
message broker.

There are several choices available, including:

RabbitMQ RabbitMQ is feature-complete, stable, durable and easy to
install. It’s an excellent choice for a production environment. Detailed
information about using RabbitMQ with Celery:

Using RabbitMQ If you’re using Ubuntu or Debian install RabbitMQ by
executing this command:

```bash
   sudo apt-get install rabbitmq-server 
```

When the command completes, the
broker will already be running in the background, ready to move messages
for you: Starting rabbitmq-server: SUCCESS.

Don’t worry if you’re not running Ubuntu or Debian, you can go to this
website to find similarly simple installation instructions for other
platforms, including Microsoft Windows:

[http://www.rabbitmq.com/download.html](http://www.rabbitmq.com/download.html) 

#### Redis 

Redis is also feature-complete, but is more susceptible to data
loss in the event of abrupt termination or power failures. Detailed
information about using Redis:

[Using Redis](http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#broker-redis)   

#### Other brokers   

In addition to the above, there are other experimental
transport implementations to choose from, including [Amazon SQS](http://docs.celeryproject.org/en/latest/getting-started/brokers/sqs.html#broker-sqs).     

See [Broker Overview](http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html#broker-overview) for a full list.


### Installing Celery  

Celery is on the Python Package Index (PyPI), so it
can be installed with standard Python tools like pip or easy\_install:

```bash
   pip install celery 
```

### Application 

The first thing you need is a Celery instance. We call this the Celery application or just app for short. As
this instance is used as the entry-point for everything you want to do
in Celery, like creating tasks and managing workers, it must be possible
for other modules to import it.

In this tutorial we keep everything contained in a single module, but
for larger projects you want to create a dedicated module.

Let’s create the file tasks.py:


```python
from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y
```

The first argument to Celery is the name of the current module. This is
only needed so that names can be automatically generated when the tasks
are defined in the \_\_main\_\_ module.

The second argument is the broker keyword argument, specifying the URL
of the message broker you want to use. Here using RabbitMQ (also the
default option).

See Choosing a Broker above for more choices – for RabbitMQ you can use
amqp://localhost, or for Redis you can use redis://localhost.

You defined a single task, called add, returning the sum of two numbers.


### Running the Celery worker server  

You can now run the worker by executing our program with the
`worker` argument:

```bash

   celery -A tasks worker --loglevel=info

```

In production you’ll want to run the worker in the background as a
daemon. To do this you need to use the tools provided by your platform,
or something like supervisord.

For a complete listing of the command-line options available, do:

```bash

      celery worker --help

```

There are also several other commands available, and help is also
available:

```bash

    celery help
```

### Calling the task  

To call our task you can use the `delay()`method.

This is a handy shortcut to the `apply_async()` method that gives greater control of the task execution:

```python

    >>> from tasks import add
    >>> add.delay(4, 4)

```

The task has now been processed by the worker you started earlier. You
can verify this by looking at the worker’s console output.

Calling a task returns an `AsyncResult` instance. This can be used to check the state of the task, wait for the task to finish, or get its return value (or if the task failed, to get the exception and traceback).

Results are not enabled by default. In order to do remote procedure
calls or keep track of task results in a database, you will need to
configure Celery to use a result backend. This is described in the next
section.

### Keeping Results

If you want to keep track of the tasks’ states, Celery needs to store or
send the states somewhere. There are several built-in result backends to
choose from: [SQLAlchemy](http://www.sqlalchemy.org/)/[Django](https://www.djangoproject.com/) ORM, [Memcached](http://memcached.org/), [Redis](https://redis.io/),
[RPC](http://docs.celeryproject.org/en/latest/userguide/configuration.html#conf-rpc-result-backend) ([RabbitMQ](http://www.rabbitmq.com/)/AMQP), and – or you can define your own.

For this example we use the `rpc` result backend, that sends
states back as transient messages. The backend is specified via the
`backend` argument to `Celery`, (or via the `result_backend` setting if you choose to use a configuration module):

```python

    app = Celery('tasks', backend='rpc://', broker='pyamqp://')

```

Or if you want to use Redis as the result backend, but still use
RabbitMQ as the message broker (a popular combination):

```python

    app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')

```

Now with the result backend configured, let’s call the task again. This
time you’ll hold on to the `AsyncResult` instance returned when you call a task:

```python

    >>> result = add.delay(4, 4)

```

The `ready()` method returns whether the task has finished processing or not:

```python

    >>> result.ready()
    False

```

You can wait for the result to complete, but this is rarely used since
it turns the asynchronous call into a synchronous one:

```python

    >>> result.get(timeout=1)
    8

```

In case the task raised an exception, `get()` will re-raise the exception, but you can override
this by specifying the `propagate` argument:

```python

    >>> result.get(propagate=False)

```

If the task raised an exception, you can also gain access to the
original traceback:

```python

    >>> result.traceback
    ?

```

See `celery.result` for the complete result object reference.

### Configuration  

Celery, like a consumer appliance, doesn’t need much configuration to
operate. It has an input and an output. The input must be connected to a
broker, and the output can be optionally connected to a result backend.
However, if you look closely at the back, there’s a lid revealing loads
of sliders, dials, and buttons: this is the configuration.

The default configuration should be good enough for most use cases, but
there are many options that can be configured to make Celery work
exactly as needed. Reading about the options available is a good idea to
familiarize yourself with what can be configured. You can read about the
options in the Configuration and defaults reference.

The configuration can be set on the app directly or by using a dedicated
configuration module. As an example you can configure the default
serializer used for serializing task payloads by changing the
`task_serializer` setting:

```python

    app.conf.task_serializer = 'json'

```

If you’re configuring many settings at once you can use `update`:

```python

    app.conf.update(
        task_serializer='json',
        accept_content=['json'],  # Ignore other content
        result_serializer='json',
        timezone='Europe/Oslo',
        enable_utc=True

```

For larger projects, a dedicated configuration module is recommended.
Hard coding periodic task intervals and task routing options is
discouraged. It is much better to keep these in a centralized location.
This is especially true for libraries, as it enables users to control
how their tasks behave. A centralized configuration will also allow your
SysAdmin to make simple changes in the event of system trouble.

You can tell your Celery instance to use a configuration module by
calling the `app.config_from_object()` method:

```python

    app.config_from_object('celeryconfig')

```

This module is often called “`celeryconfig`”, but
you can use any module name.

In the above case, a module named `celeryconfig.py`
must be available to load from the current directory or on the Python
path. It could look something like this:

`celeryconfig.py`:

```python

    broker_url = 'pyamqp://'
    result_backend = 'rpc://'

    task_serializer = 'json'
    result_serializer = 'json'
    accept_content = ['json']
    timezone = 'Europe/Oslo'
    enable_utc = True

```

To verify that your configuration file works properly and doesn’t
contain any syntax errors, you can try to import it:

```bash

    $ python -m celeryconfig

```

For a complete reference of configuration options, see [Configuration
and defaults](http://docs.celeryproject.org/en/latest/userguide/configuration.html#configuration).

To demonstrate the power of configuration files, this is how you’d route
a misbehaving task to a dedicated queue:

`celeryconfig.py`:

```python

    task_routes = {
        'tasks.add': 'low-priority',
    }

```
Or instead of routing it you could rate limit the task instead, so that
only 10 tasks of this type can be processed in a minute (10/m):

`celeryconfig.py`:

```python

    task_annotations = {
        'tasks.add': {'rate_limit': '10/m'}
    }

```

If you’re using RabbitMQ or Redis as the broker then you can also direct
the workers to set a new rate limit for the task at runtime:

```python

    $ celery -A tasks control rate_limit tasks.add 10/m
    worker@example.com: OK
        new rate limit set successfully

```

See [Routing Tasks](http://docs.celeryproject.org/en/latest/userguide/routing.html#guide-routing) to read more about task routing,
and the `task_annotations`
setting for more about annotations, or [Monitoring and Management
Guide](http://docs.celeryproject.org/en/latest/userguide/monitoring.html#guide-monitoring) for more about remote control commands and how to
monitor what your workers are doing.


### Where to go from here

If you want to learn more you should continue to the [Next Steps](http://docs.celeryproject.org/en/latest/getting-started/next-steps.html#next-steps) tutorial, and after that you can read the [User Guide](http://docs.celeryproject.org/en/latest/userguide/index.html#guide).  

### Celery Tutorials   


* [First Steps With Celery tutorial](http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html)   
* [Getting Started Scheduling Tasks with Celery](https://www.caktusgroup.com/blog/2014/06/23/scheduling-tasks-celery/)  
* [Introducing Celery for Python+Django](http://opensourceforu.com/2013/12/introducing-celery-pythondjango/)   
* [How To Use Celery with RabbitMQ to Queue Tasks on an Ubuntu VPS](https://www.digitalocean.com/community/tutorials/how-to-use-celery-with-rabbitmq-to-queue-tasks-on-an-ubuntu-vps) 
* [A 4 Minute Intro to Celery ](https://www.youtube.com/watch?v=68QWZU_gCDA) 
* [Asynchronous Tasks in Python - Getting Started With Celery](https://www.youtube.com/watch?v=fg-JfZBetpM) 
* [Introduction to Celery](https://www.youtube.com/watch?v=3cyq5DHjymw) 
* [3 Gotchas for Working with Celery](https://wiredcraft.com/blog/3-gotchas-for-celery/) 
* [Setting up an asynchronous task queue for Django using Celery and Redis](http://michal.karzynski.pl/blog/2014/05/18/setting-up-an-asynchronous-task-queue-for-django-using-celery-redis/) 
* [Python Celery & RabbitMQ Tutorial](https://tests4geeks.com/python-celery-rabbitmq-tutorial/) 


## Dask

Dask is a parallel computing library popular within the PyData community that has grown a fairly sophisticated distributed task scheduler. 

Dask is a flexible parallel computing library for analytic computing.

Dask is composed of two components:

1 - Dynamic task scheduling optimized for computation. This is similar to Airflow, Luigi, Celery, or Make, but optimized for interactive computational workloads.

2 - “Big Data” collections like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments. These parallel collections run on top of the dynamic task schedulers.

Dask emphasizes the following virtues:

* Familiar: Provides parallelized NumPy array and Pandas DataFrame objects.  
* Flexible: Provides a task scheduling interface for more custom workloads and integration with other projects.
* Native: Enables distributed computing   in Pure Python with access to the PyData stack.  
* Fast: Operates with low overhead, low latency, and minimal serialization necessary for fast numerical algorithms.  
* Scales up: Runs resiliently on clusters with 1000s of cores.  
* Scales down: Trivial to set up and run on a laptop in a single process. 
* Responsive: Designed with interactive computing in mind it provides rapid feedback and diagnostics to aid humans.   

### Dask documentation  

The Dask documentation is at [http://dask.pydata.org/en/latest/](http://dask.pydata.org/en/latest/)

### Dask Tutorials   

* [Dask and Celery](http://matthewrocklin.com/blog/work/2016/09/13/dask-and-celery)   
* [GitHub - dask/dask-tutorial: Dask tutorial](https://github.com/dask/dask-tutorial)  
* [dask-tutorial-pydata-seattle-2017](https://github.com/jcrist/dask-tutorial-pydata-seattle-2017) 
* [dask tutorial](http://dask.github.io/dask-tutorial/introduction.html#/) 
* [Distributed Pandas on a Cluster with Dask DataFrames](http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes) 
* [Parallelizing Scientific Python with Dask | SciPy 2017 Tutorial | James Crist](https://www.youtube.com/watch?v=mbfsog3e5DA) 
* [Matthew Rocklin | Using Dask for Parallel Computing in Python](https://www.youtube.com/watch?v=s4ChP7tc3tA) 
 

## Luigi

Luigi is a Python package that helps you build complex pipelines of batch
jobs. It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more.

###  Getting Started

Run ``pip install luigi`` to install the latest stable version from `PyPI
[https://pypi.python.org/pypi/luigi](https://pypi.python.org/pypi/luigi)

###  Background

The purpose of Luigi is to address all the plumbing typically associated
with long-running batch processes. You want to chain many tasks, automate them, and failures *will* happen. These tasks can be anything,
but are typically long running things like [Hadoop](http://hadoop.apache.org/) jobs, dumping data to/from
databases, running machine learning algorithms, or anything else.

There are other software packages that focus on lower level aspects of
data processing, like [Hive](http://hive.apache.org/), [Pig](http://pig.apache.org/), or [Cascading](http://www.cascading.org/). 

Luigi is not a framework to replace these. Instead it helps you stitch many tasks together, where each task can be a [Hive query](https://luigi.readthedocs.io/en/latest/api/luigi.contrib.hive.html), a [Hadoop job in Java] (https://luigi.readthedocs.io/en/latest/api/luigi.contrib.hadoop_jar.html),
a [Spark job in Scala or Python] (https://luigi.readthedocs.io/en/latest/api/luigi.contrib.spark.html),
a Python snippet, [SQL] (https://luigi.readthedocs.io/en/latest/api/luigi.contrib.sqla.html)
from a database, or anything else. It's easy to build up
long-running pipelines that comprise thousands of tasks and take days or
weeks to complete. Luigi takes care of a lot of the workflow management
so that you can focus on the tasks themselves and their dependencies.

You can build pretty much any task you want, but Luigi also comes with a
*toolbox* of several common task templates that you use. It includes
support for running [Python mapreduce jobs] (https://luigi.readthedocs.io/en/latest/api/luigi.contrib.hadoop.html) in Hadoop, as well as [Hive] (https://luigi.readthedocs.io/en/latest/api/luigi.contrib.hive.html),
and [Pig] (https://luigi.readthedocs.io/en/latest/api/luigi.contrib.pig.html),
jobs. It also comes with file system abstractions for [HDFS] (https://luigi.readthedocs.io/en/latest/api/luigi.contrib.hdfs.html),
and local files that ensures all file system operations are atomic. This
is important because it means your data pipeline will not crash in a
state containing partial data.

### Visualiser page   

The Luigi server comes with a web interface too, so you can search and filter among all your tasks.

![Visualiser page]( https://raw.githubusercontent.com/spotify/luigi/master/doc/visualiser_front_page.png)

### Dependency graph example   

Just to give you an idea of what Luigi does, this is a screen shot from
something we are running in production. Using Luigi's visualiser, we get
a nice visual overview of the dependency graph of the workflow. Each
node represents a task which has to be run. Green tasks are already
completed whereas yellow tasks are yet to be run. Most of these tasks
are Hadoop jobs, but there are also some things that run locally and
build up data files.

![Dependency graph]( https://raw.githubusercontent.com/spotify/luigi/master/doc/user_recs.png)


### Philosophy  

Conceptually, Luigi is similar to [GNU
Make](http://www.gnu.org/software/make/) where you have certain tasks
and these tasks in turn may have dependencies on other tasks. There are
also some similarities to [Oozie](http://oozie.apache.org/) and [Azkaban] (http://data.linkedin.com/opensource/azkaban). One major
difference is that Luigi is not just built specifically for Hadoop, and
it's easy to extend it with other kinds of tasks.

Everything in Luigi is in Python. Instead of XML configuration or
similar external data files, the dependency graph is specified *within
Python*. This makes it easy to build up complex dependency graphs of
tasks, where the dependencies can involve date algebra or recursive
references to other versions of the same task. However, the workflow can
trigger things not in Python, such as running [Pig scripts] (https://luigi.readthedocs.io/en/latest/api/luigi.contrib.pig.html) or [scp'ing files] (https://luigi.readthedocs.io/en/latest/api/luigi.contrib.ssh.html).

### Who uses Luigi?

Luigi is used internally at [Spotify](https://www.spotify.com) to run
thousands of tasks every day, organized in complex dependency graphs.
Most of these tasks are Hadoop jobs. Luigi provides an infrastructure
that powers all kinds of stuff including recommendations, toplists, A/B
test analysis, external reports, internal dashboards, etc.

Since Luigi is open source and without any registration walls, the exact number of Luigi users is unknown. But based on the number of unique contributors, we expect hundreds of enterprises to use it. Some users have written blog posts or held presentations about Luigi:

* [Spotify](https://www.spotify.com) [Luigi presentation, 2014] (http://www.slideshare.net/erikbern/luigi-presentation-nyc-data-science)   
* [Foursquare](https://foursquare.com/) (Luigi presentation, 2013) (http://www.slideshare.net/OpenAnayticsMeetup/luigi-presentation-17-23199897)  
* Mortar Data [Datadog](https://www.datadoghq.com/) [Luigi documentation / tutorial](http://help.mortardata.com/technologies/luigi)  
* [Stripe](https://stripe.com/) [Luigi presentation, 2014] (http://www.slideshare.net/PyData/python-as-part-of-a-production-machine-learning-stack-by-michael-manapat-pydata-sv-2014)   
* [Asana](https://asana.com/) [Luigi blog, 2014] (https://eng.asana.com/2014/11/stable-accessible-data-infrastructure-startup/)   
* [Buffer](https://buffer.com/) [Luigi blog, 2014] (https://overflow.bufferapp.com/2014/10/31/buffers-new-data-architecture/)  
* [SeatGeek](https://seatgeek.com/) [Luigi blog, 2015] (http://chairnerd.seatgeek.com/building-out-the-seatgeek-data-pipeline/)  
* [Treasure Data](https://www.treasuredata.com/) [Luigi blog, 2015] (http://blog.treasuredata.com/blog/2015/02/25/managing-the-data-pipeline-with-git-luigi/)  
* [Growth Intelligence](http://growthintel.com/) [Luigi presentation, 2015](http://www.slideshare.net/growthintel/a-beginners-guide-to-building-data-pipelines-with-luigi)  
* [AdRoll](https://www.adroll.com/) [Luigi blog, 2015] (http://tech.adroll.com/blog/data/2015/09/22/data-pipelines-docker.html)  
* [17zuoye Luigi presentation, 2015](https://speakerdeck.com/mvj3/luiti-an-offline-task-management-framework)  
* [Custobar](https://www.custobar.com/) [Luigi presentation, 2016] (http://www.slideshare.net/teemukurppa/managing-data-workflows-with-luigi)  
* [Blendle](https://launch.blendle.com/) [Luigi presentation] (http://www.anneschuth.nl/wp-content/uploads/sea-anneschuth-streamingblendle.pdf#page=126)  
* [TrustYou](http://www.trustyou.com/) [Luigi presentation, 2015] (https://speakerdeck.com/mfcabrera/pydata-berlin-2015-processing-hotel-reviews-with-python)  
* [Groupon](https://www.groupon.com/) and [OrderUp](https://orderup.com) [Luigi alternative implementation](https://github.com/groupon/luigi-warehouse)  
* [Red Hat - Marketing Operations](https://www.redhat.com) [Luigi blog, 2017](https://github.com/rh-marketingops/rh-mo-scc-luigi)  
* [GetNinjas](https://www.getninjas.com.br/) [Luigi blog, 2017] (https://labs.getninjas.com.br/using-luigi-to-create-and-monitor-pipelines-of-batch-jobs-eb8b3cd2a574)   

Some more companies are using Luigi but haven't had a chance yet to write about it:

* [Schibsted](http://www.schibsted.com/)    
* [enbrite.ly](http://enbrite.ly/)    
* [Dow Jones / The Wall Street Journal](http://wsj.com)    
* [Hotels.com](https://hotels.com)    
* [Newsela](https://newsela.com)    
* [Squarespace](https://www.squarespace.com/)    
* [OAO](https://adops.com/)    
* [Grovo](https://grovo.com/)    
* [Weebly](https://www.weebly.com/)    
* [Deloitte](https://www.Deloitte.co.uk/)    


### External links  

* [Mailing List](https://groups.google.com/d/forum/luigi-user/) for discussions and asking questions. _Google Groups_   
* [Releases](https://pypi.python.org/pypi/luigi) _PyPI_   
* [Source code](https://github.com/spotify/luigi) _Github_    
* [Hubot Integration](https://github.com/houzz/hubot-luigi) plugin for Slack, Hipchat, etc] _Github_    

### Luigi  Tutorials   


* [Getting Started — Luigi 2.7.1 documentation](https://luigi.readthedocs.io/en/stable/)   
* [Tutorial: Luigi for Scientific Workflows | Bionics IT](http://bionics.it/posts/luigi-tutorial)  
* [Building Data Pipelines with Python and Luigi – Marco Bonzanini](https://marcobonzanini.com/2015/10/24/building-data-pipelines-with-python-and-luigi/) 
* [
Intro to Building Data Pipelines in Python with Luigi - YouTube](https://www.youtube.com/watch?v=ymF2R_tY1f8) 
* [Building Data Pipelines with Python and Luigi - Erik Bernhardsson](https://vimeo.com/79533035) 
* [Building Task Pipelines Using Luigi - G B](http://gouthamanbalaraman.com/blog/building-luigi-task-pipeline.html) 
* [Managing the Data Pipeline with Git + Luigi](https://blog.treasuredata.com/blog/2015/02/25/managing-the-data-pipeline-with-git-luigi/) 
   

## Spotify Luigi example - Top Artists

Spotify Luigi example - Top Artists from [https://luigi.readthedocs.io/en/stable/example_top_artists.html](https://luigi.readthedocs.io/en/stable/example_top_artists.html)  


**Purpose**

Aggregate all streams, find the top 10 artists and then put the results into Postgres.

### Step 1 - Aggregate Artist Streams  



```python
class AggregateArtists(luigi.Task):
    date_interval = luigi.DateIntervalParameter()

    def output(self):
        return luigi.LocalTarget("data/artist_streams_%s.tsv" % self.date_interval)

    def requires(self):
        return [Streams(date) for date in self.date_interval]

    def run(self):
        artist_count = defaultdict(int)

        for input in self.input():
            with input.open('r') as in_file:
                for line in in_file:
                    timestamp, artist, track = line.strip().split()
                    artist_count[artist] += 1

        with self.output().open('w') as out_file:
            for artist, count in artist_count.iteritems():
                print >> out_file, artist, count
```

Note that this is just a portion of the file examples/top_artists.py. In particular, Streams is defined as a Task, acting as a dependency for AggregateArtists. In addition, luigi.run() is called if the script is executed directly, allowing it to be run from the command line.

There are several pieces of this snippet that deserve more explanation.

* Any Task may be customized by instantiating one or more Parameter objects on the class level.  
* The output() method tells Luigi where the result of running the task will end up. The path can be some function of the parameters.   
* The requires() tasks specifies other tasks that we need to perform this task. In this case it’s an external dump named Streams which takes the date as the argument.  
* For plain Tasks, the run() method implements the task. This could be anything, including calling subprocesses, performing long running number crunching, etc. For some subclasses of Task you don’t have to implement the run method. For instance, for the JobTask subclass you implement a mapper and reducer instead.   
* LocalTarget is a built in class that makes it easy to read/write from/to the local filesystem. It also makes all file operations atomic, which is nice in case your script crashes for any reason.  

#### Running this Locally   

```bash
    cd examples
    luigi --module top_artists AggregateArtists --local-scheduler --date-interval 2012-06
```

### Step 2 – Find the Top Artists     

At this point, we’ve counted the number of streams for each artists, for the full time period. We are left with a large file that contains mappings of artist -> count data, and we want to find the top 10 artists. Since we only have a few hundred thousand artists, and calculating artists is nontrivial to parallelize, we choose to do this not as a Hadoop job, but just as a plain old for-loop in Python.  


```python
class Top10Artists(luigi.Task):
    date_interval = luigi.DateIntervalParameter()
    use_hadoop = luigi.BoolParameter()

    def requires(self):
        if self.use_hadoop:
            return AggregateArtistsHadoop(self.date_interval)
        else:
            return AggregateArtists(self.date_interval)

    def output(self):
        return luigi.LocalTarget("data/top_artists_%s.tsv" % self.date_interval)

    def run(self):
        top_10 = nlargest(10, self._input_iterator())
        with self.output().open('w') as out_file:
            for streams, artist in top_10:
                print >> out_file, self.date_interval.date_a, self.date_interval.date_b, artist, streams

    def _input_iterator(self):
        with self.input().open('r') as in_file:
            for line in in_file:
                artist, streams = line.strip().split()
                yield int(streams), int(artist)
```

The most interesting thing here is that this task (Top10Artists) defines a dependency on the previous task (AggregateArtists). This means that if the output of AggregateArtists does not exist, the task will run before Top10Artists.

```bash
    luigi --module examples.top_artists Top10Artists --local-scheduler --date-interval 2012-07
```
This will run both tasks.

### Step 3 - Insert into Postgres  

This mainly serves as an example of a specific subclass Task that doesn’t require any code to be written. It’s also an example of how you can define task templates that you can reuse for a lot of different tasks.  


```python
class ArtistToplistToDatabase(luigi.contrib.postgres.CopyToTable):
    date_interval = luigi.DateIntervalParameter()
    use_hadoop = luigi.BoolParameter()

    host = "localhost"
    database = "toplists"
    user = "luigi"
    password = "abc123"  # ;)
    table = "top10"

    columns = [("date_from", "DATE"),
               ("date_to", "DATE"),
               ("artist", "TEXT"),
               ("streams", "INT")]

    def requires(self):
        return Top10Artists(self.date_interval, self.use_hadoop)
```

Just like previously, this defines a recursive dependency on the previous task. If you try to build the task, that will also trigger building all its upstream dependencies.

The _–local-scheduler flag_ tells Luigi not to connect to a central scheduler. This is recommended in order to get started and or for development purposes. At the point where you start putting things in production we strongly recommend running the central scheduler server. In addition to providing locking so that the same task is not run by multiple processes at the same time, this server also provides a pretty nice visualization of your current work flow.

If you drop the –local-scheduler flag, your script will try to connect to the central planner, by default at localhost port 8082. If you run


```bash
    luigid
```

in the background and then run your task without the --local-scheduler flag, then your script will now schedule through a centralized server. You need [Tornado](http://www.tornadoweb.org/en/stable/) for this to work.

Launching http://localhost:8082 should show something like this:


![luigi web server](http://nikbearbrown.com/YouTube/MachineLearning/IMG/luigi_web_server.png)

Web server screenshot Looking at the dependency graph for any of the tasks yields something like this:  

![luigi aggregate artists](http://nikbearbrown.com/YouTube/MachineLearning/IMG/luigi_aggregate_artists.png)

Aggregate artists screenshot

In production, you’ll want to run the centralized scheduler. See: [Using the Central Scheduler](https://luigi.readthedocs.io/en/stable/central_scheduler.html) for more information.


## Apache Airflow 


Use airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.

When workflows are defined as code, they become more maintainable, versionable, testable, and collaborative.

![Apache Airflow](http://nikbearbrown.com/YouTube/MachineLearning/IMG/airflow.gif)

_Apache Airflow_  

### Principles

Dynamic: Airflow pipelines are configuration as code (Python), allowing for dynamic pipeline generation. This allows for writing code that instantiates pipelines dynamically.
Extensible: Easily define your own operators, executors and extend the library so that it fits the level of abstraction that suits your environment.
Elegant: Airflow pipelines are lean and explicit. Parameterizing your scripts is built into the core of Airflow using the powerful Jinja templating engine.
Scalable: Airflow has a modular architecture and uses a message queue to orchestrate an arbitrary number of workers. Airflow is ready to scale to infinity.

### Airflow Operators  

Airflow uses Operators as the fundamental unit of abstraction to define tasks, and uses a DAG (Directed Acyclic Graph) to define workflows using a set of operators. Operators are extensible which makes customizing workflows easy. Operators are divided into 3 types:   

* Action operators that perform some action such as executing a Python function or submitting a Spark Job.
* Transfer operators that move data between systems such as from Hive to Mysql or from S3 to Hive.  
* Sensors which trigger downstream tasks in the dependency graph when a certain criteria is met, for example checking for a certain file becoming available on S3 before using it downstream. Sensors are a powerful feature of Airflow allowing us to create complex workflows and easily manage their preconditions.   


### Beyond the Horizon

Airflow is not a data streaming solution. Tasks do not move data from one to the other (though tasks can exchange metadata!). Airflow is not in the Spark Streaming or Storm space, it is more comparable to Oozie or Azkaban.

Workflows are expected to be mostly static or slowly changing. You can think of the structure of the tasks in your workflow as slightly more dynamic than a database structure would be. Airflow workflows are expected to look similar from a run to the next, this allows for clarity around unit of work and continuity.

### Who uses Airflow?

* [Airbnb](https://www.airbnb.com/)  
* [Agari](https://www.agari.com/)    
* [Robinhood Engineering](https://robinhood.engineering/)  
* [Blue Yonder](https://www.blue-yonder.com/en)  
* [Glassdoor](https://www.glassdoor.com)  
* [Lyft](https://www.lyft.com/)  


## Apache Airflow Quick Start    

The is form the [Apache Airflow Quick Start](https://airflow.incubator.apache.org/start.html).  

The installation is quick and straightforward.   

```bash
# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow

# install from pypi using pip
pip install airflow

# initialize the database
airflow initdb

# start the web server, default port is 8080
airflow webserver -p 8080
```

Upon running these commands, Airflow will create the \$AIRFLOW_HOME folder and lay an “airflow.cfg” file with defaults that get you going fast. You can inspect the file either in \$AIRFLOW_HOME/airflow.cfg, or through the UI in the Admin->Configuration menu. The PID file for the webserver will be stored in \$AIRFLOW_HOME/airflow-webserver.pid or in /run/airflow/webserver.pid if started by systemd.    

Out of the box, Airflow uses a sqlite database, which you should outgrow fairly quickly since no parallelization is possible using this database backend. It works in conjunction with the SequentialExecutor which will only run task instances sequentially. While this is very limiting, it allows you to get up and running quickly and take a tour of the UI and the command line utilities.   

Here are a few commands that will trigger a few task instances. You should be able to see the status of the jobs change in the example1 DAG as you run the commands below.  

```bash
# run your first task instance
airflow run example_bash_operator runme_0 2015-01-01
# run a backfill over 2 days
airflow backfill example_bash_operator -s 2015-01-01 -e 2015-01-02
```

### Example Pipeline definition  

Here is an example of a basic pipeline definition. Do not worry if this looks complicated, a line by line explanation follows below.

```python  
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)
```

###  It’s a DAG definition file  

One thing to wrap your head around (it may not be very intuitive for everyone at first) is that this Airflow Python script is really just a configuration file specifying the DAG’s structure as code. The actual tasks defined here will run in a different context from the context of this script. Different tasks run on different workers at different points in time, which means that this script cannot be used to cross communicate between tasks. Note that for this purpose we have a more advanced feature called XCom.

People sometimes think of the DAG definition file as a place where they can do some actual data processing - that is not the case at all! The script’s purpose is to define a DAG object. It needs to evaluate quickly (seconds, not minutes) since the scheduler will execute it periodically to reflect the changes if any.

### Importing Modules  

An Airflow pipeline is just a Python script that happens to define an Airflow DAG object. Let’s start by importing the libraries we will need.

```python  
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
```


### Default Arguments  

We’re about to create a DAG and some tasks, and we have the choice to explicitly pass a set of arguments to each task’s constructor (which would become redundant), or (better!) we can define a dictionary of default parameters that we can use when creating tasks.  

```python  
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
```

For more information about the BaseOperator’s parameters and what they do, refer to the :py:class:airflow.models.BaseOperator documentation.  

Also, note that you could easily define different sets of arguments that would serve different purposes. An example of that would be to have different settings between a production and development environment.  

### Instantiate a DAG  

We’ll need a DAG object to nest our tasks into. Here we pass a string that defines the dag_id, which serves as a unique identifier for your DAG. We also pass the default argument dictionary that we just defined and define a schedule_interval of 1 day for the DAG.

```python  
dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(1))
```

### Tasks   

Tasks are generated when instantiating operator objects. An object instantiated from an operator is called a constructor. The first argument task_id acts as a unique identifier for the task.

```python  
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)
```

Notice how we pass a mix of operator specific arguments (bash_command) and an argument common to all operators (retries) inherited from BaseOperator to the operator’s constructor. This is simpler than passing every argument for every constructor call. Also, notice that in the second task we override the retries parameter with 3.

The precedence rules for a task are as follows:

1. Explicitly passed arguments    
2. Values that exist in the default_args dictionary   
3. The operator’s default value, if one exists   


A task must include or inherit the arguments task_id and owner, otherwise Airflow will raise an exception.

### Templating with Jinja   

Airflow leverages the power of Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. Airflow also provides hooks for the pipeline author to define their own parameters, macros and templates.

This tutorial barely scratches the surface of what you can do with templating in Airflow, but the goal of this section is to let you know this feature exists, get you familiar with double curly brackets, and point to the most common template variable: {{ ds }}.

```python  
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7) }}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)
```

Notice that the templated_command contains code logic in {% %} blocks, references parameters like {{ ds }}, calls a function as in {{ macros.ds_add(ds, 7)}}, and references a user-defined parameter in {{ params.my_param }}.   

The params hook in BaseOperator allows you to pass a dictionary of parameters and/or objects to your templates. Please take the time to understand how the parameter my_param makes it through to the template.  

Files can also be passed to the bash_command argument, like bash_command='templated_command.sh', where the file location is relative to the directory containing the pipeline file (tutorial.py in this case). This may be desirable for many reasons, like separating your script’s logic and pipeline code, allowing for proper code highlighting in files composed in different languages, and general flexibility in structuring pipelines. It is also possible to define your template_searchpath as pointing to any folder locations in the DAG constructor call.   

For more information on the variables and macros that can be referenced in templates, make sure to read through the Macros section.   

### Setting up Dependencies  

We have two simple tasks that do not depend on each other. Here’s a few ways you can define dependencies between them:

```python  
t2.set_upstream(t1)

# This means that t2 will depend on t1
# running successfully to run
# It is equivalent to
# t1.set_downstream(t2)

t3.set_upstream(t1)

# all of this is equivalent to
# dag.set_dependency('print_date', 'sleep')
# dag.set_dependency('print_date', 'templated')
```

Note that when executing your script, Airflow will raise exceptions when it finds cycles in your DAG or when a dependency is referenced more than once.   

### Recap  

Alright, so we have a pretty basic DAG. At this point your code should look something like this:

```python  
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)
```

### Testing    

**Running the Script**

Time to run some tests. First let’s make sure that the pipeline parses. Let’s assume we’re saving the code from the previous step in tutorial.py in the DAGs folder referenced in your airflow.cfg. The default location for your DAGs is ~/airflow/dags.  

```bash  
    python ~/airflow/dags/tutorial.py
```

If the script does not raise an exception it means that you haven’t done anything horribly wrong, and that your Airflow environment is somewhat sound.  

### Command Line Metadata Validation   

Let’s run a few commands to validate this script further.

```bash  
# print the list of active DAGs
airflow list_dags

# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial

# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree
```

### Testing  

Let’s test by running the actual task instances on a specific date. The date specified in this context is an execution_date, which simulates the scheduler running your task or dag at a specific date + time:

```bash  
# command layout: command subcommand dag_id task_id date

# testing print_date
airflow test tutorial print_date 2015-06-01

# testing sleep
airflow test tutorial sleep 2015-06-01
```

Now remember what we did with templating earlier? See how this template gets rendered and executed by running this command:   

```bash  
# testing templated
airflow test tutorial templated 2015-06-01   
```
This should result in displaying a verbose log of events and ultimately running your bash command and printing the result.   

Note that the airflow test command runs task instances locally, outputs their log to stdout (on screen), doesn’t bother with dependencies, and doesn’t communicate state (running, success, failed, ...) to the database. It simply allows testing a single task instance.   


### Backfill   

Everything looks like it’s running fine so let’s run a backfill. backfill will respect your dependencies, emit logs into files and talk to the database to record status. If you do have a webserver up, you’ll be able to track the progress. airflow webserver will start a web server if you are interested in tracking the progress visually as your backfill progresses.

Note that if you use depends_on_past=True, individual task instances will depend on the success of the preceding task instance, except for the start_date specified itself, for which this dependency is disregarded.

The date range in this context is a start_date and optionally an end_date, which are used to populate the run schedule with task instances from this dag.  

```bash  
# optional, start a web server in debug mode in the background
# airflow webserver --debug &

# start your backfill on a date range
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07
```

### What’s Next?  

That’s it, you’ve written, tested and backfilled your very first Airflow pipeline. Merging your code into a code repository that has a master scheduler running against it should get it to get triggered and run every day.

Here’s a few things you might want to do next:

Take an in-depth tour of the UI - click all the things!
Keep reading the docs! Especially the sections on:

* Command line interface   
* Operators   
* Macros   

Write your first pipeline!


### Airflow Tutorials   

* [Tutorial — Airflow Documentation](https://airflow.incubator.apache.org/tutorial.html)   
* [hgrif/airflow-tutorial: Basic Airflow tutorial](https://github.com/hgrif/airflow-tutorial)  
* [Get started developing workflows with Apache Airflow - Michał Karzyński](http://michal.karzynski.pl/blog/2017/03/19/developing-workflows-with-apache-airflow/)
* [Technology: Airflow - Beginners Tutorial](http://tech.lalitbhatt.net/2016/04/airflow-beginners-tutorial.html) 
* [Building a Data Pipeline with Airflow - Mark Litwintschik](http://tech.marksblogg.com/airflow-postgres-redis-forex.html) 
* [ETL example — ETL Best Practices with Airflow v1.8 - GitHub Pages](https://gtoonstra.github.io/etl-with-airflow/etlexample.html) 
* [Apache Airflow introduction and best practices - YouTube](https://www.youtube.com/watch?v=Pr0FrvIIfTU) 
* [Matt Davis | A Pratctical Introduction to Airflow - YouTube](https://www.youtube.com/watch?v=cHATHSB_450) 
* [I'm sorry Cron, I've met AirBnB's Airflow – dani del valle](https://danidelvalle.me/2016/09/12/im-sorry-cron-ive-met-airbnbs-airflow/) 
* [Scheduling Spark jobs with Airflow – Insight Data](https://blog.insightdatascience.com/scheduling-spark-jobs-with-airflow-4c66f3144660)


## Chron jobs

Chron jobs from [https://help.ubuntu.com/community/CronHowto](https://help.ubuntu.com/community/CronHowto)   

Cron is a system daemon used to execute desired tasks (in the
background) at designated times.

A crontab file is a simple text file containing a list of commands meant
to be run at specified times. It is edited using the crontab command.
The commands in the crontab file (and their run times) are checked by
the cron daemon, which executes them in the system background.

Each user (including root) has a crontab file. The cron daemon checks a
user’s crontab file regardless of whether the user is actually logged
into the system or not.

To display the on-line help for crontab enter: 

```bash
     man crontab
```

On Gnome-based Ubuntu systems Gnome *Scheduled tasks* tool (from the
*gnome-schedule* package) in *Applications* –&gt; *System Tools*
provides a graphical interface with prompting for using Cron. The
project website is at <http://gnome-schedule.sourceforge.net/>; the
software is installable from the Software Center or by typing:

```bash
    sudo apt-get install gnome-schedule
```

### Starting to Use Cron  

To use cron for tasks meant to run only for your user profile, add
entries to your own user’s crontab file. To edit the crontab file enter:

```bash
    crontab -e 
```

Edit the crontab using the format described in the next sections. Save
your changes. (Exiting without saving will leave your crontab
unchanged.) To display the on-line help describing the format of the
crontab file enter:  

```bash
    man 5 crontab
```


Commands that normally run with administrative privileges (i.e. they are
generally run using sudo) should be added to the root crontab. To edit
the root crontab enter: 

```bash
     sudo crontab -e
```


###  Crontab Lines   

Each line has five time-and-date fields, followed by a command, followed
by a newline character (’\\n’). []{#line-45 .anchor}The fields are
separated by spaces. The five time-and-date fields cannot contain
spaces. The five time-and-date fields are as follows: minute (0-59),
hour (0-23, 0 = midnight), day (1-31), month (1-12), weekday (0-6, 0 =
Sunday).  

```bash
    01 04 1 1 1 /usr/bin/somedirectory/somecommand
```

The above example will run /usr/bin/somedirectory/somecommand at 4:01am
on January 1st plus every Monday in January.   

An asterisk (\*) can be used so that every instance (every hour, every
weekday, every month, etc.) of a time period is used.   

```bash
    01 04 * * * /usr/bin/somedirectory/somecommand
```

The above example will run /usr/bin/somedirectory/somecommand at 4:01am
on every day of every month. 

Comma-separated values can be used to run more than one instance of a
particular command within a time period. Dash-separated values can be
used to run a command continuously. 

```bash
    01,31 04,05 1-15 1,6 * /usr/bin/somedirectory/somecommand
```

The above example will run /usr/bin/somedirectory/somecommand at 01 and
31 past the hours of 4:00am and 5:00am on the 1st through the 15th of
every January and June.   

The “/usr/bin/somedirectory/somecommand” text in the above examples
indicates the task which will be run at the specified times. It is
recommended that you use the full path to the desired commands as shown
in the above examples. Enter *which somecommand* in the terminal to find
the full path to *somecommand*. The crontab will begin running as soon
as it is properly edited and saved.   

You may want to run a script some number of times per time unit. For
example if you want to run it every 10 minutes use the following crontab
entry (runs on minutes divisible by 10: 0, 10, 20, 30, etc.)  

```bash
    */10 * * * * /usr/bin/somedirectory/somecommand
```

which is also equivalent to the more cumbersome  

```bash
    0,10,20,30,40,50 * * * * /usr/bin/somedirectory/somecommand
```

Cron also offers some special strings, which can be used in place of the
five time-and-date fields:

| **string**    | **meaning**                        |
|---------------|------------------------------------|
|  @reboot      | Run once, at startup.              |
|  @yearly      | Run once a year, “0 0 1 1 \*”.     |
|  @annually    | (same as @yearly)                  |
|  @monthly     | Run once a month, “0 0 1 \* \*”.   |
|  @weekly      | Run once a week, “0 0 \* \* 0”.    |
|  @daily       | Run once a day, “0 0 \* \* \*”.    |
|  @midnight    | (same as @daily)                   |
|  @hourly      | Run once an hour, “0 \* \* \* \*”. |



```bash
    @reboot /path/to/execuable1
```

The above example will execute /path/to/executable1 when the system
starts.  

For more information on special strings enter “man 5 crontab”.

### Crontab Options  

The -l option causes the current crontab to be displayed on standard
output. 

* The -r option causes the current crontab to be removed.  
* The -e option is used to edit the current crontab using the editor
specified by the EDITOR environment variable.    

After you exit from the editor, the modified crontab is checked for
errors and, if there are no errors, it is installed automatically. The
file is stored in */var/spool/cron/crontabs* but should only be edited
using the crontab command.   

### Allowing/Denying User-Level Cron  

If the **/etc/cron.allow** file exists, then users must be listed in it
in order to be allowed to run the **crontab** command. If the
**/etc/cron.allow** file does not exist but the **/etc/cron.deny** file
does, then users must not be listed in the **/etc/cron.deny** file in
order to run **crontab**. 

In the case where neither file exists, the default on current Ubuntu
(and Debian, but not some other Linux and UNIX systems) is to allow all
users to run jobs with **crontab**.   

No cron.allow or cron.deny files exist in a standard Ubuntu install, so
all users should have cron available by default, until one of those
files is created. If a blank cron.deny file has been created, that will
change to the standard behavior users of other operating systems might
expect: cron only available to root or users in cron.allow.  

Note, userids on your system which do not appear in /etc/shadow will NOT
have operational crontabs, if you desire to enter a user in /etc/passwd,
but NOT /etc/shadow that user’s crontab will never run. Place an entry
in /etc/shadow for the user with a \* for the password crypt, eg:


```bash
    joeuser:*:15169::::::
```

###  Further Considerations   

Crontab commands are generally stored in the crontab file belonging to
your user account (and executed with your user’s level of permissions).
If you want to regularly run a command requiring administrative
permissions, edit the root crontab file:

```bash
    sudo crontab -e
```

Depending on the commands being run, you may need to expand the root
users PATH variable by putting the following line at the top of the root
crontab file:  

```bash
    PATH=/usr/sbin:/usr/bin:/sbin:/bin
```

**crontab -e** uses the EDITOR environment variable. To change the
editor to your own choice, just set that variable. You may want to set
EDITOR in your .bashrc because many commands use this variable. For
example, in order to set the editor to be nano (a very easy editor to
use) add this line to .bashrc:   


```bash
    export EDITOR=nano
```

It is sensible to test that your cron jobs work as intended. One method
for doing this is to set up the job to run a couple of minutes in the
future and then check the results before finalising the timing. You may
also find it useful to put the commands into script files that log their
success or failure, for example:   

```bash
    echo "Nightly Backup Successful: $(date)" >> /tmp/mybackup.log
```

If your machine is regularly switched off, you may also be interested in
**at** and **anacron**, which provide other approaches to scheduled
tasks. For example, **anacron** offers simple system-wide directories
for running commands hourly, daily, weekly, and monthly. Scripts to be
executed in said times can be placed in **/etc/cron.hourly/**,
**/etc/cron.daily/**, **/etc/cron.weekly/**, and **/etc/cron.monthly/**.
All scripts in each directory are run as root, and a specific order to
running the scripts can be specified by prefixing the scripts’ filenames
with numbers (see the **man** page for **run‑parts** for more details).
Although the directories contain periods in their names, run‑parts
**will not** accept a file name containing a period and will fail
silently when encountering them Either rename the file or use a symlink (without a period) to it instead (see, for example, *[python + cron without
login?]* and *[Problems with Hourly Cron Job]*).   


### Common Problems   

Edits to a user’s crontab and the cron jobs run are all logged by
default to **/var/log/syslog** and that’s the first place to check if
things are not running as you expect.   

If a user was not allowed to execute jobs when their crontab was last
edited, just adding them to the allow list won’t do anything. The user
needs to re-edit their crontab after being added to cron.allow before
their jobs will run.   

Note that user-specific crontabs (including the root crontab) do not
specify the user name after the date/time fields. If you accidentally
include the user name in a user-specific crontab, the system will try to
run the user name as a command.   

Cron jobs may not run with the environment, in particular the PATH, that
you expect. Try using full paths to files and programs if they’re not
being located as you expect.    

The “%” character is used as newline delimiter in cron commands. If you
need to pass that character into a script, you need to escape it as
“\\%”.   

If you’re having trouble running a GUI application using cron, see the
GUI Applications section below.   


### Two Other Types of Crontab  

The crontab files discussed above are **user** crontabs. Each of the
above crontabs is associated with a user, even the root crontab, which
is associated with the root user. There are two other types of crontab,
with syntax as follows:  


```bash
  minute(s) hour(s) day(s)_of_month month(s) day(s)_of_week user command
```

Note that the only difference from the syntax of the user crontabs is
that the line specifies the user to run the job as.  

The first type is as follows. As mentioned above **anacron** uses the
**run‑parts** command and **/etc/cron.hourly**, **/etc/cron.weekly**,
and **/etc/cron.monthly** directories. However **anacron** itself is
invoked from the **/etc/crontab** file. This file could be used for
other cron commands, but probably shouldn’t be. Here’s an example line
from a fictitious **/etc/crontab**: 


```bash
    00 01 * * * rusty /home/rusty/rusty-list-files.sh
```

This would run Rusty’s command script as user **rusty** from his home
directory. However, it is not usual to add commands to this file. While
an experienced user should know about it, it is not recommended that you
add anything to **/etc/crontab**. Apart from anything else, this could
cause a problem if the **/etc/crontab** file is affected by updates!
Rusty could lose his command. 

The second type is to be found in the directory /etc/cron.d. This
directory can contain crontab files. The directory is often used by
packages, and the crontab files allow a user to be associated with the
commands in them.   

Example: Instead of adding a line to /etc/crontab, which Rusty knows is
not a good idea, he might well add a file to the directory /etc/cron.d
with the name **rusty**, containing his cron line above. This would not
be affected by updates but is a **well known** location.  

When would you use these alternate crontab locations? Well, on a single
user machine or a shared machine such as a school or college server, a
**user** crontab would be the way to go. But in a large IT department, where several people might look after
a server, then the directory /etc/cron.d is probably the best place to
install crontabs - it’s a central point and saves searching for them!

You may not need to look at **/etc/crontab** or **/etc/cron.d**, let
alone edit them by hand. But an experienced user should perhaps know
about them and that the packages that he/she installs may use these
locations for their crontabs.  

### GUI Applications 

It is possible to run gui applications via cronjobs. This can be done by
telling cron which display to use.  

```bash
    00 06 * * * env DISPLAY=:0 gui_appname
```

The *env DISPLAY=:0* portion will tell cron to use the current display
(desktop) for the program “gui\_appname”. 

And if you have multiple monitors, don’t forget to specify on which one
the program is to be run. For example, to run it on the first screen
(default screen) use :  

```bash
    00 06 * * * env DISPLAY=:0.0 gui_appname
```
The *env DISPLAY=:0.0* portion will tell cron to use the first screen of
the current display for the program “gui\_appname”.   


**Note:** GUI users may prefer to use gnome-schedule (aka “Scheduled
tasks”) to configure GUI cron jobs. In gnome-schedule, when editing a
GUI task, you have to select “X application” in a dropdown next to the
command field.   

**Note:** In Karmic(9.10), you have to enable X ACL for localhost to
connect to for GUI applications to work.

     ~$ xhost +local:
    non-network local connections being added to access control list
     ~$ xhost
    access control enabled, only authorized clients can connect
    LOCAL:
    ...


### Crontab Example  

Below is an example of how to setup a crontab to run updatedb, which
updates the slocate database: Open a terminal, type “crontab -e”
(without the double quotes) and press enter. Type the following line,
substituting the full path of the application you wish to run for the
one shown below, into the editor: 

```bash
    45 04 * * * /usr/bin/updatedb
```

Save your changes and exit the editor. 

Crontab will let you know if you made any mistakes. The crontab will be
installed and begin running if there are no errors. That’s it. You now
have a cronjob setup to run updatedb, which updates the slocate
database, every morning at 4:45. 

Note: The double-ampersand (&&) can also be used in the “command”
section to run multiple commands consecutively, but only if the previous
command exits successfully. A string of commands joined by the
double-ampersand will only get to the last command if all the previous
commands are run successfully. If exit error-checking is not required,
string commands together, separated with a semi-colon (;). 

```bash
    45 04 * * * /usr/sbin/chkrootkit && /usr/bin/updatedb
```

The above example will run chkrootkit followed by updatedb at 4:45am
daily - providing you have all listed apps installed. If chkrootkit
fails, updatedb will NOT be run. 

### Chron Job Tutorials   

* [Newbie: Intro to cron - unixgeeks.org](http://www.unixgeeks.org/security/newbie/unix/cron-1.html)   
* [How to Run a Cron Job Every Day on a Linux System](https://www.tutorialspoint.com/articles/how-to-run-a-cron-job-every-day-on-a-linux-system)  
* [Cron Jobs and how to use them - an introduction - HowtoForge](https://www.howtoforge.com/a-short-introduction-to-cron-jobs)
* [Cron Jobs Tutorial - SiteGround](https://www.siteground.com/tutorials/cpanel/cron-jobs/) 
* [Crontab Basic Tutorial - BASH - Linux - YouTube](https://www.youtube.com/watch?v=7MFMnsnfBJs) 


Last update October 3, 2017 

The text is released under the [CC-BY-NC-ND license](https://creativecommons.org/licenses/by-nc-nd/3.0/us/legalcode), and code is released under the [MIT license](https://opensource.org/licenses/MIT).