#### This notebook shows the example of how to use these modules; 
##### Abstract Base Class: Task 
##### Decorators: init_auto_parameters, parse_auto_parameters, get_auto_parameters

## Overview

Tfm class:

    get_auto_parameters - used to provide kwargs for Tfm class, kwargs are created from vars in conf.

Pipeline class:

    register_dynamic_endpoint - used to register a pipeline class into _dynamic_endpoint dict,
                                to be called from Main class.
    parse_auto_parameters - used to provide pipeline vars and set to self, vars are taken from conf.

Main class(Task/Handler):

    init_auto_parameters - used to store SparkSession, conf, DBUtils, logger into a dict to be used
                      across the pipeline.

## Transformation Class

"get_auto_parameters"

Decorator function for setting transformation class objects.

    The decorator will take vars from Tfm class section of conf and provide them to __init__ function
    of Tfm class according to its function.__code__.co_varnames (vars defined in __init__).

Note:

    Take conf, logger, spark from _pipeline_obj (created from pipeline)
    and pass these variables to function as keyword arguments
    but if not being run from pipeline, this decorator does nothing.

In [1]:
# import: datax in-house
from datax.utils.deployment_helper.decorator.auto_parameters import get_auto_parameters
import pyspark.sql.functions as F


class TransformationClassExample:
    @get_auto_parameters
    def __init__(
        self,
        start_date,
        end_date,
        with_col_str,
        spark=None,
        logger=None,
    ):
        self.spark = spark
        self.logger = logger

        self.start_date = start_date
        self.end_date = end_date
        
        self.with_col_str = with_col_str

    def create_df(self):

        df = self.spark.createDataFrame(
            [
                (1, "one"),
                (2, "two"),
                (3, "three"),
            ],
            ["id", "label"] 
        )

        return df

    def transform_func(self, df):

        added_df = df.withColumn("with_col_str", F.lit(self.with_col_str))

        return added_df

    def prepare_data(self):
        self.logger.warn(self.spark.sparkContext._conf.get("spark.app.name"))

        df = self.create_df()
        fn_df = self.transform_func(df)

        return fn_df


## Pipeline Class

"register_dynamic_endpoint":

    Storing pipeline classes into "_dynamic_endpoint" dict so that the handler can select a module dynamically.

"parse_auto_parameters":

    Setting SparkSession, logger, DBUtils to self and a pipeline class, it also takes vars from
    the pipeline section of conf and set them to self as well 
    (for conf, not provided to __init__ but assigned to self).

Note:

    Notice self.output_data_path and self.output_schema_path that can be used without defining the vars
    because they are provided via conf.

In [11]:
# import: datax in-house
from datax.utils.deployment_helper.decorator.dynamic_endpoint import register_dynamic_endpoint
from datax.utils.deployment_helper.decorator.auto_parameters import parse_auto_parameters


@register_dynamic_endpoint
class PipelineClassExample:
    @parse_auto_parameters
    def __init__(
        self,
        start_date,
        end_date,
        spark,
        conf_app,
        logger=None,
        dbutils=None,
    ):
        self.start_date = start_date
        self.end_date = end_date

        self.spark = spark
        self.logger = logger
        self.output_data_path = conf_app["PipelineClassExample"]["output_data_path"]
        self.output_schema_path = conf_app["PipelineClassExample"]["output_schema_path"]

    def execute(self):
        self.logger.warn(self.output_data_path)
        self.logger.warn(self.output_schema_path)

        tfm_obj = TransformationClassExample(
            self.start_date,
            self.end_date,
        )

        df = tfm_obj.prepare_data()

        return df


## Main Class (Handler)

"init_auto_parameters":

    Storing SparkSession, conf, DBUtils, logger into a dict to be utilized by the pipeline later.

Note:

    Originally module, start_date, end_date are provided from CLI and argparse
    (In this example, SimpleNamespace is used to reproduce).

In [3]:
# import: datax in-house
from datax.utils.deployment_helper.abstract_class.common import Task
from datax.utils.deployment_helper.decorator.dynamic_endpoint import _dynamic_endpoint
from datax.utils.deployment_helper.decorator.auto_parameters import init_auto_parameters
    
class TaskExample(Task):
    @init_auto_parameters
    def launch(self, args):
        
        self.logger.info("Launching task")
        pipeline_obj = _dynamic_endpoint[args.module](args.start_date, args.end_date)
        ret = pipeline_obj.execute()

        if ret:
            self.logger.info("Pipeline execution completed.")
            ret.show()


## Run Examples

Prepare SparkSession

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
logger = spark._jvm.org.apache.log4j.LogManager.getLogger(
    "auto_parameters_example")
dbutils = None


Transformation Class

In [5]:
# run tfm
test_tfm = TransformationClassExample("2022-01-01", "2022-01-01", "Example",
                                        spark, logger)
test_tfm_df = test_tfm.prepare_data()
test_tfm_df.show()


+---+-----+------------+
| id|label|with_col_str|
+---+-----+------------+
|  1|  one|     Example|
|  2|  two|     Example|
|  3|three|     Example|
+---+-----+------------+



