In [7]:
import pandas as pd
import numpy as np
import os
import shutil
import time
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from tqdm import tqdm
from datetime import datetime
import glob

# ==========================================
# 1. CONFIGURATION & SCHEMA DEFINITION
# ==========================================

LOCATIONS_FILE = 'locations_dummy.csv'
ALLOCATIONS_FILE = 'allocations.csv'
PARTS_FILE = 'synthetic_parts_generated.csv'
OUTPUT_DIR = 'validation_results'
MAX_EXECUTION_TIME_SEC = 300

# Strict Schema required for Geometric Logic
REQUIRED_SCHEMA = {
    'LOCATIONS': {
        'file': LOCATIONS_FILE,
        'columns': ['loc_inst_code', 'width', 'depth', 'height', 'x', 'y', 'z']
    },
    'ALLOCATIONS': {
        'file': ALLOCATIONS_FILE,
        'columns': ['LOCATION_ID', 'SKU', 'GRID_X', 'GRID_Y', 'GRID_Z', 
                    'ORIENT_X_MM', 'ORIENT_Y_MM', 'ORIENT_Z_MM', 'INIT_UNITS']
    },
    'PARTS': {
        'file': PARTS_FILE,
        'columns': ['ITEM_ID', 'LEN_MM', 'WID_MM', 'DEP_MM', 'WT_KG']
    }
}

# ==========================================
# 2. COLOR LOGGING UTILS
# ==========================================

class Colors:
    HEADER = '\033[95m'
    BLUE = '\033[94m'
    CYAN = '\033[96m'
    GREEN = '\033[92m'
    YELLOW = '\033[93m'
    RED = '\033[91m'
    RESET = '\033[0m'
    BOLD = '\033[1m'

report_buffer = []

def log(message):
    """
    Prints to console with color coding based on keywords, 
    but saves plain text to the report buffer.
    """
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    plain_msg = f"[{timestamp}] {message}"
    report_buffer.append(plain_msg)

    # Color Logic for Terminal
    colored_msg = message
    if "PASS" in message:
        colored_msg = message.replace("PASS", f"{Colors.GREEN}{Colors.BOLD}PASS{Colors.RESET}")
    elif "FAIL" in message:
        colored_msg = message.replace("FAIL", f"{Colors.RED}{Colors.BOLD}FAIL{Colors.RESET}")
    elif "CRITICAL" in message:
        colored_msg = f"{Colors.RED}{Colors.BOLD}{message}{Colors.RESET}"
    elif "WARN" in message:
        colored_msg = message.replace("WARN", f"{Colors.YELLOW}{Colors.BOLD}WARN{Colors.RESET}")
    elif "---" in message:
        colored_msg = f"{Colors.CYAN}{message}{Colors.RESET}"
    
    print(f"[{timestamp}] {colored_msg}")

def setup_environment():
    if os.path.exists(OUTPUT_DIR):
        shutil.rmtree(OUTPUT_DIR)
    os.makedirs(OUTPUT_DIR)
    log(f"Output folder '{OUTPUT_DIR}' ready.")

# ==========================================
# 3. DATA LOADING
# ==========================================

def load_and_validate_dataset(key, config):
    filepath = config['file']
    required_cols = config['columns']
    
    log(f"--- Loading {key} ---")
    
    if not os.path.exists(filepath):
        log(f"CRITICAL: File {filepath} not found.")
        return None, False

    try:
        df = pd.read_csv(filepath, sep=None, engine='python', dtype=str)
        df.columns = df.columns.str.strip().str.replace('^ï»¿', '', regex=True)
        
        missing_cols = [c for c in required_cols if c not in df.columns]
        
        if missing_cols:
            log(f"CRITICAL SCHEMA ERROR in {key}.")
            log(f"   Missing: {missing_cols}")
            return df, False
        else:
            log(f"SUCCESS: {key} loaded ({len(df)} rows).")
            return df, True

    except Exception as e:
        log(f"CRITICAL ERROR reading {filepath}: {e}")
        return None, False

def convert_numeric(df, cols):
    for col in cols:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
    return df

def load_all_data():
    setup_environment()
    datasets = {}
    valid_flags = {}
    
    # Load and convert based on config
    # Locations
    df, valid = load_and_validate_dataset('LOCATIONS', REQUIRED_SCHEMA['LOCATIONS'])
    if valid: df = convert_numeric(df, ['width', 'depth', 'height', 'x', 'y', 'z'])
    datasets['LOCATIONS'] = df
    valid_flags['LOCATIONS'] = valid

    # Allocations
    df, valid = load_and_validate_dataset('ALLOCATIONS', REQUIRED_SCHEMA['ALLOCATIONS'])
    if valid: 
        df = convert_numeric(df, ['GRID_X', 'GRID_Y', 'GRID_Z', 
                                  'ORIENT_X_MM', 'ORIENT_Y_MM', 'ORIENT_Z_MM', 'INIT_UNITS'])
    datasets['ALLOCATIONS'] = df
    valid_flags['ALLOCATIONS'] = valid

    # Parts
    df, valid = load_and_validate_dataset('PARTS', REQUIRED_SCHEMA['PARTS'])
    if valid: df = convert_numeric(df, ['LEN_MM', 'WID_MM', 'DEP_MM', 'WT_KG'])
    datasets['PARTS'] = df
    valid_flags['PARTS'] = valid
    
    return datasets, valid_flags

