# Complete Workflow testing
This notebook processes the full workflow from raw data to trained model for future usage with the preferred MLOps Tool Stack.
In this case the relevant components will be DVC for Data Versioning, MLflow for Experiment Tracking and Model Registry and Prefect for Workflow Orchestration.
Everything in this notebook is adapted to the specific customer segmentation project of a small car repair shop.

## Preparation

In [1]:
# Common utils
ERROR_MESSAGE_OSERROR = "File '{0}' not found."

## Extract
The extraction phase consists of
- merging the raw text files
- converting the text files to one single csv file
- converting the csv file to a pandas DataFrame
- processing the data (header name conversion, deleting unnessecary columns, normalizing, etc.)
- converting the final DataFrame to a parquet file

All steps are logged as MLflow runs with the relevant metadata and artifacts.
Every step will be represented as a python function to easily create the corresponding python scripts.

### Define common variables
These variables will be parameters for the final python scripts.

In [2]:
# Imports needed for extraction
import os
import time
from pathlib import Path

In [3]:
raw_data_path = 'data'
tmp_file_path = '/tmp'
raw_data_file_extensions = ['.TXT']
raw_data_merged_file_name = 'data_merged.TXT'
raw_data_encoding = 'iso8859_15'

### Merging the raw text files

In [4]:
def merge_raw_data(data_dir: str, out_file_path: str, *, data_dir_file_extensions: list, encoding='utf_8') -> str:
    """
    Merge contents of multiple files into one output file.

    Parameters:
        data_dir (str): Path to the raw input files.
        out_file_path (str): Full path to the output file (e.g. '/tmp/out.txt').
        encoding (str, optional): Encoding of the input files (default 'utf_8').
        
    Returns: 
        (str): Full path of the output file.
    """
    # Full relative path for each raw file
    raw_files = sorted([os.path.join(raw_data_path, filename) for filename in os.listdir(data_dir)])
    print("Raw files after path extensions:")
    print(raw_files)
    
    # Filter directories (in our case only files are necessary and allowed)
    raw_files_dir_filter = [file_path for file_path in raw_files if os.path.isfile(file_path)]
    if len(raw_files) != len(raw_files_dir_filter):
        print("Raw files after filtering directories:")
        print(raw_files_dir_filter)
    else:
        print("No directories had to be filtered from raw files")
    raw_files = raw_files_dir_filter
    
    # If specific conditions where provided (data_dir_file_extensions), filter the raw_files list to only keep the permitted files
    if data_dir_file_extensions is not None and len(data_dir_file_extensions) > 0:
        raw_files_extension_filter = [filename for filename in raw_files if filename.endswith(tuple(data_dir_file_extensions))]
        if len(raw_files_extension_filter) != len(raw_files_dir_filter):
            print("Raw files after filtering by allowed extensions:")
            print(raw_files_extension_filter)
        else:
            print("No files had to be filtered by forbidden extension")
        raw_files = raw_files_extension_filter
    else:
        print("No allowed extensions where provided")

    # Perform merging
    if len(raw_files) == 0:
        raise Exception(f"No raw_files left after filtering")
    with open(out_file_path, 'w', encoding=encoding) as merged:
        for idx, file in enumerate(raw_files):
            with open(file, 'r', encoding=encoding) as current_raw_file:
                # The header information is only needed ones
                if idx == 0:
                    merged.writelines(current_raw_file.readlines())
                else:
                    merged.writelines(current_raw_file.readlines()[1:])
                print(f"Finished processing file {file}")
        
    print(f"Files merged in {out_file_path}")
    return out_file_path

In [5]:
start_time = time.time()
raw_data_merged_file_path = merge_raw_data(
    raw_data_path, 
    os.path.join(tmp_file_path, raw_data_merged_file_name), 
    data_dir_file_extensions=raw_data_file_extensions, 
    encoding=raw_data_encoding)
print(f"Execution of merging took: {time.time() - start_time} seconds")

