# 3D Data Management - Health Check System

Analyzes production folder health at hyperspeed with surgical precision.
Detects duplicate versions, duplicate content, missing files, and naming violations.

## Health Parameters
- **No Duplicate Versions**: One file per type per version (image_v1, mesh_v1, video_v1)
- **No Duplicate Content**: MD5 hash verification prevents double-upload artifacts
- **Complete File Sets**: Each version has image + mesh + video(s)
- **Sequential Videos**: No gaps in video versions (v1.1, v1.2, v1.3...)
- **Proper Naming**: All files match folder naming convention

**Architecture**: Stability at Hyperspeed

In [None]:
# Cell 1: Authentication and Setup
import os
import re
import time
import random
import threading
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
from collections import defaultdict
import json
import csv

# Google API imports
from google.colab import auth
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import pandas as pd
import google.auth

# Configuration
PRODUCTION_FOLDER_ID = "1abc123def456"  # Replace with actual production folder ID
SHEET_ID = "1HmDdq5g0Zk7d7Uodbh7fIUFq5XiONDZyvrBBZQgNK0w"
DELIVERABLE_TYPES = ['recording', 'screenshot', 'mesh']

# FIXED: Corrected patterns to properly ignore original client files
IGNORED_PATTERNS = [
    r'^image\.jpg$', 
    r'^mask\.jpg$', 
    r'^img_mask\.jpg$',  # Fixed: was img-mask, should be img_mask
    r'_v0\.'
]

# Global state
_credentials = None
thread_local = threading.local()

def get_credentials():
    global _credentials
    if _credentials is None:
        _credentials, _ = google.auth.default()
    return _credentials

def get_drive_service():
    if not hasattr(thread_local, 'drive'):
        thread_local.drive = build('drive', 'v3', credentials=get_credentials())
    return thread_local.drive

def get_sheets_service():
    if not hasattr(thread_local, 'sheets'):
        thread_local.sheets = build('sheets', 'v4', credentials=get_credentials())
    return thread_local.sheets

print("🔐 Authenticating...")
auth.authenticate_user()
_credentials = get_credentials()
print("✅ Authentication established")
print(f"📋 Sheet ID: {SHEET_ID}")
print(f"📁 Production Folder: {PRODUCTION_FOLDER_ID}")
print("🚫 Ignoring original client files: image.jpg, mask.jpg, img_mask.jpg")

In [None]:
# Cell 2: File Parser - Atomic file analysis with PNG support

class FileParser:
    @staticmethod
    def is_deliverable(filename):
        """Swift determination of file relevance"""
        return not any(re.search(pattern, filename, re.IGNORECASE) 
                      for pattern in IGNORED_PATTERNS)
    
    @staticmethod
    def parse_deliverable(filename):
        """Extract essence from filename chaos - FIXED: Now supports PNG screenshots"""
        if not FileParser.is_deliverable(filename):
            return None
            
        # FIXED: Updated pattern to support PNG and other image extensions for screenshots
        # Pattern breakdown: folder_name + type + version + extension
        pattern = r'^(.+)_(recording|screenshot|mesh)_(v\d+(?:\.\d+)?)\.(mp4|jpg|jpeg|png|obj)$'
        match = re.match(pattern, filename, re.IGNORECASE)
        
        if not match:
            return {'valid': False, 'filename': filename}
            
        folder_name, file_type, version, extension = match.groups()
        major, minor = FileParser.extract_version_parts(version)
        
        # Validate extension matches file type
        valid_extensions = {
            'recording': ['mp4'],
            'screenshot': ['jpg', 'jpeg', 'png'],  # FIXED: Added PNG support
            'mesh': ['obj']
        }
        
        if extension.lower() not in valid_extensions.get(file_type, []):
            return {'valid': False, 'filename': filename}
        
        return {
            'valid': True,
            'filename': filename,
            'folder_name': folder_name,
            'type': file_type,
            'version': version,
            'major': major,
            'minor': minor,
            'extension': extension.lower()
        }
    
    @staticmethod
    def extract_version_parts(version_str):
        """Dissect version into numerical components"""
        version_clean = version_str[1:]  # Remove 'v'
        
        if '.' in version_clean:
            major, minor = map(int, version_clean.split('.'))
            return major, minor
        
        return int(version_clean), 0

