## From interactive programming to production ready code

### Imports

In [4]:
from luigi.parameter import IntParameter, DateParameter
from luigi import LocalTarget, Task
from luigi.contrib.spark import PySparkTask

## A PySpark Task

In [5]:
class Classify(PySparkTask):
    from datetime import date

    date = DateParameter(default=date.today())
    version = IntParameter(default=1)

    # PySpark Parameter
    driver_memory = '1g'
    executor_memory = '2g'
    executor_cores = '2'
    num_executors = '4'
    master = 'local'

    def requires(self):
        return [ModelExists(self.version), Clean(self.date)]

    def output(self):
        prefix = self.date.strftime("%m-%d-%Y")
        return LocalTarget("daily/%s/ergebnis.csv" % prefix)

    def main(self, sc, *args):
        from pyspark.sql.session import SparkSession
        from pyspark.ml import PipelineModel
        from pyspark.sql.functions import when

        sql = SparkSession.builder.getOrCreate()
        df = sql.read.format("com.databricks.spark.csv") \
                     .option("delimiter", ";") \
                     .option("header", "true") \
                     .load(self.input()[1].path)
        
        # Load model and run classification...
        
        with self.output().open("w") as out:
            ergebnis.toPandas().to_csv(out,
                                       encoding='utf-8',
                                       index=False,
                                       sep=';')