Raw files after path extensions:
['data/.ipynb_checkpoints', 'data/2010_01_01-2013_12_31.TXT', 'data/2014_01_01-2015_12_31.TXT', 'data/2016_01_01-2017_12_31.TXT', 'data/2018_01_01-2019_12_31.TXT', 'data/2020_01_01-2021_12_31.TXT', 'data/2022_01_01-2024_08_06.TXT']
Raw files after filtering directories:
['data/2010_01_01-2013_12_31.TXT', 'data/2014_01_01-2015_12_31.TXT', 'data/2016_01_01-2017_12_31.TXT', 'data/2018_01_01-2019_12_31.TXT', 'data/2020_01_01-2021_12_31.TXT', 'data/2022_01_01-2024_08_06.TXT']
No files had to be filtered by forbidden extension
Finished processing file data/2010_01_01-2013_12_31.TXT
Finished processing file data/2014_01_01-2015_12_31.TXT
Finished processing file data/2016_01_01-2017_12_31.TXT
Finished processing file data/2018_01_01-2019_12_31.TXT
Finished processing file data/2020_01_01-2021_12_31.TXT
Finished processing file data/2022_01_01-2024_08_06.TXT
Files merged in /tmp/data_merged.TXT
Execution of merging took: 0.8726873397827148 seconds


### Converting TXT to CSV
For converting the merged text file to a csv file, the delimiter "tab" should be replaced with semicolons.

In [6]:
def txt_to_csv(input_file_path: str, out_file_path: str = None, encoding='utf_8') -> str:
    """
    Converts a single text file with tab as delimiter to a csv file.

    Parameters:
        input_file_path (str): Full path to the text file to be converted.
        out_file_path (str, optional): Full path to output file. Default is using the input_file_path with .csv extension (Default None).

    Exceptions:
        OSError: Raised if input_file_path does not exist or the specified out_file_path is invalid (directory does not exist)

    Returns:
        (str): Full path to the created csv file
    """
    # Check input file path
    if not os.path.isfile(input_file_path):
        raise OSError(ERROR_MESSAGE_OSERROR.format(input_file_path))

    # Check output file path or set it if no parameter was set
    if out_file_path is None:
        out_file_path = os.path.join(os.path.dirname(input_file_path), Path(input_file_path).stem + '.csv')
        print(out_file_path)
    elif not os.path.exists(os.path.dirname(out_file_path)):
        raise OSError(f"Path '{os.path.dirname(out_file_path)}' does not exist.")

    # Open and process text file to prepare csv
    with open(input_file_path, 'r', encoding=encoding) as merged_input:
        data = merged_input.readlines()
        # Replace unicode character U+00B7 (Middle Dot) with hyphen and split by tab delimiter
        # Results in a list of lists as base for converting to a csv file
        data = [x.strip().replace("·", "-").split("\t") for x in data]
        # Join the inner lists with semicolon as csv delimiter        
        data = [';'.join(x) for x in data]

        # Remove rows which do not match the column count
        data_lines = len(data)
        print(f"The data has {data_lines} rows.")
        column_count = len(data[0].split(";"))
        print(f"The data has {column_count} columns.")
        print("Skipping unsufficient rows...")
        data = [line for line in data if len(line.split(";")) == column_count]
        print(f"Removed {data_lines - len(data)} rows.")

        # Write to output csv
        with open(out_file_path, 'w') as out_file:
            out_file.write("\n".join(data))

    return out_file_path

In [7]:
start_time = time.time()
raw_data_csv = txt_to_csv(raw_data_merged_file_path, encoding=raw_data_encoding)
print(f"Creating csv from merged txt took: {time.time() - start_time} seconds")

/tmp/data_merged.csv
The data has 138887 rows.
The data has 20 columns.
Skipping unsufficient rows...
Removed 12 rows.
Creating csv from merged txt took: 1.6483495235443115 seconds


In [8]:
raw_data_csv

'/tmp/data_merged.csv'

In [9]:
def clean_raw_data(input_file_path: str):
    """
    Clean the header names, drop obsolete columns, apply privacy by obscuring names.
    
    """
    if not os.path.isfile(input_file_path):
        raise OSError(ERROR_MESSAGE_OSERROR.format(input_file_path))