## A working example of the postupdateprocesses function: `secfsdstools.x_examples.automation.memory_optimized_automation.define_extra_processes` (introduced in 2.2.0)


### What this pipeline creates

It result in creating the following bags:

- a single joined bag per statement (BS, IS, CF, ..) that will contain the data from all available quarters.
- single standardized bags for each of BS, IS, CF which contain data from all the available quarters.
- a single joined bag containing all the data from all statements from all available quarters.

Moreover, all these bags are updated in an efficient way, as soon as new data becomes available at the SEC website.

This version has a low memory footprint and should run without any problems on 16 GB.


### How to use the example


You can use this function directly by adding it to your configuration file together with some additional configuration parameters used by it: 
<pre>
[DEFAULT]
...
postupdateprocesses=secfsdstools.x_examples.automation.memory_optimized_automation.define_extra_processes

[Filter]
filtered_joined_by_stmt_dir = C:/data/sec/automated/_1_by_quarter/_1_filtered_joined_by_stmt
parallelize = True

[Standardizer]
standardized_by_stmt_dir = C:/data/sec/automated/_1_by_quarter/_2_standardized_by_stmt

[Concat]
concat_joined_by_stmt_dir = C:/data/sec/automated/_2_all/_1_joined_by_stmt
concat_joined_all_dir = C:/data/sec/automated/_2_all/_2_joined
concat_standardized_by_stmt_dir = C:/data/sec/automated/_2_all/_3_standardized_by_stmt
</pre>

The function will add 5 additional steps.

The first step creates a joined bag for every zip file which is filtered for 10-K and 10-Q reports only
and also applies the filters `ReportPeriodRawFilter`, `MainCoregRawFilter`, `USDOnlyRawFilter`, `OfficialTagsOnlyRawFilter`. 
Furthermore, the data is also split by stmt. If you set `parallelize = False`, the step will use less memory in the initial run
but be a little bit slower. Once the available data from all the past quarters were processed, it actually shouldn't matter anymore.

The filtered joined bag is stored under the path that is defined under `filtered_dir_by_stmt_joined` in the configuration file.
The resulting directory structure will look like this:


    <filtered_dir_by_stmt_joined>
        quarter
            2009q2.zip
                BS
                CF
                CI
                CP
                EQ
                IS
            ...

The second step uses the the results of the first step and creates standardized bags for every quarter.
The results are stored under the path that is defined under `standardized_by_stmt_dir` and the structure will look like this:

    <standardized_by_stmt_dir>
        2009q2.zip
            BS
            CF
            IS
        2009q3.zip
            BS
            CF
            IS
        ...

The third step concatenates per statement all available dat from the first steps.
So, you will have one bag with all BS information for all quarters, one for CF, and so on.
The results are stored under the path that is defined under `concat_joined_by_stmt_dir` and the structure will look like this:

    <concat_joined_by_stmt_dir>
        BS
        CF
        CI
        CP
        EQ
        IS

The fourth step concatenates the results from the third step into a single bag. 
So, you will have all data from all quarters in one bag. Especially when using predicate pushdown, you will still get
reasonable load performance.

The resutling bag is stored under the path that is defined under `concat_joined_all_dir`.


The fith step concatenates the standardized bags together (per statement). You will get a single standardize bag for each 
BS, CF, and IS containing all the datat from all quarters.

The results are stored under the path that is defined under `concat_standardized_by_stmt_dir` and the structure will look like this:

    <concat_standardized_by_stmt_dir>
        BS
        CF
        IS
        all


**Hint: This bags can now be loaded directly with the load method of JoinedDataBag, resp StandardizedBag.**


### How the example is implemented.

Let us have a look at the implementation of the the function `define_extra_processes`:


