# Automated Data Validation Pipeline

In [23]:
from os import chdir, getcwd, path, makedirs
from numpy import nan
from time import time
from logging import info, error
import subprocess
from yaml import YAMLError, safe_load, dump
import datetime
from re import sub, escape
from string import punctuation, ascii_letters
from random import random, choice, randint, uniform
from psutil import virtual_memory
from pandas import DataFrame, read_csv
from dask import dataframe as dd
import ray
from modin import pandas as mpd

In [2]:
cwd = getcwd()
print(cwd)
chdir('C:/Users/andre/Repositories/Professional.Portfolio/Sample6_Data.Validation.Pipeline')
print(cwd)


C:\Users\andre\Repositories\Professional.Portfolio\Sample6_Data.Validation.Pipeline
C:\Users\andre\Repositories\Professional.Portfolio\Sample6_Data.Validation.Pipeline


## Helper Functions

In [3]:
# function to keep track of df objects in memory
def dfs():
    """
    List all Pandas DataFrame objects currently in memory.

    This function lists all Pandas DataFrame objects present in the global namespace.

    Returns:
        list: A list of DataFrame objects.
    """
    dataframes = [var for var in globals() if isinstance(globals()[var], DataFrame)]
    print(dataframes)

In [4]:
dfs()

[]


In [5]:
def monitor_ram():
    """
    Get information about the system's RAM (Random Access Memory) usage.

    Returns:
        psutil._common.svmem: A named tuple representing RAM usage statistics.
    """
    memory_usage = virtual_memory()
    return memory_usage

In [6]:
monitor_ram()

svmem(total=34033319936, available=27926564864, percent=17.9, used=6106755072, free=27926564864)

## Functions to Create Mock Datasets >2.0Gb

In [7]:
def genermate_random_value():
    """
    Generate a random value with various data types and potential missing values.

    Returns:
        str, int, float, or None: A randomly generated value, which can be a string, integer, float,
        or None (representing a missing value).

    Description:
        This function generates random values with different data types and the possibility of missing values.
        - 10% chance of returning a missing value (None).
        - 20% chance of returning a string consisting of 5 random special characters.
        - 50% chance of returning a string consisting of 10 random alphanumeric characters.
        - 20% chance of returning either a random integer between 1 and 1000 (inclusive) or
          a random float between 0.1 and 1000.0 (inclusive).

    Example:
        Possible outputs:
        - 'ABc!@#' (string with special characters)
        - 123 (integer)
        - 456.789 (float)
        - None (missing value)
    """
    
    # 10% chance of missing value
    if random() < 0.1:  
        return nan
    # 20% chance of special character
    elif random() < 0.2:  
        return ''.join(choice(punctuation) for _ in range(5))
    # 50% chance of string
    elif random() < 0.5:  
        return ''.join(choice(ascii_letters) for _ in range(10))
     # 20% chance of number (integer or float)
    else: 
        return choice([randint(1, 1000), uniform(0.1, 1000.0)])

In [10]:
def generate_and_save_large_dataframe(file_type, postfix, delimiter=None):
    """
    Generate a large Pandas DataFrame with random data and save it to a CSV file when it exceeds 2GB in size.

    Description:
        This function generates random data and creates a Pandas DataFrame. It keeps adding rows to the DataFrame
        until its size exceeds 2GB. Once the size limit is reached, the DataFrame is saved to a CSV file
        with a filename indicating the size of the dataset in millions of rows (e.g., 'mock_dataset_6M.csv').

    Note:
        The function uses the `generate_random_value` function to create random data.

    Returns:
        None
    """
    start_time0 = time()
    data = {}
    num_columns = 25
    num_rows = 6000000

    while True:
        # Generate dict substructure for a Pandas dataframe
        for i in range(num_columns):
            column_name = f'column_{i+1}'
            data[column_name] = [generate_random_value() for _ in range(num_rows)]

        # Create a Pandas DataFrame
        df = DataFrame(data)
        df_size_bytes = df.memory_usage(index=True).sum()

        # Check size of dataframe: if > than 2GB, save the file
        if df_size_bytes > 2 * 1073741824: 
            filename = f'mock_dataset_{num_rows//1000000}M{postfix}.{file_type}'
            if file_type == 'psv':
                df.to_csv(filename, index=False, sep=delimiter)
            elif file_type == 'csv':
                df.to_csv(filename, index=False, sep=delimiter)
            elif file_type == 'tsv':
                df.to_csv(filename, index=False, sep=delimiter)
            print(f"Saved DataFrame to {filename}")
            break  
        # Else, add another 2000000 rows to the df
        else:
            del df 
            num_rows += 2000000

    end_time0 = time() - start_time0
    print(end_time0)


