## From interactive programming to production ready code

### Imports

In [87]:
from luigi.contrib.external_program import ExternalProgramTask
from luigi.contrib.spark import PySparkTask
from luigi.parameter import IntParameter, Parameter, DateParameter
from luigi import LocalTarget, Task
import luigi
import datetime

In [11]:
import ssl
try:
    _create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
    pass
else:
    ssl._create_default_https_context = _create_unverified_https_context

In [102]:
class DateExists(Task):
    path = Parameter()

    def output(self):
        return LocalTarget(self.path)

## Task No.1: Clean the dataset

The cleaning task, that takes care to tokenize the posts, remove stopwords and stem.

*Input*: nothing <br>
*Output*: A cleaned version of reddit posts

In [104]:
class Clean(Task):
    from datetime import date
    import nltk
    nltk.download('punkt')
    nltk.download('stopwords')

    # Ein Datum wird als Parameter uebergeben
    date = DateParameter(default=date.today())
    basepath = "datasets/reddit_ds_got/raw"
    
    # Die Liste von Stop-Woertern
    # die herausgefiltert werden
    stoppwoerter = nltk.corpus.stopwords.words('english')

    # Der verwendete Tokenizer
    tokenizer = nltk.tokenize.RegexpTokenizer(r'\w+')

    # Der Stemmer fuer Englische Woerter
    stemmer = nltk.SnowballStemmer("english")
    
    def requires():
        prefix = self.date.strftime("%m-%d-%Y")
        return DateExists("%s/%s/roh.csv" % (self.basepath, prefix))
    
    # Das LocalTarget fuer die sauberen Daten
    # Die Daten werden unter
    # "daily/<datum>/cleaned.csv gespeichert
    def output(self):
        prefix = self.date.strftime("%m-%d-%Y")
        return LocalTarget("datasets/reddit_ds_got/daily/%s/cleaned.csv" % prefix)

    # Die Rohdaten werden zerstueckelt
    # durch die Stopwort-Liste gefiltert
    # und auf ihre Wortstaemme zurueckgefuehrt
    def run(self):
        csv = self.lade()
        tokenized = self.tokenize(csv)
        gefiltert = self.entferne(tokenized)
        wortstamm = self.stemme(gefiltert)
        csv["cleaned_words"] = wortstamm
        self.speichern(csv, self.output())

    def lade(self):
        import pandas
        prefix = self.date.strftime("%m-%d-%Y")
        path = "%s/%s/roh.csv" % (self.basepath, prefix)
        dataset = pandas.read_csv(path, encoding='utf-8', sep=';').fillna('')
        return dataset

    def tokenize(self, csv):
        def tok(post):
            tokenized = self.tokenizer.tokenize(post["title"] + " " + post["selftext"])
            return tokenized
        tokenized = csv.apply(tok, axis=1)
        return tokenized

    def entferne(self, tokenized):
        lowercase = tokenized.apply(lambda post: [wort.lower() for wort in post])
        filtered = lowercase.apply(lambda post: [wort for wort in post if wort not in self.stoppwoerter])
        return filtered

    def stemme(self, gefiltert):
        wortstamm = gefiltert.apply(lambda post: [self.stemmer.stem(wort) for wort in post])
        wortstamm = wortstamm.apply(lambda post: " ".join(post))
        return wortstamm
    
    def speichern(self, dataframe, target):
        with target.open("w") as out:
            dataframe[["id", "cleaned_words", "subreddit"]].to_csv(out, encoding='utf-8', index=False, sep=';')

[nltk_data] Downloading package punkt to
[nltk_data]     /Users/markkeinhorster/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/markkeinhorster/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [95]:
luigi.build([Clean(datetime.date(2018,2,19))], local_scheduler=True, no_lock=True)

