In [1]:
pip install pyyaml pandas

Note: you may need to restart the kernel to use updated packages.


In [2]:
import pandas as pd
import yaml
import os

# --- Configuration ---
CONFIG_FILE = 'retailer_A_config.yaml'
INPUT_FILE = 'customer_data.csv'
OUTPUT_FILE = 'syndication_ready.csv'
ERROR_FILE = 'validation_errors.log'

# --- Transformation Functions ---
# These functions will be called based on the 'transformations' in the YAML
def transform_titlecase(value):
    """Converts a string to title case."""
    if isinstance(value, str):
        return value.title()
    return value

def transform_uppercase(value):
    """Converts a string to uppercase."""
    if isinstance(value, str):
        return value.upper()
    return value

# --- Validation Functions ---
# These functions will be called based on the 'validations' in the YAML
def validate_is_positive_number(value):
    """Checks if a value is a number greater than 0."""
    try:
        if pd.isna(value):
            return False, "value is missing"
        if float(value) > 0:
            return True, ""
        else:
            return False, f"value '{value}' is not greater than 0"
    except (ValueError, TypeError):
        return False, f"value '{value}' is not a valid number"

def validate_not_empty(value):
    """Checks if a value is not empty or NaN."""
    if pd.isna(value) or str(value).strip() == "":
        return False, "value is missing or empty"
    return True, ""

# --- Main Engine ---
def process_data():
    print(f"--- Starting Project 2: Config-Driven Validation Engine ---")

    # 1. Load Configuration
    try:
        with open(CONFIG_FILE, 'r') as f:
            config = yaml.safe_load(f)
        print(f"Successfully loaded configuration for '{config['endpoint_name']}'")
    except FileNotFoundError:
        print(f"ERROR: Config file '{CONFIG_FILE}' not found.")
        return
    except Exception as e:
        print(f"ERROR reading config file: {e}")
        return

    # 2. Load Input Data
    try:
        df = pd.read_csv(INPUT_FILE)
        print(f"Successfully loaded {len(df)} records from '{INPUT_FILE}'.")
    except FileNotFoundError:
        print(f"ERROR: Input file '{INPUT_FILE}' not found.")
        return

    # Get rule definitions from config
    mappings = config.get('field_mappings', {})
    transformations = config.get('transformations', {})
    validations = config.get('validations', {})

    # Lists to hold our results
    clean_records = []
    error_logs = []

    # 3. Process each row
    for index, row in df.iterrows():
        clean_record = {}
        errors_found = []
        item_id = row.get(mappings.get('product_id', 'ITEM_CODE'), f"Row {index}") # Get an ID for logging

        # 3a. Apply Mappings
        for new_field, old_field in mappings.items():
            clean_record[new_field] = row.get(old_field)

        # 3b. Apply Transformations
        for field, transform_rule in transformations.items():
            if field in clean_record:
                if transform_rule == 'titlecase':
                    clean_record[field] = transform_titlecase(clean_record[field])
                elif transform_rule == 'uppercase':
                    clean_record[field] = transform_uppercase(clean_record[field])

        # 3c. Apply Validations
        for field, validation_rule in validations.items():
            value_to_check = clean_record.get(field)

            is_valid = False
            error_msg = "Unknown validation rule"

            if validation_rule == 'is_positive_number':
                is_valid, error_msg = validate_is_positive_number(value_to_check)
            elif validation_rule == 'not_empty':
                is_valid, error_msg = validate_not_empty(value_to_check)

            if not is_valid:
                errors_found.append(f"Field '{field}' failed validation '{validation_rule}': {error_msg}")

        # 4. Sort records
        if errors_found:
            # If there are errors, add them to the error log
            error_logs.append(f"--- FAILED RECORD (ID: {item_id}) ---")
            error_logs.extend(errors_found)
            error_logs.append("") # Add a blank line for readability
        else:
            # If no errors, add the clean record to our list
            clean_records.append(clean_record)

    # 5. Write Output Files

    # Write the clean file
    if clean_records:
        clean_df = pd.DataFrame(clean_records)
        clean_df.to_csv(OUTPUT_FILE, index=False)
        print(f"--- Success: {len(clean_records)} clean records saved to '{OUTPUT_FILE}' ---")
    else:
        print("No clean records were generated.")

    # Write the error log
    if error_logs:
        with open(ERROR_FILE, 'w') as f:
            f.write("\n".join(error_logs))
        print(f"--- Notice: {len(error_logs) // 3} records failed validation. See '{ERROR_FILE}' for details. ---")
    else:
        print("All records processed successfully with no validation errors.")
        # Clean up empty error file if it exists
        if os.path.exists(ERROR_FILE):
            os.remove(ERROR_FILE)

    print("\n--- Project 2 complete ---")

# --- Run the main function ---
if __name__ == "__main__":
    process_data()

--- Starting Project 2: Config-Driven Validation Engine ---
Successfully loaded configuration for 'Retailer A'
Successfully loaded 5 records from 'customer_data.csv'.
--- Success: 4 clean records saved to 'syndication_ready.csv' ---
--- Notice: 1 records failed validation. See 'validation_errors.log' for details. ---

--- Project 2 complete ---