In [11]:
monitor_ram()

svmem(total=34033319936, available=27896766464, percent=18.0, used=6136553472, free=27896766464)

## Timing Functions for Pandas, Modin & Dask

In [12]:
## > Pandas

def load_csv_with_pandas(file_path):
    """
    Load a CSV file using Pandas and measure the loading time.

    Parameters:
        file_path (str): The path to the CSV file.

    Returns:
        float: The time taken to load the CSV file using Pandas (in seconds).
    """
    start_time1 = time()
    df = read_csv(file_path)
    end_time1 = time() - start_time1
    print(end_time1, ' seconds')
   

In [13]:
## > Modin

def load_csv_with_modin(file_path):
    """
    Load a CSV file using Modin and measure the loading time.

    Parameters:
        file_path (str): The path to the CSV file.

    Returns:
        float: The time taken to load the CSV file using Modin (in seconds).
    """
    ray.init(runtime_env={'env_vars': {'__MODIN_AUTOIMPORT_PANDAS__': '1'}})
    start_time3 = time()
    mdf = mpd.read_csv(file_path)
    end_time3 = time() - start_time3
    print(end_time3, ' seconds')
  


In [14]:
## > Dask

def load_csv_with_dask(file_path, delimiter):
    """
    Load a CSV file using Dask and measure the loading time.

    Parameters:
        file_path (str): The path to the CSV file.

    Returns:
        float: The time taken to load the CSV file using Dask (in seconds).
    """
    start_time2 = time()
    ddf = dd.read_csv(file_path, delimiter=delimiter)
    end_time2 = time() - start_time2
    print(end_time2, ' seconds')
    
    return ddf
    

In [15]:
monitor_ram()

svmem(total=34033319936, available=27898613760, percent=18.0, used=6134706176, free=27898613760)

## Utility Functions for Automated Script

In [28]:

def read_config_file(filepath):
    
    """
    Read and parse a YAML configuration file.

    Parameters:
        filepath (str): The path to the YAML configuration file.

    Returns:
        dict: A dictionary containing the parsed configuration data.
    """
    
    # Open specified filepath
    with open(filepath, 'r') as datacreek:
        # Try-Catch for YAMLError
        try:
            return safe_load(datacreek)
        except YAMLError as exc:
            # Logging library error sent to 'stdout'
            error(exc)


def replacer(string, char):
    """
    Replace two or more consecutive occurrences of a character in a string with a single occurrence.

    Parameters:
        string (str): The string to be processed.
        char (str): The character to be replaced.

    Returns:
        str: The string with consecutive occurrences replaced.
    """
    # Use regular expression to replace two or more consecutive occurrences with a single occurrence
    return sub(f'{escape(char)}+', char, string)


    
    