# Test parser elegance with corrected expectations
test_files = [
    "mc_0_1300_screenshot_v1.jpg",     # Should work
    "sa3_1576545853_handsaw_2_screenshot_v1.png",  # FIXED: Should now work with PNG
    "mc_0_1300_recording_v1.2.mp4",   # Should work
    "mc_0_1300_mesh_v1.obj",          # Should work
    "image.jpg",                       # Should be ignored
    "img_mask.jpg",                    # FIXED: Should be ignored
    "mc_0_1300_mesh_v0.obj"           # Should be ignored (v0)
]

print("🔍 File Parser ready with PNG support. Testing precision:")
for test_file in test_files:
    result = FileParser.parse_deliverable(test_file)
    if result:
        status = "✅" if result.get('valid') else "⚠️"
        print(f"   {status} {test_file} → {result}")
    else:
        print(f"   🚫 {test_file} → ignored (correct)")

In [None]:
# Cell 3: Health Analyzers - Surgical issue detection with fixed recording logic

class HealthAnalyzer:
    @staticmethod
    def check_duplicate_versions(files):
        """Detect version collisions with crystalline clarity"""
        version_map = defaultdict(list)
        
        for file in files:
            parsed = file.get('parsed')
            if parsed and parsed.get('valid'):
                key = f"{parsed['type']}_{parsed['version']}"
                version_map[key].append(file['name'])
        
        return [
            {
                'type': 'duplicate_version',
                'version_key': key,
                'files': filenames,
                'count': len(filenames)
            }
            for key, filenames in version_map.items() 
            if len(filenames) > 1
        ]
    
    @staticmethod
    def check_duplicate_content(files):
        """MD5 truth reveals hidden duplicates"""
        md5_groups = defaultdict(list)
        
        for file in files:
            if file.get('md5Checksum') and file.get('size'):
                key = f"{file['md5Checksum']}_{file['size']}"
                md5_groups[key].append(file)
        
        duplicates = []
        
        for files_group in md5_groups.values():
            if len(files_group) > 1:
                # Check if created within suspicious timeframe
                times = [datetime.fromisoformat(f['createdTime'].replace('Z', '+00:00')) 
                        for f in files_group]
                times.sort()
                
                for i in range(1, len(times)):
                    time_diff = (times[i] - times[i-1]).total_seconds()
                    if time_diff < 300:  # 5 minutes
                        duplicates.append({
                            'type': 'duplicate_content',
                            'files': [f['name'] for f in files_group],
                            'md5': files_group[0]['md5Checksum'],
                            'size': files_group[0]['size'],
                            'time_gap_seconds': int(time_diff)
                        })
                        break
        
        return duplicates
    
    @staticmethod
    def check_missing_files(files, folder_name):
        """Illuminate the void where files should exist"""
        versions = defaultdict(lambda: {'screenshot': None, 'mesh': None, 'recordings': []})
        issues = []
        
        # Categorize files by version
        for file in files:
            parsed = file.get('parsed')
            if parsed and parsed.get('valid'):
                version = f"v{parsed['major']}"
                if parsed['type'] == 'recording':
                    versions[version]['recordings'].append(parsed)
                else:
                    versions[version][parsed['type']] = parsed
        
        # Check completeness
        for version, files_by_type in versions.items():
            # Minimum file requirements
            if not files_by_type['screenshot']:
                issues.append({'type': 'missing_file', 'version': version, 'file_type': 'screenshot'})
            if not files_by_type['mesh']:
                issues.append({'type': 'missing_file', 'version': version, 'file_type': 'mesh'})
            if not files_by_type['recordings']:
                issues.append({'type': 'missing_file', 'version': version, 'file_type': 'recording'})
            
            # FIXED: Check recording sequence gaps ONLY when there are multiple recordings
            recordings = files_by_type['recordings']
            if len(recordings) > 1:  # Only check gaps if there are 2+ recordings
                minor_versions = sorted([v['minor'] for v in recordings])
                
                # Only flag gaps if we have recordings with minor versions (v1.1, v1.2, etc.)
                has_minor_versions = any(minor > 0 for minor in minor_versions)
                
                if has_minor_versions:
                    # Find the starting point for sequence checking
                    min_minor = min(minor for minor in minor_versions if minor > 0)
                    
                    for i in range(len(minor_versions) - 1):
                        current_minor = minor_versions[i]
                        next_minor = minor_versions[i + 1]
                        
                        # Only check gaps for minor versions (not v1 to v1.1 jump)
                        if current_minor > 0 and next_minor > current_minor + 1:
                            expected = current_minor + 1
                            issues.append({
                                'type': 'recording_gap',
                                'version': version,
                                'missing': f"{version}.{expected}"
                            })
                            break  # Only report first gap
            
            # Note: Single recording (v1 only) is perfectly valid - no gap check needed
        
        return issues
    
    @staticmethod
    def check_naming_convention(files, folder_name):
        """Enforce naming discipline with gentle firmness"""
        violations = []
        
        for file in files:
            parsed = file.get('parsed')
            # Only check files that passed the is_deliverable filter
            if parsed and not parsed.get('valid'):
                violations.append({
                    'type': 'naming_violation',
                    'filename': parsed['filename'],
                    'issue': 'invalid_format'
                })
            elif parsed and parsed.get('valid'):
                if parsed['folder_name'] != folder_name:
                    violations.append({
                        'type': 'naming_violation',
                        'filename': parsed['filename'],
                        'issue': 'folder_mismatch',
                        'expected': f"{folder_name}_...",
                        'actual': f"{parsed['folder_name']}_..."
                    })
        
        return violations

