## From interactive programming to production ready code

### Imports

In [4]:
from luigi.parameter import IntParameter, DateParameter
from luigi import LocalTarget, Task
from luigi.contrib.external_program import ExternalProgramTask
from luigi.contrib.spark import PySparkTask

## A sample task

In [3]:
from luigi import Task, LocalTarget, Parameter
class Beispiel(Task):
    parameter1 = Parameter(default="Test 1")
    parameter2 = Parameter(default="Test 2")

    def requires(self):
        return EinAndererTask(self.parameter2)

    def output(self):
        return LocalTarget("/tmp/test.checkpoint")

    def run(self):
        with self.output().open('w') as f:
            f.write(self.parameter1)

## Downloading a dataset using *ExternalProgramTask* and curl

In [6]:
class DownloadDataset(ExternalProgramTask):

    dataset_version = IntParameter(default=1)
    dataset_name = Parameter(default="dataset")

    base_url = "http://plainpixels.work/resources/datasets"
    file_fomat = "zip"

    def output(self):
        return LocalTarget("/tmp/%s_v%d.%s" % (self.dataset_name,
                                               self.dataset_version,
                                               self.file_fomat))

    def program_args(self):
        url = "%s/%s_v%d.%s" % (self.base_url, 
                                self.dataset_name, 
                                self.dataset_version,
                                self.file_fomat)
        return ["curl", "-L",
                "-o", self.output().path,
                url]

# Extracting a dataset using *ExternalProgramTask*

In [5]:
class ExtractDataset(ExternalProgramTask):
    
    dataset_version = IntParameter(default=1)
    dataset_name = Parameter(default="dataset")
    
    def requires(self):
        return DownloadDataset(self.dataset_version, self.dataset_name)

    def output(self):
        return LocalTarget("datasets/fruit-images-dataset/%d" % self.dataset_version)

    def program_args(self):
        self.output().makedirs()
        return ["unzip", "-u", "-q",
                "-d", self.output().path,
                self.input().path]

## A PySpark Task

Luigi supports a variety of Big Data technologies, like PySpark. The **PySparkTask** allows us to implement the PySpark code directly inside the task and provides a lot of options to configure the acquired ressources.

Also we see a new parameter type - the **DateParameter**. That makes Luigi to a very powerfull data ingestion tool.

By calling
```bash
luigi --module 01_classification_pipeline RangeDailyBase --of Classify \
                                                         --stop=$(date +"%Y-%m-%d") \
                                                         --days-back 4 \
                                                         --Classify-version 1 \
                                                         --reverse
```
we can utilize the **RangeDailyBase** wrapperclass. Executing the line above would execute the **Classify** task 4 times, for the last 4 days until today. This is a perfect tool to run unstable imports.


In [8]:
class Classify(PySparkTask):
    from datetime import date

    date = DateParameter(default=date.today())
    version = IntParameter(default=1)

    # PySpark Parameter
    driver_memory = '1g'
    executor_memory = '2g'
    executor_cores = '2'
    num_executors = '4'
    master = 'local'

    def requires(self):
        return [ModelExists(self.version), Clean(self.date)]

    def output(self):
        prefix = self.date.strftime("%m-%d-%Y")
        return LocalTarget("daily/%s/ergebnis.csv" % prefix)

    def main(self, sc, *args):
        from pyspark.sql.session import SparkSession
        sql = SparkSession.builder.getOrCreate()
        
        df = sql.read.format("com.databricks.spark.csv") \
                     .option("delimiter", ";") \
                     .option("header", "true") \
                     .load(self.input()[1].path)
        
        # Load model and run classification...
        
        with self.output().open("w") as out:
            # Never do this...
            ergebnis.toPandas().to_csv(out,
                                       encoding='utf-8',
                                       index=False,
                                       sep=';')