def col_header_val(df, columns):
    """
    Validate and standardize column names in a Dask DataFrame based on a table configuration.

    Parameters:
        df (dd.DataFrame): The Dask DataFrame to be validated.
        columns (list): The list of expected column names.

    Returns:
        tuple: A tuple containing a boolean indicating whether validation passed, and the validated Dask DataFrame.
    """
    
    # Convert all strings to lowercase
    df.columns = df.columns.str.lower()
 
    # Replace all whitespce at the start of col names
    df.columns = df.columns.str.replace('[^\w]', '_', regex=True)
    
    # Removes underscores from beginning & end of col names
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
   
    # Replaces 2 or more consecutive underscores with single underscore
    df.columns = list(map(lambda x: replacer(x, '_'), list(df.columns)))
    
    # Converts expected col_names for 'columns' to ensure case insensitivity during comaparison
    expected_col = list(map(lambda x: x.lower(), columns))
   
  
    # Ensures case insensitivity when comparing with expected col_names
    df.columns = list(map(lambda x: x.lower(), list(df.columns)))
    
    # Sort the DataFrame by multiple columns
    df = df[columns]

    if len(df.columns) == len(expected_col) and list(expected_col) == list(df.columns):
        print('Column name and column length validation passed')
        return True, df

    # If the above is false, then we check what the differences are between df.col and exp_col and print them
    else:
        print('Column name and column length validation failed')
        # Uses set operations for taking the difference between df.col and exp_col
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print('The following columns are not in the expected list:', mismatched_columns_file )
        # Uses set operations to check diff between exp_col and df_col
        missing_expected_columns = list(set(expected_col).difference(df.columns))
        print('The following expected columns are not in the uploaded file:', missing_expected_columns)
        # log results
        info(f'df columns: {df.columns}')
        info(f'expected columns: {expected_col}')
        return False, df



def generate_yaml_config(file_path, yaml_path, file_type, file_name, table_name, in_del, out_del, columns):
    """
    Generate a YAML configuration file for data processing and save it.

    Parameters:
        file_path (str): The path to the data file.
        yaml_path (str): The path where the YAML configuration file will be saved.
        file_type (str): The type of the data file (e.g., 'csv', 'parquet').
        file_name (str): The name of the data file.
        table_name (str): The name of the data processing table.
        in_del (str): The inbound delimiter character.
        out_del (str): The outbound delimiter character.
        columns (list): A list of column names.

    Returns:
        tuple: A tuple containing the generated file_path, yaml_path, file_name, file_type, table_name, in_del, out_del, columns.
    """
    
    # Create a configuration dictionary
    config_data = {
        'file_path': file_path,
        'yaml_path': yaml_path,
        'file_name': file_name,
        'file_type': file_type,
        'table_name': table_name,
        'inbound_delimiter': in_del,
        'outbound_delimiter': out_del,
        'skip_leading_rows': 1,
        'columns': columns,
    }

    # Convert the dictionary to YAML format
    yaml_config = dump(config_data, default_flow_style=False)

    # Save the YAML configuration to a file
    with open(yaml_path + f'{table_name}_config.yaml', 'w') as yaml_file:
        yaml_file.write(yaml_config)
        
    return file_path, yaml_path, file_name, file_type, table_name, in_del, out_del, columns




def read_file(file_type, source_file, delimiter):
    """
    Read data from a file into a Dask DataFrame based on the file type.

    Parameters:
        file_type (str): The type of the data file (e.g., 'csv', 'excel', 'parquet').
        source_file (str): The path to the source data file.
        delimiter (str): The delimiter used in the file.

    Returns:
        dd.DataFrame: A Dask DataFrame containing the data from the file.
    """
    if file_type == 'csv':
        return dd.read_csv(source_file, sep=delimiter)
    elif file_type == 'tsv':
        return dd.read_csv(source_file, sep=delimiter)
    elif file_type == 'psv':
        return dd.read_csv(source_file, sep=delimiter)
    # Add more file types as needed...


def construct_dataset_paths(file_path, file_name, file_type):
    """
    Construct the path to the dataset.

    Parameters:
        file_path (str): The path to the directory containing the dataset.
        file_name (str): The name of the dataset file.
        file_type (str): The type of the dataset file (e.g., 'csv', 'parquet').

    Returns:
        str: The complete path to the dataset file.
    """
    # Construct the path to the dataset
    source_file = path.join(file_path, f'{file_name}.{file_type}')
    return source_file

    
    