# ==========================================
# 4. VISUALIZATION LOGIC (NEW)
# ==========================================

def visualize_top_utilization(datasets):
    log("--- Generating Top Utilization Plot ---")
    
    df_alloc = datasets['ALLOCATIONS']
    df_loc = datasets['LOCATIONS']
    
    # Merge to get dimensions
    merged = df_alloc.merge(df_loc, left_on='LOCATION_ID', right_on='loc_inst_code', how='inner')
    
    if merged.empty:
        log("WARN: No merged data available for plotting.")
        return

    # Calculate Utilization
    # Stack Vol = Grid Count * Orient Dim
    # Bin Vol = W * D * H
    merged['STACK_VOL'] = (merged['GRID_X'] * merged['ORIENT_X_MM']) * \
                          (merged['GRID_Y'] * merged['ORIENT_Y_MM']) * \
                          (merged['GRID_Z'] * merged['ORIENT_Z_MM'])
    
    merged['LOC_VOL'] = merged['width'] * merged['depth'] * merged['height']
    
    # Avoid div by zero
    merged = merged[merged['LOC_VOL'] > 0]
    merged['UTILIZATION_PCT'] = (merged['STACK_VOL'] / merged['LOC_VOL']) * 100
    
    # Get Top Item
    top_row = merged.sort_values(by='UTILIZATION_PCT', ascending=False).iloc[0]
    
    log(f"Top Utilization Found: {top_row['UTILIZATION_PCT']:.2f}% at {top_row['LOCATION_ID']}")
    
    # Plotting
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 8))
    fig.suptitle(f"Top Utilization: {top_row['LOCATION_ID']} (SKU: {top_row['SKU']}) - {top_row['UTILIZATION_PCT']:.1f}% Full", fontsize=16)

    # --- Data Prep ---
    bin_w = top_row['width']
    bin_d = top_row['depth']
    bin_h = top_row['height']
    
    item_w = top_row['ORIENT_X_MM']
    item_d = top_row['ORIENT_Y_MM']
    item_h = top_row['ORIENT_Z_MM']
    
    grid_x = int(top_row['GRID_X'])
    grid_y = int(top_row['GRID_Y'])
    grid_z = int(top_row['GRID_Z'])

    # --- Front View (Width vs Height) ---
    # Draw Bin
    ax1.add_patch(patches.Rectangle((0, 0), bin_w, bin_h, fill=False, edgecolor='red', linewidth=3, label='Bin Boundary'))
    
    # Draw Items (Loop Z then X)
    for z in range(grid_z):
        for x in range(grid_x):
            # Bottom-left corner of item
            pos_x = x * item_w
            pos_z = z * item_h
            
            rect = patches.Rectangle((pos_x, pos_z), item_w, item_h, 
                                     linewidth=1, edgecolor='black', facecolor='skyblue', alpha=0.6)
            ax1.add_patch(rect)
            
    ax1.set_xlim(-100, bin_w + 100)
    ax1.set_ylim(-100, bin_h + 100)
    ax1.set_title(f"FRONT VIEW (X-Z)\nGrid: {grid_x} Wide x {grid_z} High")
    ax1.set_xlabel("Width (mm)")
    ax1.set_ylabel("Height (mm)")
    ax1.grid(True, linestyle='--', alpha=0.3)
    ax1.set_aspect('equal')

    # --- Top View (Width vs Depth) ---
    # Draw Bin
    ax2.add_patch(patches.Rectangle((0, 0), bin_w, bin_d, fill=False, edgecolor='red', linewidth=3, label='Bin Boundary'))
    
    # Draw Items (Loop Y then X) - Note: In top view, we see the 'footprint'
    for y in range(grid_y):
        for x in range(grid_x):
            pos_x = x * item_w
            pos_y = y * item_d
            
            rect = patches.Rectangle((pos_x, pos_y), item_w, item_d, 
                                     linewidth=1, edgecolor='black', facecolor='orange', alpha=0.6)
            ax2.add_patch(rect)

    ax2.set_xlim(-100, bin_w + 100)
    ax2.set_ylim(-100, bin_d + 100)
    ax2.set_title(f"TOP VIEW (X-Y)\nGrid: {grid_x} Wide x {grid_y} Deep")
    ax2.set_xlabel("Width (mm)")
    ax2.set_ylabel("Depth (mm)")
    ax2.grid(True, linestyle='--', alpha=0.3)
    ax2.set_aspect('equal')
    
    # Save
    plt.tight_layout()
    save_path = f"{OUTPUT_DIR}/top_utilization_visual.png"
    plt.savefig(save_path)
    plt.close()
    log(f"Visualization saved to {save_path}")