print("🔬 Health Analyzers calibrated with FIXED recording gap logic")
print("✅ Single recordings (v1 only) are now considered healthy")
print("⚠️ Gaps only flagged when multiple recordings exist with missing sequences")

In [None]:
# Cell 4: Folder Scanner - Concurrent exploration with enhanced debugging

class FolderScanner:
    def __init__(self):
        self.progress_lock = Lock()
        self.completed_count = 0
    
    def scan_folder(self, folder_id, folder_name):
        """Deep scan single folder with MD5 precision"""
        try:
            drive = get_drive_service()
            
            # List all files
            query = f"'{folder_id}' in parents and trashed=false"
            fields = 'files(id,name,size,md5Checksum,createdTime,mimeType)'
            
            result = drive.files().list(q=query, fields=fields).execute()
            raw_files = result.get('files', [])
            
            # ENHANCED: Better debugging of file processing
            total_raw_files = len(raw_files)
            ignored_files = []
            deliverable_files = []
            
            # Filter and parse deliverable files
            files = []
            for raw_file in raw_files:
                if raw_file['mimeType'] != 'application/vnd.google-apps.folder':
                    filename = raw_file['name']
                    
                    # Check if file should be ignored
                    if not FileParser.is_deliverable(filename):
                        ignored_files.append(filename)
                        continue
                    
                    # Parse the deliverable file
                    parsed = FileParser.parse_deliverable(filename)
                    files.append({
                        **raw_file,
                        'parsed': parsed
                    })
                    
                    if parsed and parsed.get('valid'):
                        deliverable_files.append(filename)
            
            # Debug logging for first few folders
            if self.completed_count < 3:  # Debug first 3 folders
                print(f"🔍 DEBUG {folder_name}:")
                print(f"   Total files: {total_raw_files}")
                print(f"   Ignored files: {len(ignored_files)} {ignored_files[:3]}")
                print(f"   Valid deliverables: {len(deliverable_files)} {deliverable_files[:3]}")
            
            # Run health checks
            issues = []
            issues.extend(HealthAnalyzer.check_duplicate_versions(files))
            issues.extend(HealthAnalyzer.check_duplicate_content(files))
            issues.extend(HealthAnalyzer.check_missing_files(files, folder_name))
            issues.extend(HealthAnalyzer.check_naming_convention(files, folder_name))
            
            # Calculate health score
            deliverable_count = len([f for f in files if f.get('parsed') and f['parsed'].get('valid')])
            health_score = max(0, 1 - (len(issues) / max(deliverable_count, 1))) if deliverable_count > 0 else 1.0
            
            with self.progress_lock:
                self.completed_count += 1
                status = "✅" if not issues else "⚠️"
                print(f"{status} ({self.completed_count}) {folder_name}: {len(issues)} issues, score {health_score:.2f}, {deliverable_count} deliverables")
            
            return {
                'folder_name': folder_name,
                'folder_id': folder_id,
                'health_score': health_score,
                'total_files': deliverable_count,
                'raw_file_count': total_raw_files,
                'ignored_file_count': len(ignored_files),
                'issues': issues,
                'scan_time': datetime.now().isoformat()
            }
            
        except Exception as e:
            with self.progress_lock:
                self.completed_count += 1
                print(f"❌ ({self.completed_count}) {folder_name}: Scan failed - {e}")
            
            return {
                'folder_name': folder_name,
                'folder_id': folder_id,
                'health_score': 0,
                'error': str(e),
                'scan_time': datetime.now().isoformat()
            }
    
    def batch_scan_folders(self, folder_data, max_workers=3):
        """Orchestrate concurrent scans with surgical precision"""
        self.completed_count = 0
        results = []
        
        chunk_size = 10
        actual_workers = min(max_workers, 3)
        
        print(f"🚀 Scanning {len(folder_data)} folders with {actual_workers} workers")
        print(f"📊 Processing in chunks of {chunk_size}")
        
        start_time = time.time()
        
        # Process in chunks for memory management
        for chunk_start in range(0, len(folder_data), chunk_size):
            chunk_end = min(chunk_start + chunk_size, len(folder_data))
            chunk_folders = folder_data[chunk_start:chunk_end]
            
            print(f"\n🔄 Chunk {chunk_start//chunk_size + 1}: folders {chunk_start+1}-{chunk_end}")
            
            with ThreadPoolExecutor(max_workers=actual_workers) as executor:
                futures = {
                    executor.submit(self.scan_folder, folder['id'], folder['name']): folder
                    for folder in chunk_folders
                }
                
                for future in as_completed(futures):
                    result = future.result()
                    results.append(result)
            
            # Gentle pause between chunks
            if chunk_end < len(folder_data):
                time.sleep(2)
        
        duration = time.time() - start_time
        successful = sum(1 for r in results if 'error' not in r)
        total_issues = sum(len(r.get('issues', [])) for r in results if 'error' not in r)
        
        print(f"\n🎯 Scan Complete: {successful}/{len(results)} successful in {duration:.1f}s")
        print(f"🔍 Total issues found across all folders: {total_issues}")
        return results