Log output

    10:47:10 WARN auto_parameters_example: pyspark-shell

Pipeline Class

In [9]:
conf_app = {
    "PipelineClassExample": {
        "data_processor_name": "example_processor",
        "main_transformation_name": "example_transformation",
        "output_data_path": "example_data_path",
        "output_schema_path": "example_schema_path.json"
    },
    "TransformationClassExample": {
        "with_col_str": "From_Conf"
    }
    }

# run pipeline
test_pipeline = PipelineClassExample("2022-01-01", "2022-01-01",
                                        spark, conf_app, logger, dbutils)
test_pipeline_df = test_pipeline.execute()
test_pipeline_df.show()

+---+-----+------------+
| id|label|with_col_str|
+---+-----+------------+
|  1|  one|   From_Conf|
|  2|  two|   From_Conf|
|  3|three|   From_Conf|
+---+-----+------------+



Log output

    11:11:25 WARN auto_parameters_example: example_data_path
    11:11:25 WARN auto_parameters_example: example_schema_path.json
    11:11:25 WARN auto_parameters_example: pyspark-shell

Pipeline Class with Hard-coded Tfm vars

    Hard-coded Tfm vars will overwrite the same vars from conf.

In [7]:
# import: datax in-house
from datax.utils.deployment_helper.decorator.dynamic_endpoint import register_dynamic_endpoint
from datax.utils.deployment_helper.decorator.auto_parameters import parse_auto_parameters

@register_dynamic_endpoint
class PipelineClassExample:
    @parse_auto_parameters
    def __init__(
        self,
        start_date,
        end_date,
        spark,
        conf_app,
        logger=None,
        dbutils=None,
    ):
        self.start_date = start_date
        self.end_date = end_date

        self.spark = spark
        self.logger = logger
        self.output_data_path = conf_app["PipelineClassExample"]["output_data_path"]
        self.output_schema_path = conf_app["PipelineClassExample"]["output_schema_path"]

    def execute(self):
        self.logger.warn(self.output_data_path)
        self.logger.warn(self.output_schema_path)

        tfm_obj = TransformationClassExample(
            self.start_date,
            self.end_date,
            "From_Hard_coding"
        )

        df = tfm_obj.prepare_data()

        return df


In [6]:
conf_app = {
    "PipelineClassExample": {
        "data_processor_name": "example_processor",
        "main_transformation_name": "example_transformation",
        "output_data_path": "example_data_path",
        "output_schema_path": "example_schema_path.json"
    }
    }

# run pipeline
test_pipeline = PipelineClassExample("2022-01-01", "2022-01-01",
                                        spark, conf_app, logger, dbutils)
test_pipeline_df = test_pipeline.execute()
test_pipeline_df.show()

+---+-----+----------------+
| id|label|    with_col_str|
+---+-----+----------------+
|  1|  one|From_Hard_coding|
|  2|  two|From_Hard_coding|
|  3|three|From_Hard_coding|
+---+-----+----------------+



In [7]:
conf_app = {
    "PipelineClassExample": {
        "data_processor_name": "example_processor",
        "main_transformation_name": "example_transformation",
        "output_data_path": "example_data_path",
        "output_schema_path": "example_schema_path.json"
    },
    "TransformationClassExample": {
        "with_col_str": "This_will_not_be_used"
    }
    }

# run pipeline
test_pipeline = PipelineClassExample("2022-01-01", "2022-01-01",
                                        spark, conf_app, logger, dbutils)
test_pipeline_df = test_pipeline.execute()
test_pipeline_df.show()

+---+-----+----------------+
| id|label|    with_col_str|
+---+-----+----------------+
|  1|  one|From_Hard_coding|
|  2|  two|From_Hard_coding|
|  3|three|From_Hard_coding|
+---+-----+----------------+



Handler

In [8]:
from types import SimpleNamespace 

conf_app = {
        "PipelineClassExample": {
            "data_processor_name": "example_processor",
            "main_transformation_name": "example_transformation",
            "output_data_path": "example_data_path",
            "output_schema_path": "example_schema_path.json"
        },
        "TransformationClassExample": {
            "with_col_str": "From_Conf"
        }
    }

module_name = "PipelineClassExample"

sn = SimpleNamespace(module=module_name, start_date="2022-01-01", end_date="2022-01-01")

test_task = TaskExample(init_conf_app=conf_app, module_name=module_name)
test_task.launch(sn)

+---+-----+------------+
| id|label|with_col_str|
+---+-----+------------+
|  1|  one|   From_Conf|
|  2|  two|   From_Conf|
|  3|three|   From_Conf|
+---+-----+------------+



Log output

    11:15:28 WARN TaskExample: No DBUtils defined in the runtime
    11:15:28 WARN TaskExample: example_data_path
    11:15:28 WARN TaskExample: example_schema_path.json
    11:15:28 WARN TaskExample: pyspark-shell

Note:

    Log in TaskExample does not show up in the output because the Setting default log level is "WARN".