In [None]:
def define_extra_processes(configuration: Configuration) -> List[AbstractProcess]:

    # First, the parameters in the config file are read.
    filtered_joined_by_stmt_dir = configuration.config_parser.get(
        section="Filter",
        option="filtered_joined_by_stmt_dir")

    filter_parallelize = configuration.config_parser.get(
        section="Filter",
        option="parallelize",
        fallback="True"
    )

    standardized_by_stmt_dir = configuration.config_parser.get(
        section="Standardizer",
        option="standardized_by_stmt_dir")

    concat_joined_by_stmt_dir = configuration.config_parser.get(
        section="Concat",
        option="concat_joined_by_stmt_dir")

    concat_joined_all_dir = configuration.config_parser.get(
        section="Concat",
        option="concat_joined_all_dir")

    concat_standardized_by_stmt_dir = configuration.config_parser.get(
        section="Concat",
        option="concat_standardized_by_stmt_dir")

    processes: List[AbstractProcess] = []


    # The first step filters the data. It is apllied on the data of every available transformed parquet folder.
    # If nothing else is configured, it will filter for 10-K and 10-Q reports only.
    # Moreover, it will also apply the filters ReportPeriodRawFilter, MainCoregRawFilter, USDOnlyRawFilter, and OfficialTagsOnlyRawFilter. 
    # You can actually configure wether you want the data to be saved as RawDataBag or a JoinedDataBag.
    # In our case, we will use the JoinedDataBag.
    # As another parameter, we can configure that the data is split up by stmt. So the data for every statement is saved in its on subfolder.
    # Therefore, the result will be a folder for every quarter containing subfolders for every statement (BS, CF, CI, CP, EQ, and IS).
    # Note that the execution is processed in parallel, if the `parallize` option is missing or set to True.
    processes.append(
        FilterProcess(db_dir=configuration.db_dir,
                      target_dir=filtered_joined_by_stmt_dir,
                      bag_type="joined",
                      save_by_stmt=True,
                      execute_serial=not filter_parallelize
                      )
    )

    
    # The second step creates standardized bags for every quarter. 
    # It expects that either the provided root_dir contains the folders BS, CF, IS directly,
    # or contains subfolders with folders BS, CF, IS. In this case, we process the data from the
    # first step and therefore, we produce a new subfolder for every quarter in target_dir.
    processes.append(
        StandardizeProcess(root_dir=f"{filtered_joined_by_stmt_dir}/quarter",
                           target_dir=standardized_by_stmt_dir),
    )

    # The third step creates a single bag for every statement BS, CF, CI, ..
    processes.extend([
        ConcatByNewSubfoldersProcess(root_dir=f"{filtered_joined_by_stmt_dir}/quarter",
                                     target_dir=f"{concat_joined_by_stmt_dir}/BS",
                                     pathfilter="*/BS"
                                     ),
        ConcatByNewSubfoldersProcess(root_dir=f"{filtered_joined_by_stmt_dir}/quarter",
                                     target_dir=f"{concat_joined_by_stmt_dir}/CF",
                                     pathfilter="*/CF"
                                     ),
        ConcatByNewSubfoldersProcess(root_dir=f"{filtered_joined_by_stmt_dir}/quarter",
                                     target_dir=f"{concat_joined_by_stmt_dir}/CI",
                                     pathfilter="*/CI"
                                     ),
        ConcatByNewSubfoldersProcess(root_dir=f"{filtered_joined_by_stmt_dir}/quarter",
                                     target_dir=f"{concat_joined_by_stmt_dir}/CP",
                                     pathfilter="*/CP"
                                     ),
        ConcatByNewSubfoldersProcess(root_dir=f"{filtered_joined_by_stmt_dir}/quarter",
                                     target_dir=f"{concat_joined_by_stmt_dir}/EQ",
                                     pathfilter="*/EQ"
                                     ),
        ConcatByNewSubfoldersProcess(root_dir=f"{filtered_joined_by_stmt_dir}/quarter",
                                     target_dir=f"{concat_joined_by_stmt_dir}/IS",
                                     pathfilter="*/IS"
                                     )
    ])

    # The fourth step creates a single joined bag containing all the data from all quarters and statements.
    # We do this by concatenating the results from the previous steps.
    processes.append(
        ConcatByChangedTimestampProcess(
            root_dir=concat_joined_by_stmt_dir,
            target_dir=concat_joined_all_dir,
        )
    )

    # The fifth step creates a single standardize bag for the statements BS, CF, and IS. 
    # It does it by concatenating the results from the second step.
    processes.extend([
        ConcatByNewSubfoldersProcess(root_dir=standardized_by_stmt_dir,
                                     target_dir=f"{concat_standardized_by_stmt_dir}/BS",
                                     pathfilter="*/BS",
                                     in_memory=True # Standardized Bag only work with in_memory
                                     ),
        ConcatByNewSubfoldersProcess(root_dir=standardized_by_stmt_dir,
                                     target_dir=f"{concat_standardized_by_stmt_dir}/CF",
                                     pathfilter="*/CF",
                                     in_memory=True # Standardized Bag only work with in_memory
                                     ),
        ConcatByNewSubfoldersProcess(root_dir=standardized_by_stmt_dir,
                                     target_dir=f"{concat_standardized_by_stmt_dir}/IS",
                                     pathfilter="*/IS",
                                     in_memory=True # Standardized Bag only work with in_memory
                                     )
    ])

    return processes