print("📡 Folder Scanner ready with enhanced debugging for issue detection")

In [None]:
# Cell 5: Report Generator - Minimal, actionable intelligence with fixed CSV export

class ReportGenerator:
    @staticmethod
    def generate_summary(health_results):
        """Distill chaos into crystalline insights"""
        total_folders = len(health_results)
        successful_scans = [r for r in health_results if 'error' not in r]
        failed_scans = total_folders - len(successful_scans)
        
        if not successful_scans:
            return {'error': 'No successful scans to analyze'}
        
        # Calculate metrics
        avg_health = sum(r['health_score'] for r in successful_scans) / len(successful_scans)
        perfect_folders = sum(1 for r in successful_scans if r['health_score'] == 1.0)
        
        # Issue categorization
        issue_counts = defaultdict(int)
        critical_folders = []
        
        for result in successful_scans:
            issues = result.get('issues', [])
            if result['health_score'] < 0.5:
                critical_folders.append(result['folder_name'])
            
            for issue in issues:
                issue_counts[issue['type']] += 1
        
        return {
            'scan_timestamp': datetime.now().isoformat(),
            'total_folders': total_folders,
            'successful_scans': len(successful_scans),
            'failed_scans': failed_scans,
            'average_health_score': round(avg_health, 3),
            'perfect_folders': perfect_folders,
            'critical_folders_count': len(critical_folders),
            'critical_folders': critical_folders[:10],  # Top 10 worst
            'issue_breakdown': dict(issue_counts),
            'recommendations': ReportGenerator._generate_recommendations(issue_counts)
        }
    
    @staticmethod
    def _generate_recommendations(issue_counts):
        """Prescribe targeted remedies"""
        recommendations = []
        
        if issue_counts.get('duplicate_content', 0) > 0:
            recommendations.append("Remove duplicate files to reclaim storage space")
        
        if issue_counts.get('missing_file', 0) > 0:
            recommendations.append("Complete missing file sets before staging")
        
        if issue_counts.get('recording_gap', 0) > 0:
            recommendations.append("Fill video sequence gaps or remove orphaned versions")
        
        if issue_counts.get('naming_violation', 0) > 0:
            recommendations.append("Standardize file naming to match folder conventions")
        
        return recommendations or ["All folders meet health standards"]
    
    @staticmethod
    def export_csv(health_results, summary, filename=None):
        """Export for spreadsheet warriors - FIXED CSV field handling"""
        if not filename:
            filename = f"health_check_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
        
        rows = []
        
        # Define all possible CSV fields upfront
        base_fields = [
            'folder_name', 'health_score', 'total_files', 'issue_count', 
            'scan_time', 'error', 'issue_type', 'issue_details'
        ]
        
        for result in health_results:
            base_row = {
                'folder_name': result['folder_name'],
                'health_score': result['health_score'],
                'total_files': result.get('total_files', 0),
                'issue_count': len(result.get('issues', [])),
                'scan_time': result['scan_time'],
                'error': result.get('error', ''),
                'issue_type': '',  # Initialize empty
                'issue_details': ''  # Initialize empty
            }
            
            issues = result.get('issues', [])
            if issues:
                # Create one row per issue
                for issue in issues:
                    row = base_row.copy()
                    row['issue_type'] = issue['type']
                    row['issue_details'] = json.dumps(issue)
                    rows.append(row)
            else:
                # Create one row for folders with no issues
                rows.append(base_row)
        
        # Write CSV with proper field handling
        if rows:
            with open(filename, 'w', newline='', encoding='utf-8') as f:
                writer = csv.DictWriter(f, fieldnames=base_fields)
                writer.writeheader()
                writer.writerows(rows)
        
        print(f"📄 CSV exported: {filename}")
        return filename
    
    @staticmethod
    def export_json(health_results, summary, filename=None):
        """Export for programmers and APIs"""
        if not filename:
            filename = f"health_check_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        
        export_data = {
            'summary': summary,
            'results': health_results
        }
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(export_data, f, indent=2, ensure_ascii=False)
        
        print(f"📋 JSON exported: {filename}")
        return filename