def save_file(df, file_type, target_path, delimiter):
    """
    Save a Dask DataFrame to a file with the specified file type.

    Parameters:
        df (dd.DataFrame): The Dask DataFrame to be saved.
        file_type (str): The file type (e.g., 'csv', 'tsv', 'psv').
        target_path (str): The path where the file should be saved.
        delimiter (str): The delimiter character for the file.

    Returns:
        bool: True if the file was saved successfully or already exists, False otherwise.
    """
    try:
        if file_type == 'csv':
            if path.exists(target_path):
                print(f"File already exists: {target_path}")
            else:
                df.to_csv(target_path, index=False, single_file=True, sep=delimiter)
        elif file_type == 'tsv':
            if path.exists(target_path):
                print(f"File already exists: {target_path}")
            else:
                df.to_csv(target_path, index=False, single_file=True, sep=delimiter)
        elif file_type == 'psv':
            if path.exists(target_path):
                print(f"File already exists: {target_path}")
            else:
                df.to_csv(target_path, index=False, single_file=True, sep=delimiter)
        # Add more file types as needed
        else:
            print(f"Unsupported file type: {file_type}")
            return False

        print(f"File saved successfully: {target_path}")
        return True
    except Exception as e:
        print(f"Error saving file: {e}")
        return False



In [17]:
monitor_ram()

svmem(total=34033319936, available=27896643584, percent=18.0, used=6136676352, free=27896643584)

## Generate >2.0Gb Dataset 

In [None]:
# Create 2.6GB dataset. Time to create: ~20min
generate_and_save_large_dataframe(file_type='csv', '0')
generate_and_save_large_dataframe(file_type='tsv', postfix='1', delimiter='\t')
generate_and_save_large_dataframe(file_type='psv', postfix='2', delimiter='|')

## Test each Library for Speed in Loading >2.0Gb Dataframe

In [18]:
# Load 2.6Gb dataset with pandas and evaluate time taken
load_csv_with_pandas('./mock_dataset_12M0.csv')
display(dfs())
display(monitor_ram())

48.54285383224487  seconds
[]


None

svmem(total=34033319936, available=27619098624, percent=18.8, used=6414221312, free=27619098624)

In [19]:
load_csv_with_modin('./mock_dataset_12M0.csv')
display(dfs())
display(monitor_ram())

2024-01-19 14:29:46,646	INFO worker.py:1724 -- Started a local Ray instance.


46.23307228088379  seconds
[]


None

svmem(total=34033319936, available=22098743296, percent=35.1, used=11934576640, free=22098743296)

In [20]:
# Load files into dataframes with dask
ddf0 = load_csv_with_dask('./mock_dataset_12M0.csv', delimiter=',')
display(dfs())
display(monitor_ram())

0.05746150016784668  seconds
[]


None

svmem(total=34033319936, available=22054789120, percent=35.2, used=11978530816, free=22054789120)

In [21]:
# Load the other two mock-datasets
ddf1 = load_csv_with_dask('./mock_dataset_12M1.tsv', delimiter='\t')
ddf2 = load_csv_with_dask('./mock_dataset_12M2.psv', delimiter='|')
display(dfs())
display(monitor_ram())

0.014030933380126953  seconds
0.01730966567993164  seconds
[]


None

svmem(total=34033319936, available=22042451968, percent=35.2, used=11990867968, free=22042451968)

## Main Script: Automated Data Validation Pipeline: Using Dask

In [31]:

## > Main Script

