# Introduction to ETL with Luigi
In this module we will cover the basic principles of how to craft a data pipeline.

This consists of three distinct stages:
- Extract (fetch data from a remote source)
- Transform (clean, aggregate, and modify the data)
- Load (insert the transformed data into a database)

## What is Luigi?
Luigi is a python framework for creating data pipelines, and provides two concepts we will use to represent what we need to accomplish.
- Task: unit of work being done
- Target: the things we're doing work on

In this case, we will make a task for each of the E T and L, and pass targets inbetween each task.  The targets will represent files at various stages of the pipeline.

## What goes into a Task?
Each task requires three things:
- The work being done (run method)
- Whether the task has completed or not (all the output targets exist)
- Any other tasks that need to run to completion before starting the task at hand (requires method)

## Anything else?
If a task needs to be configured, we can add Parameters to add changeable details.

Also, we'll be using a data manipulation library called `pandas` to transform the data.  If you're used to using Excel, you can think of it as the python code replacement (and much more!).  

Pandas offers us two concepts to work with:
- DataFrame: a multi dimentional data representation (spreadsheet with many columns)
- Series: a one dimentional data representation (single column)

## So what does a Task look like?

In [2]:
# Example adapted from http://help.mortardata.com/technologies/luigi/how_luigi_works
import luigi

class TransformTask(luigi.Task):

    # Example parameter for our task: a 
    # name to save out output file
    output_filename = luigi.Parameter(default='output.csv')

    def requires(self):
        """
        Which other Tasks need to be complete before
        this Task can start? Luigi will use this to 
        compute the task dependency graph.
        """
        return ExtractTask()

    def output(self):
        """
        When this Task is complete, where will it produce output?
        Luigi will check whether this output (specified as a Target) 
        exists to determine whether the Task needs to run at all.
        """
        return luigi.LocalTarget(self.output_filename)

    def run(self):
        """
        How do I run this Task?
        Luigi will call this method if the Task needs to be run.
        """
        # We can do anything we want in here, from calling python
        # methods to running shell scripts to calling APIs

## How do we use the targets when we run the task?
An example of the run method in a transform task uses both input and output targets

In [None]:
def run(self):
    # grab target from input
    input_target = self.input() # a.k.a self.requires().output()
    # grab the output target as well
    output_target = self.output()
    # open both target files
    with input_target.open('r') as it, output_target.open('w') as ot:
        # read file and convert to dataframe
        dataframe = pandas.read_csv(it)
        # do some stuff to the data
        dataframe['koolaid'] = pd.Series(['oh', 'yeeeeaaahh'])
        # output data as csv
        dataframe.to_csv(ot, index=False)

## How do we execute all of the tasks
Since all of the tasks are grouped in a chain, each requiring the previous to run before it can, we only need to call execution on the last task (LoadTask).  We do this by adding a main function to our script, which will look like this.

In [None]:
if __name__ == '__main__':
    luigi.run(['LoadTask', '--local-scheduler'])

Luigi will check LoadTask, see that it requires TransformTask to run before it, then see that TransformTask requires ExtractTask.  ExtractTask doesn't have any requirements, which means that while we specify LoadTask to execute, ExtractTask will run First.  

Finally, run your script as usual `python etl.py`

# Extra information to help you along
- use the `requests` package to fetch files from the web with the `get` method
- use the `pandas` package to aggregate the data
- also use `pandas` to load the data into the database with the `to_sql` method
- use the `sqlite3` package to act as a local database (doesn't require login credentials to get a connection)

# Goals for your pipeline
- Extract the titanic dataset from the url https://goo.gl/xMsy16
- Aggregate the number of passengers per class
  - groupby Pclass column, select Pclass column, then count()
- Load aggregate data into a database (sqlite3)