# ==========================================
# 5. CORE VALIDATION LOGIC
# ==========================================

def func_geometric_fit(df_alloc, df_loc, quiet=False):
    merged = df_alloc.merge(df_loc, left_on='LOCATION_ID', right_on='loc_inst_code', how='left')
    issues = []
    
    iterator = tqdm(merged.iterrows(), total=merged.shape[0]) if not quiet else merged.iterrows()
    
    for idx, row in iterator:
        if pd.isna(row['width']): continue 

        used_x = row['GRID_X'] * row['ORIENT_X_MM']
        used_y = row['GRID_Y'] * row['ORIENT_Y_MM']
        used_z = row['GRID_Z'] * row['ORIENT_Z_MM']
        
        tolerance = 1.0 
        
        fail_x = used_x > (row['width'] + tolerance)
        fail_y = used_y > (row['depth'] + tolerance)
        fail_z = used_z > (row['height'] + tolerance)
        
        if fail_x or fail_y or fail_z:
            issues.append({
                'LOCATION_ID': row['LOCATION_ID'],
                'Issue': 'Geometric Fail',
                'Details': f"Loc: {row['width']}x{row['depth']}x{row['height']} | Stack: {used_x}x{used_y}x{used_z}"
            })

    return pd.DataFrame(issues)

def check_referential_integrity(datasets):
    log("--- Checking Referential Integrity ---")
    
    orphans = datasets['ALLOCATIONS'][~datasets['ALLOCATIONS']['LOCATION_ID'].isin(datasets['LOCATIONS']['loc_inst_code'])]
    if len(orphans) > 0:
        log(f"FAIL: {len(orphans)} Allocations point to unknown Locations.")
    else:
        log("PASS: All Allocations match valid Locations.")
            
    orphans = datasets['ALLOCATIONS'][~datasets['ALLOCATIONS']['SKU'].isin(datasets['PARTS']['ITEM_ID'])]
    if len(orphans) > 0:
        log(f"FAIL: {len(orphans)} Allocations point to unknown SKUs.")
    else:
        log("PASS: All Allocations match valid Parts.")

# ==========================================
# 6. RUNNER
# ==========================================

def run_full_diagnostic():
    datasets, valid_flags = load_all_data()
    
    if not all(valid_flags.values()):
        log("STOPPING: Fix Schema Errors defined above to proceed.")
        return

    # 1. Integrity
    check_referential_integrity(datasets)

    # 2. Geometric Fit
    log("--- Geometric Fit Analysis ---")
    # Simple sampling for estimation, keeping logic clean
    fit_issues = func_geometric_fit(datasets['ALLOCATIONS'], datasets['LOCATIONS'], quiet=False)
    
    if not fit_issues.empty:
        log(f"FAIL: Found {len(fit_issues)} allocations that do not fit.")
        fit_issues.to_csv(f"{OUTPUT_DIR}/fit_issues.csv", index=False)
    else:
        log("PASS: All allocations fit geometrically.")

    # 3. Visualization (The new feature)
    visualize_top_utilization(datasets)

    # Save Log
    with open(f"{OUTPUT_DIR}/validation_report.txt", "w") as f:
        f.write("\n".join(report_buffer))
    log(f"Report saved.")

if __name__ == "__main__":
    run_full_diagnostic()

[2025-12-28 14:14:10] Output folder 'validation_results' ready.
[2025-12-28 14:14:10] [96m--- Loading LOCATIONS ---[0m
[2025-12-28 14:14:10] SUCCESS: LOCATIONS loaded (357 rows).
[2025-12-28 14:14:10] [96m--- Loading ALLOCATIONS ---[0m
[2025-12-28 14:14:10] SUCCESS: ALLOCATIONS loaded (10 rows).
[2025-12-28 14:14:10] [96m--- Loading PARTS ---[0m
[2025-12-28 14:14:10] SUCCESS: PARTS loaded (75 rows).
[2025-12-28 14:14:10] [96m--- Checking Referential Integrity ---[0m
[2025-12-28 14:14:10] [92m[1mPASS[0m: All Allocations match valid Locations.
[2025-12-28 14:14:10] [91m[1mFAIL[0m: 10 Allocations point to unknown SKUs.
[2025-12-28 14:14:10] [96m--- Geometric Fit Analysis ---[0m


100%|████████████████████████████████████████████████████████████████████████████████| 10/10 [00:00<00:00, 3687.95it/s]

[2025-12-28 14:14:10] [92m[1mPASS[0m: All allocations fit geometrically.
[2025-12-28 14:14:10] [96m--- Generating Top Utilization Plot ---[0m
[2025-12-28 14:14:10] Top Utilization Found: 87.36% at A1-00004





[2025-12-28 14:14:11] Visualization saved to validation_results/top_utilization_visual.png
[2025-12-28 14:14:11] Report saved.