if __name__ == '__main__':

    # --- CREATE - DIRECTORIES: - FOR - CLEANSED DATASETS - AND - YAML - FILES --- #

    makedirs('./csv_files')
    makedirs('./csv_files/csv_config_files')
    makedirs('./tsv_files')
    makedirs('./tsv_files/tsv_config_files')
    makedirs('./psv_files')
    makedirs('./psv_files/psv_config_files')

    

    # --- CREATING - YAML - FILES --- #
    
    # List of file configurations
    file_configurations = [
    
        {   'file_path': './tsv_files/',
            'yaml_path': './tsv_files/tsv_config_files/',
           
            'file_name': 'mock_dataset_12M0',
            'file_type': 'csv',
            'table_name': 'table1',
            'in_del': ',',
            'out_del': '\t',
            'columns': ddf0.columns
        },
        {
            'file_path': './psv_files/',
            'yaml_path': './psv_files/psv_config_files/',
            'file_name': 'mock_dataset_12M1',
            'file_type': 'tsv',
            'table_name': 'table2',
            'in_del': '\t',
            'out_del': '|',
            'columns': ddf1.columns
        },
        {
            'file_path': './csv_files/',
            'yaml_path': './csv_files/csv_config_files/',
            'file_name': 'mock_dataset_12M2',
            'file_type': 'psv',
            'table_name': 'table2',
            'in_del': '|',
            'out_del': ',',
            'columns': ddf2.columns
        }
    
        # ... Add more configurations as needed ... #
    ]
    
    
    # Create a loop to create each YAML file for each dataset
    for config in file_configurations:
        file_path, yaml_path, file_name, file_type, table_name, in_del, out_del, columns = generate_yaml_config(
       

                    file_path=config['file_path'],
                    yaml_path=config['yaml_path'],
                    file_name=config['file_name'],
                    file_type=config['file_type'],
                    table_name=config['table_name'],
                    in_del=config['in_del'],
                    out_del=config['out_del'],
                    columns=config['columns']
        )
        
        

        # --- CONSTRUCT - PATHS - TO - DATASETS --- #

        # Construct paths
        source_file = construct_dataset_paths(file_path=file_path, file_name=file_name, file_type=file_type)

       
        # --- CLEANSING - OPERATIONS --- #

        # ... Add cleansing ops as needed ... #
        
        # Read the dataset in to be cleansed using configurations
        ddf = read_file(file_type=file_type, source_file='./' + file_name + f'.{file_type}', delimiter=in_del)
        
        # Perform column validation
        bool, ddf = col_header_val(df=ddf, columns=columns)

        # Print cols for visual verification
        print(ddf.columns, end='\n')
        


        # --- SAVING - CLEANSED - DATA - AT - SPECIFIED - PATHS --- #

        if bool:

            if out_del == ',': 
                # Define the target file path and file type
                target_file_path = file_path + f'cleansed_{file_name}.csv'
            elif out_del == '|':
                target_file_path = file_path + f'cleansed_{file_name}.psv'
            elif out_del == '\t':
                target_file_path = file_path + f'cleansed_{file_name}.tsv'
    
            # Save the file using the save_file function
            save_result = save_file(df=ddf, file_type=file_type, target_path=target_file_path, delimiter=out_del)
    
            if save_result:
                print(f'Successfully processed and saved: {file_name}')
            else:
                print(f'Error saving the file for: {file_name}')
        else:
            print(f'Validation failed for: {file_name}')

    

    

Column name and column length validation passed
Index(['column_1', 'column_2', 'column_3', 'column_4', 'column_5', 'column_6',
       'column_7', 'column_8', 'column_9', 'column_10', 'column_11',
       'column_12', 'column_13', 'column_14', 'column_15', 'column_16',
       'column_17', 'column_18', 'column_19', 'column_20', 'column_21',
       'column_22', 'column_23', 'column_24', 'column_25'],
      dtype='object')
File saved successfully: ./tsv_files/cleansed_mock_dataset_12M0.tsv
Successfully processed and saved: mock_dataset_12M0
Column name and column length validation passed
Index(['column_1', 'column_2', 'column_3', 'column_4', 'column_5', 'column_6',
       'column_7', 'column_8', 'column_9', 'column_10', 'column_11',
       'column_12', 'column_13', 'column_14', 'column_15', 'column_16',
       'column_17', 'column_18', 'column_19', 'column_20', 'column_21',
       'column_22', 'column_23', 'column_24', 'column_25'],
      dtype='object')
File saved successfully: ./psv_files

In [32]:
monitor_ram()

svmem(total=34033319936, available=13152710656, percent=61.4, used=20880609280, free=13152710656)

In [33]:
!pip freeze > requirements.txt