print("📊 Report Generator ready with FIXED CSV export field handling")

In [None]:
# Cell 6: CLI Orchestrator - User-guided health assessment

def get_stageable_folders():
    """Retrieve folders ready for health assessment"""
    try:
        sheets = get_sheets_service()
        
        range_name = 'Tasks!A:AZ'
        result = sheets.spreadsheets().values().get(
            spreadsheetId=SHEET_ID,
            range=range_name
        ).execute()
        
        values = result.get('values', [])
        if not values:
            return []
        
        headers = values[0]
        data_rows = values[1:]
        
        # Pad rows to match headers
        max_cols = max(len(headers), max(len(row) for row in data_rows) if data_rows else 0)
        if len(headers) < max_cols:
            headers.extend([f'Column_{i}' for i in range(len(headers), max_cols)])
        
        padded_rows = [row + [''] * (len(headers) - len(row)) for row in data_rows]
        df = pd.DataFrame(padded_rows, columns=headers)
        
        # Filter criteria
        df['status_clean'] = df['Status'].fillna('').astype(str).str.strip().str.lower()
        df['review_clean'] = df['Review Status'].fillna('').astype(str).str.strip().str.lower()
        
        eligible = df[
            (df['status_clean'] == 'complete') & 
            (df['review_clean'] == 'passed')
        ]
        
        folders = []
        for _, row in eligible.iterrows():
            folder_link = row.get('Production Folder', '')
            folder_id = extract_folder_id(folder_link)
            if folder_id:
                folders.append({
                    'name': row['Folder Name'],
                    'id': folder_id,
                    'batch_id': row.get('Batch ID', ''),
                    'agent': row.get('Agent Email', '')
                })
        
        return folders
        
    except Exception as e:
        print(f"❌ Failed to load folders: {e}")
        return []