DEBUG: Checking if Clean(date=2018-02-19) is complete
INFO: Informed scheduler that task   Clean_2018_02_19_999079b9db   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 2392] Worker Worker(salt=356792662, workers=1, host=Marks-MacBook-Pro-2.local, username=markkeinhorster, pid=2392) running   Clean(date=2018-02-19)
INFO: [pid 2392] Worker Worker(salt=356792662, workers=1, host=Marks-MacBook-Pro-2.local, username=markkeinhorster, pid=2392) done      Clean(date=2018-02-19)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Clean_2018_02_19_999079b9db   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=356792662, workers=1, host=Marks-MacBook-Pro-2.local, username=markkeinhorster, pid=2392) was stopped. Shutting down Keep-Alive thread
INFO: 
===== L

True

## Task No.2: Check existing model

Task No.2 checks if the model exists.

*Input*: nothing <br>
*Output*: A file representing the model

In [98]:
class ModelExists(Task):
    version = IntParameter(default=1)

    def output(self):
        return LocalTarget("model/%d" % self.version)

## Task No.3: Classify

Task No.3 classifies a daily post

*Input*: ModelExists, Clean <br>
*Output*: A file with the classification results

Execute from commandline:
```bash
PYTHONPATH='.' luigi --module 01_classification_pipeline Classify --date=2018-02-19 --local-scheduler

PYTHONPATH='.' luigi --module 01_classification_pipeline RangeDailyBase --of Classify \
                                                                        --start=2018-02-19 \
                                                                        --stop=2018-02-23 \
                                                                        --days-back 365 \
                                                                        --Classify-version 1 \
                                                                        --reverse \
                                                                        --local-scheduler

PYTHONPATH='.' luigi --module 01_classification_pipeline RangeDailyBase --of Classify \
                                                                        --stop=$(date +"%Y-%m-%d") \
                                                                        --days-back 4 \
                                                                        --Classify-version 1 \
                                                                        --reverse \
                                                                        --local-scheduler
```

In [101]:
class Classify(PySparkTask):
    from datetime import date

    date = DateParameter(default=date.today())
    version = IntParameter(default=1)

    # PySpark Parameter
    driver_memory = '1g'
    executor_memory = '2g'
    executor_cores = '2'
    num_executors = '4'
    master = 'local'

    # Als Abhaengigkeit werden
    # Task *Clean* und *ModelExists*
    # zurueckgegeben
    def requires(self):
        return [ModelExists(self.version), Clean(self.date)]

    # Das LocalTarget fuer die Klassifikation
    # Die Daten werden unter
    # "daily/<datum>/ergebnis.csv gespeichert
    def output(self):
        prefix = self.date.strftime("%m-%d-%Y")
        return LocalTarget("datasets/reddit_ds_got/daily/%s/ergebnis.csv" % prefix)

    def main(self, sc, *args):
        from pyspark.sql.session import SparkSession
        from pyspark.ml import PipelineModel
        from pyspark.sql.functions import when

        # Initialisiere den SQLContext
        sql = SparkSession.builder\
            .enableHiveSupport() \
            .config("hive.exec.dynamic.partition", "true") \
            .config("hive.exec.dynamic.partition.mode", "nonstrict") \
            .config("hive.exec.max.dynamic.partitions", "4096") \
            .getOrCreate()

        # Lade die bereinigten Daten
        df = sql.read.format("com.databricks.spark.csv") \
                     .option("delimiter", ";") \
                     .option("header", "true") \
                     .load(self.input()[1].path)

        # Lade das Model das zuvor mit SparkML trainiert wurde
        model = PipelineModel.load(self.input()[0].path)

        # Klassifiziere die Datensaetze eines Tages mit dem Model
        ergebnis = model.transform(df)[["id",
                                        "subreddit",
                                        "probability",
                                        "prediction"]]

        # Eine kleine Aufbereitung der Daten denn
        # die Klasse "1" hat den Namen "datascience"
        ergebnis = ergebnis.withColumn("prediction_label",
                                        when(ergebnis.prediction==1,
                                            "datascience") \
                                        .otherwise("gameofthrones"))

        # Der Einfachheit halber wird der Dataframe
        # in einen Pandas Dataframe konvertiert.
        # Dies sollte bei grossen Datenmengen vermieden.
        with self.output().open("w") as out:
            ergebnis.toPandas().to_csv(out,
                                       encoding='utf-8',
                                       index=False,
                                       sep=';')