def extract_folder_id(folder_link):
    """Extract Drive folder ID from URL"""
    if not folder_link:
        return None
    
    match = re.search(r'folders/([a-zA-Z0-9-_]+)', folder_link)
    if match:
        return match.group(1)
    
    if re.match(r'^[a-zA-Z0-9-_]{20,}$', folder_link):
        return folder_link
    
    return None

def run_health_check():
    """Orchestrate the complete health assessment workflow"""
    print("\n" + "="*60)
    print("🏥 3D DATA MANAGEMENT - HEALTH CHECK SYSTEM")
    print("="*60)
    
    try:
        # Step 1: Load eligible folders
        print("\n📋 Loading folders ready for health check...")
        folders = get_stageable_folders()
        
        if not folders:
            print("❌ No folders found matching criteria (status=complete, review=passed)")
            return
        
        print(f"📁 Found {len(folders)} folders eligible for health check")
        
        # Step 2: Apply filters
        batch_filter = input("\n🔍 Filter by batch ID (optional): ").strip() or None
        agent_filter = input("🔍 Filter by agent email contains (optional): ").strip() or None
        
        if batch_filter:
            folders = [f for f in folders if f['batch_id'] == batch_filter]
            print(f"📊 After batch filter: {len(folders)} folders")
        
        if agent_filter:
            folders = [f for f in folders if agent_filter.lower() in f['agent'].lower()]
            print(f"📊 After agent filter: {len(folders)} folders")
        
        if not folders:
            print("❌ No folders remain after filtering")
            return
        
        # Step 3: Preview and confirm
        print(f"\n📋 Folders to scan ({len(folders)}):")
        for i, folder in enumerate(folders[:5], 1):
            print(f"   {i}. {folder['name']} (Batch: {folder.get('batch_id', 'N/A')})")
        if len(folders) > 5:
            print(f"   ... and {len(folders) - 5} more")
        
        confirm = input(f"\n🚀 Start health check on {len(folders)} folders? [y/N]: ").lower()
        if confirm != 'y':
            print("❌ Health check cancelled")
            return
        
        # Step 4: Choose export format
        print("\n📄 Export format:")
        print("   [1] Console only (fastest)")
        print("   [2] CSV file (spreadsheet-friendly)")
        print("   [3] JSON file (programmatic)")
        print("   [4] Both CSV and JSON")
        
        export_choice = input("Choose export format [1]: ").strip() or "1"
        
        # Step 5: Execute health scan
        print(f"\n🔬 Starting health assessment...")
        scanner = FolderScanner()
        results = scanner.batch_scan_folders(folders)
        
        # Step 6: Generate summary
        summary = ReportGenerator.generate_summary(results)
        
        # Step 7: Display results
        print("\n" + "="*60)
        print("📋 HEALTH CHECK SUMMARY")
        print("="*60)
        print(f"🔍 Total folders scanned: {summary['total_folders']}")
        print(f"✅ Successful scans: {summary['successful_scans']}")
        print(f"❌ Failed scans: {summary['failed_scans']}")
        print(f"📊 Average health score: {summary['average_health_score']}")
        print(f"🏆 Perfect folders: {summary['perfect_folders']}")
        print(f"⚠️ Critical folders: {summary['critical_folders_count']}")
        
        if summary.get('issue_breakdown'):
            print("\n🔍 Issue Breakdown:")
            for issue_type, count in summary['issue_breakdown'].items():
                print(f"   • {issue_type}: {count}")
        
        if summary.get('recommendations'):
            print("\n💡 Recommendations:")
            for rec in summary['recommendations']:
                print(f"   • {rec}")
        
        # Step 8: Export results
        if export_choice in ["2", "4"]:
            ReportGenerator.export_csv(results, summary)
        
        if export_choice in ["3", "4"]:
            ReportGenerator.export_json(results, summary)
        
        print("\n🏁 Health check completed successfully!")
        
    except Exception as e:
        print(f"\n💥 Health check failed: {e}")
        raise

print("\n🎯 Health Check CLI ready for deployment")
print("📞 Execute: run_health_check()")

In [None]:
# Cell 7: Execute Health Check

# Launch the health assessment workflow
run_health_check()